diff --git a/python/ray/actor.py b/python/ray/actor.py index 6bc222eda..cf8370119 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -507,7 +507,7 @@ class ActorClass: detached = False if placement_group is None: - placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1) + placement_group = PlacementGroup.empty() check_placement_group_index(placement_group, placement_group_bundle_index) diff --git a/python/ray/experimental/placement_group.py b/python/ray/experimental/placement_group.py index 8e52b8f3c..b4fa0f6d5 100644 --- a/python/ray/experimental/placement_group.py +++ b/python/ray/experimental/placement_group.py @@ -1,15 +1,72 @@ from typing import (List, Dict) import ray +from ray._raylet import PlacementGroupID, ObjectRef class PlacementGroup: """A handle to a placement group. + + Args: + id: Placement group id. + bundles: List of bundles. """ - def __init__(self, id, bundle_count): + @staticmethod + def empty(): + return PlacementGroup(PlacementGroupID.nil(), []) + + def __init__(self, id: PlacementGroupID, bundles: List[Dict[str, float]]): self.id = id - self.bundle_count = bundle_count + self.bundles = bundles + + def ready(self) -> ObjectRef: + """Returns an object ID to check ready status.""" + worker = ray.worker.global_worker + worker.check_connected() + + @ray.remote(num_cpus=0, max_calls=0) + def bundle_reservation_check(placement_group): + return placement_group + + assert len(self.bundles) != 0, ( + "ready() cannot be called on placement group object with a " + f"bundle length == 0, current bundle length: {len(self.bundles)}") + + # Select the first bundle to schedule a dummy task. + # Since the placement group creation will be atomic, it is sufficient + # to schedule a single task. + bundle_index = 0 + bundle = self.bundles[bundle_index] + + resource_name, value = self._get_none_zero_resource(bundle) + num_cpus = 0 + num_gpus = 0 + resources = None + if resource_name == "CPU": + num_cpus = value + elif resource_name == "GPU": + num_gpus = value + else: + resources[resource_name] = value + + return bundle_reservation_check.options( + num_cpus=num_cpus, + num_gpus=num_gpus, + placement_group=self, + placement_group_bundle_index=bundle_index, + resources=resources).remote(self) + + @property + def bundle_count(self): + return len(self.bundles) + + def _get_none_zero_resource(self, bundle: List[Dict]): + for key, value in bundle.items(): + if value > 0: + value = min(value, 0.001) + return key, value + assert False, "This code should be unreachable." def placement_group(bundles: List[Dict[str, float]], @@ -37,10 +94,18 @@ def placement_group(bundles: List[Dict[str, float]], raise ValueError( "The type of bundles must be list, got {}".format(bundles)) + # Validate bundles + for bundle in bundles: + if (len(bundle) == 0 or all(resource_value == 0 + for resource_value in bundle.values())): + raise ValueError( + "Bundles cannot be an empty dictionary or " + f"resources with only 0 values. Bundles: {bundles}") + placement_group_id = worker.core_worker.create_placement_group( name, bundles, strategy) - return PlacementGroup(placement_group_id, len(bundles)) + return PlacementGroup(placement_group_id, bundles) def remove_placement_group(placement_group): diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index aba2215b7..1696212aa 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -191,7 +191,7 @@ class RemoteFunction: max_retries = self._max_retries if placement_group is None: - placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1) + placement_group = PlacementGroup.empty() check_placement_group_index(placement_group, placement_group_bundle_index) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 4efef0611..0d69e75da 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -35,6 +35,7 @@ def test_placement_group_pack(ray_start_cluster): }, { "CPU": 2 }]) + ray.get(placement_group.ready()) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -42,8 +43,8 @@ def test_placement_group_pack(ray_start_cluster): placement_group=placement_group, placement_group_bundle_index=1).remote() - print(ray.get(actor_1.value.remote())) - print(ray.get(actor_2.value.remote())) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) # Get all actors. actor_infos = ray.actors() @@ -80,6 +81,7 @@ def test_placement_group_strict_pack(ray_start_cluster): }, { "CPU": 2 }]) + ray.get(placement_group.ready()) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -87,8 +89,8 @@ def test_placement_group_strict_pack(ray_start_cluster): placement_group=placement_group, placement_group_bundle_index=1).remote() - print(ray.get(actor_1.value.remote())) - print(ray.get(actor_2.value.remote())) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) # Get all actors. actor_infos = ray.actors() @@ -125,6 +127,7 @@ def test_placement_group_spread(ray_start_cluster): }, { "CPU": 2 }]) + ray.get(placement_group.ready()) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -132,8 +135,8 @@ def test_placement_group_spread(ray_start_cluster): placement_group=placement_group, placement_group_bundle_index=1).remote() - print(ray.get(actor_1.value.remote())) - print(ray.get(actor_2.value.remote())) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) # Get all actors. actor_infos = ray.actors() @@ -174,6 +177,7 @@ def test_placement_group_strict_spread(ray_start_cluster): }, { "CPU": 2 }]) + ray.get(placement_group.ready()) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -184,9 +188,9 @@ def test_placement_group_strict_spread(ray_start_cluster): placement_group=placement_group, placement_group_bundle_index=2).remote() - print(ray.get(actor_1.value.remote())) - print(ray.get(actor_2.value.remote())) - print(ray.get(actor_3.value.remote())) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) + ray.get(actor_3.value.remote()) # Get all actors. actor_infos = ray.actors() @@ -284,7 +288,7 @@ def test_remove_placement_group(ray_start_cluster): # First try to remove a placement group that doesn't # exist. This should not do anything. random_group_id = PlacementGroupID.from_random() - random_placement_group = PlacementGroup(random_group_id, -1) + random_placement_group = PlacementGroup(random_group_id, [{"CPU": 1}]) for _ in range(3): ray.experimental.remove_placement_group(random_placement_group) @@ -477,9 +481,9 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): placement_group=placement_group, placement_group_bundle_index=2, detached=True).remote() - print(ray.get(actor_1.value.remote())) - print(ray.get(actor_2.value.remote())) - print(ray.get(actor_3.value.remote())) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) + ray.get(actor_3.value.remote()) cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1]) cluster.wait_for_nodes() @@ -496,9 +500,9 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): placement_group=placement_group, placement_group_bundle_index=2, detached=True).remote() - print(ray.get(actor_4.value.remote())) - print(ray.get(actor_5.value.remote())) - print(ray.get(actor_6.value.remote())) + ray.get(actor_4.value.remote()) + ray.get(actor_5.value.remote()) + ray.get(actor_6.value.remote()) ray.shutdown() @@ -546,6 +550,63 @@ def test_check_bundle_index(ray_start_cluster): assert error_count == 3 +def test_pending_placement_group_wait(ray_start_cluster): + cluster = ray_start_cluster + [cluster.add_node(num_cpus=2) for _ in range(1)] + ray.init(address=cluster.address) + cluster.wait_for_nodes() + + # Wait on placement group that cannot be created. + placement_group = ray.experimental.placement_group( + name="name", + strategy="SPREAD", + bundles=[ + { + "CPU": 2 + }, + { + "CPU": 2 + }, + { + "GPU": 2 + }, + ]) + ready, unready = ray.wait([placement_group.ready()], timeout=0.1) + assert len(unready) == 1 + assert len(ready) == 0 + table = ray.experimental.placement_group_table(placement_group) + assert table["state"] == "PENDING" + with pytest.raises(ray.exceptions.RayTimeoutError): + ray.get(placement_group.ready(), timeout=0.1) + + +def test_placement_group_wait(ray_start_cluster): + cluster = ray_start_cluster + [cluster.add_node(num_cpus=2) for _ in range(2)] + ray.init(address=cluster.address) + cluster.wait_for_nodes() + + # Wait on placement group that cannot be created. + placement_group = ray.experimental.placement_group( + name="name", strategy="SPREAD", bundles=[ + { + "CPU": 2 + }, + { + "CPU": 2 + }, + ]) + ready, unready = ray.wait([placement_group.ready()]) + assert len(unready) == 0 + assert len(ready) == 1 + table = ray.experimental.placement_group_table(placement_group) + assert table["state"] == "CREATED" + + pg = ray.get(placement_group.ready()) + assert pg.bundles == placement_group.bundles + assert pg.id.binary() == placement_group.id.binary() + + def test_schedule_placement_group_when_node_add(ray_start_cluster): cluster = ray_start_cluster cluster.add_node(num_cpus=4)