From a269ae9bc459d1ed0227d32b410219590d428704 Mon Sep 17 00:00:00 2001 From: ZhuSenlin Date: Mon, 27 Jul 2020 10:56:52 +0800 Subject: [PATCH] [GCS] Fix actor task hang when its owner exits before local dependencies resolved (#8045) --- python/ray/tests/test_actor.py | 20 ++ python/ray/tests/test_actor_advanced.py | 6 +- python/ray/tests/test_actor_failures.py | 137 +++++++++ python/ray/tests/test_global_state.py | 6 +- python/ray/tests/test_metrics.py | 3 +- src/ray/core_worker/actor_manager.cc | 9 +- src/ray/core_worker/actor_manager.h | 42 +++ src/ray/core_worker/core_worker.cc | 10 +- .../core_worker/test/actor_manager_test.cc | 18 +- .../transport/direct_actor_transport.h | 2 +- .../transport/direct_task_transport.cc | 27 +- .../transport/direct_task_transport.h | 14 +- src/ray/gcs/accessor.h | 15 +- .../gcs/gcs_client/service_based_accessor.cc | 16 + .../gcs/gcs_client/service_based_accessor.h | 3 + .../test/service_based_gcs_client_test.cc | 30 +- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 281 ++++++++++++++---- src/ray/gcs/gcs_server/gcs_actor_manager.h | 74 +++-- .../gcs_server/test/gcs_actor_manager_test.cc | 271 +++++++++++++---- .../test/gcs_actor_scheduler_test.cc | 24 +- .../gcs_server/test/gcs_server_rpc_test.cc | 18 +- src/ray/gcs/redis_accessor.cc | 11 +- src/ray/gcs/redis_accessor.h | 3 + src/ray/gcs/test/gcs_test_util.h | 31 +- src/ray/protobuf/gcs.proto | 12 +- src/ray/protobuf/gcs_service.proto | 12 +- src/ray/raylet/node_manager.cc | 3 - src/ray/rpc/gcs_server/gcs_rpc_client.h | 4 + src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 + 29 files changed, 871 insertions(+), 236 deletions(-) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index e6917779d..6c02bd069 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -8,6 +8,7 @@ try: except ImportError: pytest_timeout = None import sys +import datetime import ray import ray.test_utils @@ -816,5 +817,24 @@ def test_inherit_actor_from_class(ray_start_regular): assert ray.get(actor.g.remote(5)) == 6 +@pytest.mark.skip( + "This test is just used to print the latency of creating 100 actors.") +def test_actor_creation_latency(ray_start_regular): + # This test is just used to test the latency of actor creation. + @ray.remote + class Actor: + def get_value(self): + return 1 + + start = datetime.datetime.now() + actor_handles = [Actor.remote() for _ in range(100)] + actor_create_time = datetime.datetime.now() + for actor_handle in actor_handles: + ray.get(actor_handle.get_value.remote()) + end = datetime.datetime.now() + print("actor_create_time_consume = {}, total_time_consume = {}".format( + actor_create_time - start, end - start)) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 320b8f947..833b4d1a4 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -719,7 +719,7 @@ def test_detached_actor_cleanup(ray_start_regular): actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) max_wait_time = 10 wait_time = 0 - while actor_status["State"] != 3: + while actor_status["State"] != ray.gcs_utils.ActorTableData.DEAD: actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) time.sleep(1.0) wait_time += 1 @@ -753,7 +753,7 @@ ray.kill(detached_actor) actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) max_wait_time = 10 wait_time = 0 -while actor_status["State"] != 3: +while actor_status["State"] != ray.gcs_utils.ActorTableData.DEAD: actor_status = ray.actors(actor_id=detached_actor._actor_id.hex()) time.sleep(1.0) wait_time += 1 @@ -798,7 +798,7 @@ def test_detached_actor_cleanup_due_to_failure(ray_start_cluster): actor_status = ray.actors(actor_id=handle._actor_id.hex()) max_wait_time = 10 wait_time = 0 - while actor_status["State"] != 3: + while actor_status["State"] != ray.gcs_utils.ActorTableData.DEAD: actor_status = ray.actors(actor_id=handle._actor_id.hex()) time.sleep(1.0) wait_time += 1 diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 997fc2c34..3f87730f4 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -18,6 +18,7 @@ from ray.test_utils import ( wait_for_pid_to_exit, generate_internal_config_map, get_other_nodes, + SignalActor, ) SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM @@ -891,6 +892,142 @@ def test_ray_wait_dead_actor(ray_start_cluster): assert wait_for_condition(lambda: ray.get(parent_actor.wait.remote())) +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 1, + "num_nodes": 1, + }], indirect=True) +def test_actor_owner_worker_dies_before_dependency_ready(ray_start_cluster): + """Test actor owner worker dies before local dependencies are resolved. + This test verifies the scenario where owner worker + has failed before actor dependencies are resolved. + Reference: https://github.com/ray-project/ray/pull/8045 + """ + + @ray.remote + class Actor: + def __init__(self, dependency): + print("actor: {}".format(os.getpid())) + self.dependency = dependency + + def f(self): + return self.dependency + + @ray.remote + class Owner: + def get_pid(self): + return os.getpid() + + def create_actor(self, caller_handle): + s = SignalActor.remote() + # Create an actor which depends on an object that can never be + # resolved. + actor_handle = Actor.remote(s.wait.remote()) + + pid = os.getpid() + signal_handle = SignalActor.remote() + caller_handle.call.remote(pid, signal_handle, actor_handle) + # Wait until the `Caller` start executing the remote `call` method. + ray.get(signal_handle.wait.remote()) + # exit + os._exit(0) + + @ray.remote + class Caller: + def call(self, owner_pid, signal_handle, actor_handle): + # Notify the `Owner` that the `Caller` is executing the remote + # `call` method. + ray.get(signal_handle.send.remote()) + # Wait for the `Owner` to exit. + wait_for_pid_to_exit(owner_pid) + oid = actor_handle.f.remote() + # It will hang without location resolution protocol. + ray.get(oid) + + def hang(self): + return True + + owner = Owner.remote() + owner_pid = ray.get(owner.get_pid.remote()) + + caller = Caller.remote() + owner.create_actor.remote(caller) + # Wait for the `Owner` to exit. + wait_for_pid_to_exit(owner_pid) + # It will hang here if location is not properly resolved. + assert (wait_for_condition(lambda: ray.get(caller.hang.remote()))) + + +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 3, + "num_nodes": 1, + }], indirect=True) +def test_actor_owner_node_dies_before_dependency_ready(ray_start_cluster): + """Test actor owner node dies before local dependencies are resolved. + This test verifies the scenario where owner node + has failed before actor dependencies are resolved. + Reference: https://github.com/ray-project/ray/pull/8045 + """ + + @ray.remote + class Actor: + def __init__(self, dependency): + print("actor: {}".format(os.getpid())) + self.dependency = dependency + + def f(self): + return self.dependency + + # Make sure it is scheduled in the second node. + @ray.remote(resources={"node": 1}, num_cpus=1) + class Owner: + def get_pid(self): + return os.getpid() + + def create_actor(self, caller_handle): + s = SignalActor.remote() + # Create an actor which depends on an object that can never be + # resolved. + actor_handle = Actor.remote(s.wait.remote()) + + pid = os.getpid() + signal_handle = SignalActor.remote() + caller_handle.call.remote(pid, signal_handle, actor_handle) + # Wait until the `Caller` start executing the remote `call` method. + ray.get(signal_handle.wait.remote()) + + @ray.remote + class Caller: + def call(self, owner_pid, signal_handle, actor_handle): + # Notify the `Owner` that the `Caller` is executing the remote + # `call` method. + ray.get(signal_handle.send.remote()) + # Wait for the `Owner` to exit. + wait_for_pid_to_exit(owner_pid) + oid = actor_handle.f.remote() + # It will hang without location resolution protocol. + ray.get(oid) + + def hang(self): + return True + + cluster = ray_start_cluster + node_to_be_broken = cluster.add_node(num_cpus=1, resources={"node": 1}) + + owner = Owner.remote() + owner_pid = ray.get(owner.get_pid.remote()) + + caller = Caller.remote() + owner.create_actor.remote(caller) + cluster.remove_node(node_to_be_broken) + # Wait for the `Owner` to exit. + wait_for_pid_to_exit(owner_pid) + + # It will hang here if location is not properly resolved. + assert (wait_for_condition(lambda: ray.get(caller.hang.remote()))) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index 1dd40a995..e691f99df 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -123,9 +123,11 @@ def test_global_state_actor_entry(ray_start_regular): a_actor_id = a._actor_id.hex() b_actor_id = b._actor_id.hex() assert ray.actors(actor_id=a_actor_id)["ActorID"] == a_actor_id - assert ray.actors(actor_id=a_actor_id)["State"] == 1 + assert ray.actors( + actor_id=a_actor_id)["State"] == ray.gcs_utils.ActorTableData.ALIVE assert ray.actors(actor_id=b_actor_id)["ActorID"] == b_actor_id - assert ray.actors(actor_id=b_actor_id)["State"] == 1 + assert ray.actors( + actor_id=b_actor_id)["State"] == ray.gcs_utils.ActorTableData.ALIVE if __name__ == "__main__": diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 760c0fab4..55087cf20 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -236,7 +236,8 @@ def test_raylet_info_endpoint(shutdown_only): if child_actor_info["state"] == -1: assert child_actor_info["requiredResources"]["CustomResource"] == 1 else: - assert child_actor_info["state"] == 1 + assert child_actor_info[ + "state"] == ray.gcs_utils.ActorTableData.ALIVE assert len(child_actor_info["children"]) == 0 assert cpu_resources(child_actor_info) == 1 diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 04b2536ce..12876968d 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -164,19 +164,18 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id, << ", raylet_id: " << ClientID::FromBinary(actor_data.address().raylet_id()) << ", num_restarts: " << actor_data.num_restarts(); - - if (actor_data.state() == gcs::ActorTableData::PENDING) { - // The actor is being created and not yet ready, just ignore! - } else if (actor_data.state() == gcs::ActorTableData::RESTARTING) { + if (actor_data.state() == gcs::ActorTableData::RESTARTING) { direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), false); } else if (actor_data.state() == gcs::ActorTableData::DEAD) { direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), true); // We cannot erase the actor handle here because clients can still // submit tasks to dead actors. This also means we defer unsubscription, // otherwise we crash when bulk unsubscribing all actor handles. - } else { + } else if (actor_data.state() == gcs::ActorTableData::ALIVE) { direct_actor_submitter_->ConnectActor(actor_id, actor_data.address(), actor_data.num_restarts()); + } else { + // The actor is being created and not yet ready, just ignore! } } diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 9a4c46ac0..37ef89590 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -22,6 +22,48 @@ namespace ray { +class ActorCreatorInterface { + public: + virtual ~ActorCreatorInterface() = default; + /// Register actor to GCS synchronously. + /// + /// \param task_spec The specification for the actor creation task. + /// \return Status + virtual Status RegisterActor(const TaskSpecification &task_spec) = 0; + + /// Asynchronously request GCS to create the actor. + /// + /// \param task_spec The specification for the actor creation task. + /// \param callback Callback that will be called after the actor info is written to GCS. + /// \return Status + virtual Status AsyncCreateActor(const TaskSpecification &task_spec, + const gcs::StatusCallback &callback) = 0; +}; + +class DefaultActorCreator : public ActorCreatorInterface { + public: + explicit DefaultActorCreator(std::shared_ptr gcs_client) + : gcs_client_(std::move(gcs_client)) {} + + Status RegisterActor(const TaskSpecification &task_spec) override { + auto promise = std::make_shared>(); + auto status = gcs_client_->Actors().AsyncRegisterActor( + task_spec, [promise](const Status &status) { promise->set_value(); }); + if (status.ok()) { + promise->get_future().wait(); + } + return status; + } + + Status AsyncCreateActor(const TaskSpecification &task_spec, + const gcs::StatusCallback &callback) override { + return gcs_client_->Actors().AsyncCreateActor(task_spec, callback); + } + + private: + std::shared_ptr gcs_client_; +}; + /// Class to manage lifetimes of actors that we create (actor children). /// Currently this class is only used to publish actor DEAD event /// for actor creation task failures. All other cases are managed diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 951c5561c..c61c7236f 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -436,13 +436,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ new raylet::RayletClient(std::move(grpc_client))); }; - std::function - actor_create_callback = nullptr; + std::shared_ptr actor_creator = nullptr; if (RayConfig::instance().gcs_actor_service_enabled()) { - actor_create_callback = [this](const TaskSpecification &task_spec, - const gcs::StatusCallback &callback) { - return gcs_client_->Actors().AsyncCreateActor(task_spec, callback); - }; + actor_creator = std::make_shared(gcs_client_); } direct_actor_submitter_ = std::shared_ptr( @@ -455,7 +451,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ memory_store_, task_manager_, local_raylet_id, RayConfig::instance().worker_lease_timeout_milliseconds(), RayConfig::instance().max_tasks_in_flight_per_worker(), - std::move(actor_create_callback), boost::asio::steady_timer(io_service_))); + std::move(actor_creator), boost::asio::steady_timer(io_service_))); future_resolver_.reset(new FutureResolver(memory_store_, client_factory, rpc_address_)); // Unfortunately the raylet client has to be constructed after the receivers. if (direct_task_receiver_ != nullptr) { diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index cafb97ec4..f8a871ae7 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -194,15 +194,13 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { EXPECT_CALL(*direct_actor_submitter_, ConnectActor(_, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); - actor_table_data.set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + actor_table_data.set_state(rpc::ActorTableData::ALIVE); actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); // Now actor state is updated to DEAD. Make sure it is diconnected. EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1); actor_table_data.set_actor_id(actor_id.Binary()); - actor_table_data.set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + actor_table_data.set_state(rpc::ActorTableData::DEAD); actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data); } @@ -248,8 +246,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationPending) { EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(0); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); - actor_table_data.set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_PENDING); + actor_table_data.set_state(rpc::ActorTableData::PENDING_CREATION); ASSERT_TRUE( actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); } @@ -261,8 +258,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationRestarting) { EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); - actor_table_data.set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_RESTARTING); + actor_table_data.set_state(rpc::ActorTableData::RESTARTING); ASSERT_TRUE( actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); } @@ -274,8 +270,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationDead) { EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(1); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); - actor_table_data.set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + actor_table_data.set_state(rpc::ActorTableData::DEAD); ASSERT_TRUE( actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); } @@ -287,8 +282,7 @@ TEST_F(ActorManagerTest, TestActorStateNotificationAlive) { EXPECT_CALL(*direct_actor_submitter_, DisconnectActor(_, _, _)).Times(0); rpc::ActorTableData actor_table_data; actor_table_data.set_actor_id(actor_id.Binary()); - actor_table_data.set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + actor_table_data.set_state(rpc::ActorTableData::ALIVE); ASSERT_TRUE( actor_info_accessor_->ActorStateNotificationPublished(actor_id, actor_table_data)); } diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index 1436e988b..ada51d242 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -121,7 +121,7 @@ class CoreWorkerDirectActorTaskSubmitter /// The current state of the actor. If this is ALIVE, then we should have /// an RPC client to the actor. If this is DEAD, then all tasks in the /// queue will be marked failed and all other ClientQueue state is ignored. - rpc::ActorTableData::ActorState state = rpc::ActorTableData::PENDING; + rpc::ActorTableData::ActorState state = rpc::ActorTableData::DEPENDENCIES_UNREADY; /// How many times this actor has been restarted before. Starts at -1 to /// indicate that the actor is not yet created. This is used to drop stale /// messages from the GCS. diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index de3ffb71f..52cad6e7f 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -20,25 +20,40 @@ namespace ray { Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId(); + + if (actor_creator_ && task_spec.IsActorCreationTask()) { + // Synchronously register the actor to GCS server. + // Previously, we asynchronously registered the actor after all its dependencies were + // resolved. This caused a problem: if the owner of the actor dies before dependencies + // are resolved, the actor will never be created. But the actor handle may already be + // passed to other workers. In this case, the actor tasks will hang forever. + // So we fixed this issue by synchronously registering the actor. If the owner dies + // before dependencies are resolved, GCS will notice this and mark the actor as dead. + auto status = actor_creator_->RegisterActor(task_spec); + if (!status.ok()) { + return status; + } + } + resolver_.ResolveDependencies(task_spec, [this, task_spec]() { RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId(); - if (actor_create_callback_ && task_spec.IsActorCreationTask()) { + if (actor_creator_ && task_spec.IsActorCreationTask()) { // If gcs actor management is enabled, the actor creation task will be sent to // gcs server directly after the in-memory dependent objects are resolved. For // more details please see the protocol of actor management based on gcs. // https://docs.google.com/document/d/1EAWide-jy05akJp6OMtDn58XOK7bUyruWMia4E-fV28/edit?usp=sharing auto actor_id = task_spec.ActorCreationId(); auto task_id = task_spec.TaskId(); - RAY_LOG(INFO) << "Submitting actor creation task to GCS: " << actor_id; - RAY_CHECK_OK( - actor_create_callback_(task_spec, [this, actor_id, task_id](Status status) { + RAY_LOG(INFO) << "Creating actor via GCS actor id = : " << actor_id; + RAY_CHECK_OK(actor_creator_->AsyncCreateActor( + task_spec, [this, actor_id, task_id](Status status) { if (status.ok()) { - RAY_LOG(INFO) << "Actor creation task submitted to GCS: " << actor_id; + RAY_LOG(INFO) << "Created actor, actor id = " << actor_id; task_finisher_->CompletePendingTask(task_id, rpc::PushTaskReply(), rpc::Address()); } else { RAY_LOG(ERROR) << "Failed to create actor " << actor_id - << " with: " << status.ToString(); + << " with status: " << status.ToString(); RAY_UNUSED(task_finisher_->PendingTaskFailed( task_id, rpc::ErrorType::ACTOR_CREATION_FAILED, &status)); } diff --git a/src/ray/core_worker/transport/direct_task_transport.h b/src/ray/core_worker/transport/direct_task_transport.h index b1dbec78d..a53a298f0 100644 --- a/src/ray/core_worker/transport/direct_task_transport.h +++ b/src/ray/core_worker/transport/direct_task_transport.h @@ -20,6 +20,7 @@ #include "absl/synchronization/mutex.h" #include "ray/common/id.h" #include "ray/common/ray_object.h" +#include "ray/core_worker/actor_manager.h" #include "ray/core_worker/context.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" #include "ray/core_worker/task_manager.h" @@ -56,8 +57,7 @@ class CoreWorkerDirectTaskSubmitter { int64_t lease_timeout_ms, uint32_t max_tasks_in_flight_per_worker = RayConfig::instance().max_tasks_in_flight_per_worker(), - std::function - actor_create_callback = nullptr, + std::shared_ptr actor_creator = nullptr, absl::optional cancel_timer = absl::nullopt) : rpc_address_(rpc_address), local_lease_client_(lease_client), @@ -67,7 +67,7 @@ class CoreWorkerDirectTaskSubmitter { task_finisher_(task_finisher), lease_timeout_ms_(lease_timeout_ms), local_raylet_id_(local_raylet_id), - actor_create_callback_(std::move(actor_create_callback)), + actor_creator_(std::move(actor_creator)), max_tasks_in_flight_per_worker_(max_tasks_in_flight_per_worker), cancel_retry_timer_(std::move(cancel_timer)) {} @@ -163,12 +163,8 @@ class CoreWorkerDirectTaskSubmitter { /// if a remote raylet tells us to spill the task back to the local raylet. const ClientID local_raylet_id_; - /// A function to override actor creation. The callback will be called once the actor - /// creation task has been accepted for submission, but the actor may not be created - /// yet. - std::function - actor_create_callback_; + /// Interface for actor creation. + std::shared_ptr actor_creator_; // Protects task submission state below. absl::Mutex mu_; diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index fcba7bb59..35e329c36 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -62,7 +62,20 @@ class ActorInfoAccessor { const std::string &name, const OptionalItemCallback &callback) = 0; - /// Create an actor to GCS asynchronously. + /// Register actor to GCS asynchronously. + /// + /// \param task_spec The specification for the actor creation task. + /// \param callback Callback that will be called after the actor info is written to GCS. + /// \return Status + virtual Status AsyncRegisterActor(const TaskSpecification &task_spec, + const StatusCallback &callback) = 0; + + /// Asynchronously request GCS to create the actor. + /// + /// This should be called after the worker has resolved the actor dependencies. + /// TODO(...): Currently this request will only reply after the actor is created. + /// We should change it to reply immediately after GCS has persisted the actor + /// dependencies in storage. /// /// \param task_spec The specification for the actor creation task. /// \param callback Callback that will be called after the actor info is written to GCS. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 16140672f..289a2dcee 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -178,6 +178,22 @@ Status ServiceBasedActorInfoAccessor::AsyncGetByName( return Status::OK(); } +Status ServiceBasedActorInfoAccessor::AsyncRegisterActor( + const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { + RAY_CHECK(task_spec.IsActorCreationTask() && callback); + rpc::RegisterActorRequest request; + request.mutable_task_spec()->CopyFrom(task_spec.GetMessage()); + client_impl_->GetGcsRpcClient().RegisterActor( + request, [callback](const Status &, const rpc::RegisterActorReply &reply) { + auto status = + reply.status().code() == (int)StatusCode::OK + ? Status() + : Status(StatusCode(reply.status().code()), reply.status().message()); + callback(status); + }); + return Status::OK(); +} + Status ServiceBasedActorInfoAccessor::AsyncCreateActor( const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { RAY_CHECK(task_spec.IsActorCreationTask() && callback); diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 5ee5f7464..d820aee88 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -81,6 +81,9 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { const std::string &name, const OptionalItemCallback &callback) override; + Status AsyncRegisterActor(const TaskSpecification &task_spec, + const StatusCallback &callback) override; + Status AsyncCreateActor(const TaskSpecification &task_spec, const StatusCallback &callback) override; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 013406efb..d0901b597 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -543,18 +543,15 @@ TEST_F(ServiceBasedGcsClientTest, TestActorInfo) { // Register an actor to GCS. ASSERT_TRUE(RegisterActor(actor_table_data)); - ASSERT_TRUE(GetActor(actor_id).state() == - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + ASSERT_TRUE(GetActor(actor_id).state() == rpc::ActorTableData::ALIVE); // Cancel subscription to an actor. UnsubscribeActor(actor_id); // Update dynamic states of actor in GCS. - actor_table_data->set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + actor_table_data->set_state(rpc::ActorTableData::DEAD); ASSERT_TRUE(UpdateActor(actor_id, actor_table_data)); - ASSERT_TRUE(GetActor(actor_id).state() == - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + ASSERT_TRUE(GetActor(actor_id).state() == rpc::ActorTableData::DEAD); WaitPendingDone(actor_update_count, 1); } @@ -915,10 +912,8 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) { // We should receive a new ALIVE notification from the subscribe channel. WaitPendingDone(num_subscribe_all_notifications, 1); WaitPendingDone(num_subscribe_one_notifications, 1); - CheckActorData(subscribe_all_notifications[0], - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); - CheckActorData(subscribe_one_notifications[0], - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + CheckActorData(subscribe_all_notifications[0], rpc::ActorTableData::ALIVE); + CheckActorData(subscribe_one_notifications[0], rpc::ActorTableData::ALIVE); // Restart GCS server. RestartGcsServer(); @@ -928,23 +923,18 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) { // another notification of ALIVE state. WaitPendingDone(num_subscribe_all_notifications, 2); WaitPendingDone(num_subscribe_one_notifications, 2); - CheckActorData(subscribe_all_notifications[1], - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); - CheckActorData(subscribe_one_notifications[1], - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + CheckActorData(subscribe_all_notifications[1], rpc::ActorTableData::ALIVE); + CheckActorData(subscribe_one_notifications[1], rpc::ActorTableData::ALIVE); // Update the actor state to DEAD. - actor_table_data->set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + actor_table_data->set_state(rpc::ActorTableData::DEAD); ASSERT_TRUE(UpdateActor(actor_id, actor_table_data)); // We should receive a new DEAD notification from the subscribe channel. WaitPendingDone(num_subscribe_all_notifications, 3); WaitPendingDone(num_subscribe_one_notifications, 3); - CheckActorData(subscribe_all_notifications[2], - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); - CheckActorData(subscribe_one_notifications[2], - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + CheckActorData(subscribe_all_notifications[2], rpc::ActorTableData::DEAD); + CheckActorData(subscribe_one_notifications[2], rpc::ActorTableData::DEAD); } TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 7988cc502..33c056c8f 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -92,9 +92,9 @@ GcsActorManager::GcsActorManager(std::shared_ptr sch gcs_pub_sub_(std::move(gcs_pub_sub)), worker_client_factory_(worker_client_factory) {} -void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request, - rpc::CreateActorReply *reply, - rpc::SendReplyCallback send_reply_callback) { +void GcsActorManager::HandleRegisterActor(const rpc::RegisterActorRequest &request, + rpc::RegisterActorReply *reply, + rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK); auto actor_id = ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id()); @@ -107,7 +107,27 @@ void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request, GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); }); if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to create actor: " << status.ToString(); + RAY_LOG(ERROR) << "Failed to register actor: " << status.ToString(); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + } +} + +void GcsActorManager::HandleCreateActor(const rpc::CreateActorRequest &request, + rpc::CreateActorReply *reply, + rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK); + auto actor_id = + ActorID::FromBinary(request.task_spec().actor_creation_task_spec().actor_id()); + + RAY_LOG(INFO) << "Creating actor, actor id = " << actor_id; + Status status = CreateActor(request, [reply, send_reply_callback, actor_id]( + const std::shared_ptr &actor) { + RAY_LOG(INFO) << "Created actor, actor id = " << actor_id; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + }); + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to create actor, actor id = " << actor_id + << " status: " << status.ToString(); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); } } @@ -360,7 +380,7 @@ void GcsActorManager::HandleGetActorCheckpointID( } Status GcsActorManager::RegisterActor( - const ray::rpc::CreateActorRequest &request, + const ray::rpc::RegisterActorRequest &request, std::function)> callback) { RAY_CHECK(callback); const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); @@ -369,10 +389,9 @@ Status GcsActorManager::RegisterActor( auto iter = registered_actors_.find(actor_id); if (iter != registered_actors_.end() && iter->second->GetState() == rpc::ActorTableData::ALIVE) { - // When the network fails, Driver/Worker is not sure whether GcsServer has received - // the request of actor creation task, so Driver/Worker will try again and again until - // receiving the reply from GcsServer. If the actor has been created successfully then - // just reply to the caller. + // In case of temporary network failures, workers will re-send multiple duplicate + // requests to GCS server. + // In this case, we can just reply. callback(iter->second); return Status::OK(); } @@ -380,12 +399,12 @@ Status GcsActorManager::RegisterActor( auto pending_register_iter = actor_to_register_callbacks_.find(actor_id); if (pending_register_iter != actor_to_register_callbacks_.end()) { // It is a duplicate message, just mark the callback as pending and invoke it after - // the actor has been successfully created. + // the actor has been flushed to the storage. pending_register_iter->second.emplace_back(std::move(callback)); return Status::OK(); } - auto actor = std::make_shared(request); + auto actor = std::make_shared(request.task_spec()); if (!actor->GetName().empty()) { auto it = named_actors_.find(actor->GetName()); if (it == named_actors_.end()) { @@ -398,16 +417,91 @@ Status GcsActorManager::RegisterActor( } // Mark the callback as pending and invoke it after the actor has been successfully - // created. + // flushed to the storage. actor_to_register_callbacks_[actor_id].emplace_back(std::move(callback)); RAY_CHECK(registered_actors_.emplace(actor->GetActorID(), actor).second); + const auto &owner_address = actor->GetOwnerAddress(); + auto node_id = ClientID::FromBinary(owner_address.raylet_id()); + auto worker_id = WorkerID::FromBinary(owner_address.worker_id()); + RAY_CHECK(unresolved_actors_[node_id][worker_id].emplace(actor->GetActorID()).second); + if (!actor->IsDetached() && worker_client_factory_) { // This actor is owned. Send a long polling request to the actor's // owner to determine when the actor should be removed. PollOwnerForActorOutOfScope(actor); } + // The backend storage is supposed to be reliable, so the status must be ok. + RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( + actor->GetActorID(), *actor->GetMutableActorTableData(), + [this, actor](const Status &status) { + // The backend storage is supposed to be reliable, so the status must be ok. + RAY_CHECK_OK(status); + // Invoke all callbacks for all registration requests of this actor (duplicated + // requests are included) and remove all of them from + // actor_to_register_callbacks_. + // Reply to the owner to indicate that the actor has been registered. + auto iter = actor_to_register_callbacks_.find(actor->GetActorID()); + RAY_CHECK(iter != actor_to_register_callbacks_.end() && !iter->second.empty()); + auto callbacks = std::move(iter->second); + actor_to_register_callbacks_.erase(iter); + for (auto &callback : callbacks) { + callback(actor); + } + })); + return Status::OK(); +} + +Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, + CreateActorCallback callback) { + RAY_CHECK(callback); + const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); + auto actor_id = ActorID::FromBinary(actor_creation_task_spec.actor_id()); + + auto iter = registered_actors_.find(actor_id); + if (iter != registered_actors_.end() && + iter->second->GetState() == rpc::ActorTableData::ALIVE) { + // In case of temporary network failures, workers will re-send multiple duplicate + // requests to GCS server. + // In this case, we can just reply. + callback(iter->second); + return Status::OK(); + } + + auto actor_creation_iter = actor_to_create_callbacks_.find(actor_id); + if (actor_creation_iter != actor_to_create_callbacks_.end()) { + // It is a duplicate message, just mark the callback as pending and invoke it after + // the actor has been successfully created. + actor_creation_iter->second.emplace_back(std::move(callback)); + return Status::OK(); + } + // Mark the callback as pending and invoke it after the actor has been successfully + // created. + actor_to_create_callbacks_[actor_id].emplace_back(std::move(callback)); + + // Remove the actor from the unresolved actor map. + auto actor = std::make_shared(request.task_spec()); + actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION); + const auto &owner_address = actor->GetOwnerAddress(); + auto node_id = ClientID::FromBinary(owner_address.raylet_id()); + auto worker_id = WorkerID::FromBinary(owner_address.worker_id()); + auto it = unresolved_actors_.find(node_id); + RAY_CHECK(it != unresolved_actors_.end()); + auto worker_to_actors_it = it->second.find(worker_id); + RAY_CHECK(worker_to_actors_it != it->second.end()); + RAY_CHECK(worker_to_actors_it->second.erase(actor_id) != 0); + if (worker_to_actors_it->second.empty()) { + it->second.erase(worker_to_actors_it); + if (it->second.empty()) { + unresolved_actors_.erase(it); + } + } + // Update the registered actor as its creation task specification may have changed due + // to resolved dependencies. + registered_actors_[actor_id] = actor; + + // Schedule the actor. gcs_actor_scheduler_->Schedule(actor); return Status::OK(); } @@ -458,6 +552,7 @@ void GcsActorManager::PollOwnerForActorOutOfScope( void GcsActorManager::DestroyActor(const ActorID &actor_id) { RAY_LOG(DEBUG) << "Destroying actor " << actor_id; actor_to_register_callbacks_.erase(actor_id); + actor_to_create_callbacks_.erase(actor_id); auto it = registered_actors_.find(actor_id); RAY_CHECK(it != registered_actors_.end()) << "Tried to destroy actor that does not exist " << actor_id; @@ -488,49 +583,70 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { return; } - // The actor is still alive or pending creation. Clean up all remaining - // state. - const auto &node_id = actor->GetNodeID(); - const auto &worker_id = actor->GetWorkerID(); - auto node_it = created_actors_.find(node_id); - if (node_it != created_actors_.end() && node_it->second.count(worker_id)) { - // The actor has already been created. Destroy the process by force-killing - // it. - auto actor_client = worker_client_factory_(actor->GetAddress()); - rpc::KillActorRequest request; - request.set_intended_actor_id(actor_id.Binary()); - request.set_force_kill(true); - request.set_no_restart(true); - RAY_UNUSED(actor_client->KillActor(request, nullptr)); - - RAY_CHECK(node_it->second.erase(actor->GetWorkerID())); - if (node_it->second.empty()) { - created_actors_.erase(node_it); + if (actor->GetState() == rpc::ActorTableData::DEPENDENCIES_UNREADY) { + // The actor creation task still has unresolved dependencies. Remove from the + // unresolved actors map. + const auto &owner_address = actor->GetOwnerAddress(); + auto node_id = ClientID::FromBinary(owner_address.raylet_id()); + auto worker_id = WorkerID::FromBinary(owner_address.worker_id()); + auto iter = unresolved_actors_.find(node_id); + if (iter != unresolved_actors_.end()) { + auto it = iter->second.find(worker_id); + RAY_CHECK(it != iter->second.end()); + RAY_CHECK(it->second.erase(actor_id) != 0); + if (it->second.empty()) { + iter->second.erase(it); + if (iter->second.empty()) { + unresolved_actors_.erase(iter); + } + } } } else { - // The actor has not been created yet. It is either being scheduled or is - // pending scheduling. - auto canceled_actor_id = - gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID()); - if (!canceled_actor_id.IsNil()) { - // The actor was being scheduled and has now been canceled. - RAY_CHECK(canceled_actor_id == actor_id); - } else { - auto pending_it = std::find_if(pending_actors_.begin(), pending_actors_.end(), - [actor_id](const std::shared_ptr &actor) { - return actor->GetActorID() == actor_id; - }); + // The actor is still alive or pending creation. Clean up all remaining + // state. + const auto &node_id = actor->GetNodeID(); + const auto &worker_id = actor->GetWorkerID(); + auto node_it = created_actors_.find(node_id); + if (node_it != created_actors_.end() && node_it->second.count(worker_id)) { + // The actor has already been created. Destroy the process by force-killing + // it. + auto actor_client = worker_client_factory_(actor->GetAddress()); + rpc::KillActorRequest request; + request.set_intended_actor_id(actor_id.Binary()); + request.set_force_kill(true); + request.set_no_restart(true); + RAY_UNUSED(actor_client->KillActor(request, nullptr)); - // The actor was pending scheduling. Remove it from the queue. - if (pending_it != pending_actors_.end()) { - pending_actors_.erase(pending_it); + RAY_CHECK(node_it->second.erase(actor->GetWorkerID())); + if (node_it->second.empty()) { + created_actors_.erase(node_it); + } + } else { + // The actor has not been created yet. It is either being scheduled or is + // pending scheduling. + auto canceled_actor_id = + gcs_actor_scheduler_->CancelOnWorker(actor->GetNodeID(), actor->GetWorkerID()); + if (!canceled_actor_id.IsNil()) { + // The actor was being scheduled and has now been canceled. + RAY_CHECK(canceled_actor_id == actor_id); } else { - // When actor creation request of this actor id is pending in raylet, - // it doesn't responds, and the actor should be still in leasing state. - // NOTE: Raylet will cancel the lease request once it receives the - // actor state notification. So this method doesn't have to cancel - // outstanding lease request by calling raylet_client->CancelWorkerLease - gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id); + auto pending_it = + std::find_if(pending_actors_.begin(), pending_actors_.end(), + [actor_id](const std::shared_ptr &actor) { + return actor->GetActorID() == actor_id; + }); + + // The actor was pending scheduling. Remove it from the queue. + if (pending_it != pending_actors_.end()) { + pending_actors_.erase(pending_it); + } else { + // When actor creation request of this actor id is pending in raylet, + // it doesn't responds, and the actor should be still in leasing state. + // NOTE: Raylet will cancel the lease request once it receives the + // actor state notification. So this method doesn't have to cancel + // outstanding lease request by calling raylet_client->CancelWorkerLease + gcs_actor_scheduler_->CancelOnLeasing(node_id, actor_id); + } } } } @@ -553,6 +669,31 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { })); } +absl::flat_hash_set GcsActorManager::GetUnresolvedActorsByOwnerNode( + const ClientID &node_id) const { + absl::flat_hash_set actor_ids; + auto iter = unresolved_actors_.find(node_id); + if (iter != unresolved_actors_.end()) { + for (auto &entry : iter->second) { + actor_ids.insert(entry.second.begin(), entry.second.end()); + } + } + return actor_ids; +} + +absl::flat_hash_set GcsActorManager::GetUnresolvedActorsByOwnerWorker( + const ClientID &node_id, const WorkerID &worker_id) const { + absl::flat_hash_set actor_ids; + auto iter = unresolved_actors_.find(node_id); + if (iter != unresolved_actors_.end()) { + auto it = iter->second.find(worker_id); + if (it != iter->second.end()) { + actor_ids.insert(it->second.begin(), it->second.end()); + } + } + return actor_ids; +} + void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id, const ray::WorkerID &worker_id, bool intentional_exit) { @@ -568,6 +709,16 @@ void GcsActorManager::OnWorkerDead(const ray::ClientID &node_id, } } + // The creator worker of these actors died before resolving their dependencies. In this + // case, these actors will never be created successfully. So we need to destroy them, + // to prevent actor tasks hang forever. + auto unresolved_actors = GetUnresolvedActorsByOwnerWorker(node_id, worker_id); + for (auto &actor_id : unresolved_actors) { + if (registered_actors_.count(actor_id)) { + DestroyActor(actor_id); + } + } + // Find if actor is already created or in the creation process (lease request is // granted) ActorID actor_id; @@ -627,6 +778,16 @@ void GcsActorManager::OnNodeDead(const ClientID &node_id) { ReconstructActor(entry.second); } } + + // The creator node of these actors died before resolving their dependencies. In this + // case, these actors will never be created successfully. So we need to destroy them, + // to prevent actor tasks hang forever. + auto unresolved_actors = GetUnresolvedActorsByOwnerNode(node_id); + for (auto &actor_id : unresolved_actors) { + if (registered_actors_.count(actor_id)) { + DestroyActor(actor_id); + } + } } void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_reschedule) { @@ -727,13 +888,13 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &ac // Invoke all callbacks for all registration requests of this actor (duplicated // requests are included) and remove all of them from - // actor_to_register_callbacks_. - auto iter = actor_to_register_callbacks_.find(actor_id); - if (iter != actor_to_register_callbacks_.end()) { + // actor_to_create_callbacks_. + auto iter = actor_to_create_callbacks_.find(actor_id); + if (iter != actor_to_create_callbacks_.end()) { for (auto &callback : iter->second) { callback(actor); } - actor_to_register_callbacks_.erase(iter); + actor_to_create_callbacks_.erase(iter); } auto worker_id = actor->GetWorkerID(); @@ -862,5 +1023,15 @@ const absl::flat_hash_map> return created_actors_; } +const absl::flat_hash_map> + &GcsActorManager::GetRegisteredActors() const { + return registered_actors_; +} + +const absl::flat_hash_map> + &GcsActorManager::GetActorRegisterCallbacks() const { + return actor_to_register_callbacks_; +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index a2721af80..b00332190 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -42,28 +42,26 @@ class GcsActor { explicit GcsActor(rpc::ActorTableData actor_table_data) : actor_table_data_(std::move(actor_table_data)) {} - /// Create a GcsActor by CreateActorRequest. + /// Create a GcsActor by TaskSpec. /// - /// \param request Contains the actor creation task specification. - explicit GcsActor(const ray::rpc::CreateActorRequest &request) { - RAY_CHECK(request.task_spec().type() == TaskType::ACTOR_CREATION_TASK); - const auto &actor_creation_task_spec = request.task_spec().actor_creation_task_spec(); + /// \param task_spec Contains the actor creation task specification. + explicit GcsActor(const ray::rpc::TaskSpec &task_spec) { + RAY_CHECK(task_spec.type() == TaskType::ACTOR_CREATION_TASK); + const auto &actor_creation_task_spec = task_spec.actor_creation_task_spec(); actor_table_data_.set_actor_id(actor_creation_task_spec.actor_id()); - actor_table_data_.set_job_id(request.task_spec().job_id()); + actor_table_data_.set_job_id(task_spec.job_id()); actor_table_data_.set_max_restarts(actor_creation_task_spec.max_actor_restarts()); actor_table_data_.set_num_restarts(0); - auto dummy_object = - TaskSpecification(request.task_spec()).ActorDummyObject().Binary(); + auto dummy_object = TaskSpecification(task_spec).ActorDummyObject().Binary(); actor_table_data_.set_actor_creation_dummy_object_id(dummy_object); actor_table_data_.set_is_detached(actor_creation_task_spec.is_detached()); actor_table_data_.set_name(actor_creation_task_spec.name()); - actor_table_data_.mutable_owner_address()->CopyFrom( - request.task_spec().caller_address()); + actor_table_data_.mutable_owner_address()->CopyFrom(task_spec.caller_address()); - actor_table_data_.set_state(rpc::ActorTableData::PENDING); - actor_table_data_.mutable_task_spec()->CopyFrom(request.task_spec()); + actor_table_data_.set_state(rpc::ActorTableData::DEPENDENCIES_UNREADY); + actor_table_data_.mutable_task_spec()->CopyFrom(task_spec); actor_table_data_.mutable_address()->set_raylet_id(ClientID::Nil().Binary()); actor_table_data_.mutable_address()->set_worker_id(WorkerID::Nil().Binary()); @@ -111,6 +109,7 @@ class GcsActor { }; using RegisterActorCallback = std::function)>; +using CreateActorCallback = std::function)>; /// GcsActorManager is responsible for managing the lifecycle of all actors. /// This class is not thread-safe. class GcsActorManager : public rpc::ActorInfoHandler { @@ -127,6 +126,10 @@ class GcsActorManager : public rpc::ActorInfoHandler { ~GcsActorManager() = default; + void HandleRegisterActor(const rpc::RegisterActorRequest &request, + rpc::RegisterActorReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + void HandleCreateActor(const rpc::CreateActorRequest &request, rpc::CreateActorReply *reply, rpc::SendReplyCallback send_reply_callback) override; @@ -171,9 +174,20 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// its state is `ALIVE`. /// \return Status::Invalid if this is a named actor and an actor with the specified /// name already exists. The callback will not be called in this case. - Status RegisterActor(const rpc::CreateActorRequest &request, + Status RegisterActor(const rpc::RegisterActorRequest &request, RegisterActorCallback callback); + /// Create actor asynchronously. + /// + /// \param request Contains the meta info to create the actor. + /// \param callback Will be invoked after the actor is created successfully or be + /// invoked immediately if the actor is already registered to `registered_actors_` and + /// its state is `ALIVE`. + /// \return Status::Invalid if this is a named actor and an actor with the specified + /// name already exists. The callback will not be called in this case. + Status CreateActor(const rpc::CreateActorRequest &request, + CreateActorCallback callback); + /// Get the actor ID for the named actor. Returns nil if the actor was not found. /// \param name The name of the detached actor to look up. /// \returns ActorID The ID of the actor. Nil if the actor was not found. @@ -234,6 +248,12 @@ class GcsActorManager : public rpc::ActorInfoHandler { const absl::flat_hash_map> &GetCreatedActors() const; + const absl::flat_hash_map> &GetRegisteredActors() + const; + + const absl::flat_hash_map> + &GetActorRegisterCallbacks() const; + private: /// A data structure representing an actor's owner. struct Owner { @@ -258,6 +278,14 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// deregisters the actor. void DestroyActor(const ActorID &actor_id); + /// Get unresolved actors that were submitted from the specified node. + absl::flat_hash_set GetUnresolvedActorsByOwnerNode( + const ClientID &node_id) const; + + /// Get unresolved actors that were submitted from the specified worker. + absl::flat_hash_set GetUnresolvedActorsByOwnerWorker( + const ClientID &node_id, const WorkerID &worker_id) const; + private: /// Reconstruct the specified actor. /// @@ -267,16 +295,28 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// again. void ReconstructActor(const ActorID &actor_id, bool need_reschedule = true); - /// Callbacks of actor registration requests that are not yet flushed. - /// This map is used to filter duplicated messages from a Driver/Worker caused by some - /// network problems. + /// Callbacks of pending `RegisterActor` requests. + /// Maps actor ID to actor registration callbacks, which is used to filter duplicated + /// messages from a driver/worker caused by some network problems. absl::flat_hash_map> actor_to_register_callbacks_; - /// All registered actors (pending actors are also included). + /// Callbacks of actor creation requests. + /// Maps actor ID to actor creation callbacks, which is used to filter duplicated + /// messages come from a Driver/Worker caused by some network problems. + absl::flat_hash_map> + actor_to_create_callbacks_; + /// All registered actors (unresoved and pending actors are also included). /// TODO(swang): Use unique_ptr instead of shared_ptr. absl::flat_hash_map> registered_actors_; /// Maps actor names to their actor ID for lookups by name. absl::flat_hash_map named_actors_; + /// The actors which dependencies have not been resolved. + /// Maps from worker ID to a client and the IDs of the actors owned by that worker. + /// The actor whose dependencies are not resolved should be destroyed once it creator + /// dies. + absl::flat_hash_map>> + unresolved_actors_; /// The pending actors which will not be scheduled until there's a resource change. std::vector> pending_actors_; /// Map contains the relationship of node and created actors. Each node ID diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 1506105b4..411192a72 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -91,7 +91,7 @@ class GcsActorManagerTest : public ::testing::Test { gcs_table_storage_ = std::make_shared(io_service_); gcs_actor_manager_.reset(new gcs::GcsActorManager( mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_, - [&](const rpc::Address &addr) { return worker_client_; })); + [this](const rpc::Address &addr) { return worker_client_; })); } virtual ~GcsActorManagerTest() { @@ -131,6 +131,22 @@ class GcsActorManagerTest : public ::testing::Test { return address; } + std::shared_ptr RegisterActor(const JobID &job_id, int max_restarts = 0, + bool detached = false, + const std::string name = "") { + auto promise = std::make_shared>>(); + auto register_actor_request = + Mocker::GenRegisterActorRequest(job_id, max_restarts, detached, name); + auto status = gcs_actor_manager_->RegisterActor( + register_actor_request, [promise](std::shared_ptr actor) { + promise->set_value(std::move(actor)); + }); + if (!status.ok()) { + promise->set_value(nullptr); + } + return promise->get_future().get(); + } + boost::asio::io_service io_service_; std::unique_ptr thread_io_service_; std::shared_ptr store_client_; @@ -146,9 +162,13 @@ class GcsActorManagerTest : public ::testing::Test { TEST_F(GcsActorManagerTest, TestBasic) { auto job_id = JobID::FromInt(1); - auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto registered_actor = RegisterActor(job_id); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - Status status = gcs_actor_manager_->RegisterActor( + Status status = gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](const std::shared_ptr &actor) { finished_actors.emplace_back(actor); @@ -172,9 +192,13 @@ TEST_F(GcsActorManagerTest, TestBasic) { TEST_F(GcsActorManagerTest, TestSchedulingFailed) { auto job_id = JobID::FromInt(1); - auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto registered_actor = RegisterActor(job_id); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - RAY_CHECK_OK(gcs_actor_manager_->RegisterActor( + RAY_CHECK_OK(gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); })); @@ -199,9 +223,13 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) { TEST_F(GcsActorManagerTest, TestWorkerFailure) { auto job_id = JobID::FromInt(1); - auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto registered_actor = RegisterActor(job_id); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - RAY_CHECK_OK(gcs_actor_manager_->RegisterActor( + RAY_CHECK_OK(gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); })); @@ -237,9 +265,13 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) { TEST_F(GcsActorManagerTest, TestNodeFailure) { auto job_id = JobID::FromInt(1); - auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto registered_actor = RegisterActor(job_id); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - Status status = gcs_actor_manager_->RegisterActor( + Status status = gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); }); @@ -276,10 +308,14 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) { TEST_F(GcsActorManagerTest, TestActorReconstruction) { auto job_id = JobID::FromInt(1); - auto create_actor_request = - Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/false); + auto registered_actor = RegisterActor(job_id, /*max_restarts=*/1, + /*detached=*/false); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - Status status = gcs_actor_manager_->RegisterActor( + Status status = gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); }); @@ -335,10 +371,14 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) { TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) { auto job_id = JobID::FromInt(1); - auto create_actor_request = - Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/false); + auto registered_actor = RegisterActor(job_id, /*max_restarts=*/1, + /*detached=*/false); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - RAY_CHECK_OK(gcs_actor_manager_->RegisterActor( + RAY_CHECK_OK(gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); })); @@ -376,10 +416,14 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) { TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) { auto job_id = JobID::FromInt(1); - auto create_actor_request = - Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/true); + auto registered_actor = RegisterActor(job_id, /*max_restarts=*/1, + /*detached=*/true); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - RAY_CHECK_OK(gcs_actor_manager_->RegisterActor( + RAY_CHECK_OK(gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); })); @@ -409,8 +453,8 @@ TEST_F(GcsActorManagerTest, TestActorWithEmptyName) { // Gen `CreateActorRequest` with an empty name. // (name,actor_id) => ("", actor_id_1) - auto request1 = - Mocker::GenCreateActorRequest(job_id, 0, /*is_detached=*/true, /*name=*/""); + auto request1 = Mocker::GenRegisterActorRequest(job_id, /*max_restarts=*/0, + /*detached=*/true, /*name=*/""); Status status = gcs_actor_manager_->RegisterActor( request1, [](std::shared_ptr actor) {}); // Ensure successful registration. @@ -420,8 +464,8 @@ TEST_F(GcsActorManagerTest, TestActorWithEmptyName) { // Gen another `CreateActorRequest` with an empty name. // (name,actor_id) => ("", actor_id_2) - auto request2 = - Mocker::GenCreateActorRequest(job_id, 0, /*is_detached=*/true, /*name=*/""); + auto request2 = Mocker::GenRegisterActorRequest(job_id, /*max_restarts=*/0, + /*detached=*/true, /*name=*/""); status = gcs_actor_manager_->RegisterActor(request2, [](std::shared_ptr actor) {}); // Ensure successful registration. @@ -432,16 +476,16 @@ TEST_F(GcsActorManagerTest, TestNamedActors) { auto job_id_1 = JobID::FromInt(1); auto job_id_2 = JobID::FromInt(2); - auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, - /*name=*/"actor1"); + auto request1 = Mocker::GenRegisterActorRequest(job_id_1, /*max_restarts=*/0, + /*detached=*/true, /*name=*/"actor1"); Status status = gcs_actor_manager_->RegisterActor( request1, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor1").Binary(), request1.task_spec().actor_creation_task_spec().actor_id()); - auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, - /*name=*/"actor2"); + auto request2 = Mocker::GenRegisterActorRequest(job_id_1, /*max_restarts=*/0, + /*detached=*/true, /*name=*/"actor2"); status = gcs_actor_manager_->RegisterActor(request2, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); @@ -452,8 +496,8 @@ TEST_F(GcsActorManagerTest, TestNamedActors) { ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor3"), ActorID::Nil()); // Check that naming collisions return Status::Invalid. - auto request3 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, - /*name=*/"actor2"); + auto request3 = Mocker::GenRegisterActorRequest(job_id_1, /*max_restarts=*/0, + /*detached=*/true, /*name=*/"actor2"); status = gcs_actor_manager_->RegisterActor(request3, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.IsInvalid()); @@ -461,8 +505,8 @@ TEST_F(GcsActorManagerTest, TestNamedActors) { request2.task_spec().actor_creation_task_spec().actor_id()); // Check that naming collisions are enforced across JobIDs. - auto request4 = Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, - /*name=*/"actor2"); + auto request4 = Mocker::GenRegisterActorRequest(job_id_2, /*max_restarts=*/0, + /*detached=*/true, /*name=*/"actor2"); status = gcs_actor_manager_->RegisterActor(request4, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.IsInvalid()); @@ -474,9 +518,13 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { // Make sure named actor deletion succeeds when workers fail. const auto actor_name = "actor_to_delete"; const auto job_id_1 = JobID::FromInt(1); - const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, - /*name=*/actor_name); - Status status = gcs_actor_manager_->RegisterActor( + auto registered_actor_1 = RegisterActor(job_id_1, /*max_restarts=*/0, + /*detached=*/true, /*name=*/actor_name); + rpc::CreateActorRequest request1; + request1.mutable_task_spec()->CopyFrom( + registered_actor_1->GetActorTableData().task_spec()); + + Status status = gcs_actor_manager_->CreateActor( request1, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); ASSERT_EQ(gcs_actor_manager_->GetActorIDByName(actor_name).Binary(), @@ -500,10 +548,14 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { // Create an actor with the same name. This ensures that the name has been properly // deleted. - const auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, - /*name=*/actor_name); - status = gcs_actor_manager_->RegisterActor(request2, - [](std::shared_ptr actor) {}); + auto registered_actor_2 = RegisterActor(job_id_1, /*max_restarts=*/0, + /*detached=*/true, /*name=*/actor_name); + rpc::CreateActorRequest request2; + request2.mutable_task_spec()->CopyFrom( + registered_actor_2->GetActorTableData().task_spec()); + + status = gcs_actor_manager_->CreateActor(request2, + [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); ASSERT_EQ(gcs_actor_manager_->GetActorIDByName(actor_name).Binary(), request2.task_spec().actor_creation_task_spec().actor_id()); @@ -512,9 +564,13 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { // Make sure named actor deletion succeeds when nodes fail. const auto job_id_1 = JobID::FromInt(1); - const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, - /*name=*/"actor"); - Status status = gcs_actor_manager_->RegisterActor( + auto registered_actor_1 = RegisterActor(job_id_1, /*max_restarts=*/0, + /*detached=*/true, /*name=*/"actor"); + rpc::CreateActorRequest request1; + request1.mutable_task_spec()->CopyFrom( + registered_actor_1->GetActorTableData().task_spec()); + + Status status = gcs_actor_manager_->CreateActor( request1, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor").Binary(), @@ -537,10 +593,14 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { // Create an actor with the same name. This ensures that the name has been properly // deleted. - const auto request2 = Mocker::GenCreateActorRequest(job_id_1, 0, /*is_detached=*/true, - /*name=*/"actor"); - status = gcs_actor_manager_->RegisterActor(request2, - [](std::shared_ptr actor) {}); + auto registered_actor_2 = RegisterActor(job_id_1, /*max_restarts=*/0, + /*detached=*/true, /*name=*/"actor"); + rpc::CreateActorRequest request2; + request2.mutable_task_spec()->CopyFrom( + registered_actor_2->GetActorTableData().task_spec()); + + status = gcs_actor_manager_->CreateActor(request2, + [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor").Binary(), request2.task_spec().actor_creation_task_spec().actor_id()); @@ -550,9 +610,13 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { // Make sure named actor deletion succeeds when nodes fail. const auto job_id_1 = JobID::FromInt(1); // The dead actor will be reconstructed. - const auto request1 = Mocker::GenCreateActorRequest(job_id_1, 1, /*is_detached=*/true, - /*name=*/"actor"); - Status status = gcs_actor_manager_->RegisterActor( + auto registered_actor_1 = RegisterActor(job_id_1, /*max_restarts=*/1, + /*detached=*/true, /*name=*/"actor"); + rpc::CreateActorRequest request1; + request1.mutable_task_spec()->CopyFrom( + registered_actor_1->GetActorTableData().task_spec()); + + Status status = gcs_actor_manager_->CreateActor( request1, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.ok()); ASSERT_EQ(gcs_actor_manager_->GetActorIDByName("actor").Binary(), @@ -578,8 +642,8 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { // It should fail because actor has been reconstructed, and names shouldn't have been // cleaned. const auto job_id_2 = JobID::FromInt(2); - const auto request2 = Mocker::GenCreateActorRequest(job_id_2, 0, /*is_detached=*/true, - /*name=*/"actor"); + auto request2 = Mocker::GenRegisterActorRequest(job_id_2, /*max_restarts=*/0, + /*detached=*/true, /*name=*/"actor"); status = gcs_actor_manager_->RegisterActor(request2, [](std::shared_ptr actor) {}); ASSERT_TRUE(status.IsInvalid()); @@ -589,9 +653,13 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) { auto job_id = JobID::FromInt(1); - auto create_actor_request = Mocker::GenCreateActorRequest(job_id); + auto registered_actor = RegisterActor(job_id); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - RAY_CHECK_OK(gcs_actor_manager_->RegisterActor( + RAY_CHECK_OK(gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); })); @@ -613,10 +681,14 @@ TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) { TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { // Covers a scenario 1 in this PR https://github.com/ray-project/ray/pull/9215. auto job_id = JobID::FromInt(1); - auto create_actor_request = - Mocker::GenCreateActorRequest(job_id, /*max_restarts=*/1, /*detached=*/false); + auto registered_actor = RegisterActor(job_id, /*max_restarts=*/1, + /*detached=*/false); + rpc::CreateActorRequest create_actor_request; + create_actor_request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + std::vector> finished_actors; - RAY_CHECK_OK(gcs_actor_manager_->RegisterActor( + RAY_CHECK_OK(gcs_actor_manager_->CreateActor( create_actor_request, [&finished_actors](std::shared_ptr actor) { finished_actors.emplace_back(actor); })); @@ -640,6 +712,97 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false); } +TEST_F(GcsActorManagerTest, TestRegisterActor) { + auto job_id = JobID::FromInt(1); + auto registered_actor = RegisterActor(job_id); + // Make sure the actor state is `DEPENDENCIES_UNREADY`. + ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEPENDENCIES_UNREADY); + // Make sure the actor has not been scheduled yet. + ASSERT_TRUE(mock_actor_scheduler_->actors.empty()); + + std::vector> finished_actors; + rpc::CreateActorRequest request; + request.mutable_task_spec()->CopyFrom( + registered_actor->GetActorTableData().task_spec()); + RAY_CHECK_OK(gcs_actor_manager_->CreateActor( + request, [&finished_actors](std::shared_ptr actor) { + finished_actors.emplace_back(std::move(actor)); + })); + // Make sure the actor is scheduling. + ASSERT_EQ(mock_actor_scheduler_->actors.size(), 1); + auto actor = mock_actor_scheduler_->actors.back(); + mock_actor_scheduler_->actors.pop_back(); + // Make sure the actor state is `PENDING`. + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::PENDING_CREATION); + + actor->UpdateAddress(RandomAddress()); + gcs_actor_manager_->OnActorCreationSuccess(actor); + WaitActorCreated(actor->GetActorID()); + ASSERT_EQ(actor->GetState(), rpc::ActorTableData::ALIVE); +} + +TEST_F(GcsActorManagerTest, TestOwnerWorkerDieBeforeActorDependenciesResolved) { + auto job_id = JobID::FromInt(1); + auto registered_actor = RegisterActor(job_id); + const auto &owner_address = registered_actor->GetOwnerAddress(); + auto node_id = ClientID::FromBinary(owner_address.raylet_id()); + auto worker_id = WorkerID::FromBinary(owner_address.worker_id()); + gcs_actor_manager_->OnWorkerDead(node_id, worker_id); + ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD); + + // Make sure the actor gets cleaned up. + const auto ®istered_actors = gcs_actor_manager_->GetRegisteredActors(); + ASSERT_FALSE(registered_actors.count(registered_actor->GetActorID())); + const auto &callbacks = gcs_actor_manager_->GetActorRegisterCallbacks(); + ASSERT_FALSE(callbacks.count(registered_actor->GetActorID())); +} + +TEST_F(GcsActorManagerTest, TestOwnerWorkerDieBeforeDetachedActorDependenciesResolved) { + auto job_id = JobID::FromInt(1); + auto registered_actor = RegisterActor(job_id, /*max_restarts=*/1, /*detached=*/true); + const auto &owner_address = registered_actor->GetOwnerAddress(); + auto node_id = ClientID::FromBinary(owner_address.raylet_id()); + auto worker_id = WorkerID::FromBinary(owner_address.worker_id()); + gcs_actor_manager_->OnWorkerDead(node_id, worker_id); + ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD); + + // Make sure the actor gets cleaned up. + const auto ®istered_actors = gcs_actor_manager_->GetRegisteredActors(); + ASSERT_FALSE(registered_actors.count(registered_actor->GetActorID())); + const auto &callbacks = gcs_actor_manager_->GetActorRegisterCallbacks(); + ASSERT_FALSE(callbacks.count(registered_actor->GetActorID())); +} + +TEST_F(GcsActorManagerTest, TestOwnerNodeDieBeforeActorDependenciesResolved) { + auto job_id = JobID::FromInt(1); + auto registered_actor = RegisterActor(job_id); + const auto &owner_address = registered_actor->GetOwnerAddress(); + auto node_id = ClientID::FromBinary(owner_address.raylet_id()); + gcs_actor_manager_->OnNodeDead(node_id); + ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD); + + // Make sure the actor gets cleaned up. + const auto ®istered_actors = gcs_actor_manager_->GetRegisteredActors(); + ASSERT_FALSE(registered_actors.count(registered_actor->GetActorID())); + const auto &callbacks = gcs_actor_manager_->GetActorRegisterCallbacks(); + ASSERT_FALSE(callbacks.count(registered_actor->GetActorID())); +} + +TEST_F(GcsActorManagerTest, TestOwnerNodeDieBeforeDetachedActorDependenciesResolved) { + auto job_id = JobID::FromInt(1); + auto registered_actor = RegisterActor(job_id, /*max_restarts=*/1, /*detached=*/true); + const auto &owner_address = registered_actor->GetOwnerAddress(); + auto node_id = ClientID::FromBinary(owner_address.raylet_id()); + gcs_actor_manager_->OnNodeDead(node_id); + ASSERT_EQ(registered_actor->GetState(), rpc::ActorTableData::DEAD); + + // Make sure the actor gets cleaned up. + const auto ®istered_actors = gcs_actor_manager_->GetRegisteredActors(); + ASSERT_FALSE(registered_actors.count(registered_actor->GetActorID())); + const auto &callbacks = gcs_actor_manager_->GetActorRegisterCallbacks(); + ASSERT_FALSE(callbacks.count(registered_actor->GetActorID())); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 427795d69..d17436edf 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -71,7 +71,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleFailedWithZeroNode) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with zero node. gcs_actor_scheduler_->Schedule(actor); @@ -93,7 +93,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleActorSuccess) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -128,7 +128,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenLeasing) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -173,7 +173,7 @@ TEST_F(GcsActorSchedulerTest, TestScheduleRetryWhenCreating) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -214,7 +214,7 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenLeasing) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -252,7 +252,7 @@ TEST_F(GcsActorSchedulerTest, TestLeasingCancelledWhenLeasing) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -285,7 +285,7 @@ TEST_F(GcsActorSchedulerTest, TestNodeFailedWhenCreating) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -327,7 +327,7 @@ TEST_F(GcsActorSchedulerTest, TestWorkerFailedWhenCreating) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -366,7 +366,7 @@ TEST_F(GcsActorSchedulerTest, TestSpillback) { auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); // Schedule the actor with 1 available node, and the lease request should be send to the // node. @@ -427,7 +427,7 @@ TEST_F(GcsActorSchedulerTest, TestReschedule) { // 1.Actor is already tied to a leased worker. auto job_id = JobID::FromInt(1); auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto actor = std::make_shared(create_actor_request.task_spec()); rpc::Address address; WorkerID worker_id = WorkerID::FromRandom(); address.set_raylet_id(node_id_1.Binary()); @@ -490,8 +490,8 @@ TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkers) { // won't send `RequestWorkerLease` request to node immediately. But instead, it will // invoke the `RetryLeasingWorkerFromNode` to retry later. auto job_id = JobID::FromInt(1); - auto create_actor_request = Mocker::GenCreateActorRequest(job_id); - auto actor = std::make_shared(create_actor_request); + auto request = Mocker::GenCreateActorRequest(job_id); + auto actor = std::make_shared(request.task_spec()); gcs_actor_scheduler_->Schedule(actor); ASSERT_EQ(2, gcs_actor_scheduler_->num_retry_leasing_count_); ASSERT_EQ(raylet_client_->num_workers_requested, 0); diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 898e36844..dd330ffc4 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -473,19 +473,16 @@ TEST_F(GcsServerTest, TestActorInfo) { ASSERT_TRUE(RegisterActorInfo(register_actor_info_request)); boost::optional result = GetActorInfo(actor_table_data->actor_id()); - ASSERT_TRUE(result->state() == - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + ASSERT_TRUE(result->state() == rpc::ActorTableData::ALIVE); // Update actor state rpc::UpdateActorInfoRequest update_actor_info_request; - actor_table_data->set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + actor_table_data->set_state(rpc::ActorTableData::DEAD); update_actor_info_request.set_actor_id(actor_table_data->actor_id()); update_actor_info_request.mutable_actor_table_data()->CopyFrom(*actor_table_data); ASSERT_TRUE(UpdateActorInfo(update_actor_info_request)); result = GetActorInfo(actor_table_data->actor_id()); - ASSERT_TRUE(result->state() == - rpc::ActorTableData_ActorState::ActorTableData_ActorState_DEAD); + ASSERT_TRUE(result->state() == rpc::ActorTableData::DEAD); // Add actor checkpoint ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); @@ -540,8 +537,7 @@ TEST_F(GcsServerTest, TestJobGarbageCollection) { ASSERT_TRUE(RegisterActorInfo(register_actor_info_request)); boost::optional actor_result = GetActorInfo(actor_table_data->actor_id()); - ASSERT_TRUE(actor_result->state() == - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + ASSERT_TRUE(actor_result->state() == rpc::ActorTableData::ALIVE); // Add actor checkpoint ActorCheckpointID checkpoint_id = ActorCheckpointID::FromRandom(); @@ -571,8 +567,7 @@ TEST_F(GcsServerTest, TestJobGarbageCollection) { ASSERT_TRUE(RegisterActorInfo(register_detached_actor_info_request)); boost::optional detached_actor_result = GetActorInfo(detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_actor_result->state() == - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + ASSERT_TRUE(detached_actor_result->state() == rpc::ActorTableData::ALIVE); // Add checkpoint for detached actor ActorCheckpointID detached_checkpoint_id = ActorCheckpointID::FromRandom(); @@ -620,8 +615,7 @@ TEST_F(GcsServerTest, TestJobGarbageCollection) { ASSERT_TRUE(WaitForCondition(condition_func, 10 * 1000)); detached_actor_result = GetActorInfo(detached_actor_table_data->actor_id()); - ASSERT_TRUE(detached_actor_result->state() == - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + ASSERT_TRUE(detached_actor_result->state() == rpc::ActorTableData::ALIVE); detached_checkpoint_result = GetActorCheckpoint(detached_actor_table_data->actor_id(), detached_checkpoint_id.Binary()); diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 3ca27345d..89007cba1 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -65,10 +65,19 @@ Status RedisLogBasedActorInfoAccessor::AsyncGet( on_done); } +Status RedisLogBasedActorInfoAccessor::AsyncRegisterActor( + const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { + const std::string error_msg = + "Unsupported method of AsyncRegisterActor in RedisLogBasedActorInfoAccessor."; + RAY_LOG(FATAL) << error_msg; + return Status::Invalid(error_msg); +} + Status RedisLogBasedActorInfoAccessor::AsyncCreateActor( const ray::TaskSpecification &task_spec, const ray::gcs::StatusCallback &callback) { const std::string error_msg = - "Unsupported method of AsyncCreateActor in RedisLogBasedActorInfoAccessor."; + "Unsupported method of AsyncCreateActor in " + "RedisLogBasedActorInfoAccessor."; RAY_LOG(FATAL) << error_msg; return Status::Invalid(error_msg); } diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index 6f5f7278d..f279e6056 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -52,6 +52,9 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor { "RedisLogBasedActorInfoAccessor does not support named detached actors."); } + Status AsyncRegisterActor(const TaskSpecification &task_spec, + const StatusCallback &callback) override; + Status AsyncCreateActor(const TaskSpecification &task_spec, const StatusCallback &callback) override; diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 5ade2bb6e..c8b4d86fe 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -49,16 +49,30 @@ struct Mocker { int max_restarts = 0, bool detached = false, const std::string name = "") { - rpc::CreateActorRequest request; rpc::Address owner_address; - if (owner_address.raylet_id().empty()) { - owner_address.set_raylet_id(ClientID::FromRandom().Binary()); - owner_address.set_ip_address("1234"); - owner_address.set_port(5678); - owner_address.set_worker_id(WorkerID::FromRandom().Binary()); - } + owner_address.set_raylet_id(ClientID::FromRandom().Binary()); + owner_address.set_ip_address("1234"); + owner_address.set_port(5678); + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); auto actor_creation_task_spec = GenActorCreationTask(job_id, max_restarts, detached, name, owner_address); + rpc::CreateActorRequest request; + request.mutable_task_spec()->CopyFrom(actor_creation_task_spec.GetMessage()); + return request; + } + + static rpc::RegisterActorRequest GenRegisterActorRequest(const JobID &job_id, + int max_restarts = 0, + bool detached = false, + const std::string name = "") { + rpc::Address owner_address; + owner_address.set_raylet_id(ClientID::FromRandom().Binary()); + owner_address.set_ip_address("1234"); + owner_address.set_port(5678); + owner_address.set_worker_id(WorkerID::FromRandom().Binary()); + auto actor_creation_task_spec = + GenActorCreationTask(job_id, max_restarts, detached, name, owner_address); + rpc::RegisterActorRequest request; request.mutable_task_spec()->CopyFrom(actor_creation_task_spec.GetMessage()); return request; } @@ -113,8 +127,7 @@ struct Mocker { ActorID actor_id = ActorID::Of(job_id, RandomTaskId(), 0); actor_table_data->set_actor_id(actor_id.Binary()); actor_table_data->set_job_id(job_id.Binary()); - actor_table_data->set_state( - rpc::ActorTableData_ActorState::ActorTableData_ActorState_ALIVE); + actor_table_data->set_state(rpc::ActorTableData::ALIVE); actor_table_data->set_max_restarts(1); actor_table_data->set_num_restarts(0); return actor_table_data; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 5db1ebb99..a7c691828 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -103,15 +103,17 @@ message TaskTableData { message ActorTableData { // State of an actor. enum ActorState { - // Actor is pending. - PENDING = 0; + // Actor info ins registered in GCS. But its dependencies are not ready. + DEPENDENCIES_UNREADY = 0; + // Actor local dependencies are ready. This actor is being created. + PENDING_CREATION = 1; // Actor is alive. - ALIVE = 1; + ALIVE = 2; // Actor is dead, now being restarted. // After reconstruction finishes, the state will become alive again. - RESTARTING = 2; + RESTARTING = 3; // Actor is already dead and won't be restarted. - DEAD = 3; + DEAD = 4; } // The ID of the actor that was created. bytes actor_id = 1; diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index bd137759e..b796fa743 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -141,7 +141,9 @@ message GetActorCheckpointIDReply { // Service for actor info access. service ActorInfoGcsService { - // Create actor via gcs service. + // Register actor to gcs service. + rpc RegisterActor(RegisterActorRequest) returns (RegisterActorReply); + // Create actor which local dependencies are resolved. rpc CreateActor(CreateActorRequest) returns (CreateActorReply); // Get actor data from GCS Service by actor id. rpc GetActorInfo(GetActorInfoRequest) returns (GetActorInfoReply); @@ -493,6 +495,14 @@ message CreateActorReply { GcsStatus status = 1; } +message RegisterActorRequest { + TaskSpec task_spec = 1; +} + +message RegisterActorReply { + GcsStatus status = 1; +} + message CreatePlacementGroupRequest { PlacementGroupSpec placement_group_spec = 1; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6cd75b6f6..bb6d29cc1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1014,9 +1014,6 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id, for (auto const &task : removed_tasks) { SubmitTask(task, Lineage()); } - } else { - RAY_CHECK(actor_registration.GetState() == ActorTableData::PENDING); - // Do nothing. } } diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index cd65097f4..aadbc5813 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -122,6 +122,10 @@ class GcsRpcClient { /// Get information of all jobs from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(JobInfoGcsService, GetAllJobInfo, job_info_grpc_client_, ) + /// Register actor via GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, RegisterActor, + actor_info_grpc_client_, ) + /// Create actor via GCS Service. VOID_GCS_RPC_CLIENT_METHOD(ActorInfoGcsService, CreateActor, actor_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 4d460df21..030c0d3f4 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -103,6 +103,10 @@ class ActorInfoGcsServiceHandler { public: virtual ~ActorInfoGcsServiceHandler() = default; + virtual void HandleRegisterActor(const RegisterActorRequest &request, + RegisterActorReply *reply, + SendReplyCallback send_reply_callback) = 0; + virtual void HandleCreateActor(const CreateActorRequest &request, CreateActorReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -156,6 +160,7 @@ class ActorInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, std::vector> *server_call_factories) override { + ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActor); ACTOR_INFO_SERVICE_RPC_HANDLER(CreateActor); ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo); ACTOR_INFO_SERVICE_RPC_HANDLER(GetNamedActorInfo);