From 3a25f5f5b4dc20be9835abfd91c0ff318dcff41f Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 9 May 2020 18:43:49 -0700 Subject: [PATCH] Clean up actor state from the GCS (#8261) * parametrize test * Regression test and logging * Test no restart after actor deletion * Unit tests * Refactor to subscribe to and lookup from worker failure table * Refactor ActorManager to remove dependencies * Revert "Regression test and logging" This reverts commit 835e1a9091b51ca8efb00392d4cc4a665145de24. * Revert "parametrize test" This reverts commit f31272082831ba1a494816dd5511d87b24eca4c9. * Revert "Test no restart after actor deletion" This reverts commit 114a83de14329aa6ab787c80cd5757cf074a9072. * doc * merge * Revert "Refactor to subscribe to and lookup from worker failure table" This reverts commit 6aa13a05178d0b9aa1db9dee5c978c911b74fa3a. * Revert "Revert "Test no restart after actor deletion"" This reverts commit 1bd92d09172aa8ab42632551cf9c56463f9598fe. * Revert "Revert "parametrize test"" This reverts commit 639ba4d3b02167fb2b05e9878f9aa600bcec95b3. * Revert "Revert "Regression test and logging"" This reverts commit f18b5f0db699a23cbccde32789e3639425e99ca4. * Clean up actors that have gone out of scope * Use actor ID instead of shared_ptr * Clean up actors owned by dead workers * Use actor ID instead of shared_ptr * TODO and lint * Fix unit tests * Add unit tests for supervision and docs * xx * Fix tests * Fix tests * fix build --- python/ray/tests/test_actor.py | 26 +++ python/ray/tests/test_actor_advanced.py | 41 ++++ python/ray/tests/test_failure.py | 26 ++- src/ray/core_worker/core_worker.cc | 94 ++++++-- src/ray/core_worker/core_worker.h | 12 ++ src/ray/core_worker/task_manager.cc | 20 +- .../gcs/gcs_client/service_based_accessor.cc | 7 +- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 187 +++++++++++++++- src/ray/gcs/gcs_server/gcs_actor_manager.h | 69 ++++-- src/ray/gcs/gcs_server/gcs_node_manager.cc | 2 - src/ray/gcs/gcs_server/gcs_server.cc | 22 +- .../gcs_server/test/gcs_actor_manager_test.cc | 200 ++++++++++++++++-- .../test/gcs_actor_scheduler_test.cc | 45 ++-- .../gcs_server/worker_info_handler_impl.cc | 2 - src/ray/protobuf/core_worker.proto | 15 ++ src/ray/raylet/node_manager.cc | 2 + src/ray/rpc/worker/core_worker_client.h | 9 + src/ray/rpc/worker/core_worker_server.h | 2 + 18 files changed, 672 insertions(+), 109 deletions(-) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 3dbe40715..c926e5f87 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -699,6 +699,32 @@ def test_use_actor_within_actor(ray_start_10_cpus): assert ray.get(actor2.get_values.remote(5)) == (3, 4) +def test_use_actor_twice(ray_start_10_cpus): + # Make sure we can call the same actor using different refs. + + @ray.remote + class Actor1: + def __init__(self): + self.count = 0 + + def inc(self): + self.count += 1 + return self.count + + @ray.remote + class Actor2: + def __init__(self): + pass + + def inc(self, handle): + return ray.get(handle.inc.remote()) + + a = Actor1.remote() + a2 = Actor2.remote() + assert ray.get(a2.inc.remote(a)) == 1 + assert ray.get(a2.inc.remote(a)) == 2 + + def test_define_actor_within_remote_function(ray_start_10_cpus): # Make sure we can define and actors within remote funtions. diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 7a18fab59..2ab582f89 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -110,6 +110,47 @@ def test_actor_lifetime_load_balancing(ray_start_cluster): ray.get([actor.ping.remote() for actor in actors]) +@pytest.mark.parametrize( + "ray_start_regular", [{ + "resources": { + "actor": 1 + }, + "num_cpus": 2, + }], + indirect=True) +def test_deleted_actor_no_restart(ray_start_regular): + @ray.remote(resources={"actor": 1}, max_reconstructions=3) + class Actor: + def method(self): + return 1 + + def getpid(self): + return os.getpid() + + @ray.remote + def f(actor, signal): + ray.get(signal.wait.remote()) + return ray.get(actor.method.remote()) + + signal = ray.test_utils.SignalActor.remote() + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + # Pass the handle to another task that cannot run yet. + x_id = f.remote(a, signal) + # Delete the original handle. The actor should not get killed yet. + del a + + # Once the task finishes, the actor process should get killed. + ray.get(signal.send.remote()) + assert ray.get(x_id) == 1 + ray.test_utils.wait_for_pid_to_exit(pid) + + # Create another actor with the same resource requirement to make sure the + # old one was not restarted. + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + + def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): cluster = ray_start_cluster_head remote_node = cluster.add_node() diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index bf8d29e8d..2c50b08f2 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1051,7 +1051,10 @@ def test_serialized_id(ray_start_cluster): ray.get(get.remote([obj], True)) -def test_fate_sharing(ray_start_cluster): +@pytest.mark.parametrize("use_actors,node_failure", + [(False, False), (False, True), (True, False), + (True, True)]) +def test_fate_sharing(ray_start_cluster, use_actors, node_failure): config = json.dumps({ "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, @@ -1074,6 +1077,9 @@ def test_fate_sharing(ray_start_cluster): def probe(): return + # TODO(swang): This test does not pass if max_reconstructions > 0 for the + # raylet codepath. Add this parameter once the GCS actor service is enabled + # by default. @ray.remote class Actor(object): def __init__(self): @@ -1121,10 +1127,20 @@ def test_fate_sharing(ray_start_cluster): assert wait_for_condition(child_resource_available) return node_to_kill - test_process_failure(use_actors=True) - test_process_failure(use_actors=False) - node_to_kill = test_node_failure(node_to_kill, use_actors=True) - node_to_kill = test_node_failure(node_to_kill, use_actors=False) + if node_failure: + test_node_failure(node_to_kill, use_actors) + else: + test_process_failure(use_actors) + + ray.state.state._check_connected() + keys = [ + key for r in ray.state.state.redis_clients + for key in r.keys("WORKER_FAILURE*") + ] + if node_failure: + assert len(keys) <= 1, len(keys) + else: + assert len(keys) <= 2, len(keys) if __name__ == "__main__": diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 13a095949..26316036d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1151,6 +1151,16 @@ Status CoreWorker::CreateActor(const RayFunction &function, actor_creation_options.max_concurrency, actor_creation_options.is_detached, actor_creation_options.name, actor_creation_options.is_asyncio, extension_data); + // Add the actor handle before we submit the actor creation task, since the + // actor handle must be in scope by the time the GCS sends the + // WaitForActorOutOfScopeRequest. + std::unique_ptr actor_handle(new ActorHandle( + actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0], + function.GetLanguage(), function.GetFunctionDescriptor(), extension_data)); + RAY_CHECK(AddActorHandle(std::move(actor_handle), + /*is_owner_handle=*/!actor_creation_options.is_detached)) + << "Actor " << actor_id << " already exists"; + *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); Status status; @@ -1163,12 +1173,6 @@ Status CoreWorker::CreateActor(const RayFunction &function, actor_creation_options.max_reconstructions)); status = direct_task_submitter_->SubmitTask(task_spec); } - std::unique_ptr actor_handle(new ActorHandle( - actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0], - function.GetLanguage(), function.GetFunctionDescriptor(), extension_data)); - RAY_CHECK(AddActorHandle(std::move(actor_handle), - /*is_owner_handle=*/!actor_creation_options.is_detached)) - << "Actor " << actor_id << " already exists"; return status; } @@ -1283,6 +1287,13 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, bool is_owner_handle) { const auto &actor_id = actor_handle->GetActorID(); const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); + if (is_owner_handle) { + reference_counter_->AddOwnedObject(actor_creation_return_id, + /*inner_ids=*/{}, GetCallerId(), rpc_address_, + CurrentCallSite(), -1, + /*is_reconstructable=*/true); + } + reference_counter_->AddLocalReference(actor_creation_return_id, CurrentCallSite()); bool inserted; @@ -1334,16 +1345,31 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, RAY_CHECK(reference_counter_->SetDeleteCallback( actor_creation_return_id, [this, actor_id, is_owner_handle](const ObjectID &object_id) { - // TODO(swang): Unsubscribe from the actor table. - // TODO(swang): Remove the actor handle entry. - // If we own the actor and the actor handle is no longer in scope, - // terminate the actor. if (is_owner_handle) { - RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id - << " has gone out of scope, sending message to actor " - << actor_id << " to do a clean exit."; - RAY_CHECK_OK( - KillActor(actor_id, /*force_kill=*/false, /*no_reconstruction=*/false)); + // If we own the actor and the actor handle is no longer in scope, + // terminate the actor. We do not do this if the GCS service is + // enabled since then the GCS will terminate the actor for us. + if (!(RayConfig::instance().gcs_service_enabled() && + RayConfig::instance().gcs_actor_service_enabled())) { + RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id + << " has gone out of scope, sending message to actor " + << actor_id << " to do a clean exit."; + RAY_CHECK_OK( + KillActor(actor_id, /*force_kill=*/false, /*no_reconstruction=*/false)); + } + } + + absl::MutexLock lock(&actor_handles_mutex_); + // TODO(swang): Erase the actor handle once all refs to the actor + // have gone out of scope. We cannot erase it here in case the + // language frontend receives another ref to the same actor. In this + // case, we must remember the last task counter that we sent to the + // actor. + // TODO(swang): Unsubscribe from the actor table once all local refs + // to the actor have gone out of scope. + auto callback = actor_out_of_scope_callbacks_.extract(actor_id); + if (callback) { + callback.mapped()(actor_id); } })); } @@ -1611,11 +1637,13 @@ void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, auto resource_ids = std::make_shared(); auto return_objects = std::vector>(); auto borrowed_refs = ReferenceCounter::ReferenceTableProto(); - for (size_t i = 0; i < task_spec.NumReturns(); i++) { - reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT), - /*inner_ids=*/{}, GetCallerId(), rpc_address_, - CurrentCallSite(), -1, - /*is_reconstructable=*/false); + if (!task_spec.IsActorCreationTask()) { + for (size_t i = 0; i < task_spec.NumReturns(); i++) { + reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT), + /*inner_ids=*/{}, GetCallerId(), rpc_address_, + CurrentCallSite(), -1, + /*is_reconstructable=*/false); + } } auto old_id = GetActorId(); SetActorId(actor_id); @@ -1786,6 +1814,32 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques } } +void CoreWorker::HandleWaitForActorOutOfScope( + const rpc::WaitForActorOutOfScopeRequest &request, + rpc::WaitForActorOutOfScopeReply *reply, rpc::SendReplyCallback send_reply_callback) { + if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), + send_reply_callback)) { + return; + } + + // Send a response to trigger cleaning up the actor state once the handle is + // no longer in scope. + auto respond = [send_reply_callback](const ActorID &actor_id) { + RAY_LOG(DEBUG) << "Replying to HandleWaitForActorOutOfScope for " << actor_id; + send_reply_callback(Status::OK(), nullptr, nullptr); + }; + + const auto actor_id = ActorID::FromBinary(request.actor_id()); + RAY_LOG(DEBUG) << "Received HandleWaitForActorOutOfScope for " << actor_id; + absl::MutexLock lock(&actor_handles_mutex_); + auto it = actor_handles_.find(actor_id); + if (it == actor_handles_.end()) { + respond(actor_id); + } else { + RAY_CHECK(actor_out_of_scope_callbacks_.emplace(actor_id, std::move(respond)).second); + } +} + void CoreWorker::HandleWaitForObjectEviction( const rpc::WaitForObjectEvictionRequest &request, rpc::WaitForObjectEvictionReply *reply, rpc::SendReplyCallback send_reply_callback) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index afed1c355..129a6180f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -697,6 +697,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { rpc::GetObjectStatusReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Implements gRPC server handler. + void HandleWaitForActorOutOfScope(const rpc::WaitForActorOutOfScopeRequest &request, + rpc::WaitForActorOutOfScopeReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Implements gRPC server handler. void HandleWaitForObjectEviction(const rpc::WaitForObjectEvictionRequest &request, rpc::WaitForObjectEvictionReply *reply, @@ -995,6 +1000,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Manages recovery of objects stored in remote plasma nodes. std::unique_ptr object_recovery_manager_; + // TODO(swang): Refactor to merge actor_handles_mutex_ and all fields that it + // protects into the ActorManager. /// The `actor_handles_` field could be mutated concurrently due to multi-threading, we /// need a mutex to protect it. mutable absl::Mutex actor_handles_mutex_; @@ -1003,6 +1010,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { absl::flat_hash_map> actor_handles_ GUARDED_BY(actor_handles_mutex_); + /// Map from actor ID to a callback to call when all local handles to that + /// actor have gone out of scpoe. + absl::flat_hash_map> + actor_out_of_scope_callbacks_ GUARDED_BY(actor_handles_mutex_); + /// /// Fields related to task execution. /// diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 4e69e0b43..f10ed76bb 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -58,15 +58,17 @@ void TaskManager::AddPendingTask(const TaskID &caller_id, if (spec.IsActorTask()) { num_returns--; } - for (size_t i = 0; i < num_returns; i++) { - // We pass an empty vector for inner IDs because we do not know the return - // value of the task yet. If the task returns an ID(s), the worker will - // notify us via the WaitForRefRemoved RPC that we are now a borrower for - // the inner IDs. Note that this RPC can be received *before* the - // PushTaskReply. - reference_counter_->AddOwnedObject(spec.ReturnId(i, TaskTransportType::DIRECT), - /*inner_ids=*/{}, caller_id, caller_address, - call_site, -1, /*is_reconstructable=*/true); + if (!spec.IsActorCreationTask()) { + for (size_t i = 0; i < num_returns; i++) { + // We pass an empty vector for inner IDs because we do not know the return + // value of the task yet. If the task returns an ID(s), the worker will + // notify us via the WaitForRefRemoved RPC that we are now a borrower for + // the inner IDs. Note that this RPC can be received *before* the + // PushTaskReply. + reference_counter_->AddOwnedObject(spec.ReturnId(i, TaskTransportType::DIRECT), + /*inner_ids=*/{}, caller_id, caller_address, + call_site, -1, /*is_reconstructable=*/true); + } } { diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index eabee5b96..e633a9de6 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -560,18 +560,13 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources( Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( const std::shared_ptr &data_ptr, const StatusCallback &callback) { - ClientID node_id = ClientID::FromBinary(data_ptr->client_id()); - RAY_LOG(DEBUG) << "Reporting heartbeat, node id = " << node_id; rpc::ReportHeartbeatRequest request; request.mutable_heartbeat()->CopyFrom(*data_ptr); client_impl_->GetGcsRpcClient().ReportHeartbeat( - request, - [node_id, callback](const Status &status, const rpc::ReportHeartbeatReply &reply) { + request, [callback](const Status &status, const rpc::ReportHeartbeatReply &reply) { if (callback) { callback(status); } - RAY_LOG(DEBUG) << "Finished reporting heartbeat, status = " << status - << ", node id = " << node_id; }); return Status::OK(); } diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 008330045..0e2391404 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -42,6 +42,18 @@ WorkerID GcsActor::GetWorkerID() const { return WorkerID::FromBinary(address.worker_id()); } +WorkerID GcsActor::GetOwnerID() const { + return WorkerID::FromBinary(GetOwnerAddress().worker_id()); +} + +ClientID GcsActor::GetOwnerNodeID() const { + return ClientID::FromBinary(GetOwnerAddress().raylet_id()); +} + +const rpc::Address &GcsActor::GetOwnerAddress() const { + return actor_table_data_.owner_address(); +} + void GcsActor::UpdateState(rpc::ActorTableData::ActorState state) { actor_table_data_.set_state(state); } @@ -75,9 +87,11 @@ rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_ ///////////////////////////////////////////////////////////////////////////////////////// GcsActorManager::GcsActorManager(std::shared_ptr scheduler, - gcs::ActorInfoAccessor &actor_info_accessor) + gcs::ActorInfoAccessor &actor_info_accessor, + const rpc::ClientFactoryFn &worker_client_factory) : gcs_actor_scheduler_(std::move(scheduler)), - actor_info_accessor_(actor_info_accessor) {} + actor_info_accessor_(actor_info_accessor), + worker_client_factory_(worker_client_factory) {} Status GcsActorManager::RegisterActor( const ray::rpc::CreateActorRequest &request, @@ -121,6 +135,13 @@ Status GcsActorManager::RegisterActor( // created. actor_to_register_callbacks_[actor_id].emplace_back(std::move(callback)); RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second); + + if (!actor->IsDetached() && worker_client_factory_) { + // This actor is owned. Send a long polling request to the actor's + // owner to determine when the actor should be removed. + PollOwnerForActorOutOfScope(actor); + } + gcs_actor_scheduler_->Schedule(actor); return Status::OK(); } @@ -134,9 +155,140 @@ ActorID GcsActorManager::GetActorIDByName(const std::string &name) { return actor_id; } -void GcsActorManager::ReconstructActorOnWorker(const ray::ClientID &node_id, - const ray::WorkerID &worker_id, - bool need_reschedule) { +void GcsActorManager::PollOwnerForActorOutOfScope( + const std::shared_ptr &actor) { + const auto &actor_id = actor->GetActorID(); + const auto &owner_node_id = actor->GetOwnerNodeID(); + const auto &owner_id = actor->GetOwnerID(); + auto &workers = owners_[owner_node_id]; + auto it = workers.find(owner_id); + if (it == workers.end()) { + RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id; + std::shared_ptr client = + worker_client_factory_(actor->GetOwnerAddress()); + it = workers.emplace(owner_id, Owner(std::move(client))).first; + } + it->second.children_actor_ids.insert(actor_id); + + rpc::WaitForActorOutOfScopeRequest wait_request; + wait_request.set_intended_worker_id(owner_id.Binary()); + wait_request.set_actor_id(actor_id.Binary()); + RAY_CHECK_OK(it->second.client->WaitForActorOutOfScope( + wait_request, [this, owner_node_id, owner_id, actor_id]( + Status status, const rpc::WaitForActorOutOfScopeReply &reply) { + if (!status.ok()) { + RAY_LOG(INFO) << "Worker " << owner_id << " failed, destroying actor child"; + } + + auto node_it = owners_.find(owner_node_id); + if (node_it != owners_.end() && node_it->second.count(owner_id)) { + // Only destroy the actor if its owner is still alive. The actor may + // have already been destroyed if the owner died. + DestroyActor(actor_id); + } + })); +} + +void GcsActorManager::DestroyActor(const ActorID &actor_id) { + RAY_LOG(DEBUG) << "Destroying actor " << actor_id; + actor_to_register_callbacks_.erase(actor_id); + auto it = registered_actors_.find(actor_id); + RAY_CHECK(it != registered_actors_.end()) + << "Tried to destroy actor that does not exist " << actor_id; + const auto actor = std::move(it->second); + registered_actors_.erase(it); + + // Clean up the client to the actor's owner, if necessary. + if (!actor->IsDetached()) { + const auto &owner_node_id = actor->GetOwnerNodeID(); + const auto &owner_id = actor->GetOwnerID(); + RAY_LOG(DEBUG) << "Erasing actor " << actor_id << " owned by " << owner_id; + + auto &node = owners_[owner_node_id]; + auto worker_it = node.find(owner_id); + RAY_CHECK(worker_it != node.end()); + auto &owner = worker_it->second; + RAY_CHECK(owner.children_actor_ids.erase(actor_id)); + if (owner.children_actor_ids.empty()) { + node.erase(worker_it); + if (node.empty()) { + owners_.erase(owner_node_id); + } + } + } + + // The actor is already dead, most likely due to process or node failure. + if (actor->GetState() == rpc::ActorTableData::DEAD) { + return; + } + + // The actor is still alive or pending creation. Clean up all remaining + // state. + const auto &node_id = actor->GetNodeID(); + const auto &worker_id = actor->GetWorkerID(); + auto node_it = created_actors_.find(node_id); + if (node_it != created_actors_.end() && node_it->second.count(worker_id)) { + // The actor has already been created. Destroy the process by force-killing + // it. + auto actor_client = worker_client_factory_(actor->GetAddress()); + rpc::KillActorRequest request; + request.set_intended_actor_id(actor_id.Binary()); + request.set_force_kill(true); + request.set_no_reconstruction(true); + RAY_UNUSED(actor_client->KillActor(request, nullptr)); + + RAY_CHECK(node_it->second.erase(actor->GetWorkerID())); + if (node_it->second.empty()) { + created_actors_.erase(node_it); + } + } else { + // The actor has not been created yet. It is either being scheduled or is + // pending scheduling. + auto canceled_actor_id = + gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID()); + if (!canceled_actor_id.IsNil()) { + // The actor was being scheduled and has now been canceled. + RAY_CHECK(canceled_actor_id == actor_id); + } else { + // The actor was pending scheduling. Remove it from the queue. + auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(), + [actor_id](const std::shared_ptr &actor) { + return actor->GetActorID() == actor_id; + }); + RAY_CHECK(pending_it != pending_actors_.end()); + pending_actors_.erase(pending_it); + } + } + + // Update the actor to DEAD in case any callers are still alive. This can + // happen if the owner of the actor dies while there are still callers. + // TODO(swang): We can skip this step and delete the actor table entry + // entirely if the callers check directly whether the owner is still alive. + actor->UpdateAddress(rpc::Address()); + auto mutable_actor_table_data = actor->GetMutableActorTableData(); + mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD); + auto actor_table_data = + std::make_shared(*mutable_actor_table_data); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK( + actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr)); +} + +void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id, + const ray::WorkerID &worker_id, + bool intentional_exit) { + // Destroy all actors that are owned by this worker. + const auto it = owners_.find(node_id); + if (it != owners_.end() && it->second.count(worker_id)) { + auto owner = it->second.find(worker_id); + // Make a copy of the children actor IDs since we will delete from the + // list. + const auto children_ids = owner->second.children_actor_ids; + for (const auto &child_id : children_ids) { + DestroyActor(child_id); + } + } + ActorID actor_id; // Find from worker_to_created_actor_. auto iter = created_actors_.find(node_id); @@ -154,11 +306,26 @@ void GcsActorManager::ReconstructActorOnWorker(const ray::ClientID &node_id, RAY_LOG(INFO) << "Worker " << worker_id << " on node " << node_id << " failed, reconstructing actor " << actor_id; // Reconstruct the actor. - ReconstructActor(actor_id, need_reschedule); + ReconstructActor(actor_id, /*need_reschedule=*/!intentional_exit); } } -void GcsActorManager::ReconstructActorsOnNode(const ClientID &node_id) { +void GcsActorManager::OnNodeDead(const ClientID &node_id) { + RAY_LOG(INFO) << "Node " << node_id << " failed, reconstructing actors"; + const auto it = owners_.find(node_id); + if (it != owners_.end()) { + std::vector children_ids; + // Make a copy of all the actor IDs owned by workers on the dead node. + for (const auto &owner : it->second) { + for (const auto &child_id : owner.second.children_actor_ids) { + children_ids.push_back(child_id); + } + } + for (const auto &child_id : children_ids) { + DestroyActor(child_id); + } + } + // Cancel the scheduling of all related actors. auto scheduling_actor_ids = gcs_actor_scheduler_->CancelOnNode(node_id); for (auto &actor_id : scheduling_actor_ids) { @@ -211,6 +378,10 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche // The backend storage is reliable in the future, so the status must be ok. RAY_CHECK_OK( actor_info_accessor_.AsyncUpdate(actor->GetActorID(), actor_table_data, nullptr)); + // The actor is dead, but we should not remove the entry from the + // registered actors yet. If the actor is owned, we will destroy the actor + // once the owner fails or notifies us that the actor's handle has gone out + // of scope. } } @@ -231,7 +402,7 @@ void GcsActorManager::OnActorCreationSuccess(std::shared_ptr actor) { // Invoke all callbacks for all registration requests of this actor (duplicated // requests are included) and remove all of them from actor_to_register_callbacks_. - auto iter = actor_to_register_callbacks_.find(actor->GetActorID()); + auto iter = actor_to_register_callbacks_.find(actor_id); if (iter != actor_to_register_callbacks_.end()) { for (auto &callback : iter->second) { callback(actor); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index b9a8ad897..feae6ccf2 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include "absl/container/flat_hash_map.h" @@ -71,6 +72,12 @@ class GcsActor { ClientID GetNodeID() const; /// Get the id of the worker on which this actor is created. WorkerID GetWorkerID() const; + /// Get the actor's owner ID. + WorkerID GetOwnerID() const; + /// Get the node ID of the actor's owner. + ClientID GetOwnerNodeID() const; + /// Get the address of the actor's owner. + const rpc::Address &GetOwnerAddress() const; /// Update the `Address` of this actor (see gcs.proto). void UpdateAddress(const rpc::Address &address); @@ -112,7 +119,8 @@ class GcsActorManager { /// \param scheduler Used to schedule actor creation tasks. /// \param actor_info_accessor Used to flush actor data to storage. GcsActorManager(std::shared_ptr scheduler, - gcs::ActorInfoAccessor &actor_info_accessor); + gcs::ActorInfoAccessor &actor_info_accessor, + const rpc::ClientFactoryFn &worker_client_factory = nullptr); ~GcsActorManager() = default; @@ -137,23 +145,25 @@ class GcsActorManager { /// change. void SchedulePendingActors(); - /// Reconstruct all actors associated with the specified node id, including actors which - /// are scheduled or have been created on this node. Triggered when the given node goes - /// down. + /// Handle a node death. This will restart all actors associated with the + /// specified node id, including actors which are scheduled or have been + /// created on this node. Actors whose owners have died (possibly due to this + /// node being removed) will not be restarted. If any workers on this node + /// owned an actor, those actors will be destroyed. /// /// \param node_id The specified node id. - void ReconstructActorsOnNode(const ClientID &node_id); + void OnNodeDead(const ClientID &node_id); - /// Reconstruct actor associated with the specified node_id and worker_id. - /// The actor may be pending or already created. + /// Handle a worker failure. This will restart the associated actor, if any, + /// which may be pending or already created. If the worker owned other + /// actors, those actors will be destroyed. /// - /// \param node_id ID of the node where the worker is located - /// \param worker_id ID of the worker that the actor is creating/created on - /// \param need_reschedule Whether to reschedule the actor creation task, sometimes - /// users want to kill an actor intentionally and don't want it to be rescheduled - /// again. - void ReconstructActorOnWorker(const ClientID &node_id, const WorkerID &worker_id, - bool need_reschedule = true); + /// \param node_id ID of the node where the dead worker was located. + /// \param worker_id ID of the dead worker. + /// \param intentional_exit Whether the death was intentional. If yes and the + /// worker was an actor, we should not attempt to restart the actor. + void OnWorkerDead(const ClientID &node_id, const WorkerID &worker_id, + bool intentional_exit = false); /// Handle actor creation task failure. This should be called when scheduling /// an actor creation task is infeasible. @@ -167,6 +177,30 @@ class GcsActorManager { /// \param actor The actor that has been created. void OnActorCreationSuccess(std::shared_ptr actor); + private: + /// A data structure representing an actor's owner. + struct Owner { + Owner(std::shared_ptr client) + : client(std::move(client)) {} + /// A client that can be used to contact the owner. + std::shared_ptr client; + /// The IDs of actors owned by this worker. + absl::flat_hash_set children_actor_ids; + }; + + /// Poll an actor's owner so that we will receive a notification when the + /// actor has gone out of scope, or the owner has died. This should not be + /// called for detached actors. + void PollOwnerForActorOutOfScope(const std::shared_ptr &actor); + + /// Destroy an actor that has gone out of scope. This cleans up all local + /// state associated with the actor and marks the actor as dead. For owned + /// actors, this should be called when all actor handles have gone out of + /// scope or the owner has died. + /// TODO: For detached actors, this should be called when the application + /// deregisters the actor. + void DestroyActor(const ActorID &actor_id); + private: /// Reconstruct the specified actor. /// @@ -191,11 +225,18 @@ class GcsActorManager { /// Map contains the relationship of node and created actors. Each node ID /// maps to a map from worker ID to the actor created on that worker. absl::flat_hash_map> created_actors_; + /// Map from worker ID to a client and the IDs of the actors owned by that + /// worker. An owned actor should be destroyed once it has gone out of scope, + /// according to its owner, or the owner dies. + absl::flat_hash_map> owners_; /// The scheduler to schedule all registered actors. std::shared_ptr gcs_actor_scheduler_; /// Actor table. Used to update actor information upon creation, deletion, etc. gcs::ActorInfoAccessor &actor_info_accessor_; + /// Factory to produce clients to workers. This is used to communicate with + /// actors and their owners. + rpc::ClientFactoryFn worker_client_factory_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index fffbe54a6..e6cffb37a 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -186,7 +186,6 @@ void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &re rpc::ReportHeartbeatReply *reply, rpc::SendReplyCallback send_reply_callback) { ClientID node_id = ClientID::FromBinary(request.heartbeat().client_id()); - RAY_LOG(DEBUG) << "Reporting heartbeat, node id = " << node_id; auto heartbeat_data = std::make_shared(); heartbeat_data->CopyFrom(request.heartbeat()); node_failure_detector_->HandleHeartbeat(node_id, *heartbeat_data); @@ -195,7 +194,6 @@ void GcsNodeManager::HandleReportHeartbeat(const rpc::ReportHeartbeatRequest &re // The heartbeat data is reported here because some python unit tests rely on the // heartbeat data in redis. RAY_CHECK_OK(node_info_accessor_.AsyncReportHeartbeat(heartbeat_data, nullptr)); - RAY_LOG(DEBUG) << "Finished reporting heartbeat, node id = " << node_id; } void GcsNodeManager::HandleGetResources(const rpc::GetResourcesRequest &request, diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 6f4f7e3b3..66ddfcb5c 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -160,8 +160,10 @@ void GcsServer::InitGcsActorManager() { [this](const rpc::Address &address) { return std::make_shared(address, client_call_manager_); }); - gcs_actor_manager_ = - std::make_shared(scheduler, redis_gcs_client_->Actors()); + gcs_actor_manager_ = std::make_shared( + scheduler, redis_gcs_client_->Actors(), [this](const rpc::Address &address) { + return std::make_shared(address, client_call_manager_); + }); gcs_node_manager_->AddNodeAddedListener( [this](const std::shared_ptr &) { // Because a new node has been added, we need to try to schedule the pending @@ -169,19 +171,19 @@ void GcsServer::InitGcsActorManager() { gcs_actor_manager_->SchedulePendingActors(); }); - gcs_node_manager_->AddNodeRemovedListener([this]( - std::shared_ptr node) { - // All of the related actors should be reconstructed when a node is removed from the - // GCS. - gcs_actor_manager_->ReconstructActorsOnNode(ClientID::FromBinary(node->node_id())); - }); + gcs_node_manager_->AddNodeRemovedListener( + [this](std::shared_ptr node) { + // All of the related actors should be reconstructed when a node is removed from + // the GCS. + gcs_actor_manager_->OnNodeDead(ClientID::FromBinary(node->node_id())); + }); RAY_CHECK_OK(redis_gcs_client_->Workers().AsyncSubscribeToWorkerFailures( [this](const WorkerID &id, const rpc::WorkerFailureData &worker_failure_data) { auto &worker_address = worker_failure_data.worker_address(); WorkerID worker_id = WorkerID::FromBinary(worker_address.worker_id()); ClientID node_id = ClientID::FromBinary(worker_address.raylet_id()); - auto needs_restart = !worker_failure_data.intentional_disconnect(); - gcs_actor_manager_->ReconstructActorOnWorker(node_id, worker_id, needs_restart); + gcs_actor_manager_->OnWorkerDead(node_id, worker_id, + worker_failure_data.intentional_disconnect()); }, /*done_callback=*/nullptr)); } diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 6c0de63ce..ed955217a 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -35,14 +35,48 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface { std::vector> actors; }; +class MockWorkerClient : public rpc::CoreWorkerClientInterface { + public: + ray::Status WaitForActorOutOfScope( + const rpc::WaitForActorOutOfScopeRequest &request, + const rpc::ClientCallback &callback) override { + callbacks.push_back(callback); + return Status::OK(); + } + + ray::Status KillActor( + const rpc::KillActorRequest &request, + const rpc::ClientCallback &callback) override { + killed_actors.push_back(ActorID::FromBinary(request.intended_actor_id())); + return Status::OK(); + } + + bool Reply(Status status = Status::OK()) { + if (callbacks.size() == 0) { + return false; + } + auto callback = callbacks.front(); + auto reply = rpc::WaitForActorOutOfScopeReply(); + callback(status, reply); + callbacks.pop_front(); + return true; + } + + std::list> callbacks; + std::vector killed_actors; +}; + class GcsActorManagerTest : public ::testing::Test { public: GcsActorManagerTest() : mock_actor_scheduler_(new MockActorScheduler()), - gcs_actor_manager_(mock_actor_scheduler_, actor_info_accessor_) {} + worker_client_(new MockWorkerClient()), + gcs_actor_manager_(mock_actor_scheduler_, actor_info_accessor_, + [&](const rpc::Address &addr) { return worker_client_; }) {} GcsServerMocker::MockedActorInfoAccessor actor_info_accessor_; std::shared_ptr mock_actor_scheduler_; + std::shared_ptr worker_client_; gcs::GcsActorManager gcs_actor_manager_; }; @@ -61,6 +95,34 @@ TEST_F(GcsActorManagerTest, TestBasic) { auto actor = mock_actor_scheduler_->actors.back(); mock_actor_scheduler_->actors.pop_back(); + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + gcs_actor_manager_.OnActorCreationSuccess(actor); + ASSERT_EQ(finished_actors.size(), 1); + + ASSERT_TRUE(worker_client_->Reply()); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); +} + +TEST_F(GcsActorManagerTest, TestSchedulingFailed) { + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + std::vector> finished_actors; + RAY_CHECK_OK(gcs_actor_manager_.RegisterActor( + create_actor_request, [&finished_actors](std::shared_ptr actor) { + finished_actors.emplace_back(actor); + })); + + ASSERT_EQ(finished_actors.size(), 0); + ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.clear(); + gcs_actor_manager_.OnActorCreationFailed(actor); gcs_actor_manager_.SchedulePendingActors(); ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); @@ -76,21 +138,48 @@ TEST_F(GcsActorManagerTest, TestBasic) { actor->UpdateAddress(address); gcs_actor_manager_.OnActorCreationSuccess(actor); ASSERT_EQ(finished_actors.size(), 1); +} + +TEST_F(GcsActorManagerTest, TestWorkerFailure) { + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + std::vector> finished_actors; + RAY_CHECK_OK(gcs_actor_manager_.RegisterActor( + create_actor_request, [&finished_actors](std::shared_ptr actor) { + finished_actors.emplace_back(actor); + })); + + ASSERT_EQ(finished_actors.size(), 0); + ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + gcs_actor_manager_.OnActorCreationSuccess(actor); + ASSERT_EQ(finished_actors.size(), 1); // Killing another worker does not affect this actor. EXPECT_CALL(*mock_actor_scheduler_, CancelOnWorker(node_id, _)); - gcs_actor_manager_.ReconstructActorOnWorker(node_id, WorkerID::FromRandom()); + gcs_actor_manager_.OnWorkerDead(node_id, WorkerID::FromRandom()); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE); // Remove worker and then check that the actor is dead. - gcs_actor_manager_.ReconstructActorOnWorker(node_id, worker_id); + gcs_actor_manager_.OnWorkerDead(node_id, worker_id); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); // No more actors to schedule. gcs_actor_manager_.SchedulePendingActors(); ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0); + + ASSERT_TRUE(worker_client_->Reply()); } -TEST_F(GcsActorManagerTest, TestBasicNodeFailure) { +TEST_F(GcsActorManagerTest, TestNodeFailure) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); std::vector> finished_actors; @@ -105,12 +194,6 @@ TEST_F(GcsActorManagerTest, TestBasicNodeFailure) { auto actor = mock_actor_scheduler_->actors.back(); mock_actor_scheduler_->actors.pop_back(); - gcs_actor_manager_.OnActorCreationFailed(actor); - gcs_actor_manager_.SchedulePendingActors(); - ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); - mock_actor_scheduler_->actors.clear(); - ASSERT_EQ(finished_actors.size(), 0); - // Check that the actor is in state `ALIVE`. rpc::Address address; auto node_id = ClientID::FromRandom(); @@ -123,16 +206,18 @@ TEST_F(GcsActorManagerTest, TestBasicNodeFailure) { // Killing another node does not affect this actor. EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(_)); - gcs_actor_manager_.ReconstructActorsOnNode(ClientID::FromRandom()); + gcs_actor_manager_.OnNodeDead(ClientID::FromRandom()); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE); // Remove node and then check that the actor is dead. EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id)); - gcs_actor_manager_.ReconstructActorsOnNode(node_id); + gcs_actor_manager_.OnNodeDead(node_id); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); // No more actors to schedule. gcs_actor_manager_.SchedulePendingActors(); ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0); + + ASSERT_TRUE(worker_client_->Reply()); } TEST_F(GcsActorManagerTest, TestActorReconstruction) { @@ -161,9 +246,9 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) { gcs_actor_manager_.OnActorCreationSuccess(actor); ASSERT_EQ(finished_actors.size(), 1); - // Remove node and then check that the actor is being restarted. + // Remove worker and then check that the actor is being restarted. EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id)); - gcs_actor_manager_.ReconstructActorsOnNode(node_id); + gcs_actor_manager_.OnNodeDead(node_id); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::RECONSTRUCTING); // Add node and check that the actor is restarted. @@ -179,18 +264,97 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) { ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE); ASSERT_EQ(actor->GetNodeID(), node_id2); - // Killing another node does not affect this actor. + // Killing another worker does not affect this actor. EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(_)); - gcs_actor_manager_.ReconstructActorsOnNode(ClientID::FromRandom()); + gcs_actor_manager_.OnNodeDead(ClientID::FromRandom()); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE); - // Remove node and then check that the actor is dead. + // Remove worker and then check that the actor is dead. EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id2)); - gcs_actor_manager_.ReconstructActorsOnNode(node_id2); + gcs_actor_manager_.OnNodeDead(node_id2); ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); // No more actors to schedule. gcs_actor_manager_.SchedulePendingActors(); ASSERT_EQ(mock_actor_scheduler_->actors.size(), 0); + + ASSERT_TRUE(worker_client_->Reply()); +} + +TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) { + auto job_id = JobID::FromInt(1); + auto create_actor_request = Mocker::GenCreateActorRequest( + job_id, /*max_reconstructions=*/1, /*detached=*/false); + std::vector> finished_actors; + RAY_CHECK_OK(gcs_actor_manager_.RegisterActor( + create_actor_request, [&finished_actors](std::shared_ptr actor) { + finished_actors.emplace_back(actor); + })); + + ASSERT_EQ(finished_actors.size(), 0); + ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + const auto owner_node_id = actor->GetOwnerNodeID(); + + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + gcs_actor_manager_.OnActorCreationSuccess(actor); + ASSERT_EQ(finished_actors.size(), 1); + + // Remove the owner's node. + EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(owner_node_id)); + gcs_actor_manager_.OnNodeDead(owner_node_id); + // The child actor should be marked as dead. + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); + ASSERT_EQ(worker_client_->killed_actors.size(), 1); + ASSERT_EQ(worker_client_->killed_actors.front(), actor->GetActorID()); + + // Remove the actor's node and check that the actor is not restarted, since + // its owner has died. + EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id)); + gcs_actor_manager_.OnNodeDead(node_id); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); + gcs_actor_manager_.SchedulePendingActors(); + ASSERT_TRUE(mock_actor_scheduler_->actors.empty()); +} + +TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) { + auto job_id = JobID::FromInt(1); + auto create_actor_request = + Mocker::GenCreateActorRequest(job_id, /*max_reconstructions=*/1, /*detached=*/true); + std::vector> finished_actors; + RAY_CHECK_OK(gcs_actor_manager_.RegisterActor( + create_actor_request, [&finished_actors](std::shared_ptr actor) { + finished_actors.emplace_back(actor); + })); + + ASSERT_EQ(finished_actors.size(), 0); + ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + const auto owner_node_id = actor->GetOwnerNodeID(); + + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + gcs_actor_manager_.OnActorCreationSuccess(actor); + ASSERT_EQ(finished_actors.size(), 1); + + // Remove the owner's node. + EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(owner_node_id)); + gcs_actor_manager_.OnNodeDead(owner_node_id); + // The child actor should not be marked as dead. + ASSERT_TRUE(worker_client_->killed_actors.empty()); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE); } TEST_F(GcsActorManagerTest, TestNamedActors) { diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index f333e5143..05376a1df 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -74,6 +74,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleFailedWithZeroNode) { ASSERT_EQ(0, success_actors_.size()); ASSERT_EQ(1, failure_actors_.size()); ASSERT_EQ(actor, failure_actors_.front()); + ASSERT_TRUE(actor->GetNodeID().IsNil()); } TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) { @@ -94,9 +95,10 @@ TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) { ASSERT_EQ(0, worker_client_->callbacks.size()); // Grant a worker, then the actor creation request should be send to the worker. - ASSERT_TRUE(raylet_client_->GrantWorkerLease( - node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), - node_id, ClientID::Nil())); + WorkerID worker_id = WorkerID::FromRandom(); + ASSERT_TRUE(raylet_client_->GrantWorkerLease(node->node_manager_address(), + node->node_manager_port(), worker_id, + node_id, ClientID::Nil())); ASSERT_EQ(0, raylet_client_->callbacks.size()); ASSERT_EQ(1, worker_client_->callbacks.size()); @@ -106,6 +108,8 @@ TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) { ASSERT_EQ(0, failure_actors_.size()); ASSERT_EQ(1, success_actors_.size()); ASSERT_EQ(actor, success_actors_.front()); + ASSERT_EQ(actor->GetNodeID(), node_id); + ASSERT_EQ(actor->GetWorkerID(), worker_id); } TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) { @@ -136,9 +140,10 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) { ASSERT_EQ(0, worker_client_->callbacks.size()); // Grant a worker, then the actor creation request should be send to the worker. - ASSERT_TRUE(raylet_client_->GrantWorkerLease( - node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), - node_id, ClientID::Nil())); + WorkerID worker_id = WorkerID::FromRandom(); + ASSERT_TRUE(raylet_client_->GrantWorkerLease(node->node_manager_address(), + node->node_manager_port(), worker_id, + node_id, ClientID::Nil())); ASSERT_EQ(0, raylet_client_->callbacks.size()); ASSERT_EQ(1, worker_client_->callbacks.size()); @@ -148,6 +153,8 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) { ASSERT_EQ(0, failure_actors_.size()); ASSERT_EQ(1, success_actors_.size()); ASSERT_EQ(actor, success_actors_.front()); + ASSERT_EQ(actor->GetNodeID(), node_id); + ASSERT_EQ(actor->GetWorkerID(), worker_id); } TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) { @@ -168,9 +175,10 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) { ASSERT_EQ(0, worker_client_->callbacks.size()); // Grant a worker, then the actor creation request should be send to the worker. - ASSERT_TRUE(raylet_client_->GrantWorkerLease( - node->node_manager_address(), node->node_manager_port(), WorkerID::FromRandom(), - node_id, ClientID::Nil())); + WorkerID worker_id = WorkerID::FromRandom(); + ASSERT_TRUE(raylet_client_->GrantWorkerLease(node->node_manager_address(), + node->node_manager_port(), worker_id, + node_id, ClientID::Nil())); ASSERT_EQ(0, raylet_client_->callbacks.size()); ASSERT_EQ(1, worker_client_->callbacks.size()); ASSERT_EQ(0, gcs_actor_scheduler_->num_retry_creating_count_); @@ -186,6 +194,8 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) { ASSERT_EQ(0, failure_actors_.size()); ASSERT_EQ(1, success_actors_.size()); ASSERT_EQ(actor, success_actors_.front()); + ASSERT_EQ(actor->GetNodeID(), node_id); + ASSERT_EQ(actor->GetWorkerID(), worker_id); } TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) { @@ -213,6 +223,7 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) { ASSERT_EQ(actor->GetActorID(), actor_ids.front()); ASSERT_EQ(1, raylet_client_->num_workers_requested); ASSERT_EQ(1, raylet_client_->callbacks.size()); + ASSERT_TRUE(actor->GetNodeID().IsNil()); // Grant a worker, which will influence nothing. ASSERT_TRUE(raylet_client_->GrantWorkerLease( @@ -258,6 +269,7 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenCreating) { ASSERT_EQ(1, actor_ids.size()); ASSERT_EQ(actor->GetActorID(), actor_ids.front()); ASSERT_EQ(1, worker_client_->callbacks.size()); + ASSERT_TRUE(actor->GetNodeID().IsNil()); // Reply the actor creation request, which will influence nothing. ASSERT_TRUE(worker_client_->ReplyPushTask()); @@ -297,6 +309,8 @@ TEST_F(GcsActorSchedulerTest, TestWorkerFailedWhenCreating) { ASSERT_EQ(actor->GetActorID(), gcs_actor_scheduler_->CancelOnWorker(node_id, worker_id)); ASSERT_EQ(1, worker_client_->callbacks.size()); + ASSERT_TRUE(actor->GetNodeID().IsNil()); + ASSERT_TRUE(actor->GetWorkerID().IsNil()); // Reply the actor creation request, which will influence nothing. ASSERT_TRUE(worker_client_->ReplyPushTask()); @@ -340,9 +354,10 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) { ASSERT_EQ(0, worker_client_->callbacks.size()); // Grant a worker, then the actor creation request should be send to the worker. - ASSERT_TRUE(raylet_client_->GrantWorkerLease( - node2->node_manager_address(), node2->node_manager_port(), WorkerID::FromRandom(), - node_id_2, ClientID::Nil())); + WorkerID worker_id = WorkerID::FromRandom(); + ASSERT_TRUE(raylet_client_->GrantWorkerLease(node2->node_manager_address(), + node2->node_manager_port(), worker_id, + node_id_2, ClientID::Nil())); ASSERT_EQ(0, raylet_client_->callbacks.size()); ASSERT_EQ(1, worker_client_->callbacks.size()); @@ -350,11 +365,11 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) { ASSERT_TRUE(worker_client_->ReplyPushTask()); ASSERT_EQ(0, worker_client_->callbacks.size()); - ASSERT_EQ(node_id_2, actor->GetNodeID()); - ASSERT_EQ(0, failure_actors_.size()); ASSERT_EQ(1, success_actors_.size()); ASSERT_EQ(actor, success_actors_.front()); + ASSERT_EQ(actor->GetNodeID(), node_id_2); + ASSERT_EQ(actor->GetWorkerID(), worker_id); } } // namespace ray @@ -362,4 +377,4 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/src/ray/gcs/gcs_server/worker_info_handler_impl.cc b/src/ray/gcs/gcs_server/worker_info_handler_impl.cc index 7a4f13bfd..87495bc4a 100644 --- a/src/ray/gcs/gcs_server/worker_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/worker_info_handler_impl.cc @@ -34,8 +34,6 @@ void DefaultWorkerInfoHandler::HandleReportWorkerFailure( RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_FAILURE_CHANNEL, worker_id.Binary(), worker_failure_data->SerializeAsString(), nullptr)); - RAY_LOG(DEBUG) << "Finished reporting worker failure, " - << worker_address.DebugString(); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 5ea21de3d..bf87fb5c6 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -163,6 +163,16 @@ message GetObjectStatusReply { ObjectStatus status = 1; } +message WaitForActorOutOfScopeRequest { + // The ID of the worker this message is intended for. + bytes intended_worker_id = 1; + // ActorID of the actor in scope. + bytes actor_id = 2; +} + +message WaitForActorOutOfScopeReply { +} + message WaitForObjectEvictionRequest { // The ID of the worker this message is intended for. bytes intended_worker_id = 1; @@ -298,6 +308,11 @@ service CoreWorkerService { returns (DirectActorCallArgWaitCompleteReply); // Ask the object's owner about the object's current status. rpc GetObjectStatus(GetObjectStatusRequest) returns (GetObjectStatusReply); + // Wait for the actor's owner to decide that the actor has gone out of scope. + // Replying to this message indicates that the client should force-kill the + // actor process, if still alive. + rpc WaitForActorOutOfScope(WaitForActorOutOfScopeRequest) + returns (WaitForActorOutOfScopeReply); // Notify the object's owner that it has been pinned by a raylet. Replying // to this message indicates that the raylet should unpin the object. rpc WaitForObjectEviction(WaitForObjectEvictionRequest) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9483a7895..898df1a4e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2524,6 +2524,8 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & failed_nodes_cache_.count(owner_node_id) > 0)) { // TODO(swang): Skip assigning this task to this worker instead of // killing the worker? + RAY_LOG(INFO) << "Owner of assigned task " << task.GetTaskSpecification().TaskId() + << " died, killing leased worker " << worker->WorkerId(); KillWorker(worker); } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index d175d0553..81c649be3 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -144,6 +144,13 @@ class CoreWorkerClientInterface { return Status::NotImplemented(""); } + /// Ask the actor's owner to reply when the actor has gone out of scope. + virtual ray::Status WaitForActorOutOfScope( + const WaitForActorOutOfScopeRequest &request, + const ClientCallback &callback) { + return Status::NotImplemented(""); + } + /// Notify the owner of an object that the object has been pinned. virtual ray::Status WaitForObjectEviction( const WaitForObjectEvictionRequest &request, @@ -225,6 +232,8 @@ class CoreWorkerClient : public std::enable_shared_from_this, RPC_CLIENT_METHOD(CoreWorkerService, RemoteCancelTask, grpc_client_, override) + RPC_CLIENT_METHOD(CoreWorkerService, WaitForActorOutOfScope, grpc_client_, override) + RPC_CLIENT_METHOD(CoreWorkerService, WaitForObjectEviction, grpc_client_, override) RPC_CLIENT_METHOD(CoreWorkerService, GetCoreWorkerStats, grpc_client_, override) diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index a73dfba16..f3e902a45 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -32,6 +32,7 @@ namespace rpc { RPC_SERVICE_HANDLER(CoreWorkerService, PushTask) \ RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete) \ RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus) \ + RPC_SERVICE_HANDLER(CoreWorkerService, WaitForActorOutOfScope) \ RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction) \ RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved) \ RPC_SERVICE_HANDLER(CoreWorkerService, KillActor) \ @@ -46,6 +47,7 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PushTask) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(DirectActorCallArgWaitComplete) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetObjectStatus) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForActorOutOfScope) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForObjectEviction) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(WaitForRefRemoved) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(KillActor) \