mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 08:53:44 +08:00
[api] Clean up and document Actor name / lifetime API (#10332)
This commit is contained in:
+13
-8
@@ -411,7 +411,7 @@ class ActorClass:
|
||||
max_restarts=None,
|
||||
max_task_retries=None,
|
||||
name=None,
|
||||
detached=False,
|
||||
lifetime=None,
|
||||
placement_group=None,
|
||||
placement_group_bundle_index=-1):
|
||||
"""Create an actor.
|
||||
@@ -436,8 +436,13 @@ class ActorClass:
|
||||
concurrency defaults to 1 for threaded execution, and 1000 for
|
||||
asyncio execution. Note that the execution order is not
|
||||
guaranteed when max_concurrency > 1.
|
||||
name: The globally unique name for the actor.
|
||||
detached: DEPRECATED.
|
||||
name: The globally unique name for the actor, which can be used
|
||||
to retrieve the actor via ray.get_actor(name) as long as the
|
||||
actor is still alive.
|
||||
lifetime: Either `None`, which defaults to the actor will fate
|
||||
share with its creator and will be deleted once its refcount
|
||||
drops to zero, or "detached", which means the actor will live
|
||||
as a global object independent of the creator.
|
||||
placement_group: the placement group this actor belongs to,
|
||||
or None if it doesn't belong to any group.
|
||||
placement_group_bundle_index: the index of the bundle
|
||||
@@ -472,10 +477,6 @@ class ActorClass:
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
|
||||
if detached:
|
||||
logger.warning("The detached flag is deprecated. To create a "
|
||||
"detached actor, use the name parameter.")
|
||||
|
||||
if name is not None:
|
||||
if not isinstance(name, str):
|
||||
raise TypeError(
|
||||
@@ -498,9 +499,13 @@ class ActorClass:
|
||||
f"The name {name} is already taken. Please use "
|
||||
"a different name or get the existing actor using "
|
||||
f"ray.get_actor('{name}')")
|
||||
|
||||
if lifetime is None:
|
||||
detached = False
|
||||
elif lifetime == "detached":
|
||||
detached = True
|
||||
else:
|
||||
detached = False
|
||||
raise ValueError("lifetime must be either `None` or 'detached'")
|
||||
|
||||
if placement_group is None:
|
||||
placement_group = PlacementGroup.empty()
|
||||
|
||||
@@ -94,6 +94,7 @@ def init(name: Optional[str] = None,
|
||||
|
||||
controller = ServeController.options(
|
||||
name=controller_name,
|
||||
lifetime="detached",
|
||||
max_restarts=-1,
|
||||
max_task_retries=-1,
|
||||
).remote(name, http_host, http_port, _http_middlewares)
|
||||
|
||||
@@ -127,9 +127,9 @@ class ReplicaConfig:
|
||||
|
||||
if not isinstance(self.ray_actor_options, dict):
|
||||
raise TypeError("ray_actor_options must be a dictionary.")
|
||||
elif "detached" in self.ray_actor_options:
|
||||
elif "lifetime" in self.ray_actor_options:
|
||||
raise ValueError(
|
||||
"Specifying detached in actor_init_args is not allowed.")
|
||||
"Specifying lifetime in actor_init_args is not allowed.")
|
||||
elif "name" in self.ray_actor_options:
|
||||
raise ValueError(
|
||||
"Specifying name in actor_init_args is not allowed.")
|
||||
|
||||
@@ -180,6 +180,7 @@ class ServeController:
|
||||
self.http_port))
|
||||
router = HTTPProxyActor.options(
|
||||
name=router_name,
|
||||
lifetime="detached",
|
||||
max_concurrency=ASYNC_CONCURRENCY,
|
||||
max_restarts=-1,
|
||||
max_task_retries=-1,
|
||||
@@ -391,6 +392,7 @@ class ServeController:
|
||||
replica_name = format_actor_name(replica_tag, self.instance_name)
|
||||
worker_handle = ray.remote(backend_info.worker_class).options(
|
||||
name=replica_name,
|
||||
lifetime="detached",
|
||||
max_restarts=-1,
|
||||
max_task_retries=-1,
|
||||
**backend_info.replica_config.ray_actor_options).remote(
|
||||
|
||||
@@ -620,6 +620,33 @@ def test_calling_put_on_actor_handle(ray_start_regular):
|
||||
ray.get(g.remote())
|
||||
|
||||
|
||||
def test_named_but_not_detached(ray_start_regular):
|
||||
redis_address = ray_start_regular["redis_address"]
|
||||
|
||||
driver_script = """
|
||||
import ray
|
||||
ray.init(address="{}")
|
||||
|
||||
@ray.remote
|
||||
class NotDetached:
|
||||
def ping(self):
|
||||
return "pong"
|
||||
|
||||
actor = NotDetached.options(name="actor").remote()
|
||||
assert ray.get(actor.ping.remote()) == "pong"
|
||||
handle = ray.get_actor("actor")
|
||||
assert ray.get(handle.ping.remote()) == "pong"
|
||||
""".format(redis_address)
|
||||
|
||||
# Creates and kills actor once the driver exits.
|
||||
run_string_as_driver(driver_script)
|
||||
|
||||
# Must raise an exception since lifetime is not detached.
|
||||
with pytest.raises(Exception):
|
||||
detached_actor = ray.get_actor("actor")
|
||||
ray.get(detached_actor.ping.remote())
|
||||
|
||||
|
||||
def test_detached_actor(ray_start_regular):
|
||||
@ray.remote
|
||||
class DetachedActor:
|
||||
@@ -627,17 +654,17 @@ def test_detached_actor(ray_start_regular):
|
||||
return "pong"
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
DetachedActor._remote(name=1)
|
||||
DetachedActor._remote(lifetime="detached", name=1)
|
||||
|
||||
with pytest.raises(
|
||||
ValueError, match="Actor name cannot be an empty string"):
|
||||
DetachedActor._remote(name="")
|
||||
DetachedActor._remote(lifetime="detached", name="")
|
||||
|
||||
d = DetachedActor._remote(name="d_actor")
|
||||
d = DetachedActor._remote(lifetime="detached", name="d_actor")
|
||||
assert ray.get(d.ping.remote()) == "pong"
|
||||
|
||||
with pytest.raises(ValueError, match="Please use a different name"):
|
||||
DetachedActor._remote(name="d_actor")
|
||||
DetachedActor._remote(lifetime="detached", name="d_actor")
|
||||
|
||||
redis_address = ray_start_regular["redis_address"]
|
||||
|
||||
@@ -655,7 +682,7 @@ class DetachedActor:
|
||||
def ping(self):
|
||||
return "pong"
|
||||
|
||||
actor = DetachedActor._remote(name="{}")
|
||||
actor = DetachedActor._remote(lifetime="detached", name="{}")
|
||||
ray.get(actor.ping.remote())
|
||||
""".format(redis_address, get_actor_name, create_actor_name)
|
||||
|
||||
@@ -674,7 +701,8 @@ def test_detached_actor_cleanup(ray_start_regular):
|
||||
|
||||
def create_and_kill_actor(actor_name):
|
||||
# Make sure same name is creatable after killing it.
|
||||
detached_actor = DetachedActor.options(name=actor_name).remote()
|
||||
detached_actor = DetachedActor.options(
|
||||
lifetime="detached", name=actor_name).remote()
|
||||
# Wait for detached actor creation.
|
||||
assert ray.get(detached_actor.ping.remote()) == "pong"
|
||||
del detached_actor
|
||||
@@ -711,7 +739,7 @@ class DetachedActor:
|
||||
return "pong"
|
||||
|
||||
# Make sure same name is creatable after killing it.
|
||||
detached_actor = DetachedActor.options(name="{}").remote()
|
||||
detached_actor = DetachedActor.options(lifetime="detached", name="{}").remote()
|
||||
assert ray.get(detached_actor.ping.remote()) == "pong"
|
||||
ray.kill(detached_actor)
|
||||
# Wait until actor dies.
|
||||
@@ -745,7 +773,7 @@ def test_detached_actor_local_mode(ray_start_regular):
|
||||
def f(self):
|
||||
return RETURN_VALUE
|
||||
|
||||
Y.options(name="test").remote()
|
||||
Y.options(lifetime="detached", name="test").remote()
|
||||
y = ray.get_actor("test")
|
||||
assert ray.get(y.f.remote()) == RETURN_VALUE
|
||||
|
||||
@@ -799,7 +827,8 @@ def test_detached_actor_cleanup_due_to_failure(ray_start_cluster):
|
||||
if schedule_in_second_node\
|
||||
else {"first_node": 1}
|
||||
actor_handle = DetachedActor.options(
|
||||
name=actor_name, resources=resources).remote()
|
||||
lifetime="detached", name=actor_name,
|
||||
resources=resources).remote()
|
||||
# Wait for detached actor creation.
|
||||
assert ray.get(actor_handle.ping.remote()) == "pong"
|
||||
return actor_handle
|
||||
|
||||
@@ -241,7 +241,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster):
|
||||
def ready(self):
|
||||
return
|
||||
|
||||
actor = RestartableActor.options(detached=True).remote()
|
||||
actor = RestartableActor.options(lifetime="detached").remote()
|
||||
ray.get(actor.ready.remote())
|
||||
results = [actor.increase.remote() for _ in range(100)]
|
||||
# Kill actor node, while the above task is still being executed.
|
||||
|
||||
@@ -66,7 +66,7 @@ class Actor:
|
||||
def value(self):
|
||||
return 1
|
||||
|
||||
_ = Actor.options(name="DetachedActor").remote()
|
||||
_ = Actor.options(lifetime="detached", name="DetachedActor").remote()
|
||||
""".format(address)
|
||||
|
||||
p = run_string_as_driver_nonblocking(driver)
|
||||
|
||||
@@ -472,15 +472,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
|
||||
actor_1 = Actor.options(
|
||||
placement_group=placement_group,
|
||||
placement_group_bundle_index=0,
|
||||
detached=True).remote()
|
||||
lifetime="detached").remote()
|
||||
actor_2 = Actor.options(
|
||||
placement_group=placement_group,
|
||||
placement_group_bundle_index=1,
|
||||
detached=True).remote()
|
||||
lifetime="detached").remote()
|
||||
actor_3 = Actor.options(
|
||||
placement_group=placement_group,
|
||||
placement_group_bundle_index=2,
|
||||
detached=True).remote()
|
||||
lifetime="detached").remote()
|
||||
ray.get(actor_1.value.remote())
|
||||
ray.get(actor_2.value.remote())
|
||||
ray.get(actor_3.value.remote())
|
||||
@@ -491,15 +491,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
|
||||
actor_4 = Actor.options(
|
||||
placement_group=placement_group,
|
||||
placement_group_bundle_index=0,
|
||||
detached=True).remote()
|
||||
lifetime="detached").remote()
|
||||
actor_5 = Actor.options(
|
||||
placement_group=placement_group,
|
||||
placement_group_bundle_index=1,
|
||||
detached=True).remote()
|
||||
lifetime="detached").remote()
|
||||
actor_6 = Actor.options(
|
||||
placement_group=placement_group,
|
||||
placement_group_bundle_index=2,
|
||||
detached=True).remote()
|
||||
lifetime="detached").remote()
|
||||
ray.get(actor_4.value.remote())
|
||||
ray.get(actor_5.value.remote())
|
||||
ray.get(actor_6.value.remote())
|
||||
|
||||
Reference in New Issue
Block a user