From a484947742b242d03c8cccd1a50f532d157f458b Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Wed, 29 Jul 2020 14:16:03 +0800 Subject: [PATCH] Fix leased worker leak bug if lease worker requests that are still waiting to be scheduled when GCS restarts (#9719) --- python/ray/tests/test_gcs_fault_tolerance.py | 14 +++++++++--- src/ray/core_worker/reference_count.cc | 7 +++++- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 24 +++++++++++++++++--- src/ray/raylet/node_manager.cc | 14 +++++++++--- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 367f668ae..4dae6bec4 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -22,8 +22,12 @@ def increase(x): @pytest.mark.skipif( - os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED", "true") != "true", reason=("This testcase can only be run when GCS actor management is on.")) +@pytest.mark.parametrize( + "ray_start_regular", + [generate_internal_config_map(num_heartbeats_timeout=20)], + indirect=True) def test_gcs_server_restart(ray_start_regular): actor1 = Increase.remote() result = ray.get(actor1.method.remote(1)) @@ -44,8 +48,12 @@ def test_gcs_server_restart(ray_start_regular): @pytest.mark.skipif( - os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED", "true") != "true", reason=("This testcase can only be run when GCS actor management is on.")) +@pytest.mark.parametrize( + "ray_start_regular", + [generate_internal_config_map(num_heartbeats_timeout=20)], + indirect=True) def test_gcs_server_restart_during_actor_creation(ray_start_regular): ids = [] for i in range(0, 100): @@ -62,7 +70,7 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular): @pytest.mark.skipif( - os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", + os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED", "true") != "true", reason=("This testcase can only be run when GCS actor management is on.")) @pytest.mark.parametrize( "ray_start_cluster_head", diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 587fb8c64..cfcd255ca 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -493,7 +493,12 @@ bool ReferenceCounter::SetDeleteCallback( return false; } - RAY_CHECK(!it->second.on_delete) << object_id; + // NOTE: In two cases, `GcsActorManager` will send `WaitForActorOutOfScope` request more + // than once, causing the delete callback to be set repeatedly. + // 1.If actors have not been registered successfully before GCS restarts, gcs client + // will resend the registration request after GCS restarts. + // 2.After GCS restarts, GCS will send `WaitForActorOutOfScope` request to owned actors + // again. it->second.on_delete = callback; return true; } diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 33c056c8f..afe44be5d 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -931,8 +931,22 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) { named_actors_.emplace(actor->GetName(), actor->GetActorID()); } - created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), - actor->GetActorID()); + if (item.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) { + const auto &owner = actor->GetOwnerAddress(); + const auto &owner_node = ClientID::FromBinary(owner.raylet_id()); + const auto &owner_worker = WorkerID::FromBinary(owner.worker_id()); + RAY_CHECK(unresolved_actors_[owner_node][owner_worker] + .emplace(actor->GetActorID()) + .second); + if (!actor->IsDetached() && worker_client_factory_) { + // This actor is owned. Send a long polling request to the actor's + // owner to determine when the actor should be removed. + PollOwnerForActorOutOfScope(actor); + } + } else if (item.second.state() == ray::rpc::ActorTableData::ALIVE) { + created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), + actor->GetActorID()); + } auto &workers = owners_[actor->GetNodeID()]; auto it = workers.find(actor->GetWorkerID()); @@ -958,7 +972,11 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) { << ", and the number of created actors is " << created_actors_.size(); for (auto &item : registered_actors_) { auto &actor = item.second; - if (actor->GetState() != ray::rpc::ActorTableData::ALIVE) { + if (actor->GetState() == ray::rpc::ActorTableData::PENDING_CREATION || + actor->GetState() == ray::rpc::ActorTableData::RESTARTING) { + // We should not reschedule actors in state of `ALIVE`. + // We could not reschedule actors in state of `DEPENDENCIES_UNREADY` because the + // dependencies of them may not have been resolved yet. RAY_LOG(DEBUG) << "Rescheduling a non-alive actor, actor id = " << actor->GetActorID() << ", state = " << actor->GetState(); gcs_actor_scheduler_->Reschedule(actor); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index dbc6ef025..4762e67f0 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1763,7 +1763,17 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest rid->set_quantity(id.second.ToDouble()); } } - send_reply_callback(Status::OK(), nullptr, nullptr); + + auto reply_failure_handler = [this, worker_id]() { + if (RayConfig::instance().gcs_actor_service_enabled()) { + RAY_LOG(WARNING) + << "Failed to reply to GCS server, because it might have restarted. GCS " + "cannot obtain the information of the leased worker, so we need to " + "release the leased worker to avoid leakage."; + leased_workers_.erase(worker_id); + } + }; + send_reply_callback(Status::OK(), nullptr, reply_failure_handler); RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end()) << "Worker is already leased out " << worker_id; @@ -1855,8 +1865,6 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, void NodeManager::HandleReleaseUnusedWorkers( const rpc::ReleaseUnusedWorkersRequest &request, rpc::ReleaseUnusedWorkersReply *reply, rpc::SendReplyCallback send_reply_callback) { - // TODO(ffbin): At present, we have not cleaned up the lease worker requests that are - // still waiting to be scheduled, which will be implemented in the next pr. std::unordered_set in_use_worker_ids; for (int index = 0; index < request.worker_ids_in_use_size(); ++index) { auto worker_id = WorkerID::FromBinary(request.worker_ids_in_use(index));