From 30b2fc1d818fc7b8923adcda47d90d6844772bb2 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 26 Nov 2019 15:21:03 -0800 Subject: [PATCH] Fix actor creation hang due to race in SWAP queue (#6280) --- python/ray/cluster_utils.py | 2 ++ python/ray/tests/BUILD | 2 +- python/ray/tests/test_multi_node_2.py | 2 -- src/ray/core_worker/core_worker.cc | 4 ++-- src/ray/raylet/node_manager.cc | 14 +------------- 5 files changed, 6 insertions(+), 18 deletions(-) diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index 4bbaf275f..f42d63114 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -92,6 +92,8 @@ class Cluster(object): self.webui_url = self.head_node.webui_url else: ray_params.update_if_absent(redis_address=self.redis_address) + # We only need one log monitor per physical node. + ray_params.update_if_absent(include_log_monitor=False) # Let grpc pick a port. ray_params.update(node_manager_port=0) node = ray.node.Node( diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 37d11e9ee..6a567b62c 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -10,7 +10,7 @@ py_test( name = "test_actor_direct", size = "medium", srcs = ["test_actor_direct.py", "test_actor.py"], - tags = ["exclusive", "manual"], + tags = ["exclusive"], deps = ["//:ray_lib"], ) diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index fba8925b0..4812b4383 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -215,8 +215,6 @@ def test_worker_plasma_store_failure(ray_start_cluster_head): cluster = ray_start_cluster_head worker = cluster.add_node() cluster.wait_for_nodes() - # Log monitor doesn't die for some reason - worker.kill_log_monitor() worker.kill_reporter() worker.kill_plasma_store() worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait() diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d1d8b8703..db9d7d3e9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -709,9 +709,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle) { it->second->Reset(); } } else if (actor_data.state() == gcs::ActorTableData::DEAD) { - RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id, nullptr)); // We cannot erase the actor handle here because clients can still - // submit tasks to dead actors. + // submit tasks to dead actors. This also means we defer unsubscription, + // otherwise we crash when bulk unsubscribing all actor handles. } direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 566b96bd4..d41918e1e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2632,21 +2632,9 @@ void NodeManager::ForwardTask( task_entry.second.TaskData().GetTaskExecutionSpec().GetMessage()); } - // Move the FORWARDING task to the SWAP queue so that we remember that we - // have it queued locally. Once the ForwardTaskRequest has been sent, the - // task will get re-queued, depending on whether the message succeeded or - // not. - local_queues_.QueueTasks({task}, TaskState::SWAP); - client->ForwardTask(request, [this, on_error, task_id, node_id]( + client->ForwardTask(request, [this, on_error, task, task_id, node_id]( Status status, const rpc::ForwardTaskReply &reply) { // Remove the FORWARDING task from the SWAP queue. - Task task; - TaskState state; - if (!local_queues_.RemoveTask(task_id, &task, &state)) { - return; - } - RAY_CHECK(state == TaskState::SWAP); - if (status.ok()) { const auto &spec = task.GetTaskSpecification(); // Mark as forwarded so that the task and its lineage are not