diff --git a/BUILD.bazel b/BUILD.bazel index d8f7c0462..a91afeccf 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -60,6 +60,17 @@ python_grpc_compile( deps = [":node_manager_proto"], ) +proto_library( + name = "reporter_proto", + srcs = ["src/ray/protobuf/reporter.proto"], + deps = [":common_proto"], +) + +python_grpc_compile( + name = "reporter_py_proto", + deps = [":reporter_proto"], +) + proto_library( name = "gcs_service_proto", srcs = ["src/ray/protobuf/gcs_service.proto"], @@ -1154,6 +1165,7 @@ filegroup( "common_py_proto", "gcs_py_proto", "node_manager_py_proto", + "reporter_py_proto", ], ) @@ -1221,6 +1233,8 @@ genrule( sed -i -E 's/from src.ray.protobuf/from ./' "$$WORK_DIR/python/ray/core/generated/common_pb2.py" && sed -i -E 's/from src.ray.protobuf/from ./' "$$WORK_DIR/python/ray/core/generated/node_manager_pb2.py" && sed -i -E 's/from src.ray.protobuf/from ./' "$$WORK_DIR/python/ray/core/generated/node_manager_pb2_grpc.py" && + sed -i -E 's/from src.ray.protobuf/from ./' "$$WORK_DIR/python/ray/core/generated/reporter_pb2.py" && + sed -i -E 's/from src.ray.protobuf/from ./' "$$WORK_DIR/python/ray/core/generated/reporter_pb2_grpc.py" && echo "$$WORK_DIR" > $@ """, local = 1, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1f6c043a7..973f778d8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -633,6 +633,13 @@ cdef execute_task( with core_worker.profile_event(b"task:deserialize_arguments"): args, kwargs = deserialize_args(c_args, c_arg_reference_ids) + if (task_type == TASK_TYPE_ACTOR_CREATION_TASK): + actor = worker.actors[core_worker.get_actor_id()] + class_name = actor.__class__.__name__ + actor_title = "{}({}, {})".format( + class_name, repr(args), repr(kwargs)) + core_worker.set_actor_title(actor_title.encode("utf-8")) + # Execute the task. with ray.worker._changeproctitle(title, next_title): with core_worker.profile_event(b"task:execute"): @@ -810,6 +817,9 @@ cdef class CoreWorker: def set_webui_display(self, message): self.core_worker.get().SetWebuiDisplay(message) + def set_actor_title(self, title): + self.core_worker.get().SetActorTitle(title) + def get_objects(self, object_ids, TaskID current_task_id, int64_t timeout_ms=-1): cdef: diff --git a/python/ray/dashboard/client/src/api.ts b/python/ray/dashboard/client/src/api.ts index d650f6fef..ee4d46c1e 100644 --- a/python/ray/dashboard/client/src/api.ts +++ b/python/ray/dashboard/client/src/api.ts @@ -100,19 +100,25 @@ export interface RayletInfoResponse { [actorId: string]: | { actorId: string; + actorTitle: string; + averageTaskExecutionSpeed: number; children: RayletInfoResponse["actors"]; + currentTaskFuncDesc: string[]; ipAddress: string; isDirectCall: boolean; jobId: string; + nodeId: string; + numExecutedTasks: number; numLocalObjects: number; numObjectIdsInScope: number; + pid: number; port: number; state: 0 | 1 | 2; taskQueueLength: number; + timestamp: number; usedObjectStoreMemory: number; usedResources: { [key: string]: number }; currentTaskDesc?: string; - currentTaskFuncDesc?: string[]; numPendingTasks?: number; webuiDisplay?: string; } diff --git a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx index 364adfe18..c42409cae 100644 --- a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx +++ b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx @@ -67,8 +67,14 @@ class Actor extends React.Component, State> { actor.state !== -1 ? [ { - label: "ID", - value: actor.actorId + label: "ActorTitle", + value: + actor.actorTitle + }, + { + label: "State", + value: + actor.state.toLocaleString() }, { label: "Resources", @@ -81,14 +87,32 @@ class Actor extends React.Component, State> { { label: "Pending", value: - actor.taskQueueLength !== undefined && - actor.taskQueueLength > 0 && actor.taskQueueLength.toLocaleString() }, + { + label: "Executed", + value: + actor.numExecutedTasks.toLocaleString() + }, + { + label: "NumObjectIdsInScope", + value: + actor.numObjectIdsInScope.toLocaleString() + }, + { + label: "NumLocalObjects", + value: + actor.numLocalObjects.toLocaleString() + }, + { + label: "UsedLocalObjectMemory", + value: + actor.usedObjectStoreMemory.toLocaleString() + }, { label: "Task", value: - actor.currentTaskFuncDesc && actor.currentTaskFuncDesc.join(".") + actor.currentTaskFuncDesc.join(".") } ] : [ diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 7b52f7164..86c9715aa 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -16,6 +16,7 @@ import threading import time import traceback import yaml +import uuid from base64 import b64decode from collections import defaultdict @@ -27,6 +28,8 @@ from google.protobuf.json_format import MessageToDict import ray from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc +from ray.core.generated import reporter_pb2 +from ray.core.generated import reporter_pb2_grpc import ray.ray_constants as ray_constants # Logger for this module. It should be configured at the entry point @@ -54,18 +57,18 @@ def format_resource(resource_name, quantity): return "{}".format(round_resource_value(quantity)) -def format_reply(reply): +def format_reply_id(reply): if isinstance(reply, dict): for k, v in reply.items(): if isinstance(v, dict) or isinstance(v, list): - format_reply(v) + format_reply_id(v) else: if k.endswith("Id"): v = b64decode(v) reply[k] = ray.utils.binary_to_hex(v) elif isinstance(reply, list): for item in reply: - format_reply(item) + format_reply_id(item) def measures_to_dict(measures): @@ -190,12 +193,14 @@ class Dashboard(object): async def raylet_info(req) -> aiohttp.web.Response: D = self.raylet_stats.get_raylet_stats() - workers_info = sum( - (data.get("workersStats", []) for data in D.values()), []) + workers_info_by_node = { + data["nodeId"]: data.get("workersStats") + for data in D.values() + } infeasible_tasks = sum( (data.get("infeasibleTasks", []) for data in D.values()), []) actor_tree = self.node_stats.get_actor_tree( - workers_info, infeasible_tasks) + workers_info_by_node, infeasible_tasks) for address, data in D.items(): # process view data measures_dicts = {} @@ -246,6 +251,24 @@ class Dashboard(object): result = {"nodes": D, "actors": actor_tree} return await json_response(result=result) + async def launch_profiling(req) -> aiohttp.web.Response: + node_id = req.query.get("node_id") + pid = int(req.query.get("pid")) + duration = int(req.query.get("duration")) + profiling_id = self.raylet_stats.launch_profiling( + node_id=node_id, pid=pid, duration=duration) + return aiohttp.web.json_response(str(profiling_id)) + + async def check_profiling_status(req) -> aiohttp.web.Response: + profiling_id = req.query.get("profiling_id") + return aiohttp.web.json_response( + self.raylet_stats.check_profiling_status(profiling_id)) + + async def get_profiling_info(req) -> aiohttp.web.Response: + profiling_id = req.query.get("profiling_id") + return aiohttp.web.json_response( + self.raylet_stats.get_profiling_info(profiling_id)) + async def logs(req) -> aiohttp.web.Response: hostname = req.query.get("hostname") pid = req.query.get("pid") @@ -280,6 +303,10 @@ class Dashboard(object): self.app.router.add_get("/api/ray_config", ray_config) self.app.router.add_get("/api/node_info", node_info) self.app.router.add_get("/api/raylet_info", raylet_info) + self.app.router.add_get("/api/launch_profiling", launch_profiling) + self.app.router.add_get("/api/check_profiling_status", + check_profiling_status) + self.app.router.add_get("/api/get_profiling_info", get_profiling_info) self.app.router.add_get("/api/logs", logs) self.app.router.add_get("/api/errors", errors) @@ -313,6 +340,7 @@ class NodeStats(threading.Thread): self._default_info = { "actorId": "", "children": {}, + "currentTaskFuncDesc": [], "ipAddress": "", "isDirectCall": False, "jobId": "", @@ -380,7 +408,7 @@ class NodeStats(threading.Thread): "error_counts": self.calculate_error_counts(), } - def get_actor_tree(self, workers_info, infeasible_tasks) -> Dict: + def get_actor_tree(self, workers_info_by_node, infeasible_tasks) -> Dict: now = time.time() # construct flattened actor tree flattened_tree = {"root": {"children": {}}} @@ -394,23 +422,28 @@ class NodeStats(threading.Thread): self._addr_to_owner_addr[addr], "root") child_to_parent[actor_id] = parent_id - for worker_info in workers_info: - if "coreWorkerStats" in worker_info: - core_worker_stats = worker_info["coreWorkerStats"] - addr = (core_worker_stats["ipAddress"], - str(core_worker_stats["port"])) - if addr in self._addr_to_actor_id: - actor_info = flattened_tree[self._addr_to_actor_id[ - addr]] - if "currentTaskFuncDesc" in core_worker_stats: - core_worker_stats["currentTaskFuncDesc"] = list( - map(b64_decode, - core_worker_stats["currentTaskFuncDesc"])) - format_reply(core_worker_stats) - actor_info.update(core_worker_stats) - actor_info["averageTaskExecutionSpeed"] = round( - actor_info["numExecutedTasks"] / - (now - actor_info["timestamp"] / 1000), 2) + for node_id, workers_info in workers_info_by_node.items(): + for worker_info in workers_info: + if "coreWorkerStats" in worker_info: + core_worker_stats = worker_info["coreWorkerStats"] + addr = (core_worker_stats["ipAddress"], + str(core_worker_stats["port"])) + if addr in self._addr_to_actor_id: + actor_info = flattened_tree[self._addr_to_actor_id[ + addr]] + if "currentTaskFuncDesc" in core_worker_stats: + core_worker_stats[ + "currentTaskFuncDesc"] = list( + map( + b64_decode, core_worker_stats[ + "currentTaskFuncDesc"])) + format_reply_id(core_worker_stats) + actor_info.update(core_worker_stats) + actor_info["averageTaskExecutionSpeed"] = round( + actor_info["numExecutedTasks"] / + (now - actor_info["timestamp"] / 1000), 2) + actor_info["nodeId"] = node_id + actor_info["pid"] = worker_info["pid"] for infeasible_task in infeasible_tasks: actor_id = ray.utils.binary_to_hex( @@ -423,7 +456,7 @@ class NodeStats(threading.Thread): infeasible_task["state"] = -1 infeasible_task["functionDescriptor"] = list( map(b64_decode, infeasible_task["functionDescriptor"])) - format_reply(infeasible_tasks) + format_reply_id(infeasible_tasks) flattened_tree[actor_id] = infeasible_task # construct actor tree @@ -536,9 +569,13 @@ class RayletStats(threading.Thread): self.nodes_lock = threading.Lock() self.nodes = [] self.stubs = {} + self.reporter_stubs = {} + self.redis_client = ray.services.create_redis_client( + redis_address, password=redis_password) self._raylet_stats_lock = threading.Lock() self._raylet_stats = {} + self._profiling_stats = {} self.update_nodes() @@ -554,21 +591,71 @@ class RayletStats(threading.Thread): if node_id not in node_ids: stub = self.stubs.pop(node_id) stub.close() + reporter_stub = self.reporter_stubs.pop(node_id) + reporter_stub.close() # Now add node connections of new nodes. for node in self.nodes: node_id = node["NodeID"] if node_id not in self.stubs: + node_ip = node["NodeManagerAddress"] channel = grpc.insecure_channel("{}:{}".format( - node["NodeManagerAddress"], node["NodeManagerPort"])) + node_ip, node["NodeManagerPort"])) stub = node_manager_pb2_grpc.NodeManagerServiceStub( channel) self.stubs[node_id] = stub + # Block wait until the reporter for the node starts. + while True: + reporter_port = self.redis_client.get( + "REPORTER_PORT:{}".format(node_ip)) + if reporter_port: + break + reporter_channel = grpc.insecure_channel("{}:{}".format( + node_ip, int(reporter_port))) + reporter_stub = reporter_pb2_grpc.ReporterServiceStub( + reporter_channel) + self.reporter_stubs[node_id] = reporter_stub + + assert len(self.stubs) == len( + self.reporter_stubs), (self.stubs.keys(), + self.reporter_stubs.keys()) def get_raylet_stats(self) -> Dict: with self._raylet_stats_lock: return copy.deepcopy(self._raylet_stats) + def launch_profiling(self, node_id, pid, duration): + profiling_id = str(uuid.uuid4()) + + def _callback(reply_future): + reply = reply_future.result() + with self._raylet_stats_lock: + self._profiling_stats[profiling_id] = reply + + reporter_stub = self.reporter_stubs[node_id] + reply_future = reporter_stub.GetProfilingStats.future( + reporter_pb2.GetProfilingStatsRequest(pid=pid, duration=duration)) + reply_future.add_done_callback(_callback) + return profiling_id + + def check_profiling_status(self, profiling_id): + with self._raylet_stats_lock: + is_present = profiling_id in self._profiling_stats + if is_present: + reply = self._profiling_stats[profiling_id] + if reply.stderr: + return {"status": "error", "error": reply.stderr} + else: + return {"status": "finished"} + else: + return {"status": "pending"} + + def get_profiling_info(self, profiling_id): + with self._raylet_stats_lock: + profiling_stats = self._profiling_stats.get(profiling_id) + assert profiling_stats, "profiling not finished" + return json.loads(profiling_stats.profiling_stats) + def run(self): counter = 0 while True: @@ -579,10 +666,12 @@ class RayletStats(threading.Thread): stub = self.stubs[node_id] reply = stub.GetNodeStats( node_manager_pb2.GetNodeStatsRequest(), timeout=2) - replies[node["NodeManagerAddress"]] = reply + reply_dict = MessageToDict(reply) + reply_dict["nodeId"] = node_id + replies[node["NodeManagerAddress"]] = reply_dict with self._raylet_stats_lock: - for address, reply in replies.items(): - self._raylet_stats[address] = MessageToDict(reply) + for address, reply_dict in replies.items(): + self._raylet_stats[address] = reply_dict counter += 1 # From time to time, check if new nodes have joined the cluster # and update self.nodes diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 320e695a8..09d826381 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -114,6 +114,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CTaskID GetCurrentTaskId() const CActorID &GetActorId() void SetWebuiDisplay(const c_string &message) + void SetActorTitle(const c_string &title) CTaskID GetCallerId() const ResourceMappingType &GetResourceIDs() const CActorID DeserializeAndRegisterActorHandle(const c_string &bytes) diff --git a/python/ray/reporter.py b/python/ray/reporter.py index 1cdaa40e9..88211070f 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -5,6 +5,9 @@ import os import traceback import time import datetime +import grpc +import subprocess +from concurrent import futures try: import psutil @@ -16,6 +19,8 @@ except ImportError: import ray.ray_constants as ray_constants import ray.services import ray.utils +from ray.core.generated import reporter_pb2 +from ray.core.generated import reporter_pb2_grpc # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at @@ -23,6 +28,31 @@ import ray.utils logger = logging.getLogger(__name__) +class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer): + def __init__(self): + pass + + def GetProfilingStats(self, request, context): + pid = request.pid + duration = request.duration + profiling_file_path = os.path.join("/tmp/ray/", + "{}_profiling.txt".format(pid)) + process = subprocess.Popen( + "sudo $(which py-spy) record -o {} -p {} -d {} -f speedscope" + .format(profiling_file_path, pid, duration), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True) + stdout, stderr = process.communicate() + if process.returncode != 0: + profiling_stats = "" + else: + with open(profiling_file_path, "r") as f: + profiling_stats = f.read() + return reporter_pb2.GetProfilingStatsReply( + profiling_stats=profiling_stats, stdout=stdout, stderr=stderr) + + def recursive_asdict(o): if isinstance(o, tuple) and hasattr(o, "_asdict"): return recursive_asdict(o._asdict()) @@ -157,6 +187,14 @@ class Reporter: ) def run(self): + """Publish the port.""" + thread_pool = futures.ThreadPoolExecutor(max_workers=10) + server = grpc.server(thread_pool, options=(("grpc.so_reuseport", 0), )) + reporter_pb2_grpc.add_ReporterServiceServicer_to_server( + ReporterServer(), server) + port = server.add_insecure_port("[::]:0") + server.start() + self.redis_client.set("REPORTER_PORT:{}".format(self.ip), port) """Run the reporter.""" while True: try: diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index a18c13d0a..42368059d 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -1,4 +1,5 @@ import os +import json import grpc import psutil import requests @@ -7,6 +8,8 @@ import time import ray from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc +from ray.core.generated import reporter_pb2 +from ray.core.generated import reporter_pb2_grpc from ray.test_utils import RayTestTimeoutException @@ -138,7 +141,11 @@ def test_raylet_info_endpoint(shutdown_only): def remote_store(self): self.remote_storage = ray.put("test") + def getpid(self): + return os.getpid() + c = ActorC.remote() + actor_pid = ray.get(c.getpid.remote()) c.local_store.remote() c.remote_store.remote() @@ -168,7 +175,7 @@ def test_raylet_info_endpoint(shutdown_only): "Timed out while waiting for dashboard to start.") assert parent_actor_info["usedResources"]["CPU"] == 2 - assert parent_actor_info["numExecutedTasks"] == 3 + assert parent_actor_info["numExecutedTasks"] == 4 for _, child_actor_info in children.items(): if child_actor_info["state"] == -1: assert child_actor_info["requiredResources"]["CustomResource"] == 1 @@ -177,6 +184,61 @@ def test_raylet_info_endpoint(shutdown_only): assert len(child_actor_info["children"]) == 0 assert child_actor_info["usedResources"]["CPU"] == 1 + profiling_id = requests.get( + webui_url + "/api/launch_profiling", + params={ + "node_id": ray.nodes()[0]["NodeID"], + "pid": actor_pid, + "duration": 5 + }).json() + start_time = time.time() + while True: + time.sleep(1) + try: + profiling_info = requests.get( + webui_url + "/api/check_profiling_status", + params={ + "profiling_id": profiling_id, + }).json() + assert profiling_info["status"] in ("finished", "pending", "error") + break + except AssertionError: + if time.time() - start_time + 10: + raise Exception("Timed out while collecting profiling stats.") + + +def test_profiling_info_endpoint(shutdown_only): + ray.init(num_cpus=1) + + redis_client = ray.worker.global_worker.redis_client + + node_ip = ray.nodes()[0]["NodeManagerAddress"] + + while True: + reporter_port = redis_client.get("REPORTER_PORT:{}".format(node_ip)) + if reporter_port: + break + + reporter_channel = grpc.insecure_channel("{}:{}".format( + node_ip, int(reporter_port))) + reporter_stub = reporter_pb2_grpc.ReporterServiceStub(reporter_channel) + + @ray.remote(num_cpus=1) + class ActorA: + def __init__(self): + pass + + def getpid(self): + return os.getpid() + + a = ActorA.remote() + actor_pid = ray.get(a.getpid.remote()) + + reply = reporter_stub.GetProfilingStats( + reporter_pb2.GetProfilingStatsRequest(pid=actor_pid, duration=10)) + profiling_stats = json.loads(reply.profiling_stats) + assert profiling_stats is not None + if __name__ == "__main__": import pytest diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 08925312c..edc205a42 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1189,6 +1189,7 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & (*used_resources_map)[it.first] = quantity; } stats->set_webui_display(webui_display_); + stats->set_actor_title(actor_title_); MemoryStoreStats memory_store_stats = memory_store_->GetMemoryStoreStatisticalData(); stats->set_num_local_objects(memory_store_stats.num_local_objects); stats->set_used_object_store_memory(memory_store_stats.used_object_store_memory); @@ -1225,4 +1226,9 @@ void CoreWorker::SetWebuiDisplay(const std::string &message) { webui_display_ = message; } +void CoreWorker::SetActorTitle(const std::string &title) { + absl::MutexLock lock(&mutex_); + actor_title_ = title; +} + } // namespace ray diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 68623dafd..57b833759 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -99,6 +99,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void SetWebuiDisplay(const std::string &message); + void SetActorTitle(const std::string &title); + /// Increase the reference count for this object ID. /// Increase the local reference count for this object ID. Should be called /// by the language frontend when a new reference is created. @@ -657,6 +659,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// String to be displayed on Web UI. std::string webui_display_ GUARDED_BY(mutex_); + /// Actor title that consists of class name, args, kwargs for actor construction. + std::string actor_title_ GUARDED_BY(mutex_); + /// Number of tasks that have been pushed to the actor but not executed. std::atomic task_queue_length_; diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index f0860af98..b1d9db3c0 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -214,4 +214,6 @@ message CoreWorkerStats { int32 task_queue_length = 13; // Number of executed tasks. int32 num_executed_tasks = 14; + // Actor constructor. + string actor_title = 15; } diff --git a/src/ray/protobuf/reporter.proto b/src/ray/protobuf/reporter.proto new file mode 100644 index 000000000..67b830460 --- /dev/null +++ b/src/ray/protobuf/reporter.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package ray.rpc; + +import "src/ray/protobuf/common.proto"; + +message GetProfilingStatsRequest { + // PID of the worker process. + uint32 pid = 1; + // Duration of the profiling in seconds. + int32 duration = 2; +} + +message GetProfilingStatsReply { + // Profiling stats. + string profiling_stats = 1; + // Standard output of the profiler process. + string stdout = 2; + // Standard error of the profiler process. + string stderr = 3; +} + +// Service for communicating with the reporter.py process on a remote node. +service ReporterService { + // Get the profiling stats. + rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply); +}