diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 222bea61e..c995f6aea 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -11,7 +11,7 @@ import pytest import ray import ray.cluster_utils -from ray.test_utils import SignalActor, put_object +from ray.test_utils import SignalActor, put_object, wait_for_condition logger = logging.getLogger(__name__) @@ -527,6 +527,37 @@ def test_basic_nested_ids(one_worker_100MiB): _fill_object_store_and_get(inner_oid_bytes, succeed=False) +def _all_actors_dead(): + return all(actor["State"] == ray.gcs_utils.ActorTableData.DEAD + for actor in list(ray.actors().values())) + + +def test_kill_actor_immediately_after_creation(ray_start_regular): + @ray.remote + class A: + pass + + a = A.remote() + b = A.remote() + + ray.kill(a) + ray.kill(b) + wait_for_condition(_all_actors_dead, timeout=10) + + +def test_remove_actor_immediately_after_creation(ray_start_regular): + @ray.remote + class A: + pass + + a = A.remote() + b = A.remote() + + del a + del b + wait_for_condition(_all_actors_dead, timeout=10) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 89ce290ba..4cf07147a 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -93,15 +93,16 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( actor_id, actor_notification_callback, nullptr)); - RAY_CHECK(reference_counter_->SetDeleteCallback( - actor_creation_return_id, - [this, actor_id, is_owner_handle](const ObjectID &object_id) { - if (is_owner_handle) { - // If we own the actor and the actor handle is no longer in scope, - // terminate the actor. We do not do this if the GCS service is - // enabled since then the GCS will terminate the actor for us. - // TODO(sang): Remove this block once gcs_actor_service is enabled by default. - if (!RayConfig::instance().gcs_actor_service_enabled()) { + if (!RayConfig::instance().gcs_actor_service_enabled()) { + RAY_CHECK(reference_counter_->SetDeleteCallback( + actor_creation_return_id, + [this, actor_id, is_owner_handle](const ObjectID &object_id) { + if (is_owner_handle) { + // If we own the actor and the actor handle is no longer in scope, + // terminate the actor. We do not do this if the GCS service is + // enabled since then the GCS will terminate the actor for us. + // TODO(sang): Remove this block once gcs_actor_service is enabled by + // default. RAY_LOG(INFO) << "Owner's handle and creation ID " << object_id << " has gone out of scope, sending message to actor " << actor_id << " to do a clean exit."; @@ -109,38 +110,44 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, direct_actor_submitter_->KillActor(actor_id, /*force_kill=*/false, /*no_restart=*/false); - } - } - absl::MutexLock lock(&mutex_); - // TODO(swang): Erase the actor handle once all refs to the actor - // have gone out of scope. We cannot erase it here in case the - // language frontend receives another ref to the same actor. In this - // case, we must remember the last task counter that we sent to the - // actor. - // TODO(ekl) we can't unsubscribe to actor notifications here due to - // https://github.com/ray-project/ray/pull/6885 - auto callback = actor_out_of_scope_callbacks_.extract(actor_id); - if (callback) { - callback.mapped()(actor_id); - } - })); + // TODO(swang): Erase the actor handle once all refs to the actor + // have gone out of scope. We cannot erase it here in case the + // language frontend receives another ref to the same actor. In this + // case, we must remember the last task counter that we sent to the + // actor. + // TODO(ekl) we can't unsubscribe to actor notifications here due to + // https://github.com/ray-project/ray/pull/6885 + } + })); + } } return inserted; } -void ActorManager::AddActorOutOfScopeCallback( +void ActorManager::WaitForActorOutOfScope( const ActorID &actor_id, - std::function actor_out_of_scope_callbacks) { + std::function actor_out_of_scope_callback) { absl::MutexLock lock(&mutex_); auto it = actor_handles_.find(actor_id); if (it == actor_handles_.end()) { - actor_out_of_scope_callbacks(actor_id); + actor_out_of_scope_callback(actor_id); } else { - RAY_CHECK(actor_out_of_scope_callbacks_ - .emplace(actor_id, std::move(actor_out_of_scope_callbacks)) - .second); + // GCS actor manager will wait until the actor has been created before polling the + // owner. This should avoid any asynchronous problems. + auto callback = [actor_id, actor_out_of_scope_callback](const ObjectID &object_id) { + actor_out_of_scope_callback(actor_id); + }; + + // Returns true if the object was present and the callback was added. It might have + // already been evicted by the time we get this request, in which case we should + // respond immediately so the gcs server can destroy the actor. + const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); + if (!reference_counter_->SetDeleteCallback(actor_creation_return_id, callback)) { + RAY_LOG(DEBUG) << "ActorID reference already gone for " << actor_id; + actor_out_of_scope_callback(actor_id); + } } } diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 0a6426d33..a63f774ca 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -89,14 +89,14 @@ class ActorManager { const TaskID &caller_id, const std::string &call_site, const rpc::Address &caller_address, bool is_detached); - /// Add a callback that is called when an actor goes out of scope. + /// Wait for actor out of scope. /// /// \param actor_id The actor id that owns the callback. - /// \param actor_out_of_scope_callbacks The callback function that will be called when + /// \param actor_out_of_scope_callback The callback function that will be called when /// an actor_id goes out of scope. - void AddActorOutOfScopeCallback( + void WaitForActorOutOfScope( const ActorID &actor_id, - std::function actor_out_of_scope_callbacks); + std::function actor_out_of_scope_callback); /// Get a list of actor_ids from existing actor handles. /// This is used for debugging purpose. @@ -148,11 +148,6 @@ class ActorManager { /// Actor handle is a logical abstraction that holds actor handle's states. absl::flat_hash_map> actor_handles_ GUARDED_BY(mutex_); - - /// Map from actor ID to a callback. Callback is called when - /// the corresponding handles are gone out of scope. - absl::flat_hash_map> - actor_out_of_scope_callbacks_ GUARDED_BY(mutex_); }; } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index c761309f1..d640b228d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1697,6 +1697,8 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques void CoreWorker::HandleWaitForActorOutOfScope( const rpc::WaitForActorOutOfScopeRequest &request, rpc::WaitForActorOutOfScopeReply *reply, rpc::SendReplyCallback send_reply_callback) { + // Currently WaitForActorOutOfScope is only used when GCS actor service is enabled. + RAY_CHECK(RayConfig::instance().gcs_actor_service_enabled()); if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { return; @@ -1711,7 +1713,7 @@ void CoreWorker::HandleWaitForActorOutOfScope( const auto actor_id = ActorID::FromBinary(request.actor_id()); RAY_LOG(DEBUG) << "Received HandleWaitForActorOutOfScope for " << actor_id; - actor_manager_->AddActorOutOfScopeCallback(actor_id, std::move(respond)); + actor_manager_->WaitForActorOutOfScope(actor_id, std::move(respond)); } void CoreWorker::HandleWaitForObjectEviction(