diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index d8b2acbed..32740eb34 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -668,7 +668,9 @@ def test_detached_actor(ray_start_regular): ValueError, match="Actor name cannot be an empty string"): DetachedActor._remote(name="") - DetachedActor._remote(name="d_actor") + d = DetachedActor._remote(name="d_actor") + assert ray.get(d.ping.remote()) == "pong" + with pytest.raises(ValueError, match="Please use a different name"): DetachedActor._remote(name="d_actor") @@ -697,6 +699,141 @@ ray.get(actor.ping.remote()) assert ray.get(detached_actor.ping.remote()) == "pong" +def test_detached_actor_cleanup(ray_start_regular): + @ray.remote + class DetachedActor: + def ping(self): + return "pong" + + dup_actor_name = "actor" + + def create_and_kill_actor(actor_name): + # Make sure same name is creatable after killing it. + detached_actor = DetachedActor.options(name=actor_name).remote() + # Wait for detached actor creation. + assert ray.get(detached_actor.ping.remote()) == "pong" + ray.kill(detached_actor) + # Wait until actor dies. + actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) + max_wait_time = 10 + wait_time = 0 + while actor_status["State"] != 3: + actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) + time.sleep(1.0) + wait_time += 1 + if wait_time >= max_wait_time: + assert None, ( + "It took too much time to kill an actor: {}".format( + detached_actor._actor_id)) + + create_and_kill_actor(dup_actor_name) + + # This shouldn't be broken because actor + # name should have been cleaned up from GCS. + create_and_kill_actor(dup_actor_name) + + redis_address = ray_start_regular["redis_address"] + driver_script = """ +import ray +import time +ray.init(address="{}") + +@ray.remote +class DetachedActor: + def ping(self): + return "pong" + +# Make sure same name is creatable after killing it. +detached_actor = DetachedActor.options(name="{}").remote() +assert ray.get(detached_actor.ping.remote()) == "pong" +ray.kill(detached_actor) +# Wait until actor dies. +actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) +max_wait_time = 10 +wait_time = 0 +while actor_status["State"] != 3: + actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) + time.sleep(1.0) + wait_time += 1 + if wait_time >= max_wait_time: + assert None, ( + "It took too much time to kill an actor") +""".format(redis_address, dup_actor_name) + + run_string_as_driver(driver_script) + # Make sure we can create a detached actor created/killed + # at other scripts. + create_and_kill_actor(dup_actor_name) + + +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 3, + "num_nodes": 1, + "resources": { + "first_node": 5 + } + }], + indirect=True) +def test_detached_actor_cleanup_due_to_failure(ray_start_cluster): + cluster = ray_start_cluster + node = cluster.add_node(resources={"second_node": 1}) + cluster.wait_for_nodes() + + @ray.remote + class DetachedActor: + def ping(self): + return "pong" + + def kill_itself(self): + # kill itself. + os._exit(0) + + worker_failure_actor_name = "worker_failure_actor_name" + node_failure_actor_name = "node_failure_actor_name" + + def wait_until_actor_dead(handle): + actor_status = ray.actors(actor_id=handle._actor_id.hex()) + max_wait_time = 10 + wait_time = 0 + while actor_status["State"] != 3: + actor_status = ray.actors(actor_id=handle._actor_id.hex()) + time.sleep(1.0) + wait_time += 1 + if wait_time >= max_wait_time: + assert None, ( + "It took too much time to kill an actor: {}".format( + handle._actor_id)) + + def create_detached_actor_blocking(actor_name, + schedule_in_second_node=False): + resources = {"second_node": 1}\ + if schedule_in_second_node\ + else {"first_node": 1} + actor_handle = DetachedActor.options( + name=actor_name, resources=resources).remote() + # Wait for detached actor creation. + assert ray.get(actor_handle.ping.remote()) == "pong" + return actor_handle + + # Name should be cleaned when workers fail + deatched_actor = create_detached_actor_blocking(worker_failure_actor_name) + deatched_actor.kill_itself.remote() + wait_until_actor_dead(deatched_actor) + # Name should be available now. + deatched_actor = create_detached_actor_blocking(worker_failure_actor_name) + assert ray.get(deatched_actor.ping.remote()) == "pong" + + # Name should be cleaned when nodes fail. + deatched_actor = create_detached_actor_blocking( + node_failure_actor_name, schedule_in_second_node=True) + cluster.remove_node(node) + wait_until_actor_dead(deatched_actor) + # Name should be available now. + deatched_actor = create_detached_actor_blocking(node_failure_actor_name) + assert ray.get(deatched_actor.ping.remote()) == "pong" + + def test_kill(ray_start_regular): @ray.remote class Actor: diff --git a/python/ray/util/named_actors.py b/python/ray/util/named_actors.py index 28a70dac1..247fbd88a 100644 --- a/python/ray/util/named_actors.py +++ b/python/ray/util/named_actors.py @@ -3,6 +3,7 @@ import logging import ray import ray.cloudpickle as pickle from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put +from ray.gcs_utils import ActorTableData logger = logging.getLogger(__name__) @@ -30,7 +31,14 @@ def _get_actor(name): raise ValueError( "The actor with name={} doesn't exist".format(name)) handle = pickle.loads(pickled_state) - + # If the actor state is dead, that means that this name is reusable. + # We don't delete the name entry from key value store when + # the actor is killed because ray.kill is asynchronous, + # and it can cause worker leaks. + actor_info = ray.actors(actor_id=handle._actor_id.hex()) + actor_state = actor_info.get("State", None) + if actor_state and actor_state == ActorTableData.DEAD: + raise ValueError("The actor with name={} is dead.".format(name)) return handle diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 74690e67d..c19b4f9d0 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1380,15 +1380,14 @@ Status CoreWorker::GetNamedActorHandle(const std::string &name, RAY_CHECK_OK(gcs_client_->Actors().AsyncGetByName( name, [this, &actor_id, name, ready, m, cv]( Status status, const boost::optional &result) { - if (!status.ok()) { - RAY_LOG(INFO) << "Failed to look up actor with name: " << name; - // Use a NIL actor ID to signal that the actor wasn't found. - actor_id = ActorID::Nil(); - } else { - RAY_CHECK(result); + if (status.ok() && result) { auto actor_handle = std::unique_ptr(new ActorHandle(*result)); actor_id = actor_handle->GetActorID(); AddActorHandle(std::move(actor_handle), /*is_owner_handle=*/false); + } else { + RAY_LOG(INFO) << "Failed to look up actor with name: " << name; + // Use a NIL actor ID to signal that the actor wasn't found. + actor_id = ActorID::Nil(); } // Notify the main thread that the RPC has finished. @@ -1409,7 +1408,10 @@ Status CoreWorker::GetNamedActorHandle(const std::string &name, Status status; if (actor_id.IsNil()) { std::stringstream stream; - stream << "Failed to look up actor with name '" << name << "'."; + stream + << "Failed to look up actor with name '" << name + << "'. It is either you look up the named actor you didn't create or the named " + "actor hasn't been created because named actor creation is asynchronous."; status = Status::NotFound(stream.str()); } else { status = GetActorHandle(actor_id, actor_handle); diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index f1efdf55a..cbae579e8 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/gcs/gcs_client/service_based_accessor.h" + #include "ray/gcs/gcs_client/service_based_gcs_client.h" namespace ray { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index cda42c0de..06096eee8 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -657,11 +657,23 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche })); gcs_actor_scheduler_->Schedule(actor); } else { + // For detached actors, make sure to remove its name. + if (actor->IsDetached()) { + auto it = named_actors_.find(actor->GetName()); + if (it != named_actors_.end()) { + RAY_CHECK(it->second == actor->GetActorID()); + named_actors_.erase(it); + } + } mutable_actor_table_data->set_state(rpc::ActorTableData::DEAD); // The backend storage is reliable in the future, so the status must be ok. RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, - [this, actor_id, mutable_actor_table_data](Status status) { + [this, actor, actor_id, mutable_actor_table_data](Status status) { + // if actor was an detached actor, make sure to destroy it. + // We need to do this because detached actors are not destroyed + // when its owners are dead because it doesn't have owners. + if (actor->IsDetached()) DestroyActor(actor_id); RAY_CHECK_OK(gcs_pub_sub_->Publish( ACTOR_CHANNEL, actor_id.Hex(), mutable_actor_table_data->SerializeAsString(), nullptr)); diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 58c2e7fe9..a410278e6 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -409,6 +409,127 @@ TEST_F(GcsActorManagerTest, TestNamedActors) { request2.task_spec().actor_creation_task_spec().actor_id()); } +TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { + // Make sure named actor deletion succeeds when workers fail. + const auto actor_name = "actor_to_delete"; + const auto job_id_1 = JobID::FromInt(1); + const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, + /*name=*/actor_name); + Status status = gcs_actor_manager_->RegisterActor( + request1, [](std::shared_ptr actor) {}); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(gcs_actor_manager_->GetActorIDByName(actor_name).Binary(), + request1.task_spec().actor_creation_task_spec().actor_id()); + + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + gcs_actor_manager_->OnActorCreationSuccess(actor); + + // Remove worker and then check that the actor is dead. + gcs_actor_manager_->OnWorkerDead(node_id, worker_id); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); + ASSERT_EQ(gcs_actor_manager_->GetActorIDByName(actor_name), ActorID::Nil()); + + // Create an actor with the same name. This ensures that the name has been properly + // deleted. + const auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, + /*name=*/actor_name); + status = gcs_actor_manager_->RegisterActor(request2, + [](std::shared_ptr actor) {}); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(gcs_actor_manager_->GetActorIDByName(actor_name).Binary(), + request2.task_spec().actor_creation_task_spec().actor_id()); +} + +TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { + // Make sure named actor deletion succeeds when nodes fail. + const auto job_id_1 = JobID::FromInt(1); + const auto request1 = + Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor"); + Status status = gcs_actor_manager_->RegisterActor( + request1, [](std::shared_ptr actor) {}); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor").Binary(), + request1.task_spec().actor_creation_task_spec().actor_id()); + + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + gcs_actor_manager_->OnActorCreationSuccess(actor); + + // Remove node and then check that the actor is dead. + EXPECT_CALL(*mock_actor_scheduler_, CancelOnNode(node_id)); + gcs_actor_manager_->OnNodeDead(node_id); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::DEAD); + + // Create an actor with the same name. This ensures that the name has been properly + // deleted. + const auto request2 = + Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, /*name=*/"actor"); + status = gcs_actor_manager_->RegisterActor(request2, + [](std::shared_ptr actor) {}); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor").Binary(), + request2.task_spec().actor_creation_task_spec().actor_id()); +} + +TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { + // Make sure named actor deletion succeeds when nodes fail. + const auto job_id_1 = JobID::FromInt(1); + // The dead actor will be reconstructed. + const auto request1 = + Mocker::GenCreateActorRequest(job_id_1, 1, /*is_detached=*/true, /*name=*/"actor"); + Status status = gcs_actor_manager_->RegisterActor( + request1, [](std::shared_ptr actor) {}); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor").Binary(), + request1.task_spec().actor_creation_task_spec().actor_id()); + + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + + // Check that the actor is in state `ALIVE`. + rpc::Address address; + auto node_id = ClientID::FromRandom(); + auto worker_id = WorkerID::FromRandom(); + address.set_raylet_id(node_id.Binary()); + address.set_worker_id(worker_id.Binary()); + actor->UpdateAddress(address); + gcs_actor_manager_->OnActorCreationSuccess(actor); + + // Remove worker and then check that the actor is dead. The actor should be + // reconstructed. + gcs_actor_manager_->OnWorkerDead(node_id, worker_id); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::RESTARTING); + + // Create an actor with the same name. + // It should fail because actor has been reconstructed, and names shouldn't have been + // cleaned. + const auto job_id_2 = JobID::FromInt(2); + const auto request2 = + Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, /*name=*/"actor"); + status = gcs_actor_manager_->RegisterActor(request2, + [](std::shared_ptr actor) {}); + ASSERT_TRUE(status.IsInvalid()); + ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor").Binary(), + request1.task_spec().actor_creation_task_spec().actor_id()); +} + } // namespace ray int main(int argc, char **argv) {