mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 22:13:16 +08:00
Visualize computation graph
This commit is contained in:
@@ -8,6 +8,6 @@ PYTHON_MODE = 3
|
||||
|
||||
import libraylib as lib
|
||||
import serialization
|
||||
from worker import scheduler_info, dump_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers
|
||||
from worker import scheduler_info, visualize_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers
|
||||
from libraylib import ObjRef
|
||||
import internal
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
# Utilities to deal with computation graphs
|
||||
|
||||
import graphviz
|
||||
|
||||
def graph_to_graphviz(computation_graph):
|
||||
"""
|
||||
Convert the computation graph to graphviz format.
|
||||
|
||||
Args:
|
||||
computation_graph [graph_pb2.CompGraph]: protocol buffer description of
|
||||
the computation graph
|
||||
|
||||
Returns:
|
||||
Graphviz description of the computation graph
|
||||
"""
|
||||
dot = graphviz.Digraph(format="pdf")
|
||||
dot.node("op-root", shape="box")
|
||||
for (i, op) in enumerate(computation_graph.operation):
|
||||
if op.HasField("task"):
|
||||
dot.node("op" + str(i), shape="box", label=str(i) + "\n" + op.task.name.split(".")[-1])
|
||||
for res in op.task.result:
|
||||
dot.edge("op" + str(i), str(res))
|
||||
elif op.HasField("put"):
|
||||
dot.node("op" + str(i), shape="box", label=str(i) + "\n" + "put")
|
||||
dot.edge("op" + str(i), str(op.put.objref))
|
||||
elif op.HasField("get"):
|
||||
dot.node("op" + str(i), shape="box", label=str(i) + "\n" + "get")
|
||||
creator_operationid = op.creator_operationid if op.creator_operationid != 2 ** 64 - 1 else "-root"
|
||||
dot.edge("op" + str(creator_operationid), "op" + str(i), style="dotted", constraint="false")
|
||||
for arg in op.task.arg:
|
||||
if not arg.HasField("obj"):
|
||||
dot.node(str(arg.ref))
|
||||
dot.edge(str(arg.ref), "op" + str(i))
|
||||
return dot
|
||||
@@ -12,6 +12,8 @@ import copy
|
||||
import ray
|
||||
from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
|
||||
import serialization
|
||||
import ray.internal.graph_pb2
|
||||
import ray.graph
|
||||
|
||||
class RayFailedObject(object):
|
||||
"""If a task throws an exception during execution, a RayFailedObject is stored in the object store for each of the tasks outputs."""
|
||||
@@ -143,8 +145,42 @@ def print_task_info(task_data, mode):
|
||||
def scheduler_info(worker=global_worker):
|
||||
return ray.lib.scheduler_info(worker.handle);
|
||||
|
||||
def dump_computation_graph(file_name, worker=global_worker):
|
||||
ray.lib.dump_computation_graph(worker.handle, file_name)
|
||||
def visualize_computation_graph(file_path=None, view=False, worker=global_worker):
|
||||
"""
|
||||
Write the computation graph to a pdf file.
|
||||
|
||||
Args:
|
||||
file_path: A .pdf file that the rendered computation graph will be written to
|
||||
|
||||
view: If true, the result the python graphviz package will try to open the
|
||||
result in a viewer
|
||||
|
||||
Example:
|
||||
In ray/scripts, call "python shell.py" and paste in the following code.
|
||||
|
||||
x = da.zeros([20, 20])
|
||||
y = da.zeros([20, 20])
|
||||
z = da.dot(x, y)
|
||||
|
||||
ray.visualize_computation_graph("computation_graph.pdf")
|
||||
"""
|
||||
|
||||
if file_path is None:
|
||||
file_path = os.path.join(ray.config.LOG_DIRECTORY, (ray.config.LOG_TIMESTAMP + "-computation-graph.pdf").format(datetime.datetime.now()))
|
||||
|
||||
base_path, extension = os.path.splitext(file_path)
|
||||
if extension != ".pdf":
|
||||
raise Exception("File path must be a .pdf file")
|
||||
proto_path = base_path + ".binaryproto"
|
||||
|
||||
ray.lib.dump_computation_graph(worker.handle, proto_path)
|
||||
graph = ray.internal.graph_pb2.CompGraph()
|
||||
graph.ParseFromString(open(proto_path).read())
|
||||
ray.graph.graph_to_graphviz(graph).render(base_path, view=view)
|
||||
|
||||
print "Wrote graph dot description to file {}".format(base_path)
|
||||
print "Wrote graph protocol buffer description to file {}".format(proto_path)
|
||||
print "Wrote computation graph to file {}.pdf".format(base_path)
|
||||
|
||||
def task_info(worker=global_worker):
|
||||
"""Tell the scheduler to return task information. Currently includes a list of all failed tasks since the start of the cluster."""
|
||||
|
||||
@@ -12,6 +12,10 @@ message Put {
|
||||
uint64 objref = 1; // The objref for the object that was put
|
||||
}
|
||||
|
||||
message Get {
|
||||
uint64 objref = 1; // The objref for the object that is retrieved
|
||||
}
|
||||
|
||||
// This is used internally by the scheduler. From the scheduler's perspective,
|
||||
// the submission of tasks (via SubmitTask) and the submission of puts (via
|
||||
// PutObj) look very similar, and so it is useful to be able to handle them
|
||||
@@ -19,6 +23,7 @@ message Put {
|
||||
message Operation {
|
||||
Task task = 1;
|
||||
Put put = 2;
|
||||
Get get = 4;
|
||||
uint64 creator_operationid = 3; // The id of the task that called this task or put.
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
typing
|
||||
funcsigs
|
||||
subprocess32
|
||||
protobuf==3.0.0-alpha-2
|
||||
IPython
|
||||
boto3
|
||||
botocore
|
||||
Pillow
|
||||
colorama
|
||||
graphviz
|
||||
|
||||
@@ -30,10 +30,10 @@ fi
|
||||
|
||||
if [[ $platform == "linux" ]]; then
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip libjpeg8-dev
|
||||
sudo apt-get install -y git cmake build-essential autoconf curl libtool python-dev python-numpy python-pip libboost-all-dev unzip libjpeg8-dev graphviz
|
||||
sudo pip install -r requirements.txt
|
||||
elif [[ $platform == "macosx" ]]; then
|
||||
brew install git cmake automake autoconf libtool boost libjpeg
|
||||
brew install git cmake automake autoconf libtool boost libjpeg graphviz
|
||||
sudo easy_install pip
|
||||
sudo pip install numpy
|
||||
sudo pip install -r requirements.txt --ignore-installed six
|
||||
|
||||
+8
-1
@@ -49,7 +49,10 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ
|
||||
|
||||
Status SchedulerService::PutObj(ServerContext* context, const PutObjRequest* request, PutObjReply* reply) {
|
||||
ObjRef objref = register_new_object();
|
||||
ObjStoreId objstoreid = get_store(request->workerid());
|
||||
auto operation = std::unique_ptr<Operation>(new Operation());
|
||||
operation->mutable_put()->set_objref(objref);
|
||||
operation->set_creator_operationid((*workers_.get())[request->workerid()].current_task);
|
||||
computation_graph_.get()->add_operation(std::move(operation));
|
||||
reply->set_objref(objref);
|
||||
schedule();
|
||||
return Status::OK;
|
||||
@@ -59,6 +62,10 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ
|
||||
size_t size = objtable_.get()->size();
|
||||
ObjRef objref = request->objref();
|
||||
RAY_CHECK_LT(objref, size, "internal error: no object with objref " << objref << " exists");
|
||||
auto operation = std::unique_ptr<Operation>(new Operation());
|
||||
operation->mutable_get()->set_objref(objref);
|
||||
operation->set_creator_operationid((*workers_.get())[request->workerid()].current_task);
|
||||
computation_graph_.get()->add_operation(std::move(operation));
|
||||
get_queue_.get()->push_back(std::make_pair(request->workerid(), objref));
|
||||
schedule();
|
||||
return Status::OK;
|
||||
|
||||
Reference in New Issue
Block a user