[Dashboard] Logical view backend for dashboard (#6590)

This commit is contained in:
Yunzhi Zhang
2019-12-30 13:08:08 -08:00
committed by Philipp Moritz
parent 8b16847c02
commit 65acb54553
9 changed files with 162 additions and 15 deletions
@@ -38,7 +38,7 @@ const styles = (theme: Theme) =>
},
extraInfo: {
fontFamily: "SFMono-Regular,Consolas,Liberation Mono,Menlo,monospace",
whiteSpace: "pre-wrap"
whiteSpace: "pre"
}
});
+65 -1
View File
@@ -31,7 +31,6 @@ import ray
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
import ray.ray_constants as ray_constants
import ray.utils
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
@@ -159,6 +158,9 @@ class Dashboard(object):
async def raylet_info(req) -> aiohttp.web.Response:
D = self.raylet_stats.get_raylet_stats()
workers_info = sum((data["workersStats"] for data in D.values()),
[])
actor_tree = self.node_stats.get_actor_tree(workers_info)
for address, data in D.items():
available_resources = data["availableResources"]
total_resources = data["totalResources"]
@@ -171,6 +173,17 @@ class Dashboard(object):
extra_info.append("{}: {} / {}".format(
resource_name, occupied, total))
data["extraInfo"] = ", ".join(extra_info)
if os.environ.get("RAY_DASHBOARD_DEBUG"):
actor_tree_str = json.dumps(actor_tree, indent=2)
actor_tree_lines = actor_tree_str.split("\n")
max_line_length = max(map(len, actor_tree_lines))
actor_tree_print = []
for line in actor_tree_lines:
actor_tree_print.append(
line + (max_line_length - len(line)) * " ")
actor_tree_print = "\n".join(actor_tree_print)
data["extraInfo"] += "\n" + actor_tree_print
D["actorInfo"] = actor_tree
return await json_response(result=D)
async def logs(req) -> aiohttp.web.Response:
@@ -226,6 +239,9 @@ class NodeStats(threading.Thread):
redis_address, password=redis_password)
self._node_stats = {}
self._addr_to_owner_addr = {}
self._addr_to_actor_id = {}
self._addr_to_extra_info_dict = {}
self._node_stats_lock = threading.Lock()
# Mapping from IP address to PID to list of log lines
@@ -281,6 +297,35 @@ class NodeStats(threading.Thread):
"error_counts": self.calculate_error_counts(),
}
def get_actor_tree(self, workers_info) -> Dict:
# construct flattened actor tree
flattened_tree = {"root": {"children": {}}}
child_to_parent = {}
with self._node_stats_lock:
for addr, actor_id in self._addr_to_actor_id.items():
flattened_tree[actor_id] = self._addr_to_extra_info_dict[addr]
flattened_tree[actor_id]["children"] = {}
parent_id = self._addr_to_actor_id.get(
self._addr_to_owner_addr[addr], "root")
child_to_parent[actor_id] = parent_id
for worker_info in workers_info:
if "coreWorkerStats" in worker_info:
core_worker_stats = worker_info["coreWorkerStats"]
addr = (core_worker_stats["ipAddress"],
core_worker_stats["port"])
if addr in self._addr_to_actor_id:
actor_id = self._addr_to_actor_id[addr]
if "currentTaskDesc" in core_worker_stats:
core_worker_stats.pop("currentTaskDesc")
flattened_tree[actor_id].update(core_worker_stats)
# construct actor tree
actor_tree = flattened_tree
for actor_id, parent_id in child_to_parent.items():
actor_tree[parent_id]["children"][actor_id] = actor_tree[actor_id]
return actor_tree["root"]["children"]
def get_logs(self, hostname, pid):
ip = self._node_stats.get(hostname, {"ip": None})["ip"]
logs = self._logs.get(ip, {})
@@ -309,6 +354,10 @@ class NodeStats(threading.Thread):
p.subscribe(error_channel)
logger.info("NodeStats: subscribed to {}".format(error_channel))
actor_channel = ray.gcs_utils.TablePubsub.Value("ACTOR_PUBSUB")
p.subscribe(actor_channel)
logger.info("NodeStats: subscribed to {}".format(actor_channel))
for x in p.listen():
try:
with self._node_stats_lock:
@@ -334,6 +383,21 @@ class NodeStats(threading.Thread):
"timestamp": error_data.timestamp,
"type": error_data.type
})
elif channel == str(actor_channel):
gcs_entry = ray.gcs_utils.GcsEntry.FromString(data)
actor_data = ray.gcs_utils.ActorTableData.FromString(
gcs_entry.entries[0])
addr = (str(actor_data.address.ip_address),
str(actor_data.address.port))
owner_addr = (str(actor_data.owner_address.ip_address),
str(actor_data.owner_address.port))
self._addr_to_owner_addr[addr] = owner_addr
self._addr_to_actor_id[addr] = ray.utils.binary_to_hex(
actor_data.actor_id)
self._addr_to_extra_info_dict[addr] = {
"job_id": ray.utils.binary_to_hex(
actor_data.job_id)
}
else:
data = json.loads(ray.utils.decode(data))
self._node_stats[data["hostname"]] = data