[Dashboard] display actor status and infeasible tasks (#6652)

* expose actor status and protobuf message of infeasible tasks

* move infeasible tasks into actor tree

* add pytest for displaying infeasible tasks info

* fix base64 decoding

* fix race condition after #6629 merged
This commit is contained in:
Yunzhi Zhang
2020-01-02 14:27:59 -08:00
committed by Philipp Moritz
parent 895f2727fb
commit 8a0a30b5f0
5 changed files with 95 additions and 36 deletions
+72 -18
View File
@@ -21,6 +21,7 @@ import time
import traceback
import yaml
from base64 import b64decode
from collections import defaultdict
from operator import itemgetter
from typing import Dict
@@ -57,6 +58,20 @@ def format_resource(resource_name, quantity):
return "{}".format(round_resource_value(quantity))
def format_reply(reply):
if isinstance(reply, dict):
for k, v in reply.items():
if isinstance(v, dict) or isinstance(v, list):
format_reply(v)
else:
if k.endswith("Id"):
v = b64decode(v)
reply[k] = ray.utils.binary_to_hex(v)
elif isinstance(reply, list):
for item in reply:
format_reply(item)
class Dashboard(object):
"""A dashboard process for monitoring Ray nodes.
@@ -158,9 +173,12 @@ class Dashboard(object):
async def raylet_info(req) -> aiohttp.web.Response:
D = self.raylet_stats.get_raylet_stats()
workers_info = sum((data["workersStats"] for data in D.values()),
[])
actor_tree = self.node_stats.get_actor_tree(workers_info)
workers_info = sum(
(data.get("workersStats", []) for data in D.values()), [])
infeasible_tasks = sum(
(data.get("infeasibleTasks", []) for data in D.values()), [])
actor_tree = self.node_stats.get_actor_tree(
workers_info, infeasible_tasks)
for address, data in D.items():
available_resources = data["availableResources"]
total_resources = data["totalResources"]
@@ -174,16 +192,17 @@ class Dashboard(object):
resource_name, occupied, total))
data["extraInfo"] = ", ".join(extra_info)
if os.environ.get("RAY_DASHBOARD_DEBUG"):
actor_tree_str = json.dumps(actor_tree, indent=2)
actor_tree_lines = actor_tree_str.split("\n")
max_line_length = max(map(len, actor_tree_lines))
actor_tree_print = []
for line in actor_tree_lines:
actor_tree_print.append(
line + (max_line_length - len(line)) * " ")
actor_tree_print = "\n".join(actor_tree_print)
data["extraInfo"] += "\n" + actor_tree_print
actor_tree_str = json.dumps(
actor_tree, indent=2, sort_keys=True)
lines = actor_tree_str.split("\n")
max_line_length = max(map(len, lines))
to_print = []
for line in lines:
to_print.append(line +
(max_line_length - len(line)) * " ")
data["extraInfo"] += "\n" + "\n".join(to_print)
D["actorInfo"] = actor_tree
D["infeasibleTasks"] = infeasible_tasks
return await json_response(result=D)
async def logs(req) -> aiohttp.web.Response:
@@ -298,7 +317,7 @@ class NodeStats(threading.Thread):
"error_counts": self.calculate_error_counts(),
}
def get_actor_tree(self, workers_info) -> Dict:
def get_actor_tree(self, workers_info, infeasible_tasks) -> Dict:
# construct flattened actor tree
flattened_tree = {"root": {"children": {}}}
child_to_parent = {}
@@ -314,13 +333,31 @@ class NodeStats(threading.Thread):
if "coreWorkerStats" in worker_info:
core_worker_stats = worker_info["coreWorkerStats"]
addr = (core_worker_stats["ipAddress"],
core_worker_stats["port"])
str(core_worker_stats["port"]))
if addr in self._addr_to_actor_id:
actor_id = self._addr_to_actor_id[addr]
if "currentTaskDesc" in core_worker_stats:
core_worker_stats.pop("currentTaskDesc")
if "numPendingTasks" in core_worker_stats:
core_worker_stats.pop("numPendingTasks")
format_reply(core_worker_stats)
flattened_tree[actor_id].update(core_worker_stats)
for infeasible_task in infeasible_tasks:
actor_id = ray.utils.binary_to_hex(
b64decode(
infeasible_task["actorCreationTaskSpec"]["actorId"]))
caller_addr = (infeasible_task["callerAddress"]["ipAddress"],
str(infeasible_task["callerAddress"]["port"]))
caller_id = self._addr_to_actor_id.get(caller_addr, "root")
child_to_parent[actor_id] = caller_id
infeasible_task["state"] = -1
infeasible_task["functionDescriptor"] = list(
map(lambda desc: b64decode(desc).decode("utf-8"),
infeasible_task["functionDescriptor"]))
format_reply(infeasible_tasks)
flattened_tree[actor_id] = infeasible_task
# construct actor tree
actor_tree = flattened_tree
for actor_id, parent_id in child_to_parent.items():
@@ -359,6 +396,21 @@ class NodeStats(threading.Thread):
p.subscribe(actor_channel)
logger.info("NodeStats: subscribed to {}".format(actor_channel))
current_actor_table = ray.actors()
with self._node_stats_lock:
for actor_data in current_actor_table.values():
addr = (actor_data["Address"]["IPAddress"],
str(actor_data["Address"]["Port"]))
owner_addr = (actor_data["OwnerAddress"]["IPAddress"],
str(actor_data["OwnerAddress"]["Port"]))
self._addr_to_owner_addr[addr] = owner_addr
self._addr_to_actor_id[addr] = actor_data["ActorID"]
self._addr_to_extra_info_dict[addr] = {
"jobId": actor_data["JobID"],
"state": actor_data["State"],
"isDirectCall": actor_data["IsDirectCall"],
}
for x in p.listen():
try:
with self._node_stats_lock:
@@ -388,16 +440,18 @@ class NodeStats(threading.Thread):
gcs_entry = ray.gcs_utils.GcsEntry.FromString(data)
actor_data = ray.gcs_utils.ActorTableData.FromString(
gcs_entry.entries[0])
addr = (str(actor_data.address.ip_address),
addr = (actor_data.address.ip_address,
str(actor_data.address.port))
owner_addr = (str(actor_data.owner_address.ip_address),
owner_addr = (actor_data.owner_address.ip_address,
str(actor_data.owner_address.port))
self._addr_to_owner_addr[addr] = owner_addr
self._addr_to_actor_id[addr] = ray.utils.binary_to_hex(
actor_data.actor_id)
self._addr_to_extra_info_dict[addr] = {
"job_id": ray.utils.binary_to_hex(
actor_data.job_id)
"jobId": ray.utils.binary_to_hex(
actor_data.job_id),
"state": actor_data.state,
"isDirectCall": actor_data.is_direct_call,
}
else:
data = json.loads(ray.utils.decode(data))
+3 -1
View File
@@ -328,6 +328,7 @@ class GlobalState(object):
gcs_entries.entries[0])
actor_info = {
"ActorID": binary_to_hex(actor_table_data.actor_id),
"JobID": binary_to_hex(actor_table_data.job_id),
"Address": {
"IPAddress": actor_table_data.address.ip_address,
@@ -337,7 +338,8 @@ class GlobalState(object):
"IPAddress": actor_table_data.owner_address.ip_address,
"Port": actor_table_data.owner_address.port
},
"IsDirectCall": actor_table_data.is_direct_call
"IsDirectCall": actor_table_data.is_direct_call,
"State": actor_table_data.state,
}
return actor_info
+15 -17
View File
@@ -118,29 +118,24 @@ def test_raylet_info_endpoint(shutdown_only):
addresses = ray.init(include_webui=True, num_cpus=6)
@ray.remote(num_cpus=1)
class A(object):
class ActorA(object):
def __init__(self):
pass
def f(self):
return os.getpid()
@ray.remote(num_cpus=2)
class B(object):
@ray.remote(resources={"CustomResource": 1})
class ActorB(object):
def __init__(self):
self.children = [A.remote(), A.remote()]
pass
def f(self):
return os.getpid(), ray.get(
[child.f.remote() for child in self.children])
@ray.remote(num_cpus=2)
class ActorC(object):
def __init__(self):
self.children = [ActorA.remote(), ActorB.remote()]
# TODO: Currently there is a race condition of Dashboard subscription
# and actor initialization. This will be fixed after #6629 is merged.
time.sleep(10)
b = B.remote()
pids = ray.get(b.f.remote())
assert len(pids) == 2 and len(pids[1]) == 2
_ = ActorC.remote()
start_time = time.time()
while True:
@@ -152,7 +147,6 @@ def test_raylet_info_endpoint(shutdown_only):
actor_info = raylet_info["result"]["actorInfo"]
try:
assert len(actor_info) == 1
print("actor_info", actor_info)
_, parent_actor_info = actor_info.popitem()
children = parent_actor_info["children"]
assert len(children) == 2
@@ -168,8 +162,12 @@ def test_raylet_info_endpoint(shutdown_only):
assert parent_actor_info["usedResources"]["CPU"] == 2
for _, child_actor_info in children.items():
assert len(child_actor_info["children"]) == 0
assert child_actor_info["usedResources"]["CPU"] == 1
if child_actor_info["state"] == -1:
assert child_actor_info["requiredResources"]["CustomResource"] == 1
else:
assert child_actor_info["state"] == 0
assert len(child_actor_info["children"]) == 0
assert child_actor_info["usedResources"]["CPU"] == 1
if __name__ == "__main__":