[Placement Group]Add detached support for placement group. (#13582)

This commit is contained in:
DK.Pino
2021-01-27 18:51:26 +08:00
committed by GitHub
parent d2963f4ee1
commit 7f6d326ad8
15 changed files with 209 additions and 21 deletions
+4 -2
View File
@@ -1184,7 +1184,8 @@ cdef class CoreWorker:
self,
c_string name,
c_vector[unordered_map[c_string, double]] bundles,
c_string strategy):
c_string strategy,
c_bool is_detached):
cdef:
CPlacementGroupID c_placement_group_id
CPlacementStrategy c_strategy
@@ -1208,7 +1209,8 @@ cdef class CoreWorker:
CPlacementGroupCreationOptions(
name,
c_strategy,
bundles
bundles,
is_detached
),
&c_placement_group_id))
+3 -1
View File
@@ -584,7 +584,9 @@ class ActorClass:
elif lifetime == "detached":
detached = True
else:
raise ValueError("lifetime must be either `None` or 'detached'")
raise ValueError(
"actor `lifetime` argument must be either `None` or 'detached'"
)
if placement_group_capture_child_tasks is None:
placement_group_capture_child_tasks = (
+2 -1
View File
@@ -270,7 +270,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
CPlacementGroupCreationOptions(
const c_string &name,
CPlacementStrategy strategy,
const c_vector[unordered_map[c_string, double]] &bundles
const c_vector[unordered_map[c_string, double]] &bundles,
c_bool is_detached
)
cdef extern from "ray/gcs/gcs_client.h" nogil:
+113
View File
@@ -1309,6 +1309,119 @@ def test_schedule_placement_groups_at_the_same_time():
wait_for_condition(is_all_placement_group_removed)
ray.shutdown()
def test_detached_placement_group(ray_start_cluster):
cluster = ray_start_cluster
for _ in range(2):
cluster.add_node(num_cpus=3)
cluster.wait_for_nodes()
info = ray.init(address=cluster.address)
# Make sure detached placement group will alive when job dead.
driver_code = f"""
import ray
ray.init(address="{info["redis_address"]}")
pg = ray.util.placement_group(
[{{"CPU": 1}} for _ in range(2)],
strategy="STRICT_SPREAD", lifetime="detached")
ray.get(pg.ready())
@ray.remote(num_cpus=1)
class Actor:
def ready(self):
return True
for bundle_index in range(2):
actor = Actor.options(lifetime="detached", placement_group=pg,
placement_group_bundle_index=bundle_index).remote()
ray.get(actor.ready.remote())
ray.shutdown()
"""
run_string_as_driver(driver_code)
# Wait until the driver is reported as dead by GCS.
def is_job_done():
jobs = ray.jobs()
for job in jobs:
if "StopTime" in job:
return True
return False
def assert_alive_num_pg(expected_num_pg):
alive_num_pg = 0
for _, placement_group_info in ray.util.placement_group_table().items(
):
if placement_group_info["state"] == "CREATED":
alive_num_pg += 1
return alive_num_pg == expected_num_pg
def assert_alive_num_actor(expected_num_actor):
alive_num_actor = 0
for actor_info in ray.actors().values():
if actor_info["State"] == ray.gcs_utils.ActorTableData.ALIVE:
alive_num_actor += 1
return alive_num_actor == expected_num_actor
wait_for_condition(is_job_done)
assert assert_alive_num_pg(1)
assert assert_alive_num_actor(2)
# Make sure detached placement group will alive when its creator which
# is detached actor dead.
# Test actors first.
@ray.remote(num_cpus=1)
class NestedActor:
def ready(self):
return True
@ray.remote(num_cpus=1)
class Actor:
def __init__(self):
self.actors = []
def ready(self):
return True
def schedule_nested_actor_with_detached_pg(self):
# Create placement group which is detached.
pg = ray.util.placement_group(
[{
"CPU": 1
} for _ in range(2)],
strategy="STRICT_SPREAD",
lifetime="detached",
name="detached_pg")
ray.get(pg.ready())
# Schedule nested actor with the placement group.
for bundle_index in range(2):
actor = NestedActor.options(
placement_group=pg,
placement_group_bundle_index=bundle_index,
lifetime="detached").remote()
ray.get(actor.ready.remote())
self.actors.append(actor)
a = Actor.options(lifetime="detached").remote()
ray.get(a.ready.remote())
# 1 parent actor and 2 children actor.
ray.get(a.schedule_nested_actor_with_detached_pg.remote())
# Kill an actor and wait until it is killed.
ray.kill(a)
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
# We should have 2 alive pgs and 4 alive actors.
assert assert_alive_num_pg(2)
assert assert_alive_num_actor(4)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
+15 -2
View File
@@ -145,7 +145,8 @@ class PlacementGroup:
def placement_group(bundles: List[Dict[str, float]],
strategy: str = "PACK",
name: str = "unnamed_group") -> PlacementGroup:
name: str = "unnamed_group",
lifetime=None) -> PlacementGroup:
"""Asynchronously creates a PlacementGroup.
Args:
@@ -160,6 +161,10 @@ def placement_group(bundles: List[Dict[str, float]],
- "STRICT_SPREAD": Packs Bundles across distinct nodes.
name(str): The name of the placement group.
lifetime(str): Either `None`, which defaults to the placement group
will fate share with its creator and will be deleted once its
creator is dead, or "detached", which means the placement group
will live as a global object independent of the creator.
Return:
PlacementGroup: Placement group object.
@@ -179,8 +184,16 @@ def placement_group(bundles: List[Dict[str, float]],
"Bundles cannot be an empty dictionary or "
f"resources with only 0 values. Bundles: {bundles}")
if lifetime is None:
detached = False
elif lifetime == "detached":
detached = True
else:
raise ValueError("placement group `lifetime` argument must be either"
" `None` or 'detached'")
placement_group_id = worker.core_worker.create_placement_group(
name, bundles, strategy)
name, bundles, strategy, detached)
return PlacementGroup(placement_group_id)