diff --git a/dashboard/consts.py b/dashboard/consts.py index 80c658882..66f738bbb 100644 --- a/dashboard/consts.py +++ b/dashboard/consts.py @@ -8,6 +8,7 @@ UPDATE_NODES_INTERVAL_SECONDS = 5 CONNECT_GCS_INTERVAL_SECONDS = 2 CONNECT_REDIS_INTERNAL_SECONDS = 2 PURGE_DATA_INTERVAL_SECONDS = 60 * 10 +ORGANIZE_DATA_INTERVAL_SECONDS = 2 REDIS_KEY_DASHBOARD = "dashboard" REDIS_KEY_DASHBOARD_RPC = "dashboard_rpc" REDIS_KEY_GCS_SERVER_ADDRESS = "GcsServerAddress" @@ -19,6 +20,11 @@ AIOHTTP_CACHE_MAX_SIZE = 128 AIOHTTP_CACHE_DISABLE_ENVIRONMENT_KEY = "RAY_DASHBOARD_NO_CACHE" # Named signals SIGNAL_NODE_INFO_FETCHED = "node_info_fetched" +SIGNAL_NODE_SUMMARY_FETCHED = "node_summary_fetched" +SIGNAL_WORKER_INFO_FETCHED = "worker_info_fetched" # Default param for RotatingFileHandler LOGGING_ROTATE_BYTES = 100 * 1000 * 1000 # maxBytes LOGGING_ROTATE_BACKUP_COUNT = 5 # backupCount +# Default value for datacenter (the default value in protobuf) +DEFAULT_LANGUAGE = "PYTHON" +DEFAULT_JOB_ID = "ffff" diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 109bdc13e..059704db4 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -1,15 +1,16 @@ import logging import ray.new_dashboard.consts as dashboard_consts import ray.new_dashboard.memory_utils as memory_utils -from collections import defaultdict from ray.new_dashboard.actor_utils import actor_classname_from_task_spec -from ray.new_dashboard.utils import Dict, Signal +from ray.new_dashboard.utils import Dict, Signal, async_loop_forever logger = logging.getLogger(__name__) class GlobalSignals: node_info_fetched = Signal(dashboard_consts.SIGNAL_NODE_INFO_FETCHED) + node_summary_fetched = Signal(dashboard_consts.SIGNAL_NODE_SUMMARY_FETCHED) + worker_info_fetched = Signal(dashboard_consts.SIGNAL_WORKER_INFO_FETCHED) class DataSource: @@ -29,6 +30,16 @@ class DataSource: node_id_to_ip = Dict() # {node id hex(str): hostname(str)} node_id_to_hostname = Dict() + # {node id hex(str): worker list} + node_workers = Dict() + # {node id hex(str): {actor id hex(str): actor table data}} + node_actors = Dict() + # {job id hex(str): worker list} + job_workers = Dict() + # {job id hex(str): {actor id hex(str): actor table data}} + job_actors = Dict() + # {worker id(str): core worker stats} + core_worker_stats = Dict() # {node ip (str): log entries by pid # (dict from pid to list of latest log entries)} ip_and_pid_to_logs = Dict() @@ -39,6 +50,7 @@ class DataSource: class DataOrganizer: @staticmethod + @async_loop_forever(dashboard_consts.PURGE_DATA_INTERVAL_SECONDS) async def purge(): # Purge data that is out of date. # These data sources are maintained by DashboardHead, @@ -60,56 +72,57 @@ class DataOrganizer: DataSource.node_physical_stats.pop(key) @classmethod - async def get_node_actors(cls, node_id): - node_stats = DataSource.node_stats.get(node_id, {}) + @async_loop_forever(dashboard_consts.ORGANIZE_DATA_INTERVAL_SECONDS) + async def organize(cls): + job_workers = {} + node_workers = {} + core_worker_stats = {} + for node_id in DataSource.nodes.keys(): + workers = await cls.get_node_workers(node_id) + for worker in workers: + job_id = worker["jobId"] + job_workers.setdefault(job_id, []).append(worker) + for stats in worker.get("coreWorkerStats", []): + worker_id = stats["workerId"] + core_worker_stats[worker_id] = stats + node_workers[node_id] = workers + DataSource.job_workers.reset(job_workers) + DataSource.node_workers.reset(node_workers) + DataSource.core_worker_stats.reset(core_worker_stats) + + @classmethod + async def get_node_workers(cls, node_id): + workers = [] node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) - worker_id_to_raylet_info = {} - pid_to_worker_id = {} + node_stats = DataSource.node_stats.get(node_id, {}) + # Merge coreWorkerStats (node stats) to workers (node physical stats) + pid_to_worker_stats = {} + pid_to_language = {} + pid_to_job_id = {} + for core_worker_stats in node_stats.get("coreWorkersStats", []): + pid = core_worker_stats["pid"] + pid_to_worker_stats.setdefault(pid, []).append(core_worker_stats) + pid_to_language[pid] = core_worker_stats["language"] + pid_to_job_id[pid] = core_worker_stats["jobId"] + for worker in node_physical_stats.get("workers", []): + worker = dict(worker) + pid = worker["pid"] + worker["coreWorkerStats"] = pid_to_worker_stats.get(pid, []) + worker["language"] = pid_to_language.get( + pid, dashboard_consts.DEFAULT_LANGUAGE) + worker["jobId"] = pid_to_job_id.get( + pid, dashboard_consts.DEFAULT_JOB_ID) - for worker_stats in node_stats.get("workersStats", []): - worker_id_to_raylet_info[worker_stats["workerId"]] = worker_stats - pid_to_worker_id[worker_stats["pid"]] = worker_stats["workerId"] - worker_id_to_process_info = {} + await GlobalSignals.worker_info_fetched.send(node_id, worker) - for process_stats in node_physical_stats.get("workers"): - if process_stats["pid"] in pid_to_worker_id: - worker_id = pid_to_worker_id[process_stats["pid"]] - worker_id_to_process_info[worker_id] = process_stats - - worker_id_to_gpu_stats = defaultdict(list) - for gpu_stats in node_physical_stats.get("gpus"): - for process in gpu_stats.get("processes", []): - if process["pid"] in pid_to_worker_id: - worker_id = pid_to_worker_id[process["pid"]] - worker_id_to_gpu_stats[worker_id].append(gpu_stats) - - node_actors = {} - for actor_id, actor_table_data in DataSource.actors.items(): - worker_id = actor_table_data["address"]["workerId"] - if worker_id in worker_id_to_raylet_info: - worker_raylet_stats = worker_id_to_raylet_info[worker_id] - core_worker = worker_raylet_stats.get("coreWorkerStats", {}) - actor_constructor = core_worker.get( - "actorTitle", "Unknown actor constructor") - - actor_table_data["actorConstructor"] = actor_constructor - - actor_class = actor_classname_from_task_spec( - actor_table_data.get("taskSpec", {})) - - actor_table_data["actorClass"] = actor_class - actor_table_data.update(core_worker) - node_actors[actor_id] = actor_table_data - actor_table_data["gpus"] = worker_id_to_gpu_stats.get( - worker_id, []) - actor_table_data["processStats"] = worker_id_to_process_info.get( - worker_id, {}) - return node_actors + workers.append(worker) + return workers @classmethod async def get_node_info(cls, node_id): - node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) - node_stats = DataSource.node_stats.get(node_id, {}) + node_physical_stats = dict( + DataSource.node_physical_stats.get(node_id, {})) + node_stats = dict(DataSource.node_stats.get(node_id, {})) node = DataSource.nodes.get(node_id, {}) node_ip = DataSource.node_id_to_ip.get(node_id) # Merge node log count information into the payload @@ -122,29 +135,9 @@ class DataOrganizer: for entries in error_info.values(): node_err_count += len(entries) - # Merge coreWorkerStats (node stats) to workers (node physical stats) - workers_stats = node_stats.pop("workersStats", {}) - pid_to_worker_stats = {} - pid_to_language = {} - pid_to_job_id = {} - for stats in workers_stats: - d = pid_to_worker_stats.setdefault(stats["pid"], {}).setdefault( - stats["workerId"], stats["coreWorkerStats"]) - d["workerId"] = stats["workerId"] - pid_to_language.setdefault(stats["pid"], - stats.get("language", "PYTHON")) - pid_to_job_id.setdefault(stats["pid"], - stats["coreWorkerStats"]["jobId"]) + node_stats.pop("coreWorkersStats", None) - for worker in node_physical_stats.get("workers", []): - worker_stats = pid_to_worker_stats.get(worker["pid"], {}) - worker["coreWorkerStats"] = list(worker_stats.values()) - worker["language"] = pid_to_language.get(worker["pid"], "") - worker["jobId"] = pid_to_job_id.get(worker["pid"], "ffff") - worker["logCount"] = len(log_info.get(str(worker["pid"]), [])) - worker["errorCount"] = len(error_info.get(str(worker["pid"]), [])) - - ray_stats = _extract_view_data( + ray_stats = cls._extract_view_data( node_stats["viewData"], {"object_store_used_memory", "object_store_available_memory"}) @@ -157,57 +150,132 @@ class DataOrganizer: node_info["raylet"].update(node) # Merge actors to node physical stats node_info["actors"] = await cls.get_node_actors(node_id) + # Update workers to node physical stats + node_info["workers"] = DataSource.node_workers.get(node_id, []) node_info["logCount"] = node_log_count node_info["errorCount"] = node_err_count await GlobalSignals.node_info_fetched.send(node_info) return node_info + @staticmethod + async def get_node_summary(node_id): + node_physical_stats = dict( + DataSource.node_physical_stats.get(node_id, {})) + node_stats = dict(DataSource.node_stats.get(node_id, {})) + node = DataSource.nodes.get(node_id, {}) + + node_physical_stats.pop("workers", None) + node_stats.pop("workersStats", None) + node_stats.pop("viewData", None) + + node_summary = node_physical_stats + # Merge node stats to node physical stats + node_summary["raylet"] = node_stats + # Merge GcsNodeInfo to node physical stats + node_summary["raylet"].update(node) + + await GlobalSignals.node_summary_fetched.send(node_summary) + + return node_summary + @classmethod async def get_all_node_summary(cls): - all_nodes_summary = [] - for node_id in DataSource.nodes.keys(): - node_info = await cls.get_node_info(node_id) - node_info.pop("workers", None) - node_info.pop("actors", None) - node_info["raylet"].pop("workersStats", None) - node_info["raylet"].pop("viewData", None) - all_nodes_summary.append(node_info) - return all_nodes_summary + return [ + await DataOrganizer.get_node_summary(node_id) + for node_id in DataSource.nodes.keys() + ] @classmethod async def get_all_node_details(cls): - node_details = [] - for node_id in DataSource.nodes.keys(): - node_details.append(await cls.get_node_info(node_id)) - return node_details + return [ + await DataOrganizer.get_node_info(node_id) + for node_id in DataSource.nodes.keys() + ] + + @classmethod + async def get_node_actors(cls, node_id): + node_actors = DataSource.node_actors.get(node_id, {}) + return { + actor_id: await cls._get_actor(actor) + for actor_id, actor in node_actors.items() + } + + @classmethod + async def get_job_actors(cls, job_id): + job_actors = DataSource.job_actors.get(job_id, {}) + return { + actor_id: await cls._get_actor(actor) + for actor_id, actor in job_actors.items() + } @classmethod async def get_all_actors(cls): - all_actors = {} - for node_id in DataSource.nodes.keys(): - all_actors.update(await cls.get_node_actors(node_id)) - return all_actors + return { + actor_id: await cls._get_actor(actor) + for actor_id, actor in DataSource.actors.items() + } + + @staticmethod + async def _get_actor(actor): + actor = dict(actor) + worker_id = actor["address"]["workerId"] + core_worker_stats = DataSource.core_worker_stats.get(worker_id, {}) + actor_constructor = core_worker_stats.get("actorTitle", + "Unknown actor constructor") + actor["actorConstructor"] = actor_constructor + actor.update(core_worker_stats) + + # TODO(fyrestone): remove this, give a link from actor + # info to worker info in front-end. + node_id = actor["address"]["rayletId"] + pid = core_worker_stats["pid"] + node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) + + actor_process_stats = None + for process_stats in node_physical_stats.get("workers"): + if process_stats["pid"] == pid: + actor_process_stats = process_stats + break + + actor_process_gpu_stats = None + for gpu_stats in node_physical_stats.get("gpus"): + for process in gpu_stats.get("processes", []): + if process["pid"] == pid: + actor_process_gpu_stats = gpu_stats + break + if actor_process_gpu_stats is not None: + break + actor["gpus"] = actor_process_gpu_stats + actor["processStats"] = actor_process_stats + + return actor @classmethod async def get_actor_creation_tasks(cls): infeasible_tasks = sum( - (node_stats.get("infeasibleTasks", []) + (list(node_stats.get("infeasibleTasks", [])) for node_stats in DataSource.node_stats.values()), []) + new_infeasible_tasks = [] for task in infeasible_tasks: + task = dict(task) task["actorClass"] = actor_classname_from_task_spec(task) task["state"] = "INFEASIBLE" + new_infeasible_tasks.append(task) resource_pending_tasks = sum( - (data.get("readyTasks", []) + (list(data.get("readyTasks", [])) for data in DataSource.node_stats.values()), []) + new_resource_pending_tasks = [] for task in resource_pending_tasks: + task = dict(task) task["actorClass"] = actor_classname_from_task_spec(task) task["state"] = "PENDING_RESOURCES" + new_resource_pending_tasks.append(task) results = { task["actorCreationTaskSpec"]["actorId"]: task - for task in resource_pending_tasks + infeasible_tasks + for task in new_resource_pending_tasks + new_infeasible_tasks } return results @@ -217,27 +285,27 @@ class DataOrganizer: group_by=memory_utils.GroupByType.STACK_TRACE): all_worker_stats = [] for node_stats in DataSource.node_stats.values(): - all_worker_stats.extend(node_stats.get("workersStats", [])) + all_worker_stats.extend(node_stats.get("coreWorkersStats", [])) memory_information = memory_utils.construct_memory_table( all_worker_stats, group_by=group_by, sort_by=sort_by) return memory_information + @staticmethod + def _extract_view_data(views, data_keys): + view_data = {} + for view in views: + view_name = view["viewName"] + if view_name in data_keys: + if not view.get("measures"): + view_data[view_name] = 0 + continue + measure = view["measures"][0] + if "doubleValue" in measure: + measure_value = measure["doubleValue"] + elif "intValue" in measure: + measure_value = measure["intValue"] + else: + measure_value = 0 + view_data[view_name] = measure_value -def _extract_view_data(views, data_keys): - view_data = {} - for view in views: - view_name = view["viewName"] - if view_name in data_keys: - if not view.get("measures"): - view_data[view_name] = 0 - continue - measure = view["measures"][0] - if "doubleValue" in measure: - measure_value = measure["doubleValue"] - elif "intValue" in measure: - measure_value = measure["intValue"] - else: - measure_value = 0 - view_data[view_name] = measure_value - - return view_data + return view_data diff --git a/dashboard/head.py b/dashboard/head.py index 1e61b6197..e8e911913 100644 --- a/dashboard/head.py +++ b/dashboard/head.py @@ -191,16 +191,6 @@ class DashboardHead: except Exception: logger.exception(f"Error notifying coroutine {co}") - async def _purge_data(): - """Purge data in datacenter.""" - while True: - await asyncio.sleep( - dashboard_consts.PURGE_DATA_INTERVAL_SECONDS) - try: - await DataOrganizer.purge() - except Exception: - logger.exception("Error purging data.") - modules = self._load_modules() # Http server should be initialized after all modules loaded. @@ -219,7 +209,13 @@ class DashboardHead: # Freeze signal after all modules loaded. dashboard_utils.SignalManager.freeze() - await asyncio.gather(self._update_nodes(), _async_notify(), - _purge_data(), web_server, + concurrent_tasks = [ + self._update_nodes(), + _async_notify(), + DataOrganizer.purge(), + DataOrganizer.organize(), + web_server, + ] + await asyncio.gather(*concurrent_tasks, *(m.run(self.server) for m in modules)) await self.server.wait_for_termination() diff --git a/dashboard/memory_utils.py b/dashboard/memory_utils.py index 68ba3f877..48bf4b8a7 100644 --- a/dashboard/memory_utils.py +++ b/dashboard/memory_utils.py @@ -278,14 +278,13 @@ class MemoryTable: return self.__repr__() -def construct_memory_table(workers_info: List, +def construct_memory_table(workers_stats: List, group_by: GroupByType = GroupByType.NODE_ADDRESS, sort_by=SortingType.OBJECT_SIZE) -> MemoryTable: memory_table_entries = [] - for worker_info in workers_info: - pid = worker_info["pid"] - is_driver = worker_info.get("isDriver", False) - core_worker_stats = worker_info["coreWorkerStats"] + for core_worker_stats in workers_stats: + pid = core_worker_stats["pid"] + is_driver = core_worker_stats.get("workerType") == "DRIVER" node_address = core_worker_stats["ipAddress"] object_refs = core_worker_stats.get("objectRefs", []) diff --git a/dashboard/modules/log/log_head.py b/dashboard/modules/log/log_head.py index abf4f0405..f16e75e59 100644 --- a/dashboard/modules/log/log_head.py +++ b/dashboard/modules/log/log_head.py @@ -20,6 +20,8 @@ class LogHead(dashboard_utils.DashboardHeadModule): routes.static("/logs", self._dashboard_head.log_dir, show_index=True) GlobalSignals.node_info_fetched.append( self.insert_log_url_to_node_info) + GlobalSignals.node_summary_fetched.append( + self.insert_log_url_to_node_info) async def insert_log_url_to_node_info(self, node_info): node_id = node_info.get("raylet", {}).get("nodeId") diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py index 38e45933e..c819f988c 100644 --- a/dashboard/modules/stats_collector/stats_collector_head.py +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -10,6 +10,7 @@ import ray.gcs_utils import ray.new_dashboard.modules.stats_collector.stats_collector_consts \ as stats_collector_consts import ray.new_dashboard.utils as dashboard_utils +from ray.new_dashboard.actor_utils import actor_classname_from_task_spec from ray.new_dashboard.utils import async_loop_forever from ray.new_dashboard.memory_utils import GroupByType, SortingType from ray.core.generated import node_manager_pb2 @@ -17,25 +18,36 @@ from ray.core.generated import node_manager_pb2_grpc from ray.core.generated import gcs_service_pb2 from ray.core.generated import gcs_service_pb2_grpc from ray.new_dashboard.datacenter import DataSource, DataOrganizer -from ray.utils import binary_to_hex logger = logging.getLogger(__name__) routes = dashboard_utils.ClassMethodRouteTable def node_stats_to_dict(message): - return dashboard_utils.message_to_dict( - message, { - "actorId", "jobId", "taskId", "parentTaskId", "sourceActorId", - "callerId", "rayletId", "workerId" - }) + decode_keys = { + "actorId", "jobId", "taskId", "parentTaskId", "sourceActorId", + "callerId", "rayletId", "workerId", "placementGroupId" + } + core_workers_stats = message.core_workers_stats + message.ClearField("core_workers_stats") + try: + result = dashboard_utils.message_to_dict(message, decode_keys) + result["coreWorkersStats"] = [ + dashboard_utils.message_to_dict( + m, decode_keys, including_default_value_fields=True) + for m in core_workers_stats + ] + return result + finally: + message.core_workers_stats.extend(core_workers_stats) def actor_table_data_to_dict(message): return dashboard_utils.message_to_dict( message, { "actorId", "parentId", "jobId", "workerId", "rayletId", - "actorCreationDummyObjectId" + "actorCreationDummyObjectId", "callerId", "taskId", "parentTaskId", + "sourceActorId", "placementGroupId" }, including_default_value_fields=True) @@ -160,21 +172,40 @@ class StatsCollector(dashboard_utils.DashboardHeadModule): await aioredis_client.psubscribe(pattern) logger.info("Subscribed to %s", key) + def _process_actor_table_data(data): + actor_class = actor_classname_from_task_spec( + data.get("taskSpec", {})) + data["actorClass"] = actor_class + # Get all actor info. while True: try: logger.info("Getting all actor info from GCS.") request = gcs_service_pb2.GetAllActorInfoRequest() reply = await self._gcs_actor_info_stub.GetAllActorInfo( - request, timeout=2) + request, timeout=5) if reply.status.code == 0: - result = {} - for actor_info in reply.actor_table_data: - result[binary_to_hex(actor_info.actor_id)] = \ - actor_table_data_to_dict(actor_info) - DataSource.actors.reset(result) + actors = {} + for message in reply.actor_table_data: + actor_table_data = actor_table_data_to_dict(message) + _process_actor_table_data(actor_table_data) + actors[actor_table_data["actorId"]] = actor_table_data + # Update actors. + DataSource.actors.reset(actors) + # Update node actors and job actors. + job_actors = {} + node_actors = {} + for actor_id, actor_table_data in actors.items(): + job_id = actor_table_data["jobId"] + node_id = actor_table_data["address"]["rayletId"] + job_actors.setdefault(job_id, + {})[actor_id] = actor_table_data + node_actors.setdefault(node_id, + {})[actor_id] = actor_table_data + DataSource.job_actors.reset(job_actors) + DataSource.node_actors.reset(node_actors) logger.info("Received %d actor info from GCS.", - len(result)) + len(actors)) break else: raise Exception( @@ -187,12 +218,26 @@ class StatsCollector(dashboard_utils.DashboardHeadModule): # Receive actors from channel. async for sender, msg in receiver.iter(): try: - _, data = msg - pubsub_message = ray.gcs_utils.PubSubMessage.FromString(data) - actor_info = ray.gcs_utils.ActorTableData.FromString( + _, actor_table_data = msg + pubsub_message = ray.gcs_utils.PubSubMessage.FromString( + actor_table_data) + message = ray.gcs_utils.ActorTableData.FromString( pubsub_message.data) - DataSource.actors[binary_to_hex(actor_info.actor_id)] = \ - actor_table_data_to_dict(actor_info) + actor_table_data = actor_table_data_to_dict(message) + _process_actor_table_data(actor_table_data) + actor_id = actor_table_data["actorId"] + job_id = actor_table_data["jobId"] + node_id = actor_table_data["address"]["rayletId"] + # Update actors. + DataSource.actors[actor_id] = actor_table_data + # Update node actors. + node_actors = dict(DataSource.node_actors.get(node_id, {})) + node_actors[actor_id] = actor_table_data + DataSource.node_actors[node_id] = node_actors + # Update job actors. + job_actors = dict(DataSource.job_actors.get(job_id, {})) + job_actors[actor_id] = actor_table_data + DataSource.job_actors[job_id] = job_actors except Exception: logger.exception("Error receiving actor info.") @@ -224,11 +269,10 @@ class StatsCollector(dashboard_utils.DashboardHeadModule): async for sender, msg in receiver.iter(): try: data = json.loads(ray.utils.decode(msg)) - logger.error(f"data={data}") ip = data["ip"] pid = str(data["pid"]) - logs_for_ip = DataSource.ip_and_pid_to_logs.get(ip, {}) - logs_for_pid = logs_for_ip.get(pid, []) + logs_for_ip = dict(DataSource.ip_and_pid_to_logs.get(ip, {})) + logs_for_pid = list(logs_for_ip.get(pid, [])) logs_for_pid.extend(data["lines"]) logs_for_ip[pid] = logs_for_pid DataSource.ip_and_pid_to_logs[ip] = logs_for_ip diff --git a/dashboard/modules/stats_collector/tests/test_stats_collector.py b/dashboard/modules/stats_collector/tests/test_stats_collector.py index 6e371dfa8..06824656f 100644 --- a/dashboard/modules/stats_collector/tests/test_stats_collector.py +++ b/dashboard/modules/stats_collector/tests/test_stats_collector.py @@ -91,7 +91,7 @@ def test_node_info(disable_aiohttp_cache, ray_start_with_dashboard): raise Exception(f"Timed out while testing, {ex_stack}") -def test_memory_table(ray_start_with_dashboard): +def test_memory_table(disable_aiohttp_cache, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) @ray.remote @@ -129,7 +129,7 @@ def test_memory_table(ray_start_with_dashboard): wait_for_condition(check_mem_table, 10) -def test_get_all_node_details(ray_start_with_dashboard): +def test_get_all_node_details(disable_aiohttp_cache, ray_start_with_dashboard): assert (wait_until_server_available(ray_start_with_dashboard["webui_url"])) webui_url = format_web_url(ray_start_with_dashboard["webui_url"]) @@ -193,6 +193,10 @@ def test_multi_nodes_info(enable_test_module, disable_aiohttp_cache, assert detail["result"] is True, detail["msg"] detail = detail["data"]["detail"] assert detail["raylet"]["state"] == "ALIVE" + response = requests.get(webui_url + "/test/dump?key=agents") + response.raise_for_status() + agents = response.json() + assert len(agents["data"]["agents"]) == 3 return True except Exception as ex: logger.info(ex) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 07e30985c..b8e64bec8 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -1,11 +1,13 @@ import os import sys +import copy import json import time import logging import asyncio import collections +import numpy as np import aiohttp.web import ray import psutil @@ -493,5 +495,81 @@ def test_get_cluster_status(ray_start_with_dashboard): assert response.json()["data"]["autoscalingError"] == "world" +def test_immutable_types(): + d = {str(i): i for i in range(1000)} + d["list"] = list(range(1000)) + d["list"][0] = {str(i): i for i in range(1000)} + d["dict"] = {str(i): i for i in range(1000)} + immutable_dict = dashboard_utils.make_immutable(d) + assert type(immutable_dict) == dashboard_utils.ImmutableDict + assert immutable_dict == dashboard_utils.ImmutableDict(d) + assert immutable_dict == d + assert dashboard_utils.ImmutableDict(immutable_dict) == immutable_dict + assert dashboard_utils.ImmutableList( + immutable_dict["list"]) == immutable_dict["list"] + assert "512" in d + assert "512" in d["list"][0] + assert "512" in d["dict"] + + # Test type conversion + assert type(dict(immutable_dict)["list"]) == dashboard_utils.ImmutableList + assert type(list( + immutable_dict["list"])[0]) == dashboard_utils.ImmutableDict + + # Test json dumps / loads + json_str = json.dumps(immutable_dict, cls=dashboard_utils.CustomEncoder) + deserialized_immutable_dict = json.loads(json_str) + assert type(deserialized_immutable_dict) == dict + assert type(deserialized_immutable_dict["list"]) == list + assert immutable_dict.mutable() == deserialized_immutable_dict + dashboard_utils.rest_response(True, "OK", data=immutable_dict) + dashboard_utils.rest_response(True, "OK", **immutable_dict) + + # Test copy + copy_of_immutable = copy.copy(immutable_dict) + assert copy_of_immutable == immutable_dict + deepcopy_of_immutable = copy.deepcopy(immutable_dict) + assert deepcopy_of_immutable == immutable_dict + + # Test get default immutable + immutable_default_value = immutable_dict.get("not exist list", [1, 2]) + assert type(immutable_default_value) == dashboard_utils.ImmutableList + + # Test recursive immutable + assert type(immutable_dict["list"]) == dashboard_utils.ImmutableList + assert type(immutable_dict["dict"]) == dashboard_utils.ImmutableDict + assert type(immutable_dict["list"][0]) == dashboard_utils.ImmutableDict + + # Test exception + with pytest.raises(TypeError): + dashboard_utils.ImmutableList((1, 2)) + + with pytest.raises(TypeError): + dashboard_utils.ImmutableDict([1, 2]) + + with pytest.raises(TypeError): + immutable_dict["list"] = [] + + with pytest.raises(AttributeError): + immutable_dict.update({1: 3}) + + with pytest.raises(TypeError): + immutable_dict["list"][0] = 0 + + with pytest.raises(AttributeError): + immutable_dict["list"].extend([1, 2]) + + with pytest.raises(AttributeError): + immutable_dict["list"].insert(1, 2) + + d2 = dashboard_utils.ImmutableDict({1: np.zeros([3, 5])}) + with pytest.raises(TypeError): + print(d2[1]) + + d3 = dashboard_utils.ImmutableList([1, np.zeros([3, 5])]) + with pytest.raises(TypeError): + print(d3[1]) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/utils.py b/dashboard/utils.py index e9cfcaf68..e1379eea8 100644 --- a/dashboard/utils.py +++ b/dashboard/utils.py @@ -1,9 +1,9 @@ import abc +import os import socket import time import asyncio import collections -import copy import json import datetime import functools @@ -13,10 +13,11 @@ import logging import pkgutil import traceback from base64 import b64decode -from collections.abc import MutableMapping, Mapping +from abc import ABCMeta, abstractmethod +from collections.abc import MutableMapping, Mapping, Sequence from collections import namedtuple from typing import Any -import os + import aioredis import aiohttp.web import ray.new_dashboard.consts as dashboard_consts @@ -129,6 +130,7 @@ class ClassMethodRouteTable: req = args[-1] return await handler(bind_info.instance, req) except Exception: + logger.exception("Handle %s %s failed.", method, path) return rest_response( success=False, message=traceback.format_exc()) @@ -224,6 +226,8 @@ class CustomEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, bytes): return binary_to_hex(obj) + if isinstance(obj, Immutable): + return obj.mutable() # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) @@ -427,7 +431,8 @@ class Change: self.new = new def __str__(self): - return f"Change(owner: {self.owner}, old: {self.old}, new: {self.new}" + return f"Change(owner: {type(self.owner)}), " \ + f"old: {self.old}, new: {self.new}" class NotifyQueue: @@ -444,7 +449,163 @@ class NotifyQueue: return await cls._queue.get() -class Dict(MutableMapping): +""" +https://docs.python.org/3/library/json.html?highlight=json#json.JSONEncoder + +-------------------+---------------+ + | Python | JSON | + +===================+===============+ + | dict | object | + +-------------------+---------------+ + | list, tuple | array | + +-------------------+---------------+ + | str | string | + +-------------------+---------------+ + | int, float | number | + +-------------------+---------------+ + | True | true | + +-------------------+---------------+ + | False | false | + +-------------------+---------------+ + | None | null | + +-------------------+---------------+ +""" +_json_compatible_types = { + dict, list, tuple, str, int, float, bool, + type(None), bytes +} + + +def is_immutable(self): + raise TypeError("%r objects are immutable" % self.__class__.__name__) + + +def make_immutable(value, strict=True): + value_type = type(value) + if value_type is dict: + return ImmutableDict(value) + if value_type is list: + return ImmutableList(value) + if strict: + if value_type not in _json_compatible_types: + raise TypeError("Type {} can't be immutable.".format(value_type)) + return value + + +class Immutable(metaclass=ABCMeta): + @abstractmethod + def mutable(self): + pass + + +class ImmutableList(Immutable, Sequence): + """Makes a :class:`list` immutable. + """ + + __slots__ = ("_list", "_proxy") + + def __init__(self, list_value): + if type(list_value) not in (list, ImmutableList): + raise TypeError(f"{type(list_value)} object is not a list.") + if isinstance(list_value, ImmutableList): + list_value = list_value.mutable() + self._list = list_value + self._proxy = [None] * len(list_value) + + def __reduce_ex__(self, protocol): + return type(self), (self._list, ) + + def mutable(self): + return self._list + + def __eq__(self, other): + if isinstance(other, ImmutableList): + other = other.mutable() + return list.__eq__(self._list, other) + + def __ne__(self, other): + if isinstance(other, ImmutableList): + other = other.mutable() + return list.__ne__(self._list, other) + + def __contains__(self, item): + if isinstance(item, Immutable): + item = item.mutable() + return list.__contains__(self._list, item) + + def __getitem__(self, item): + proxy = self._proxy[item] + if proxy is None: + proxy = self._proxy[item] = make_immutable(self._list[item]) + return proxy + + def __len__(self): + return len(self._list) + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, list.__repr__(self._list)) + + +class ImmutableDict(Immutable, Mapping): + """Makes a :class:`dict` immutable. + """ + + __slots__ = ("_dict", "_proxy") + + def __init__(self, dict_value): + if type(dict_value) not in (dict, ImmutableDict): + raise TypeError(f"{type(dict_value)} object is not a dict.") + if isinstance(dict_value, ImmutableDict): + dict_value = dict_value.mutable() + self._dict = dict_value + self._proxy = {} + + def __reduce_ex__(self, protocol): + return type(self), (self._dict, ) + + def mutable(self): + return self._dict + + def get(self, key, default=None): + try: + return self[key] + except KeyError: + return make_immutable(default) + + def __eq__(self, other): + if isinstance(other, ImmutableDict): + other = other.mutable() + return dict.__eq__(self._dict, other) + + def __ne__(self, other): + if isinstance(other, ImmutableDict): + other = other.mutable() + return dict.__ne__(self._dict, other) + + def __contains__(self, item): + if isinstance(item, Immutable): + item = item.mutable() + return dict.__contains__(self._dict, item) + + def __getitem__(self, item): + proxy = self._proxy.get(item, None) + if proxy is None: + proxy = self._proxy[item] = make_immutable(self._dict[item]) + return proxy + + def __len__(self) -> int: + return len(self._dict) + + def __iter__(self): + if len(self._proxy) != len(self._dict): + for key in self._dict.keys() - self._proxy.keys(): + self._proxy[key] = make_immutable(self._dict[key]) + return iter(self._proxy) + + def __repr__(self): + return "%s(%s)" % (self.__class__.__name__, dict.__repr__(self._dict)) + + +class Dict(ImmutableDict, MutableMapping): """A simple descriptor for dict type to notify data changes. :note: Only the first level data report change. @@ -453,12 +614,13 @@ class Dict(MutableMapping): ChangeItem = namedtuple("DictChangeItem", ["key", "value"]) def __init__(self, *args, **kwargs): - self._data = dict(*args, **kwargs) + super().__init__(dict(*args, **kwargs)) self.signal = Signal(self) def __setitem__(self, key, value): - old = self._data.pop(key, None) - self._data[key] = value + old = self._dict.pop(key, None) + self._proxy.pop(key, None) + self._dict[key] = value if len(self.signal) and old != value: if old is None: co = self.signal.send( @@ -471,30 +633,25 @@ class Dict(MutableMapping): new=Dict.ChangeItem(key, value))) NotifyQueue.put(co) - def __getitem__(self, item): - return copy.deepcopy(self._data[item]) - def __delitem__(self, key): - old = self._data.pop(key, None) + old = self._dict.pop(key, None) + self._proxy.pop(key, None) if len(self.signal) and old is not None: co = self.signal.send( Change(owner=self, old=Dict.ChangeItem(key, old))) NotifyQueue.put(co) - def __len__(self): - return len(self._data) - - def __iter__(self): - return iter(copy.deepcopy(self._data)) - - def __str__(self): - return str(self._data) - def reset(self, d): assert isinstance(d, Mapping) - for key in self._data.keys() - d.keys(): - self.pop(key) - self.update(d) + for key in self._dict.keys() - d.keys(): + del self[key] + for key, value in d.items(): + self[key] = value + + +# Register immutable types. +for immutable_type in Immutable.__subclasses__(): + _json_compatible_types.add(immutable_type) async def get_aioredis_client(redis_address, redis_password, diff --git a/python/build-wheel-macos.sh b/python/build-wheel-macos.sh index 19588921e..60b35ed04 100755 --- a/python/build-wheel-macos.sh +++ b/python/build-wheel-macos.sh @@ -13,6 +13,7 @@ MACPYTHON_URL=https://www.python.org/ftp/python MACPYTHON_PY_PREFIX=/Library/Frameworks/Python.framework/Versions DOWNLOAD_DIR=python_downloads +NODE_VERSION="14" PY_VERSIONS=("3.6.1" "3.7.0" "3.8.2") @@ -36,6 +37,7 @@ mkdir -p .whl # Use the latest version of Node.js in order to build the dashboard. source "$HOME"/.nvm/nvm.sh +nvm install $NODE_VERSION nvm use node # Build the dashboard so its static assets can be included in the wheel. diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index a6dbe1e15..5e8fac428 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -81,7 +81,7 @@ class DashboardController(BaseDashboardController): def _construct_raylet_info(self): D = self.raylet_stats.get_raylet_stats() workers_info_by_node = { - data["nodeId"]: data.get("workersStats") + data["nodeId"]: data.get("coreWorkersStats") for data in D.values() } diff --git a/python/ray/dashboard/node_stats.py b/python/ray/dashboard/node_stats.py index c688250c5..616ef5547 100644 --- a/python/ray/dashboard/node_stats.py +++ b/python/ray/dashboard/node_stats.py @@ -159,20 +159,18 @@ class NodeStats(threading.Thread): actors[actor_id].update(self._addr_to_extra_info_dict[addr]) for node_id, workers_info in workers_info_by_node.items(): - for worker_info in workers_info: - if "coreWorkerStats" in worker_info: - core_worker_stats = worker_info["coreWorkerStats"] - addr = (core_worker_stats["ipAddress"], - str(core_worker_stats["port"])) - if addr in self._addr_to_actor_id: - actor_info = actors[self._addr_to_actor_id[addr]] - format_reply_id(core_worker_stats) - actor_info.update(core_worker_stats) - actor_info["averageTaskExecutionSpeed"] = round( - actor_info["numExecutedTasks"] / - (now - actor_info["timestamp"] / 1000), 2) - actor_info["nodeId"] = node_id - actor_info["pid"] = worker_info["pid"] + for core_worker_stats in workers_info: + addr = (core_worker_stats["ipAddress"], + str(core_worker_stats["port"])) + if addr in self._addr_to_actor_id: + actor_info = actors[self._addr_to_actor_id[addr]] + format_reply_id(core_worker_stats) + actor_info.update(core_worker_stats) + actor_info["averageTaskExecutionSpeed"] = round( + actor_info["numExecutedTasks"] / + (now - actor_info["timestamp"] / 1000), 2) + actor_info["nodeId"] = node_id + actor_info["pid"] = core_worker_stats["pid"] def _update_from_actor_tasks(task, task_spec_type, invalid_state_type): @@ -183,8 +181,9 @@ class NodeStats(threading.Thread): elif invalid_state_type == "infeasibleActor": task["state"] = -2 else: - raise ValueError(f"Invalid argument" - "invalid_state_type={invalid_state_type}") + raise ValueError( + "Invalid argument" + f"invalid_state_type={invalid_state_type}") task["actorTitle"] = task["functionDescriptor"][ "pythonFunctionDescriptor"]["className"] format_reply_id(task) diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 5b97adc93..b43747889 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -4,6 +4,7 @@ import requests import time import ray +from ray.core.generated import common_pb2 from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc from ray.test_utils import (RayTestTimeoutException, @@ -36,7 +37,10 @@ def test_worker_stats(shutdown_only): reply = try_get_node_stats() # Check that there is one connected driver. - drivers = [worker for worker in reply.workers_stats if worker.is_driver] + drivers = [ + worker for worker in reply.core_workers_stats + if worker.worker_type == common_pb2.DRIVER + ] assert len(drivers) == 1 assert os.getpid() == drivers[0].pid @@ -58,11 +62,10 @@ def test_worker_stats(shutdown_only): worker_pid = ray.get(f.remote()) reply = try_get_node_stats() target_worker_present = False - for worker in reply.workers_stats: - stats = worker.core_worker_stats + for stats in reply.core_workers_stats: if stats.webui_display[""] == '{"message": "test", "dtype": "text"}': target_worker_present = True - assert worker.pid == worker_pid + assert stats.pid == worker_pid else: assert stats.webui_display[""] == "" # Empty proto assert target_worker_present @@ -72,11 +75,10 @@ def test_worker_stats(shutdown_only): worker_pid = ray.get(a.f.remote()) reply = try_get_node_stats() target_worker_present = False - for worker in reply.workers_stats: - stats = worker.core_worker_stats + for stats in reply.core_workers_stats: if stats.webui_display[""] == '{"message": "test", "dtype": "text"}': target_worker_present = True - assert worker.pid == worker_pid + assert stats.pid == worker_pid else: assert stats.webui_display[""] == "" # Empty proto assert target_worker_present @@ -89,15 +91,15 @@ def test_worker_stats(shutdown_only): "Timed out while waiting for worker processes") # Wait for the workers to start. - if len(reply.workers_stats) < num_cpus + 1: + if len(reply.core_workers_stats) < num_cpus + 1: time.sleep(1) reply = try_get_node_stats() continue # Check that the rest of the processes are workers, 1 for each CPU. - assert len(reply.workers_stats) == num_cpus + 1 + assert len(reply.core_workers_stats) == num_cpus + 1 # Check that all processes are Python. - pids = [worker.pid for worker in reply.workers_stats] + pids = [worker.pid for worker in reply.core_workers_stats] processes = [ p.info["name"] for p in psutil.process_iter(attrs=["pid", "name"]) if p.info["pid"] in pids diff --git a/python/ray/tests/test_multi_tenancy.py b/python/ray/tests/test_multi_tenancy.py index 3e937976e..74fa2e5b2 100644 --- a/python/ray/tests/test_multi_tenancy.py +++ b/python/ray/tests/test_multi_tenancy.py @@ -8,6 +8,7 @@ import pytest import ray import ray.test_utils +from ray.core.generated import common_pb2 from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc from ray.test_utils import (wait_for_condition, wait_for_pid_to_exit, run_string_as_driver, @@ -22,8 +23,8 @@ def get_workers(): stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) return [ worker for worker in stub.GetNodeStats( - node_manager_pb2.GetNodeStatsRequest()).workers_stats - if not worker.is_driver + node_manager_pb2.GetNodeStatsRequest()).core_workers_stats + if worker.worker_type != common_pb2.DRIVER ] diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6ba2cc23b..6196fc02b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2182,8 +2182,12 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & stats->set_current_task_func_desc(current_task_.FunctionDescriptor()->ToString()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); + stats->set_pid(getpid()); + stats->set_language(options_.language); stats->set_job_id(worker_context_.GetCurrentJobID().Binary()); + stats->set_worker_id(worker_context_.GetWorkerID().Binary()); stats->set_actor_id(actor_id_.Binary()); + stats->set_worker_type(worker_context_.GetWorkerType()); auto used_resources_map = stats->mutable_used_resources(); for (auto const &it : *resource_ids_) { rpc::ResourceAllocations allocations; diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 18223a534..2c60cf3b8 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -392,6 +392,14 @@ message CoreWorkerStats { repeated ObjectRefInfo object_refs = 18; // Job ID. bytes job_id = 19; + // Worker id of core worker. + bytes worker_id = 20; + // Language + Language language = 21; + // PID of the worker process. + uint32 pid = 22; + // The worker type. + WorkerType worker_type = 23; } message MetricPoint { diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 05f3ee754..887787cc4 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -115,28 +115,12 @@ message GetNodeStatsRequest { bool include_memory_info = 1; } -message WorkerStats { - // PID of the worker process. - uint32 pid = 1; - // Whether this is a driver. - bool is_driver = 2; - // Debug information returned from the core worker. - CoreWorkerStats core_worker_stats = 3; - // Error string if fetching core worker stats failed. - string fetch_error = 4; - // Worker id of core worker. - bytes worker_id = 5; - // Worker language. - Language language = 6; -} - message GetNodeStatsReply { - repeated WorkerStats workers_stats = 1; + repeated CoreWorkerStats core_workers_stats = 1; repeated ViewData view_data = 2; uint32 num_workers = 3; repeated TaskSpec infeasible_tasks = 4; repeated TaskSpec ready_tasks = 5; - int32 pid = 6; } message GlobalGCRequest { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 55049beea..8b884587d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -3174,7 +3174,6 @@ void NodeManager::FlushObjectsToFree() { void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { - reply->set_pid(getpid()); for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { if (task.GetTaskSpecification().IsActorCreationTask()) { auto infeasible_task = reply->add_infeasible_tasks(); @@ -3254,22 +3253,8 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_ worker->rpc_client()->GetCoreWorkerStats( request, [reply, worker, all_workers, driver_ids, send_reply_callback]( const ray::Status &status, const rpc::GetCoreWorkerStatsReply &r) { - auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(worker->GetProcess().GetId()); - worker_stats->set_worker_id(worker->WorkerId().Binary()); - worker_stats->set_is_driver(driver_ids.contains(worker->WorkerId())); - worker_stats->set_language(worker->GetLanguage()); + reply->add_core_workers_stats()->MergeFrom(r.core_worker_stats()); reply->set_num_workers(reply->num_workers() + 1); - if (status.ok()) { - worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); - } else { - RAY_LOG(WARNING) << "Failed to send get core worker stats request, " - << "worker id is " << worker->WorkerId() << ", status is " - << status.ToString() - << ". This is likely since the worker has died before the " - "request was sent."; - worker_stats->set_fetch_error(status.ToString()); - } if (reply->num_workers() == all_workers.size()) { send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -3281,8 +3266,8 @@ std::string FormatMemoryInfo(std::vector node_stats) { // First pass to compute object sizes. absl::flat_hash_map object_sizes; for (const auto &reply : node_stats) { - for (const auto &worker_stats : reply.workers_stats()) { - for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) { + for (const auto &core_worker_stats : reply.core_workers_stats()) { + for (const auto &object_ref : core_worker_stats.object_refs()) { auto obj_id = ObjectID::FromBinary(object_ref.object_id()); if (object_ref.object_size() > 0) { object_sizes[obj_id] = object_ref.object_size(); @@ -3304,9 +3289,9 @@ std::string FormatMemoryInfo(std::vector node_stats) { // Second pass builds the summary string for each node. for (const auto &reply : node_stats) { - for (const auto &worker_stats : reply.workers_stats()) { + for (const auto &core_worker_stats : reply.core_workers_stats()) { bool pid_printed = false; - for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) { + for (const auto &object_ref : core_worker_stats.object_refs()) { auto obj_id = ObjectID::FromBinary(object_ref.object_id()); if (!object_ref.pinned_in_memory() && object_ref.local_ref_count() == 0 && object_ref.submitted_task_ref_count() == 0 && @@ -3317,10 +3302,10 @@ std::string FormatMemoryInfo(std::vector node_stats) { continue; } if (!pid_printed) { - if (worker_stats.is_driver()) { - builder << "; driver pid=" << worker_stats.pid() << "\n"; + if (core_worker_stats.worker_type() == rpc::WorkerType::DRIVER) { + builder << "; driver pid=" << core_worker_stats.pid() << "\n"; } else { - builder << "; worker pid=" << worker_stats.pid() << "\n"; + builder << "; worker pid=" << core_worker_stats.pid() << "\n"; } pid_printed = true; }