diff --git a/doc/source/actors.rst b/doc/source/actors.rst index e3839b593..6d3127158 100644 --- a/doc/source/actors.rst +++ b/doc/source/actors.rst @@ -199,6 +199,37 @@ If we instantiate an actor, we can pass the handle around to various tasks. time.sleep(1) print(ray.get(counter.get_counter.remote())) +Named Actors +------------ + +An actor can be given a globally unique name via ``.options(name="some_name")``, +which allows you to retrieve the actor from any job in the Ray cluster via +``ray.get_actor("some_name")``. This can be useful if you cannot directly +pass the actor handle to the task that needs it, or if you are trying to +access an actor launched by another driver. + +Actor Lifetimes +--------------- + +Separately, actor lifetimes can be decoupled from the job, allowing an actor to +persist even after the driver process of the job exits. + +.. code-block:: python + + counter = Counter.options(name="CounterActor", lifetime="detached").remote() + +The CounterActor will be kept alive even after the driver running above script +exits. Therefore it is possible to run the following script in a different +driver: + +.. code-block:: python + + counter = ray.get_actor("CounterActor") + print(ray.get(counter.get_counter.remote())) + +Note that the lifetime option is decoupled from the name. If we only specified +the name without specifying ``lifetime="detached"``, then the CounterActor can +only be retrieved as long as the original driver is still running. Actor Pool ---------- diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst index 8c2c0df8b..fc742fb5e 100644 --- a/doc/source/advanced.rst +++ b/doc/source/advanced.rst @@ -239,28 +239,3 @@ To get information about the current available resource capacity of your cluster .. autofunction:: ray.available_resources :noindex: - -Detached Actors ------------------------------------ - -When original actor handles goes out of scope or the driver that originally -created the actor exits, ray will clean up the actor by default. If you want -to make sure the actor is kept alive, you can use -``_remote(name="some_name")`` to keep the actor alive after -the driver exits. The actor will have a globally unique name and can be -accessed across different drivers. - -For example, you can instantiate and register a persistent actor as follows: - -.. code-block:: python - - counter = Counter.options(name="CounterActor").remote() - -The CounterActor will be kept alive even after the driver running above script -exits. Therefore it is possible to run the following script in a different -driver: - -.. code-block:: python - - counter = ray.get_actor("CounterActor") - print(ray.get(counter.get_counter.remote())) diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index 4623dc8a2..991a19df2 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -519,7 +519,7 @@ For even finer-grained control over training, you can use RLlib's lower-level `b Global Coordination ~~~~~~~~~~~~~~~~~~~ -Sometimes, it is necessary to coordinate between pieces of code that live in different processes managed by RLlib. For example, it can be useful to maintain a global average of a certain variable, or centrally control a hyperparameter used by policies. Ray provides a general way to achieve this through *detached actors* (learn more about Ray actors `here `__). These actors are assigned a global name and handles to them can be retrieved using these names. As an example, consider maintaining a shared global counter that is incremented by environments and read periodically from your driver program: +Sometimes, it is necessary to coordinate between pieces of code that live in different processes managed by RLlib. For example, it can be useful to maintain a global average of a certain variable, or centrally control a hyperparameter used by policies. Ray provides a general way to achieve this through *named actors* (learn more about Ray actors `here `__). These actors are assigned a global name and handles to them can be retrieved using these names. As an example, consider maintaining a shared global counter that is incremented by environments and read periodically from your driver program: .. code-block:: python diff --git a/python/ray/actor.py b/python/ray/actor.py index cc95e8c35..9ae43d77b 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -411,7 +411,7 @@ class ActorClass: max_restarts=None, max_task_retries=None, name=None, - detached=False, + lifetime=None, placement_group=None, placement_group_bundle_index=-1): """Create an actor. @@ -436,8 +436,13 @@ class ActorClass: concurrency defaults to 1 for threaded execution, and 1000 for asyncio execution. Note that the execution order is not guaranteed when max_concurrency > 1. - name: The globally unique name for the actor. - detached: DEPRECATED. + name: The globally unique name for the actor, which can be used + to retrieve the actor via ray.get_actor(name) as long as the + actor is still alive. + lifetime: Either `None`, which defaults to the actor will fate + share with its creator and will be deleted once its refcount + drops to zero, or "detached", which means the actor will live + as a global object independent of the creator. placement_group: the placement group this actor belongs to, or None if it doesn't belong to any group. placement_group_bundle_index: the index of the bundle @@ -472,10 +477,6 @@ class ActorClass: worker = ray.worker.global_worker worker.check_connected() - if detached: - logger.warning("The detached flag is deprecated. To create a " - "detached actor, use the name parameter.") - if name is not None: if not isinstance(name, str): raise TypeError( @@ -498,9 +499,13 @@ class ActorClass: f"The name {name} is already taken. Please use " "a different name or get the existing actor using " f"ray.get_actor('{name}')") + + if lifetime is None: + detached = False + elif lifetime == "detached": detached = True else: - detached = False + raise ValueError("lifetime must be either `None` or 'detached'") if placement_group is None: placement_group = PlacementGroup.empty() diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 202c98db9..f34623c6d 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -94,6 +94,7 @@ def init(name: Optional[str] = None, controller = ServeController.options( name=controller_name, + lifetime="detached", max_restarts=-1, max_task_retries=-1, ).remote(name, http_host, http_port, _http_middlewares) diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index e18328553..4fbe0dccd 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -127,9 +127,9 @@ class ReplicaConfig: if not isinstance(self.ray_actor_options, dict): raise TypeError("ray_actor_options must be a dictionary.") - elif "detached" in self.ray_actor_options: + elif "lifetime" in self.ray_actor_options: raise ValueError( - "Specifying detached in actor_init_args is not allowed.") + "Specifying lifetime in actor_init_args is not allowed.") elif "name" in self.ray_actor_options: raise ValueError( "Specifying name in actor_init_args is not allowed.") diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 5ff50acc3..bf2faa2a8 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -180,6 +180,7 @@ class ServeController: self.http_port)) router = HTTPProxyActor.options( name=router_name, + lifetime="detached", max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, max_task_retries=-1, @@ -391,6 +392,7 @@ class ServeController: replica_name = format_actor_name(replica_tag, self.instance_name) worker_handle = ray.remote(backend_info.worker_class).options( name=replica_name, + lifetime="detached", max_restarts=-1, max_task_retries=-1, **backend_info.replica_config.ray_actor_options).remote( diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 1915d7f6d..3d7446b02 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -620,6 +620,33 @@ def test_calling_put_on_actor_handle(ray_start_regular): ray.get(g.remote()) +def test_named_but_not_detached(ray_start_regular): + redis_address = ray_start_regular["redis_address"] + + driver_script = """ +import ray +ray.init(address="{}") + +@ray.remote +class NotDetached: + def ping(self): + return "pong" + +actor = NotDetached.options(name="actor").remote() +assert ray.get(actor.ping.remote()) == "pong" +handle = ray.get_actor("actor") +assert ray.get(handle.ping.remote()) == "pong" +""".format(redis_address) + + # Creates and kills actor once the driver exits. + run_string_as_driver(driver_script) + + # Must raise an exception since lifetime is not detached. + with pytest.raises(Exception): + detached_actor = ray.get_actor("actor") + ray.get(detached_actor.ping.remote()) + + def test_detached_actor(ray_start_regular): @ray.remote class DetachedActor: @@ -627,17 +654,17 @@ def test_detached_actor(ray_start_regular): return "pong" with pytest.raises(TypeError): - DetachedActor._remote(name=1) + DetachedActor._remote(lifetime="detached", name=1) with pytest.raises( ValueError, match="Actor name cannot be an empty string"): - DetachedActor._remote(name="") + DetachedActor._remote(lifetime="detached", name="") - d = DetachedActor._remote(name="d_actor") + d = DetachedActor._remote(lifetime="detached", name="d_actor") assert ray.get(d.ping.remote()) == "pong" with pytest.raises(ValueError, match="Please use a different name"): - DetachedActor._remote(name="d_actor") + DetachedActor._remote(lifetime="detached", name="d_actor") redis_address = ray_start_regular["redis_address"] @@ -655,7 +682,7 @@ class DetachedActor: def ping(self): return "pong" -actor = DetachedActor._remote(name="{}") +actor = DetachedActor._remote(lifetime="detached", name="{}") ray.get(actor.ping.remote()) """.format(redis_address, get_actor_name, create_actor_name) @@ -674,7 +701,8 @@ def test_detached_actor_cleanup(ray_start_regular): def create_and_kill_actor(actor_name): # Make sure same name is creatable after killing it. - detached_actor = DetachedActor.options(name=actor_name).remote() + detached_actor = DetachedActor.options( + lifetime="detached", name=actor_name).remote() # Wait for detached actor creation. assert ray.get(detached_actor.ping.remote()) == "pong" del detached_actor @@ -711,7 +739,7 @@ class DetachedActor: return "pong" # Make sure same name is creatable after killing it. -detached_actor = DetachedActor.options(name="{}").remote() +detached_actor = DetachedActor.options(lifetime="detached", name="{}").remote() assert ray.get(detached_actor.ping.remote()) == "pong" ray.kill(detached_actor) # Wait until actor dies. @@ -745,7 +773,7 @@ def test_detached_actor_local_mode(ray_start_regular): def f(self): return RETURN_VALUE - Y.options(name="test").remote() + Y.options(lifetime="detached", name="test").remote() y = ray.get_actor("test") assert ray.get(y.f.remote()) == RETURN_VALUE @@ -799,7 +827,8 @@ def test_detached_actor_cleanup_due_to_failure(ray_start_cluster): if schedule_in_second_node\ else {"first_node": 1} actor_handle = DetachedActor.options( - name=actor_name, resources=resources).remote() + lifetime="detached", name=actor_name, + resources=resources).remote() # Wait for detached actor creation. assert ray.get(actor_handle.ping.remote()) == "pong" return actor_handle diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 468c12a99..7985050ad 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -241,7 +241,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster): def ready(self): return - actor = RestartableActor.options(detached=True).remote() + actor = RestartableActor.options(lifetime="detached").remote() ray.get(actor.ready.remote()) results = [actor.increase.remote() for _ in range(100)] # Kill actor node, while the above task is still being executed. diff --git a/python/ray/tests/test_job.py b/python/ray/tests/test_job.py index 7f953d48d..395ae9f96 100644 --- a/python/ray/tests/test_job.py +++ b/python/ray/tests/test_job.py @@ -66,7 +66,7 @@ class Actor: def value(self): return 1 -_ = Actor.options(name="DetachedActor").remote() +_ = Actor.options(lifetime="detached", name="DetachedActor").remote() """.format(address) p = run_string_as_driver_nonblocking(driver) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 0d69e75da..c23420196 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -472,15 +472,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0, - detached=True).remote() + lifetime="detached").remote() actor_2 = Actor.options( placement_group=placement_group, placement_group_bundle_index=1, - detached=True).remote() + lifetime="detached").remote() actor_3 = Actor.options( placement_group=placement_group, placement_group_bundle_index=2, - detached=True).remote() + lifetime="detached").remote() ray.get(actor_1.value.remote()) ray.get(actor_2.value.remote()) ray.get(actor_3.value.remote()) @@ -491,15 +491,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): actor_4 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0, - detached=True).remote() + lifetime="detached").remote() actor_5 = Actor.options( placement_group=placement_group, placement_group_bundle_index=1, - detached=True).remote() + lifetime="detached").remote() actor_6 = Actor.options( placement_group=placement_group, placement_group_bundle_index=2, - detached=True).remote() + lifetime="detached").remote() ray.get(actor_4.value.remote()) ray.get(actor_5.value.remote()) ray.get(actor_6.value.remote())