diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 2e0ba773a..2e919c5ab 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -387,25 +387,28 @@ void GcsActorManager::HandleGetActorCheckpointID( Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &request, RegisterActorCallback success_callback) { + // NOTE: After the abnormal recovery of the network between GCS client and GCS server or + // the GCS server is restarted, it is required to continue to register actor + // successfully. RAY_CHECK(success_callback); const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id()); auto iter = registered_actors_.find(actor_id); - if (iter != registered_actors_.end() && - iter->second->GetState() == rpc::ActorTableData::ALIVE) { - // In case of temporary network failures, workers will re-send multiple duplicate - // requests to GCS server. - // In this case, we can just reply. - success_callback(iter->second); - return Status::OK(); - } - - auto pending_register_iter = actor_to_register_callbacks_.find(actor_id); - if (pending_register_iter != actor_to_register_callbacks_.end()) { - // It is a duplicate message, just mark the callback as pending and invoke it after - // the actor has been flushed to the storage. - pending_register_iter->second.emplace_back(std::move(success_callback)); + if (iter != registered_actors_.end()) { + auto pending_register_iter = actor_to_register_callbacks_.find(actor_id); + if (pending_register_iter != actor_to_register_callbacks_.end()) { + // 1. The GCS client sends the `RegisterActor` request to the GCS server. + // 2. The GCS client receives some network errors. + // 3. The GCS client resends the `RegisterActor` request to the GCS server. + pending_register_iter->second.emplace_back(std::move(success_callback)); + } else { + // 1. The GCS client sends the `RegisterActor` request to the GCS server. + // 2. The GCS server flushes the actor to the storage and restarts before replying + // to the GCS client. + // 3. The GCS client resends the `RegisterActor` request to the GCS server. + success_callback(iter->second); + } return Status::OK(); } @@ -422,7 +425,7 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ } actor_to_register_callbacks_[actor_id].emplace_back(std::move(success_callback)); - RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second); + registered_actors_.emplace(actor->GetActorID(), actor); const auto &owner_address = actor->GetOwnerAddress(); auto node_id = NodeID::FromBinary(owner_address.raylet_id()); @@ -469,13 +472,20 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, CreateActorCallback callback) { + // NOTE: After the abnormal recovery of the network between GCS client and GCS server or + // the GCS server is restarted, it is required to continue to create actor + // successfully. RAY_CHECK(callback); const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id()); auto iter = registered_actors_.find(actor_id); - if (iter != registered_actors_.end() && - iter->second->GetState() == rpc::ActorTableData::ALIVE) { + if (iter == registered_actors_.end()) { + RAY_LOG(WARNING) << "Actor " << actor_id << " may be already destroyed."; + return Status::Invalid("Actor may be already destroyed."); + } + + if (iter->second->GetState() == rpc::ActorTableData::ALIVE) { // In case of temporary network failures, workers will re-send multiple duplicate // requests to GCS server. // In this case, we can just reply. @@ -494,6 +504,15 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, // created. actor_to_create_callbacks_[actor_id].emplace_back(std::move(callback)); + // If GCS restarts while processing `CreateActor` request, GCS client will resend the + // `CreateActor` request. + // After GCS restarts, the state of the actor may not be `DEPENDENCIES_UNREADY`. + if (iter->second->GetState() != rpc::ActorTableData::DEPENDENCIES_UNREADY) { + RAY_LOG(INFO) << "Actor " << actor_id + << " is already in the process of creation. Skip it directly."; + return Status::OK(); + } + // Remove the actor from the unresolved actor map. auto actor = std::make_shared(request.task_spec()); actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION);