diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst index c3567ec6b..68c6d0ec7 100644 --- a/doc/source/advanced.rst +++ b/doc/source/advanced.rst @@ -235,3 +235,43 @@ To get information about the current available resource capacity of your cluster .. autofunction:: ray.available_resources :noindex: + +Detached Actors +----------------------------------- + +When original actor handles goes out of scope or the driver that originally +created the actor exits, ray will clean up the actor by default. If you want +to make sure the actor is kept alive, you can use +``_remote(name="some_name", detached=True)`` to keep the actor alive after +the driver exits. The actor will have a globally unique name and can be +accessed across different drivers. + +For example, you can instantiate and register a persistent actor as follows: + +.. code-block:: python + + counter = Counter._remote(name="CounterActor", detached=True) + +The CounterActor will be kept alive even after the driver running above script +exits. Therefore it is possible to run the following script in a different +driver: + +.. code-block:: python + + counter = ray.experimental.get_actor("CounterActor") + print(ray.get(counter.get_counter.remote())) + +Note that just creating a named actor is allowed, this actor will be cleaned +up after driver exits: + +.. code-block:: python + + Counter._remote(name="CounterActor") + +However, creating a detached actor without name is not allowed because there +will be no way to retrieve the actor handle and the resource is leaked. + +.. code-block:: python + + # Can't do this! + Counter._remote(detached=True) \ No newline at end of file diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index a0f349839..83d185282 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -948,7 +948,8 @@ cdef class CoreWorker: uint64_t max_reconstructions, resources, placement_resources, - c_bool is_direct_call): + c_bool is_direct_call, + c_bool is_detached): cdef: CRayFunction ray_function c_vector[CTaskArg] args_vector @@ -969,7 +970,8 @@ cdef class CoreWorker: ray_function, args_vector, CActorCreationOptions( max_reconstructions, is_direct_call, c_resources, - c_placement_resources, dynamic_worker_options), + c_placement_resources, dynamic_worker_options, + is_detached), &c_actor_id)) return ActorID(c_actor_id.Binary()) diff --git a/python/ray/actor.py b/python/ray/actor.py index 7e0f354e6..9f487d281 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -307,7 +307,9 @@ class ActorClass(object): memory=None, object_store_memory=None, resources=None, - is_direct_call=None): + is_direct_call=None, + name=None, + detached=False): """Create an actor. This method allows more flexibility than the remote method because @@ -325,6 +327,9 @@ class ActorClass(object): resources: The custom resources required by the actor creation task. is_direct_call: Use direct actor calls. + name: The globally unique name for the actor. + detached: Whether the actor should be kept alive after driver + exits. Returns: A handle to the newly created actor. @@ -341,6 +346,23 @@ class ActorClass(object): meta = self.__ray_metadata__ + if detached and name is None: + raise Exception("Detached actors must be named. " + "Please use Actor._remote(name='some_name') " + "to associate the name.") + + # Check whether the name is already taken. + if name is not None: + try: + ray.experimental.get_actor(name) + except ValueError: # name is not taken, expected. + pass + else: + raise ValueError( + "The name {name} is already taken. Please use " + "a different name or get existing actor using " + "ray.experimental.get_actor('{name}')".format(name=name)) + # Set the actor's default resources if not already set. First three # conditions are to check that no resources were specified in the # decorator. Last three conditions are to check that no resources were @@ -403,7 +425,7 @@ class ActorClass(object): actor_id = worker.core_worker.create_actor( function_descriptor.get_function_descriptor_list(), creation_args, meta.max_reconstructions, resources, - actor_placement_resources, is_direct_call) + actor_placement_resources, is_direct_call, detached) actor_handle = ActorHandle( actor_id, @@ -417,6 +439,9 @@ class ActorClass(object): worker.current_session_and_job, original_handle=True) + if name is not None: + ray.experimental.register_actor(name, actor_handle) + return actor_handle diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 9189850b6..ad3a24c79 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -207,7 +207,8 @@ cdef extern from "ray/core_worker/common.h" nogil: uint64_t max_reconstructions, c_bool is_direct_call, const unordered_map[c_string, double] &resources, const unordered_map[c_string, double] &placement_resources, - const c_vector[c_string] &dynamic_worker_options) + const c_vector[c_string] &dynamic_worker_options, + c_bool is_detached) cdef extern from "ray/gcs/gcs_client_interface.h" nogil: cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 7672cbbd8..e20a6a79f 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -21,12 +21,9 @@ import ray.ray_constants as ray_constants import ray.tests.utils import ray.tests.cluster_utils from ray.tests.conftest import generate_internal_config_map -from ray.tests.utils import ( - relevant_errors, - wait_for_condition, - wait_for_errors, - wait_for_pid_to_exit, -) +from ray.tests.utils import (relevant_errors, wait_for_condition, + wait_for_errors, wait_for_pid_to_exit, + run_string_as_driver) @pytest.fixture @@ -2816,3 +2813,37 @@ def test_ray_wait_dead_actor(ray_start_cluster): failure_detected = False while not failure_detected: failure_detected = ray.get(parent_actor.wait.remote()) + + +def test_detached_actor(ray_start_regular): + @ray.remote + class DetachedActor(object): + def ping(self): + return "pong" + + with pytest.raises(Exception, match="Detached actors must be named"): + DetachedActor._remote(detached=True) + + with pytest.raises(ValueError, match="Please use a different name"): + _ = DetachedActor._remote(name="d_actor") + DetachedActor._remote(name="d_actor") + + redis_address = ray_start_regular["redis_address"] + + actor_name = "DetachedActor" + driver_script = """ +import ray +ray.init(address="{}") + +@ray.remote +class DetachedActor(object): + def ping(self): + return "pong" + +actor = DetachedActor._remote(name="{}", detached=True) +ray.get(actor.ping.remote()) +""".format(redis_address, actor_name) + + run_string_as_driver(driver_script) + detached_actor = ray.experimental.get_actor(actor_name) + assert ray.get(detached_actor.ping.remote()) == "pong" diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index de53e6dee..115a38ac0 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -189,6 +189,11 @@ bool TaskSpecification::IsDirectCall() const { return message_->actor_creation_task_spec().is_direct_call(); } +bool TaskSpecification::IsDetachedActor() const { + RAY_CHECK(IsActorCreationTask()); + return message_->actor_creation_task_spec().is_detached(); +} + std::string TaskSpecification::DebugString() const { std::ostringstream stream; stream << "Type=" << TaskType_Name(message_->type()) @@ -213,7 +218,8 @@ std::string TaskSpecification::DebugString() const { // Print actor creation task spec. stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId() << ", max_reconstructions=" << MaxActorReconstructions() - << ", is_direct_call=" << IsDirectCall() << "}"; + << ", is_direct_call=" << IsDirectCall() + << ", is_detached=" << IsDetachedActor() << "}"; } else if (IsActorTask()) { // Print actor task spec. stream << ", actor_task_spec={actor_id=" << ActorId() diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 4f9cbe001..29f4ec9be 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -142,6 +142,8 @@ class TaskSpecification : public MessageWrapper { bool IsDirectCall() const; + bool IsDetachedActor() const; + ObjectID ActorDummyObject() const; std::string DebugString() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 6ddd7275b..c723bdc7f 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -93,7 +93,7 @@ class TaskSpecBuilder { TaskSpecBuilder &SetActorCreationTaskSpec( const ActorID &actor_id, uint64_t max_reconstructions = 0, const std::vector &dynamic_worker_options = {}, - bool is_direct_call = false) { + bool is_direct_call = false, bool is_detached = false) { message_->set_type(TaskType::ACTOR_CREATION_TASK); auto actor_creation_spec = message_->mutable_actor_creation_task_spec(); actor_creation_spec->set_actor_id(actor_id.Binary()); @@ -102,6 +102,7 @@ class TaskSpecBuilder { actor_creation_spec->add_dynamic_worker_options(option); } actor_creation_spec->set_is_direct_call(is_direct_call); + actor_creation_spec->set_is_detached(is_detached); return *this; } diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 943dbaa53..2f171ddef 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -99,12 +99,14 @@ struct ActorCreationOptions { ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call, const std::unordered_map &resources, const std::unordered_map &placement_resources, - const std::vector &dynamic_worker_options) + const std::vector &dynamic_worker_options, + bool is_detached) : max_reconstructions(max_reconstructions), is_direct_call(is_direct_call), resources(resources), placement_resources(placement_resources), - dynamic_worker_options(dynamic_worker_options) {} + dynamic_worker_options(dynamic_worker_options), + is_detached(is_detached){}; /// Maximum number of times that the actor should be reconstructed when it dies /// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed. @@ -119,6 +121,9 @@ struct ActorCreationOptions { /// The dynamic options used in the worker command when starting a worker process for /// an actor creation task. const std::vector dynamic_worker_options; + /// Whether to keep the actor persistent after driver exit. If true, this will set + /// the worker to not be destroyed after the driver shutdown. + const bool is_detached = false; }; } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 753333d9b..297d4c70b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -482,7 +482,8 @@ Status CoreWorker::CreateActor(const RayFunction &function, actor_creation_options.placement_resources, TaskTransportType::RAYLET, &return_ids); builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, actor_creation_options.dynamic_worker_options, - actor_creation_options.is_direct_call); + actor_creation_options.is_direct_call, + actor_creation_options.is_detached); std::unique_ptr actor_handle(new ActorHandle( actor_id, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index d0a67e2bd..2c1a5edb0 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -61,7 +61,8 @@ ActorID CreateActorHelper(CoreWorker &worker, args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); ActorCreationOptions actor_options{ - max_reconstructions, is_direct_call, resources, resources, {}}; + max_reconstructions, is_direct_call, resources, resources, {}, + /*is_detached*/ false}; // Create an actor. ActorID actor_id; @@ -489,8 +490,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); std::unordered_map resources; - ActorCreationOptions actor_options{ - 0, /*is_direct_call*/ true, resources, resources, {}}; + ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, resources, + {}, /*is_detached*/ false}; const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), job_id, ObjectID::FromRandom(), function.GetLanguage(), true, diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index b53e253ec..4c2476a44 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -95,6 +95,8 @@ message ActorCreationTaskSpec { repeated string dynamic_worker_options = 4; // Whether direct actor call is used. bool is_direct_call = 5; + // Whether the actor is persistent + bool is_detached = 6; } // Task spec of an actor task. diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 4ea542b4b..55ef646a7 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -111,6 +111,8 @@ message ActorTableData { int32 port = 10; // Whether direct actor call is used. bool is_direct_call = 11; + // Whether the actor is persistent. + bool is_detached = 12; } message ErrorTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3743c3d0d..cd15034f7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -275,13 +275,15 @@ void NodeManager::HandleJobTableUpdate(const JobID &id, // Kill all the workers. The actual cleanup for these workers is done // later when we receive the DisconnectClient message from them. for (const auto &worker : workers) { - // Clean up any open ray.wait calls that the worker made. - task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); - // Mark the worker as dead so further messages from it are ignored - // (except DisconnectClient). - worker->MarkDead(); - // Then kill the worker process. - KillWorker(worker); + if (!worker->IsDetachedActor()) { + // Clean up any open ray.wait calls that the worker made. + task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); + // Mark the worker as dead so further messages from it are ignored + // (except DisconnectClient). + worker->MarkDead(); + // Then kill the worker process. + KillWorker(worker); + } } // Remove all tasks for this job from the scheduling queues, mark @@ -2015,6 +2017,7 @@ std::shared_ptr NodeManager::CreateActorTableDataFromCreationTas // of remaining reconstructions is the max. actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions()); actor_info_ptr->set_is_direct_call(task_spec.IsDirectCall()); + actor_info_ptr->set_is_detached(task_spec.IsDetachedActor()); } else { // If we've already seen this actor, it means that this actor was reconstructed. // Thus, its previous state must be RECONSTRUCTING. @@ -2066,6 +2069,11 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { if (task_spec.IsActorCreationTask()) { // This was an actor creation task. Convert the worker to an actor. worker.AssignActorId(actor_id); + + if (task_spec.IsDetachedActor()) { + worker.MarkDetachedActor(); + } + // Lookup the parent actor id. auto parent_task_id = task_spec.ParentTaskId(); int port = worker.Port(); diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index f9182f43a..0218196fd 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -22,7 +22,8 @@ Worker::Worker(const WorkerID &worker_id, pid_t pid, const Language &language, i connection_(connection), dead_(false), blocked_(false), - client_call_manager_(client_call_manager) { + client_call_manager_(client_call_manager), + is_detached_actor_(false) { if (port_ > 0) { rpc_client_ = std::unique_ptr( new rpc::WorkerTaskClient("127.0.0.1", port_, client_call_manager_)); @@ -80,6 +81,10 @@ void Worker::AssignActorId(const ActorID &actor_id) { const ActorID &Worker::GetActorId() const { return actor_id_; } +void Worker::MarkDetachedActor() { is_detached_actor_ = true; } + +bool Worker::IsDetachedActor() const { return is_detached_actor_; } + const std::shared_ptr Worker::Connection() const { return connection_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 878fc9f8e..b474d1569 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -46,6 +46,8 @@ class Worker { const JobID &GetAssignedJobId() const; void AssignActorId(const ActorID &actor_id); const ActorID &GetActorId() const; + void MarkDetachedActor(); + bool IsDetachedActor() const; const std::shared_ptr Connection() const; const ResourceIdSet &GetLifetimeResourceIds() const; @@ -102,6 +104,9 @@ class Worker { rpc::ClientCallManager &client_call_manager_; /// The rpc client to send tasks to this worker. std::unique_ptr rpc_client_; + /// Whether the worker is detached. This is applies when the worker is actor. + /// Detached actor means the actor's creator can exit without killing this actor. + bool is_detached_actor_; /// The rpc client to send tasks to the direct actor service. std::unique_ptr direct_rpc_client_; };