From 68899e2f8eac8be1fa50f8ffcceff65ad724d66f Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 5 Aug 2020 12:22:12 -0700 Subject: [PATCH] [GCS Actor Management] Fix race condition for DEPENDENCIES_UNREADY states. (#9883) * Fix issues. * Address code review. * Addressed code review 2. * Fix formatting. * Addressed code review 3/ * Addressed code review. --- src/ray/common/ray_config_def.h | 3 +++ src/ray/core_worker/actor_manager.h | 9 +++++-- src/ray/core_worker/core_worker.cc | 3 ++- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 26 ++++++++++++++------- src/ray/gcs/gcs_server/gcs_actor_manager.h | 13 ++++++----- 5 files changed, 36 insertions(+), 18 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index ad1667fbf..ebb4c6657 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -338,5 +338,8 @@ RAY_CONFIG(uint32_t, max_tasks_in_flight_per_worker, 1) /// load reported by each raylet. RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100) +/// The timeout for synchronous GCS requests in seconds. +RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5) + /// Whether to enable multi tenancy features. RAY_CONFIG(bool, enable_multi_tenancy, false) diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 24f66d8f5..e3c72913a 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -49,8 +49,13 @@ class DefaultActorCreator : public ActorCreatorInterface { auto promise = std::make_shared>(); auto status = gcs_client_->Actors().AsyncRegisterActor( task_spec, [promise](const Status &status) { promise->set_value(); }); - if (status.ok()) { - promise->get_future().wait(); + if (status.ok() && promise->get_future().wait_for(std::chrono::seconds( + RayConfig::instance().gcs_server_request_timeout_seconds())) != + std::future_status::ready) { + std::ostringstream stream; + stream << "There was timeout in registering an actor. It is probably " + "because GCS server is dead or there's a high load there."; + return Status::TimedOut(stream.str()); } return status; } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f6479bea7..e8f1bdbf9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1451,7 +1451,8 @@ std::pair CoreWorker::GetNamedActorHandle( })); // Block until the RPC completes. Set a timeout to avoid hangs if the // GCS service crashes. - if (ready_promise->get_future().wait_for(std::chrono::seconds(5)) != + if (ready_promise->get_future().wait_for(std::chrono::seconds( + RayConfig::instance().gcs_server_request_timeout_seconds())) != std::future_status::ready) { std::ostringstream stream; stream << "There was timeout in getting the actor handle. It is probably " diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index b75e4ef78..8ae2fb4f6 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -385,10 +385,9 @@ void GcsActorManager::HandleGetActorCheckpointID( } } -Status GcsActorManager::RegisterActor( - const ray::rpc::RegisterActorRequest &request, - std::function)> callback) { - RAY_CHECK(callback); +Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request, + RegisterActorCallback success_callback) { + RAY_CHECK(success_callback); const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id()); @@ -398,7 +397,7 @@ Status GcsActorManager::RegisterActor( // In case of temporary network failures, workers will re-send multiple duplicate // requests to GCS server. // In this case, we can just reply. - callback(iter->second); + success_callback(iter->second); return Status::OK(); } @@ -406,7 +405,7 @@ Status GcsActorManager::RegisterActor( if (pending_register_iter != actor_to_register_callbacks_.end()) { // It is a duplicate message, just mark the callback as pending and invoke it after // the actor has been flushed to the storage. - pending_register_iter->second.emplace_back(std::move(callback)); + pending_register_iter->second.emplace_back(std::move(success_callback)); return Status::OK(); } @@ -422,9 +421,7 @@ Status GcsActorManager::RegisterActor( } } - // Mark the callback as pending and invoke it after the actor has been successfully - // flushed to the storage. - actor_to_register_callbacks_[actor_id].emplace_back(std::move(callback)); + actor_to_register_callbacks_[actor_id].emplace_back(std::move(success_callback)); RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second); const auto &owner_address = actor->GetOwnerAddress(); @@ -444,6 +441,17 @@ Status GcsActorManager::RegisterActor( [this, actor](const Status &status) { // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); + // If a creator dies before this callback is called, the actor could have been + // already destroyed. It is okay not to invoke a callback because we don't need + // to reply to the creator as it is already dead. + auto registered_actor_it = registered_actors_.find(actor->GetActorID()); + if (registered_actor_it == registered_actors_.end()) { + // NOTE(sang): This logic assumes that the ordering of backend call is + // guaranteed. It is currently true because we use a single TCP socket to call + // the default Redis backend. If ordering is not guaranteed, we should overwrite + // the actor state to DEAD to avoid race condition. + return; + } // Invoke all callbacks for all registration requests of this actor (duplicated // requests are included) and remove all of them from // actor_to_register_callbacks_. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 6ce4f2a2a..1f84cc2f5 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -211,13 +211,14 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// Register actor asynchronously. /// /// \param request Contains the meta info to create the actor. - /// \param callback Will be invoked after the actor is created successfully or be - /// invoked immediately if the actor is already registered to `registered_actors_` and - /// its state is `ALIVE`. - /// \return Status::Invalid if this is a named actor and an actor with the specified - /// name already exists. The callback will not be called in this case. + /// \param success_callback Will be invoked after the actor is created successfully or + /// be invoked immediately if the actor is already registered to `registered_actors_` + /// and its state is `ALIVE`. + /// \return Status::Invalid if this is a named actor and an + /// actor with the specified name already exists. The callback will not be called in + /// this case. Status RegisterActor(const rpc::RegisterActorRequest &request, - RegisterActorCallback callback); + RegisterActorCallback success_callback); /// Create actor asynchronously. ///