[Dashboard] Display memory usage of nodes and core workers (#6671)

This commit is contained in:
Yunzhi Zhang
2020-01-03 20:12:42 -08:00
committed by Philipp Moritz
parent fd379934b6
commit 816b84808d
10 changed files with 161 additions and 66 deletions
+66 -15
View File
@@ -72,7 +72,18 @@ def format_reply(reply):
format_reply(item)
class Dashboard:
def measures_to_dict(measures):
measures_dict = {}
for measure in measures:
tags = measure["tags"].split(",")[-1]
if "intValue" in measure:
measures_dict[tags] = measure["intValue"]
elif "doubleValue" in measure:
measures_dict[tags] = measure["doubleValue"]
return measures_dict
class Dashboard(object):
"""A dashboard process for monitoring Ray nodes.
This dashboard is made up of a REST API which collates data published by
@@ -180,18 +191,43 @@ class Dashboard:
actor_tree = self.node_stats.get_actor_tree(
workers_info, infeasible_tasks)
for address, data in D.items():
available_resources = data["availableResources"]
total_resources = data["totalResources"]
extra_info = []
for resource_name in sorted(available_resources.keys()):
total = total_resources[resource_name]
occupied = total - available_resources[resource_name]
total = format_resource(resource_name, total)
occupied = format_resource(resource_name, occupied)
extra_info.append("{}: {} / {}".format(
resource_name, occupied, total))
data["extraInfo"] = ", ".join(extra_info)
# process view data
measures_dicts = {}
for view_data in data["viewData"]:
view_name = view_data["viewName"]
if view_name in ("local_available_resource",
"local_total_resource",
"object_manager_stats"):
measures_dicts[view_name] = measures_to_dict(
view_data["measures"])
# process resources info
extra_info_strings = []
prefix = "ResourceName:"
for resource_name, total_resource in measures_dicts[
"local_total_resource"].items():
available_resource = measures_dicts[
"local_available_resource"].get(resource_name, .0)
resource_name = resource_name[len(prefix):]
extra_info_strings.append("{}: {} / {}".format(
resource_name,
format_resource(resource_name,
total_resource - available_resource),
format_resource(resource_name, total_resource)))
data["extraInfo"] = ",".join(extra_info_strings) + "\n"
if os.environ.get("RAY_DASHBOARD_DEBUG"):
# process object store info
extra_info_strings = []
prefix = "ValueType:"
for stats_name in [
"used_object_store_memory", "num_local_objects"
]:
stats_value = measures_dicts[
"object_manager_stats"].get(
prefix + stats_name, .0)
extra_info_strings.append("{}: {}".format(
stats_name, stats_value))
data["extraInfo"] += ", ".join(extra_info_strings)
# process actor info
actor_tree_str = json.dumps(
actor_tree, indent=2, sort_keys=True)
lines = actor_tree_str.split("\n")
@@ -202,7 +238,6 @@ class Dashboard:
(max_line_length - len(line)) * " ")
data["extraInfo"] += "\n" + "\n".join(to_print)
D["actorInfo"] = actor_tree
D["infeasibleTasks"] = infeasible_tasks
return await json_response(result=D)
async def logs(req) -> aiohttp.web.Response:
@@ -263,6 +298,21 @@ class NodeStats(threading.Thread):
self._addr_to_extra_info_dict = {}
self._node_stats_lock = threading.Lock()
self._default_info = {
"actorId": "",
"children": {},
"ipAddress": "",
"isDirectCall": False,
"jobId": "",
"numLocalObjects": 0,
"numObjectIdsInScope": 0,
"port": 0,
"state": 0,
"taskQueueLength": 0,
"usedObjectStoreMemory": 0,
"usedResources": {},
}
# Mapping from IP address to PID to list of log lines
self._logs = defaultdict(lambda: defaultdict(list))
@@ -323,8 +373,9 @@ class NodeStats(threading.Thread):
child_to_parent = {}
with self._node_stats_lock:
for addr, actor_id in self._addr_to_actor_id.items():
flattened_tree[actor_id] = self._addr_to_extra_info_dict[addr]
flattened_tree[actor_id]["children"] = {}
flattened_tree[actor_id] = copy.deepcopy(self._default_info)
flattened_tree[actor_id].update(
self._addr_to_extra_info_dict[addr])
parent_id = self._addr_to_actor_id.get(
self._addr_to_owner_addr[addr], "root")
child_to_parent[actor_id] = parent_id
+17 -6
View File
@@ -117,14 +117,15 @@ def test_worker_stats(shutdown_only):
def test_raylet_info_endpoint(shutdown_only):
addresses = ray.init(include_webui=True, num_cpus=6)
@ray.remote
def f():
return "test"
@ray.remote(num_cpus=1)
class ActorA:
def __init__(self):
pass
def f(self):
return os.getpid()
@ray.remote(resources={"CustomResource": 1})
class ActorB:
def __init__(self):
@@ -135,7 +136,15 @@ def test_raylet_info_endpoint(shutdown_only):
def __init__(self):
self.children = [ActorA.remote(), ActorB.remote()]
_ = ActorC.remote()
def local_store(self):
self.local_storage = [f.remote() for _ in range(10)]
def remote_store(self):
self.remote_storage = ray.put("test")
c = ActorC.remote()
c.local_store.remote()
c.remote_store.remote()
start_time = time.time()
while True:
@@ -148,13 +157,15 @@ def test_raylet_info_endpoint(shutdown_only):
try:
assert len(actor_info) == 1
_, parent_actor_info = actor_info.popitem()
assert parent_actor_info["numObjectIdsInScope"] == 11
assert parent_actor_info["numLocalObjects"] == 10
children = parent_actor_info["children"]
assert len(children) == 2
break
except AssertionError:
if time.time() > start_time + 30:
raise Exception(
"Timed out while waiting for actorInfo to show up.")
raise Exception("Timed out while waiting for actor info \
or object store info update.")
except requests.exceptions.ConnectionError:
if time.time() > start_time + 30:
raise Exception(