From 860eb6f13a0e570b95bd251eb53105473850cbdc Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Sun, 24 May 2020 20:08:03 -0500 Subject: [PATCH] Update named actor API (#8559) --- doc/source/rllib-training.rst | 9 ++-- python/ray/__init__.py | 2 + python/ray/actor.py | 40 ++++++-------- python/ray/serve/api.py | 3 +- python/ray/serve/master.py | 20 +++---- python/ray/serve/tests/test_failure.py | 18 +++---- python/ray/tests/test_actor_advanced.py | 72 +++++-------------------- python/ray/tests/test_multi_node.py | 5 +- python/ray/util/named_actors.py | 52 +++++++++++------- python/ray/worker.py | 28 +++++++--- 10 files changed, 109 insertions(+), 140 deletions(-) diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index 46fe06355..6ea6dea34 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -467,12 +467,10 @@ For even finer-grained control over training, you can use RLlib's lower-level `b Global Coordination ~~~~~~~~~~~~~~~~~~~ -Sometimes, it is necessary to coordinate between pieces of code that live in different processes managed by RLlib. For example, it can be useful to maintain a global average of a certain variable, or centrally control a hyperparameter used by policies. Ray provides a general way to achieve this through *named actors* (learn more about Ray actors `here `__). As an example, consider maintaining a shared global counter that is incremented by environments and read periodically from your driver program: +Sometimes, it is necessary to coordinate between pieces of code that live in different processes managed by RLlib. For example, it can be useful to maintain a global average of a certain variable, or centrally control a hyperparameter used by policies. Ray provides a general way to achieve this through *detached actors* (learn more about Ray actors `here `__). These actors are assigned a global name and handles to them can be retrieved using these names. As an example, consider maintaining a shared global counter that is incremented by environments and read periodically from your driver program: .. code-block:: python - from ray.util import named_actors - @ray.remote class Counter: def __init__(self): @@ -483,12 +481,11 @@ Sometimes, it is necessary to coordinate between pieces of code that live in dif return self.count # on the driver - counter = Counter.remote() - named_actors.register_actor("global_counter", counter) + counter = Counter.options(name="global_counter").remote() print(ray.get(counter.get.remote())) # get the latest count # in your envs - counter = named_actors.get_actor("global_counter") + counter = ray.get_actor("global_counter") counter.inc.remote(1) # async call to increment the global count Ray actors provide high levels of performance, so in more complex cases they can be used implement communication patterns such as parameter servers and allreduce. diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 0145ac959..2474893b3 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -78,6 +78,7 @@ from ray.worker import ( connect, disconnect, get, + get_actor, get_gpu_ids, get_resource_ids, get_webui_url, @@ -126,6 +127,7 @@ __all__ = [ "connect", "disconnect", "get", + "get_actor", "get_gpu_ids", "get_resource_ids", "get_webui_url", diff --git a/python/ray/actor.py b/python/ray/actor.py index fad72a8b6..ddf7266f4 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -435,8 +435,7 @@ class ActorClass: asyncio execution. Note that the execution order is not guaranteed when max_concurrency > 1. name: The globally unique name for the actor. - detached: Whether the actor should be kept alive after driver - exits. + detached: DEPRECATED. Returns: A handle to the newly created actor. @@ -469,18 +468,16 @@ class ActorClass: raise RuntimeError("Actors cannot be created before ray.init() " "has been called.") - if detached and name is None: - raise ValueError("Detached actors must be named. " - "Please use Actor._remote(name='some_name') " - "to associate the name.") + if detached: + logger.warning("The detached flag is deprecated. To create a " + "detached actor, use the name parameter.") - if name and not detached: - raise ValueError("Only detached actors can be named. " - "Please use Actor._remote(detached=True, " - "name='some_name').") - - if name == "": - raise ValueError("Actor name cannot be an empty string.") + if name is not None: + if not isinstance(name, str): + raise TypeError("name must be None or a string, " + "got: '{}'.".format(type(name))) + if name == "": + raise ValueError("Actor name cannot be an empty string.") # Check whether the name is already taken. # TODO(edoakes): this check has a race condition because two drivers @@ -489,14 +486,17 @@ class ActorClass: # async call. if name is not None: try: - ray.util.get_actor(name) + ray.get_actor(name) except ValueError: # Name is not taken. pass else: raise ValueError( "The name {name} is already taken. Please use " - "a different name or get existing actor using " - "ray.util.get_actor('{name}')".format(name=name)) + "a different name or get the existing actor using " + "ray.get_actor('{name}')".format(name=name)) + detached = True + else: + detached = False # Set the actor's default resources if not already set. First three # conditions are to check that no resources were specified in the @@ -583,7 +583,7 @@ class ActorClass: original_handle=True) if name is not None and not gcs_actor_service_enabled(): - ray.util.register_actor(name, actor_handle) + ray.util.named_actors._register_actor(name, actor_handle) return actor_handle @@ -762,12 +762,6 @@ class ActorHandle: self._ray_actor_creation_function_descriptor.class_name, self._actor_id.hex()) - def __ray_kill__(self): - """Deprecated - use ray.kill() instead.""" - logger.warning("actor.__ray_kill__() is deprecated and will be removed" - " in the near future. Use ray.kill(actor) instead.") - ray.kill(self) - @property def _actor_id(self): return self._ray_actor_id diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index f26f8b3e4..ef45331fe 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -107,7 +107,7 @@ def init(cluster_name=None, global master_actor master_actor_name = format_actor_name(SERVE_MASTER_NAME, cluster_name) try: - master_actor = ray.util.get_actor(master_actor_name) + master_actor = ray.get_actor(master_actor_name) return except ValueError: pass @@ -124,7 +124,6 @@ def init(cluster_name=None, # in the future. http_node_id = ray.state.current_node_id() master_actor = ServeMaster.options( - detached=True, name=master_actor_name, max_restarts=-1, ).remote(cluster_name, start_server, http_node_id, http_host, http_port, diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 2507e48d8..887e778b0 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -127,11 +127,10 @@ class ServeMaster: """ router_name = format_actor_name(SERVE_ROUTER_NAME, self.cluster_name) try: - self.router = ray.util.get_actor(router_name) + self.router = ray.get_actor(router_name) except ValueError: logger.info("Starting router with name '{}'".format(router_name)) self.router = async_retryable(ray.remote(Router)).options( - detached=True, name=router_name, max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, @@ -148,13 +147,12 @@ class ServeMaster: """ proxy_name = format_actor_name(SERVE_PROXY_NAME, self.cluster_name) try: - self.http_proxy = ray.util.get_actor(proxy_name) + self.http_proxy = ray.get_actor(proxy_name) except ValueError: logger.info( "Starting HTTP proxy with name '{}' on node '{}'".format( proxy_name, node_id)) self.http_proxy = async_retryable(HTTPProxyActor).options( - detached=True, name=proxy_name, max_concurrency=ASYNC_CONCURRENCY, max_restarts=-1, @@ -180,12 +178,11 @@ class ServeMaster: metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME, self.cluster_name) try: - self.metric_exporter = ray.util.get_actor(metric_sink_name) + self.metric_exporter = ray.get_actor(metric_sink_name) except ValueError: logger.info("Starting metric exporter with name '{}'".format( metric_sink_name)) self.metric_exporter = MetricExporterActor.options( - detached=True, name=metric_sink_name).remote(metric_exporter_class) def get_metric_exporter(self): @@ -246,7 +243,7 @@ class ServeMaster: for replica_tag in replica_tags: replica_name = format_actor_name(replica_tag, self.cluster_name) - self.workers[backend_tag][replica_tag] = ray.util.get_actor( + self.workers[backend_tag][replica_tag] = ray.get_actor( replica_name) # Push configuration state to the router. @@ -311,7 +308,6 @@ class ServeMaster: replica_name = format_actor_name(replica_tag, self.cluster_name) worker_handle = async_retryable(ray.remote(backend_worker)).options( - detached=True, name=replica_name, max_restarts=-1, **replica_config.ray_actor_options).remote( @@ -328,7 +324,7 @@ class ServeMaster: # failed after creating them but before writing a # checkpoint. try: - worker_handle = ray.util.get_actor(replica_tag) + worker_handle = ray.get_actor(replica_tag) except ValueError: worker_handle = await self._start_backend_worker( backend_tag, replica_tag) @@ -371,7 +367,7 @@ class ServeMaster: # NOTE(edoakes): the replicas may already be stopped if we # failed after stopping them but before writing a checkpoint. try: - replica = ray.util.get_actor(replica_tag) + replica = ray.get_actor(replica_tag) except ValueError: continue @@ -384,9 +380,7 @@ class ServeMaster: # use replica.__ray_terminate__, we may send it while the # replica is being restarted and there's no way to tell if it # successfully killed the worker or not. - worker = ray.worker.global_worker - # Kill the actor with no_restart=True. - worker.core_worker.kill_actor(replica._ray_actor_id, True) + ray.kill(replica, no_restart=True) self.replicas_to_stop.clear() diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 7b556cdf8..909d82832 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -35,7 +35,7 @@ def test_master_failure(serve_instance): response = request_with_retries("/master_failure", timeout=30) assert response.text == "hello1" - ray.kill(serve.api._get_master_actor()) + ray.kill(serve.api._get_master_actor(), no_restart=False) for _ in range(10): response = request_with_retries("/master_failure", timeout=30) @@ -44,7 +44,7 @@ def test_master_failure(serve_instance): def function(): return "hello2" - ray.kill(serve.api._get_master_actor()) + ray.kill(serve.api._get_master_actor(), no_restart=False) serve.create_backend("master_failure:v2", function) serve.set_traffic("master_failure", {"master_failure:v2": 1.0}) @@ -56,11 +56,11 @@ def test_master_failure(serve_instance): def function(): return "hello3" - ray.kill(serve.api._get_master_actor()) + ray.kill(serve.api._get_master_actor(), no_restart=False) serve.create_endpoint("master_failure_2", "/master_failure_2") - ray.kill(serve.api._get_master_actor()) + ray.kill(serve.api._get_master_actor(), no_restart=False) serve.create_backend("master_failure_2", function) - ray.kill(serve.api._get_master_actor()) + ray.kill(serve.api._get_master_actor(), no_restart=False) serve.set_traffic("master_failure_2", {"master_failure_2": 1.0}) for _ in range(10): @@ -73,7 +73,7 @@ def test_master_failure(serve_instance): def _kill_http_proxy(): [http_proxy] = ray.get( serve.api._get_master_actor().get_http_proxy.remote()) - ray.kill(http_proxy) + ray.kill(http_proxy, no_restart=False) def test_http_proxy_failure(serve_instance): @@ -107,7 +107,7 @@ def test_http_proxy_failure(serve_instance): def _kill_router(): [router] = ray.get(serve.api._get_master_actor().get_router.remote()) - ray.kill(router) + ray.kill(router, no_restart=False) def test_router_failure(serve_instance): @@ -169,7 +169,7 @@ def test_worker_restart(serve_instance): # Kill the worker. handles = _get_worker_handles("worker_failure:v1") assert len(handles) == 1 - ray.kill(handles[0]) + ray.kill(handles[0], no_restart=False) # Wait until the worker is killed and a one is started. start = time.time() @@ -227,7 +227,7 @@ def test_worker_replica_failure(serve_instance): # Kill one of the replicas. handles = _get_worker_handles("replica_failure") assert len(handles) == 2 - ray.kill(handles[0]) + ray.kill(handles[0], no_restart=False) # Check that the other replica still serves requests. for _ in range(10): diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 93b0cd96b..f59378dc3 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -655,63 +655,22 @@ def test_pickled_actor_handle_call_in_method_twice(ray_start_regular): ray.get(b.step.remote()) -def test_register_and_get_named_actors(ray_start_regular): - # TODO(heyucongtom): We should test this from another driver. - - @ray.remote - class Foo: - def __init__(self): - self.x = 0 - - def method(self): - self.x += 1 - return self.x - - f1 = Foo.remote() - # Test saving f. - ray.util.register_actor("f1", f1) - # Test getting f. - f2 = ray.util.get_actor("f1") - assert f1._actor_id == f2._actor_id - - # Test same name register shall raise error. - with pytest.raises(ValueError): - ray.util.register_actor("f1", f2) - - # Test register with wrong object type. - with pytest.raises(TypeError): - ray.util.register_actor("f3", 1) - - # Test getting a nonexistent actor. - with pytest.raises(ValueError): - ray.util.get_actor("nonexistent") - - # Test method - assert ray.get(f1.method.remote()) == 1 - assert ray.get(f2.method.remote()) == 2 - assert ray.get(f1.method.remote()) == 3 - assert ray.get(f2.method.remote()) == 4 - - def test_detached_actor(ray_start_regular): @ray.remote class DetachedActor: def ping(self): return "pong" + with pytest.raises(TypeError): + DetachedActor._remote(name=1) + with pytest.raises( ValueError, match="Actor name cannot be an empty string"): - DetachedActor._remote(detached=True, name="") + DetachedActor._remote(name="") - with pytest.raises(ValueError, match="Detached actors must be named"): - DetachedActor._remote(detached=True) - - with pytest.raises(ValueError, match="Only detached actors can be named"): - DetachedActor._remote(name="d_actor") - - DetachedActor._remote(detached=True, name="d_actor") + DetachedActor._remote(name="d_actor") with pytest.raises(ValueError, match="Please use a different name"): - DetachedActor._remote(detached=True, name="d_actor") + DetachedActor._remote(name="d_actor") redis_address = ray_start_regular["redis_address"] @@ -721,7 +680,7 @@ def test_detached_actor(ray_start_regular): import ray ray.init(address="{}") -existing_actor = ray.util.get_actor("{}") +existing_actor = ray.get_actor("{}") assert ray.get(existing_actor.ping.remote()) == "pong" @ray.remote @@ -729,17 +688,16 @@ class DetachedActor: def ping(self): return "pong" -actor = DetachedActor._remote(name="{}", detached=True) +actor = DetachedActor._remote(name="{}") ray.get(actor.ping.remote()) """.format(redis_address, get_actor_name, create_actor_name) run_string_as_driver(driver_script) - detached_actor = ray.util.get_actor(create_actor_name) + detached_actor = ray.get_actor(create_actor_name) assert ray.get(detached_actor.ping.remote()) == "pong" -@pytest.mark.parametrize("deprecated_codepath", [False, True]) -def test_kill(ray_start_regular, deprecated_codepath): +def test_kill(ray_start_regular): @ray.remote class Actor: def hang(self): @@ -750,17 +708,13 @@ def test_kill(ray_start_regular, deprecated_codepath): result = actor.hang.remote() ready, _ = ray.wait([result], timeout=0.5) assert len(ready) == 0 - if deprecated_codepath: - actor.__ray_kill__() - else: - ray.kill(actor) + ray.kill(actor, no_restart=False) with pytest.raises(ray.exceptions.RayActorError): ray.get(result) - if not deprecated_codepath: - with pytest.raises(ValueError): - ray.kill("not_an_actor_handle") + with pytest.raises(ValueError): + ray.kill("not_an_actor_handle") # This test verifies actor creation task failure will not diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 5979161dd..21ad6f28a 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -219,8 +219,7 @@ class Counter: def increment(self): self.count += 1 return self.count -counter = Counter.remote() -ray.util.register_actor("Counter", counter) +counter = Counter.options(name="Counter").remote() time.sleep(100) """.format(address) @@ -231,7 +230,7 @@ import time ray.init(address="{}") while True: try: - counter = ray.util.get_actor("Counter") + counter = ray.get_actor("Counter") break except ValueError: time.sleep(1) diff --git a/python/ray/util/named_actors.py b/python/ray/util/named_actors.py index f02839171..28a70dac1 100644 --- a/python/ray/util/named_actors.py +++ b/python/ray/util/named_actors.py @@ -1,7 +1,11 @@ +import logging + import ray import ray.cloudpickle as pickle from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put +logger = logging.getLogger(__name__) + def _calculate_key(name): """Generate a Redis key with the given name. @@ -15,17 +19,7 @@ def _calculate_key(name): return b"Actor:" + name.encode("ascii") -def get_actor(name): - """Get a named actor which was previously created. - - If the actor doesn't exist, an exception will be raised. - - Args: - name: The name of the named actor. - - Returns: - The ActorHandle object corresponding to the name. - """ +def _get_actor(name): if ray._raylet.gcs_actor_service_enabled(): worker = ray.worker.global_worker handle = worker.core_worker.get_named_actor_handle(name) @@ -40,13 +34,23 @@ def get_actor(name): return handle -def register_actor(name, actor_handle): - """Register a named actor under a string key. +def get_actor(name): + """Get a named actor which was previously created. - Args: - name: The name of the named actor. - actor_handle: The actor object to be associated with this name - """ + If the actor doesn't exist, an exception will be raised. + + Args: + name: The name of the named actor. + + Returns: + The ActorHandle object corresponding to the name. + """ + logger.warning("ray.util.get_actor has been moved to ray.get_actor and " + "will be removed in the future.") + return _get_actor(name) + + +def _register_actor(name, actor_handle): if not isinstance(name, str): raise TypeError("The name argument must be a string.") if not isinstance(actor_handle, ray.actor.ActorHandle): @@ -56,7 +60,7 @@ def register_actor(name, actor_handle): # First check if the actor already exists. try: - get_actor(name) + _get_actor(name) exists = True except ValueError: exists = False @@ -66,3 +70,15 @@ def register_actor(name, actor_handle): # Add the actor to Redis if it does not already exist. _internal_kv_put(actor_name, pickle.dumps(actor_handle)) + + +def register_actor(name, actor_handle): + """Register a named actor under a string key. + + Args: + name: The name of the named actor. + actor_handle: The actor object to be associated with this name + """ + logger.warning("ray.util.register_actor is deprecated. To create a " + "named, detached actor, use Actor.options(name=\"name\").") + return _register_actor(name, actor_handle) diff --git a/python/ray/worker.py b/python/ray/worker.py index e663afea5..1521340a0 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1657,7 +1657,22 @@ def wait(object_ids, num_returns=1, timeout=None): return ready_ids, remaining_ids -def kill(actor): +def get_actor(name): + """Get a handle to a detached actor. + + Gets a handle to a detached actor with the given name. The actor must + have been created with Actor.options(name="name").remote(). + + Returns: + ActorHandle to the actor. + + Raises: + ValueError if the named actor does not exist. + """ + return ray.util.named_actors._get_actor(name) + + +def kill(actor, no_restart=True): """Kill an actor forcefully. This will interrupt any running tasks on the actor, causing them to fail @@ -1667,21 +1682,20 @@ def kill(actor): you can call ``actor.__ray_terminate__.remote()`` instead to queue a termination task. - In both cases, the worker is actually killed, but it will be restarted by - Ray. - - If this actor is reconstructable, an attempt will be made to reconstruct - it. + If the actor is a detached actor, subsequent calls to get its handle via + ray.get_actor will fail. Args: actor (ActorHandle): Handle to the actor to kill. + no_restart (bool): Whether or not this actor should be restarted if + it's a restartable actor. """ if not isinstance(actor, ray.actor.ActorHandle): raise ValueError("ray.kill() only supported for actors. " "Got: {}.".format(type(actor))) worker = ray.worker.global_worker worker.check_connected() - worker.core_worker.kill_actor(actor._ray_actor_id, False) + worker.core_worker.kill_actor(actor._ray_actor_id, no_restart) def cancel(object_id, force=False):