From 12e1175dd1d33673c3fc6a33fb2caa3ac9f1bf36 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Wed, 13 Jan 2021 18:03:15 -0800 Subject: [PATCH] Revert "[Dashboard] Fix missing actor pid (#13229)" This reverts commit 4853aa96cbbea76e69c3e48802ce7408f08669ee. --- dashboard/datacenter.py | 18 +++++++++++++++++- dashboard/modules/job/job_head.py | 17 ++++++++++++----- dashboard/modules/job/tests/test_job.py | 8 ++++---- .../tests/test_logical_view_head.py | 2 +- .../raylet/scheduling/cluster_task_manager.cc | 2 -- 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 42bae3017..319354068 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -159,7 +159,7 @@ class DataOrganizer: # Merge GcsNodeInfo to node physical stats node_info["raylet"].update(node) # Merge actors to node physical stats - node_info["actors"] = DataSource.node_actors.get(node_id, {}) + node_info["actors"] = await cls.get_node_actors(node_id) # Update workers to node physical stats node_info["workers"] = DataSource.node_workers.get(node_id, []) node_info["logCount"] = node_log_count @@ -203,6 +203,22 @@ class DataOrganizer: for node_id in DataSource.nodes.keys() ] + @classmethod + async def get_node_actors(cls, node_id): + node_actors = DataSource.node_actors.get(node_id, {}) + return { + actor_id: await cls._get_actor(actor) + for actor_id, actor in node_actors.items() + } + + @classmethod + async def get_job_actors(cls, job_id): + job_actors = DataSource.job_actors.get(job_id, {}) + return { + actor_id: await cls._get_actor(actor) + for actor_id, actor in job_actors.items() + } + @classmethod async def get_all_actors(cls): return { diff --git a/dashboard/modules/job/job_head.py b/dashboard/modules/job/job_head.py index be2972e94..671482b82 100644 --- a/dashboard/modules/job/job_head.py +++ b/dashboard/modules/job/job_head.py @@ -12,6 +12,7 @@ from ray.core.generated import gcs_service_pb2 from ray.core.generated import gcs_service_pb2_grpc from ray.new_dashboard.datacenter import ( DataSource, + DataOrganizer, GlobalSignals, ) @@ -52,7 +53,7 @@ class JobHead(dashboard_utils.DashboardHeadModule): if view is None: job_detail = { "jobInfo": DataSource.jobs.get(job_id, {}), - "jobActors": DataSource.job_actors.get(job_id, {}), + "jobActors": await DataOrganizer.get_job_actors(job_id), "jobWorkers": DataSource.job_workers.get(job_id, []), } await GlobalSignals.job_info_fetched.send(job_detail) @@ -103,10 +104,16 @@ class JobHead(dashboard_utils.DashboardHeadModule): pubsub_message = ray.gcs_utils.PubSubMessage.FromString(data) message = ray.gcs_utils.JobTableData.FromString( pubsub_message.data) - job_table_data = job_table_data_to_dict(message) - job_id = job_table_data["jobId"] - # Update jobs. - DataSource.jobs[job_id] = job_table_data + job_id = ray._raylet.JobID(message.job_id) + if job_id.is_submitted_from_dashboard(): + job_table_data = job_table_data_to_dict(message) + job_id = job_table_data["jobId"] + # Update jobs. + DataSource.jobs[job_id] = job_table_data + else: + logger.info( + "Ignore job %s which is not submitted from dashboard.", + job_id.hex()) except Exception: logger.exception("Error receiving job info.") diff --git a/dashboard/modules/job/tests/test_job.py b/dashboard/modules/job/tests/test_job.py index ed45ea608..4a5e38727 100644 --- a/dashboard/modules/job/tests/test_job.py +++ b/dashboard/modules/job/tests/test_job.py @@ -42,7 +42,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard): result = resp.json() assert result["result"] is True, resp.text job_summary = result["data"]["summary"] - assert len(job_summary) == 1, resp.text + assert len(job_summary) == 1 one_job = job_summary[0] assert "jobId" in one_job job_id = one_job["jobId"] @@ -67,7 +67,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard): assert len(one_job_summary_keys - job_detail["jobInfo"].keys()) == 0 assert "jobActors" in job_detail job_actors = job_detail["jobActors"] - assert len(job_actors) == 1, resp.text + assert len(job_actors) == 1 one_job_actor = job_actors[actor_id] assert "taskSpec" in one_job_actor assert type(one_job_actor["taskSpec"]) is dict @@ -82,7 +82,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard): assert k in one_job_actor assert "jobWorkers" in job_detail job_workers = job_detail["jobWorkers"] - assert len(job_workers) == 1, resp.text + assert len(job_workers) == 1 one_job_worker = job_workers[0] check_worker_keys = [ "cmdline", "pid", "cpuTimes", "memoryInfo", "cpuPercent", @@ -91,7 +91,7 @@ def test_get_job_info(disable_aiohttp_cache, ray_start_with_dashboard): for k in check_worker_keys: assert k in one_job_worker - timeout_seconds = 10 + timeout_seconds = 5 start_time = time.time() last_ex = None while True: diff --git a/dashboard/modules/logical_view/tests/test_logical_view_head.py b/dashboard/modules/logical_view/tests/test_logical_view_head.py index 5c7680d93..ceee063dd 100644 --- a/dashboard/modules/logical_view/tests/test_logical_view_head.py +++ b/dashboard/modules/logical_view/tests/test_logical_view_head.py @@ -121,7 +121,7 @@ def test_actors(disable_aiohttp_cache, ray_start_with_dashboard): assert "name" in one_entry assert "numRestarts" in one_entry assert "pid" in one_entry - all_pids = {entry["pid"] for entry in actors.values()} + all_pids = [entry["pid"] for entry in actors.values()] assert 0 in all_pids # The infeasible actor assert len(all_pids) > 1 break diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 5bda64560..eb08e1dd9 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -1,7 +1,6 @@ #include "ray/raylet/scheduling/cluster_task_manager.h" #include - #include #include "ray/util/logging.h" @@ -648,7 +647,6 @@ void ClusterTaskManager::Dispatch( const auto &task_spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId(); // Pass the contact info of the worker to use. - reply->set_worker_pid(worker->GetProcess().GetId()); reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); reply->mutable_worker_address()->set_port(worker->Port()); reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary());