From 8a0a30b5f01d8acb928c12e7fc545b86d9c24618 Mon Sep 17 00:00:00 2001 From: Yunzhi Zhang <35828389+zzyunzhi@users.noreply.github.com> Date: Thu, 2 Jan 2020 14:27:59 -0800 Subject: [PATCH] [Dashboard] display actor status and infeasible tasks (#6652) * expose actor status and protobuf message of infeasible tasks * move infeasible tasks into actor tree * add pytest for displaying infeasible tasks info * fix base64 decoding * fix race condition after #6629 merged --- python/ray/dashboard/dashboard.py | 90 +++++++++++++++++++++++------ python/ray/state.py | 4 +- python/ray/tests/test_metrics.py | 32 +++++----- src/ray/protobuf/node_manager.proto | 1 + src/ray/raylet/node_manager.cc | 4 ++ 5 files changed, 95 insertions(+), 36 deletions(-) diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index fa98d1d01..c7deba529 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -21,6 +21,7 @@ import time import traceback import yaml +from base64 import b64decode from collections import defaultdict from operator import itemgetter from typing import Dict @@ -57,6 +58,20 @@ def format_resource(resource_name, quantity): return "{}".format(round_resource_value(quantity)) +def format_reply(reply): + if isinstance(reply, dict): + for k, v in reply.items(): + if isinstance(v, dict) or isinstance(v, list): + format_reply(v) + else: + if k.endswith("Id"): + v = b64decode(v) + reply[k] = ray.utils.binary_to_hex(v) + elif isinstance(reply, list): + for item in reply: + format_reply(item) + + class Dashboard(object): """A dashboard process for monitoring Ray nodes. @@ -158,9 +173,12 @@ class Dashboard(object): async def raylet_info(req) -> aiohttp.web.Response: D = self.raylet_stats.get_raylet_stats() - workers_info = sum((data["workersStats"] for data in D.values()), - []) - actor_tree = self.node_stats.get_actor_tree(workers_info) + workers_info = sum( + (data.get("workersStats", []) for data in D.values()), []) + infeasible_tasks = sum( + (data.get("infeasibleTasks", []) for data in D.values()), []) + actor_tree = self.node_stats.get_actor_tree( + workers_info, infeasible_tasks) for address, data in D.items(): available_resources = data["availableResources"] total_resources = data["totalResources"] @@ -174,16 +192,17 @@ class Dashboard(object): resource_name, occupied, total)) data["extraInfo"] = ", ".join(extra_info) if os.environ.get("RAY_DASHBOARD_DEBUG"): - actor_tree_str = json.dumps(actor_tree, indent=2) - actor_tree_lines = actor_tree_str.split("\n") - max_line_length = max(map(len, actor_tree_lines)) - actor_tree_print = [] - for line in actor_tree_lines: - actor_tree_print.append( - line + (max_line_length - len(line)) * " ") - actor_tree_print = "\n".join(actor_tree_print) - data["extraInfo"] += "\n" + actor_tree_print + actor_tree_str = json.dumps( + actor_tree, indent=2, sort_keys=True) + lines = actor_tree_str.split("\n") + max_line_length = max(map(len, lines)) + to_print = [] + for line in lines: + to_print.append(line + + (max_line_length - len(line)) * " ") + data["extraInfo"] += "\n" + "\n".join(to_print) D["actorInfo"] = actor_tree + D["infeasibleTasks"] = infeasible_tasks return await json_response(result=D) async def logs(req) -> aiohttp.web.Response: @@ -298,7 +317,7 @@ class NodeStats(threading.Thread): "error_counts": self.calculate_error_counts(), } - def get_actor_tree(self, workers_info) -> Dict: + def get_actor_tree(self, workers_info, infeasible_tasks) -> Dict: # construct flattened actor tree flattened_tree = {"root": {"children": {}}} child_to_parent = {} @@ -314,13 +333,31 @@ class NodeStats(threading.Thread): if "coreWorkerStats" in worker_info: core_worker_stats = worker_info["coreWorkerStats"] addr = (core_worker_stats["ipAddress"], - core_worker_stats["port"]) + str(core_worker_stats["port"])) if addr in self._addr_to_actor_id: actor_id = self._addr_to_actor_id[addr] if "currentTaskDesc" in core_worker_stats: core_worker_stats.pop("currentTaskDesc") + if "numPendingTasks" in core_worker_stats: + core_worker_stats.pop("numPendingTasks") + format_reply(core_worker_stats) flattened_tree[actor_id].update(core_worker_stats) + for infeasible_task in infeasible_tasks: + actor_id = ray.utils.binary_to_hex( + b64decode( + infeasible_task["actorCreationTaskSpec"]["actorId"])) + caller_addr = (infeasible_task["callerAddress"]["ipAddress"], + str(infeasible_task["callerAddress"]["port"])) + caller_id = self._addr_to_actor_id.get(caller_addr, "root") + child_to_parent[actor_id] = caller_id + infeasible_task["state"] = -1 + infeasible_task["functionDescriptor"] = list( + map(lambda desc: b64decode(desc).decode("utf-8"), + infeasible_task["functionDescriptor"])) + format_reply(infeasible_tasks) + flattened_tree[actor_id] = infeasible_task + # construct actor tree actor_tree = flattened_tree for actor_id, parent_id in child_to_parent.items(): @@ -359,6 +396,21 @@ class NodeStats(threading.Thread): p.subscribe(actor_channel) logger.info("NodeStats: subscribed to {}".format(actor_channel)) + current_actor_table = ray.actors() + with self._node_stats_lock: + for actor_data in current_actor_table.values(): + addr = (actor_data["Address"]["IPAddress"], + str(actor_data["Address"]["Port"])) + owner_addr = (actor_data["OwnerAddress"]["IPAddress"], + str(actor_data["OwnerAddress"]["Port"])) + self._addr_to_owner_addr[addr] = owner_addr + self._addr_to_actor_id[addr] = actor_data["ActorID"] + self._addr_to_extra_info_dict[addr] = { + "jobId": actor_data["JobID"], + "state": actor_data["State"], + "isDirectCall": actor_data["IsDirectCall"], + } + for x in p.listen(): try: with self._node_stats_lock: @@ -388,16 +440,18 @@ class NodeStats(threading.Thread): gcs_entry = ray.gcs_utils.GcsEntry.FromString(data) actor_data = ray.gcs_utils.ActorTableData.FromString( gcs_entry.entries[0]) - addr = (str(actor_data.address.ip_address), + addr = (actor_data.address.ip_address, str(actor_data.address.port)) - owner_addr = (str(actor_data.owner_address.ip_address), + owner_addr = (actor_data.owner_address.ip_address, str(actor_data.owner_address.port)) self._addr_to_owner_addr[addr] = owner_addr self._addr_to_actor_id[addr] = ray.utils.binary_to_hex( actor_data.actor_id) self._addr_to_extra_info_dict[addr] = { - "job_id": ray.utils.binary_to_hex( - actor_data.job_id) + "jobId": ray.utils.binary_to_hex( + actor_data.job_id), + "state": actor_data.state, + "isDirectCall": actor_data.is_direct_call, } else: data = json.loads(ray.utils.decode(data)) diff --git a/python/ray/state.py b/python/ray/state.py index 4ec9f4702..ad56392e6 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -328,6 +328,7 @@ class GlobalState(object): gcs_entries.entries[0]) actor_info = { + "ActorID": binary_to_hex(actor_table_data.actor_id), "JobID": binary_to_hex(actor_table_data.job_id), "Address": { "IPAddress": actor_table_data.address.ip_address, @@ -337,7 +338,8 @@ class GlobalState(object): "IPAddress": actor_table_data.owner_address.ip_address, "Port": actor_table_data.owner_address.port }, - "IsDirectCall": actor_table_data.is_direct_call + "IsDirectCall": actor_table_data.is_direct_call, + "State": actor_table_data.state, } return actor_info diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 8aa756142..473a41fb4 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -118,29 +118,24 @@ def test_raylet_info_endpoint(shutdown_only): addresses = ray.init(include_webui=True, num_cpus=6) @ray.remote(num_cpus=1) - class A(object): + class ActorA(object): def __init__(self): pass def f(self): return os.getpid() - @ray.remote(num_cpus=2) - class B(object): + @ray.remote(resources={"CustomResource": 1}) + class ActorB(object): def __init__(self): - self.children = [A.remote(), A.remote()] + pass - def f(self): - return os.getpid(), ray.get( - [child.f.remote() for child in self.children]) + @ray.remote(num_cpus=2) + class ActorC(object): + def __init__(self): + self.children = [ActorA.remote(), ActorB.remote()] - # TODO: Currently there is a race condition of Dashboard subscription - # and actor initialization. This will be fixed after #6629 is merged. - time.sleep(10) - - b = B.remote() - pids = ray.get(b.f.remote()) - assert len(pids) == 2 and len(pids[1]) == 2 + _ = ActorC.remote() start_time = time.time() while True: @@ -152,7 +147,6 @@ def test_raylet_info_endpoint(shutdown_only): actor_info = raylet_info["result"]["actorInfo"] try: assert len(actor_info) == 1 - print("actor_info", actor_info) _, parent_actor_info = actor_info.popitem() children = parent_actor_info["children"] assert len(children) == 2 @@ -168,8 +162,12 @@ def test_raylet_info_endpoint(shutdown_only): assert parent_actor_info["usedResources"]["CPU"] == 2 for _, child_actor_info in children.items(): - assert len(child_actor_info["children"]) == 0 - assert child_actor_info["usedResources"]["CPU"] == 1 + if child_actor_info["state"] == -1: + assert child_actor_info["requiredResources"]["CustomResource"] == 1 + else: + assert child_actor_info["state"] == 0 + assert len(child_actor_info["children"]) == 0 + assert child_actor_info["usedResources"]["CPU"] == 1 if __name__ == "__main__": diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index bdfc8c372..6ef1b8d7b 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -87,6 +87,7 @@ message GetNodeStatsReply { map available_resources = 3; map total_resources = 4; uint32 num_workers = 5; + repeated TaskSpec infeasible_tasks = 6; } // Service for inter-node-manager communication. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c6c4fc910..788bdf581 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3018,6 +3018,10 @@ void NodeManager::HandleNodeStatsRequest(const rpc::GetNodeStatsRequest &request (*available_resources_map)[pair.first] = 0.0; } } + for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { + auto infeasible_task = reply->add_infeasible_tasks(); + infeasible_task->ParseFromString(task.GetTaskSpecification().Serialize()); + } // Ensure we never report an empty set of metrics. if (!recorded_metrics_) { RecordMetrics();