From 65acb54553c201af5c6ff97e31cb4567af7cd5b3 Mon Sep 17 00:00:00 2001 From: Yunzhi Zhang <35828389+zzyunzhi@users.noreply.github.com> Date: Mon, 30 Dec 2019 13:08:08 -0800 Subject: [PATCH] [Dashboard] Logical view backend for dashboard (#6590) --- .../dashboard/node-info/NodeRowGroup.tsx | 2 +- python/ray/dashboard/dashboard.py | 66 ++++++++++++++++- python/ray/gcs_utils.py | 2 + python/ray/tests/test_metrics.py | 70 +++++++++++++++++-- src/ray/core_worker/core_worker.cc | 16 ++++- src/ray/protobuf/common.proto | 12 ++++ src/ray/protobuf/core_worker.proto | 4 +- src/ray/protobuf/node_manager.proto | 4 +- src/ray/raylet/node_manager.cc | 1 - 9 files changed, 162 insertions(+), 15 deletions(-) diff --git a/python/ray/dashboard/client/src/pages/dashboard/node-info/NodeRowGroup.tsx b/python/ray/dashboard/client/src/pages/dashboard/node-info/NodeRowGroup.tsx index 9c842e882..56dc43f1b 100644 --- a/python/ray/dashboard/client/src/pages/dashboard/node-info/NodeRowGroup.tsx +++ b/python/ray/dashboard/client/src/pages/dashboard/node-info/NodeRowGroup.tsx @@ -38,7 +38,7 @@ const styles = (theme: Theme) => }, extraInfo: { fontFamily: "SFMono-Regular,Consolas,Liberation Mono,Menlo,monospace", - whiteSpace: "pre-wrap" + whiteSpace: "pre" } }); diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 9809c0eb4..8cfbf50a0 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -31,7 +31,6 @@ import ray from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc import ray.ray_constants as ray_constants -import ray.utils # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at @@ -159,6 +158,9 @@ class Dashboard(object): async def raylet_info(req) -> aiohttp.web.Response: D = self.raylet_stats.get_raylet_stats() + workers_info = sum((data["workersStats"] for data in D.values()), + []) + actor_tree = self.node_stats.get_actor_tree(workers_info) for address, data in D.items(): available_resources = data["availableResources"] total_resources = data["totalResources"] @@ -171,6 +173,17 @@ class Dashboard(object): extra_info.append("{}: {} / {}".format( resource_name, occupied, total)) data["extraInfo"] = ", ".join(extra_info) + if os.environ.get("RAY_DASHBOARD_DEBUG"): + actor_tree_str = json.dumps(actor_tree, indent=2) + actor_tree_lines = actor_tree_str.split("\n") + max_line_length = max(map(len, actor_tree_lines)) + actor_tree_print = [] + for line in actor_tree_lines: + actor_tree_print.append( + line + (max_line_length - len(line)) * " ") + actor_tree_print = "\n".join(actor_tree_print) + data["extraInfo"] += "\n" + actor_tree_print + D["actorInfo"] = actor_tree return await json_response(result=D) async def logs(req) -> aiohttp.web.Response: @@ -226,6 +239,9 @@ class NodeStats(threading.Thread): redis_address, password=redis_password) self._node_stats = {} + self._addr_to_owner_addr = {} + self._addr_to_actor_id = {} + self._addr_to_extra_info_dict = {} self._node_stats_lock = threading.Lock() # Mapping from IP address to PID to list of log lines @@ -281,6 +297,35 @@ class NodeStats(threading.Thread): "error_counts": self.calculate_error_counts(), } + def get_actor_tree(self, workers_info) -> Dict: + # construct flattened actor tree + flattened_tree = {"root": {"children": {}}} + child_to_parent = {} + with self._node_stats_lock: + for addr, actor_id in self._addr_to_actor_id.items(): + flattened_tree[actor_id] = self._addr_to_extra_info_dict[addr] + flattened_tree[actor_id]["children"] = {} + parent_id = self._addr_to_actor_id.get( + self._addr_to_owner_addr[addr], "root") + child_to_parent[actor_id] = parent_id + + for worker_info in workers_info: + if "coreWorkerStats" in worker_info: + core_worker_stats = worker_info["coreWorkerStats"] + addr = (core_worker_stats["ipAddress"], + core_worker_stats["port"]) + if addr in self._addr_to_actor_id: + actor_id = self._addr_to_actor_id[addr] + if "currentTaskDesc" in core_worker_stats: + core_worker_stats.pop("currentTaskDesc") + flattened_tree[actor_id].update(core_worker_stats) + + # construct actor tree + actor_tree = flattened_tree + for actor_id, parent_id in child_to_parent.items(): + actor_tree[parent_id]["children"][actor_id] = actor_tree[actor_id] + return actor_tree["root"]["children"] + def get_logs(self, hostname, pid): ip = self._node_stats.get(hostname, {"ip": None})["ip"] logs = self._logs.get(ip, {}) @@ -309,6 +354,10 @@ class NodeStats(threading.Thread): p.subscribe(error_channel) logger.info("NodeStats: subscribed to {}".format(error_channel)) + actor_channel = ray.gcs_utils.TablePubsub.Value("ACTOR_PUBSUB") + p.subscribe(actor_channel) + logger.info("NodeStats: subscribed to {}".format(actor_channel)) + for x in p.listen(): try: with self._node_stats_lock: @@ -334,6 +383,21 @@ class NodeStats(threading.Thread): "timestamp": error_data.timestamp, "type": error_data.type }) + elif channel == str(actor_channel): + gcs_entry = ray.gcs_utils.GcsEntry.FromString(data) + actor_data = ray.gcs_utils.ActorTableData.FromString( + gcs_entry.entries[0]) + addr = (str(actor_data.address.ip_address), + str(actor_data.address.port)) + owner_addr = (str(actor_data.owner_address.ip_address), + str(actor_data.owner_address.port)) + self._addr_to_owner_addr[addr] = owner_addr + self._addr_to_actor_id[addr] = ray.utils.binary_to_hex( + actor_data.actor_id) + self._addr_to_extra_info_dict[addr] = { + "job_id": ray.utils.binary_to_hex( + actor_data.job_id) + } else: data = json.loads(ray.utils.decode(data)) self._node_stats[data["hostname"]] = data diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 2aed699c8..53f0b67ff 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -4,6 +4,7 @@ from __future__ import print_function from ray.core.generated.gcs_pb2 import ( ActorCheckpointIdData, + ActorTableData, GcsNodeInfo, JobTableData, ErrorTableData, @@ -21,6 +22,7 @@ from ray.core.generated.gcs_pb2 import ( __all__ = [ "ActorCheckpointIdData", + "ActorTableData", "GcsNodeInfo", "JobTableData", "ErrorTableData", diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 5954de6f9..bc3292c0c 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -5,6 +5,7 @@ from __future__ import print_function import os import grpc import psutil +import requests import time import ray @@ -60,11 +61,12 @@ def test_worker_stats(shutdown_only): reply = try_get_node_stats() target_worker_present = False for worker in reply.workers_stats: - if worker.webui_display == "test": + stats = worker.core_worker_stats + if stats.webui_display == "test": target_worker_present = True assert worker.pid == worker_pid else: - assert worker.webui_display == "" + assert stats.webui_display == "" assert target_worker_present # Test show_in_webui for remote actors. @@ -73,11 +75,12 @@ def test_worker_stats(shutdown_only): reply = try_get_node_stats() target_worker_present = False for worker in reply.workers_stats: - if worker.webui_display == "test": + stats = worker.core_worker_stats + if stats.webui_display == "test": target_worker_present = True assert worker.pid == worker_pid else: - assert worker.webui_display == "" + assert stats.webui_display == "" assert target_worker_present timeout_seconds = 20 @@ -94,7 +97,6 @@ def test_worker_stats(shutdown_only): continue # Check that the rest of the processes are workers, 1 for each CPU. - print(reply) assert len(reply.workers_stats) == num_cpus + 1 views = [view.view_name for view in reply.view_data] assert "redis_latency" in views @@ -112,6 +114,64 @@ def test_worker_stats(shutdown_only): break +def test_raylet_info_endpoint(shutdown_only): + addresses = ray.init(include_webui=True, num_cpus=6) + + @ray.remote(num_cpus=1) + class A(object): + def __init__(self): + pass + + def f(self): + return os.getpid() + + @ray.remote(num_cpus=2) + class B(object): + def __init__(self): + self.children = [A.remote(), A.remote()] + + def f(self): + return os.getpid(), ray.get( + [child.f.remote() for child in self.children]) + + # TODO: Currently there is a race condition of Dashboard subscription + # and actor initialization. This will be fixed after #6629 is merged. + time.sleep(10) + + b = B.remote() + pids = ray.get(b.f.remote()) + assert len(pids) == 2 and len(pids[1]) == 2 + + start_time = time.time() + while True: + time.sleep(1) + try: + webui_url = addresses["webui_url"] + webui_url = webui_url.replace("localhost", "http://127.0.0.1") + raylet_info = requests.get(webui_url + "/api/raylet_info").json() + actor_info = raylet_info["result"]["actorInfo"] + try: + assert len(actor_info) == 1 + print("actor_info", actor_info) + _, parent_actor_info = actor_info.popitem() + children = parent_actor_info["children"] + assert len(children) == 2 + break + except AssertionError: + if time.time() > start_time + 30: + raise Exception( + "Timed out while waiting for actorInfo to show up.") + except requests.exceptions.ConnectionError: + if time.time() > start_time + 30: + raise Exception( + "Timed out while waiting for dashboard to start.") + + assert parent_actor_info["usedResources"]["CPU"] == 2 + for _, child_actor_info in children.items(): + assert len(child_actor_info["children"]) == 0 + assert child_actor_info["usedResources"]["CPU"] == 1 + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 0156b472e..2375d3ab8 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1118,13 +1118,27 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & rpc::GetCoreWorkerStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { absl::MutexLock lock(&mutex_); - reply->set_webui_display(webui_display_); auto stats = reply->mutable_core_worker_stats(); stats->set_num_pending_tasks(task_manager_->NumPendingTasks()); stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope()); if (!current_task_.TaskId().IsNil()) { stats->set_current_task_desc(current_task_.DebugString()); + for (auto const it : current_task_.FunctionDescriptor()) { + stats->add_current_task_func_desc(it); + } } + stats->set_ip_address(rpc_address_.ip_address()); + stats->set_port(rpc_address_.port()); + stats->set_actor_id(actor_id_.Binary()); + auto used_resources_map = stats->mutable_used_resources(); + for (auto const &it : *resource_ids_) { + double quantity = 0; + for (auto const &pair : it.second) { + quantity += pair.second; + } + (*used_resources_map)[it.first] = quantity; + } + stats->set_webui_display(webui_display_); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 35214840b..a5cefa6f8 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -170,4 +170,16 @@ message CoreWorkerStats { int32 num_pending_tasks = 2; // Number of object ids in local scope. int32 num_object_ids_in_scope = 3; + // Function descriptor of the currently executing task. + repeated string current_task_func_desc = 4; + // IP address of the core worker. + string ip_address = 6; + // Port of the core worker. + int64 port = 7; + // Actor ID. + bytes actor_id = 8; + // A map from the resource name (e.g. "CPU") to the amount of resource used. + map used_resources = 9; + // A string displayed on Dashboard. + string webui_display = 10; } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 8f9dd5439..36e610dfd 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -131,10 +131,8 @@ message GetCoreWorkerStatsRequest { } message GetCoreWorkerStatsReply { - // String displayed on Web UI. - string webui_display = 1; // Debug information returned from the core worker. - CoreWorkerStats core_worker_stats = 2; + CoreWorkerStats core_worker_stats = 1; } service CoreWorkerService { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index cfbe0e591..f4988cd6a 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -53,10 +53,8 @@ message WorkerStats { uint32 pid = 1; // Whether this is a driver. bool is_driver = 2; - // String displayed on Web UI. - string webui_display = 3; // Debug information returned from the core worker. - CoreWorkerStats core_worker_stats = 4; + CoreWorkerStats core_worker_stats = 3; } message ViewData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7bf9316fe..07b6785d4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3012,7 +3012,6 @@ void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request, worker_stats->set_pid(worker->Pid()); worker_stats->set_is_driver(false); reply->set_num_workers(reply->num_workers() + 1); - worker_stats->set_webui_display(r.webui_display()); worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); if (reply->num_workers() == all_workers.size()) { send_reply_callback(Status::OK(), nullptr, nullptr);