diff --git a/java/test/src/main/java/io/ray/test/PlacementGroupTest.java b/java/test/src/main/java/io/ray/test/PlacementGroupTest.java index e3c6fa845..370a518c8 100644 --- a/java/test/src/main/java/io/ray/test/PlacementGroupTest.java +++ b/java/test/src/main/java/io/ray/test/PlacementGroupTest.java @@ -47,4 +47,29 @@ public class PlacementGroupTest extends BaseTest { // Test calling an actor. Assert.assertEquals(Integer.valueOf(1), actor.task(Counter::getValue).remote().get()); } + + public void testCheckBundleIndex() { + List> bundles = new ArrayList<>(); + Map bundle = new HashMap<>(); + bundle.put("CPU", 1.0); + bundles.add(bundle); + PlacementStrategy strategy = PlacementStrategy.PACK; + PlacementGroup placementGroup = Ray.createPlacementGroup(bundles, strategy); + + int exceptionCount = 0; + try { + Ray.actor(Counter::new, 1).setPlacementGroup(placementGroup, 1).remote(); + } catch (IllegalArgumentException e) { + ++exceptionCount; + } + Assert.assertEquals(1, exceptionCount); + + + try { + Ray.actor(Counter::new, 1).setPlacementGroup(placementGroup, -1).remote(); + } catch (IllegalArgumentException e) { + ++exceptionCount; + } + Assert.assertEquals(2, exceptionCount); + } } diff --git a/python/ray/actor.py b/python/ray/actor.py index b7a2a851a..c320b2c51 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -8,6 +8,9 @@ import ray.ray_constants as ray_constants import ray._raylet import ray.signature as signature import ray.worker +from ray.experimental.placement_group import PlacementGroup, \ + check_placement_group_index + from ray import ActorClassID, Language from ray._raylet import PythonFunctionDescriptor from ray import cross_language @@ -412,7 +415,7 @@ class ActorClass: max_task_retries=None, name=None, detached=False, - placement_group_id=None, + placement_group=None, placement_group_bundle_index=-1): """Create an actor. @@ -438,7 +441,7 @@ class ActorClass: guaranteed when max_concurrency > 1. name: The globally unique name for the actor. detached: DEPRECATED. - placement_group_id: the placement group this actor belongs to, + placement_group: the placement group this actor belongs to, or None if it doesn't belong to any group. placement_group_bundle_index: the index of the bundle if the actor belongs to a placement group, which may be -1 to @@ -504,6 +507,12 @@ class ActorClass: else: detached = False + if placement_group is None: + placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1) + + check_placement_group_index(placement_group, + placement_group_bundle_index) + # Set the actor's default resources if not already set. First three # conditions are to check that no resources were specified in the # decorator. Last three conditions are to check that no resources were @@ -574,8 +583,7 @@ class ActorClass: detached, name if name is not None else "", is_asyncio, - placement_group_id - if placement_group_id is not None else ray.PlacementGroupID.nil(), + placement_group.id, placement_group_bundle_index, # Store actor_method_cpu in actor handle's extension data. extension_data=str(actor_method_cpu)) diff --git a/python/ray/experimental/placement_group.py b/python/ray/experimental/placement_group.py index 190b45849..8e52b8f3c 100644 --- a/python/ray/experimental/placement_group.py +++ b/python/ray/experimental/placement_group.py @@ -1,8 +1,15 @@ from typing import (List, Dict) import ray -from ray._raylet import ( - PlacementGroupID, ) + + +class PlacementGroup: + """A handle to a placement group. + """ + + def __init__(self, id, bundle_count): + self.id = id + self.bundle_count = bundle_count def placement_group(bundles: List[Dict[str, float]], @@ -33,19 +40,32 @@ def placement_group(bundles: List[Dict[str, float]], placement_group_id = worker.core_worker.create_placement_group( name, bundles, strategy) - return placement_group_id + return PlacementGroup(placement_group_id, len(bundles)) -def remove_placement_group(placement_group_id: PlacementGroupID): - assert type(placement_group_id) == PlacementGroupID +def remove_placement_group(placement_group): + assert placement_group is not None worker = ray.worker.global_worker worker.check_connected() - worker.core_worker.remove_placement_group(placement_group_id) + worker.core_worker.remove_placement_group(placement_group.id) -def placement_group_table(placement_group_id): - assert placement_group_id is not None +def placement_group_table(placement_group): + assert placement_group is not None worker = ray.worker.global_worker worker.check_connected() - return ray.state.state.placement_group_table(placement_group_id) + return ray.state.state.placement_group_table(placement_group.id) + + +def check_placement_group_index(placement_group, bundle_index): + assert placement_group is not None + if placement_group.id.is_nil(): + if bundle_index != -1: + raise ValueError("If placement group is not set, " + "the value of bundle index must be -1.") + elif bundle_index >= placement_group.bundle_count \ + or bundle_index < -1: + raise ValueError(f"placement group bundle index {bundle_index} " + f"is invalid. Valid placement group indexes: " + f"0-{placement_group.bundle_count}") diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 3fa62b9b8..aa53f0d51 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -4,6 +4,8 @@ from functools import wraps from ray import cloudpickle as pickle from ray._raylet import PythonFunctionDescriptor from ray import cross_language, Language +from ray.experimental.placement_group import PlacementGroup, \ + check_placement_group_index import ray.signature # Default parameters for remote functions. @@ -60,7 +62,7 @@ class RemoteFunction: def __init__(self, language, function, function_descriptor, num_cpus, num_gpus, memory, object_store_memory, resources, - num_return_vals, max_calls, max_retries, placement_group_id, + num_return_vals, max_calls, max_retries, placement_group, placement_group_bundle_index): self._language = language self._function = function @@ -150,7 +152,7 @@ class RemoteFunction: object_store_memory=None, resources=None, max_retries=None, - placement_group_id=None, + placement_group=None, placement_group_bundle_index=-1): """Submit the remote function for execution.""" worker = ray.worker.global_worker @@ -187,8 +189,12 @@ class RemoteFunction: raise ValueError("Non-direct call tasks are no longer supported.") if max_retries is None: max_retries = self._max_retries - if placement_group_id is None: - placement_group_id = ray.PlacementGroupID.nil() + + if placement_group is None: + placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1) + + check_placement_group_index(placement_group, + placement_group_bundle_index) resources = ray.utils.resources_from_resource_arguments( self._num_cpus, self._num_gpus, self._memory, @@ -210,7 +216,7 @@ class RemoteFunction: "cannot be executed locally." object_refs = worker.core_worker.submit_task( self._language, self._function_descriptor, list_args, - num_return_vals, resources, max_retries, placement_group_id, + num_return_vals, resources, max_retries, placement_group.id, placement_group_bundle_index) if len(object_refs) == 1: diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index cae741c1d..13fb0637d 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -11,6 +11,7 @@ import ray from ray.test_utils import get_other_nodes, wait_for_condition import ray.cluster_utils from ray._raylet import PlacementGroupID +from ray.experimental.placement_group import PlacementGroup def test_placement_group_pack(ray_start_cluster): @@ -28,17 +29,17 @@ def test_placement_group_pack(ray_start_cluster): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group_id = ray.experimental.placement_group( + placement_group = ray.experimental.placement_group( name="name", strategy="PACK", bundles=[{ "CPU": 2 }, { "CPU": 2 }]) actor_1 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=0).remote() actor_2 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=1).remote() print(ray.get(actor_1.value.remote())) @@ -73,17 +74,17 @@ def test_placement_group_strict_pack(ray_start_cluster): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group_id = ray.experimental.placement_group( + placement_group = ray.experimental.placement_group( name="name", strategy="STRICT_PACK", bundles=[{ "CPU": 2 }, { "CPU": 2 }]) actor_1 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=0).remote() actor_2 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=1).remote() print(ray.get(actor_1.value.remote())) @@ -118,17 +119,17 @@ def test_placement_group_spread(ray_start_cluster): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group_id = ray.experimental.placement_group( + placement_group = ray.experimental.placement_group( name="name", strategy="SPREAD", bundles=[{ "CPU": 2 }, { "CPU": 2 }]) actor_1 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=0).remote() actor_2 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=1).remote() print(ray.get(actor_1.value.remote())) @@ -163,7 +164,7 @@ def test_placement_group_strict_spread(ray_start_cluster): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group_id = ray.experimental.placement_group( + placement_group = ray.experimental.placement_group( name="name", strategy="STRICT_SPREAD", bundles=[{ @@ -174,13 +175,13 @@ def test_placement_group_strict_spread(ray_start_cluster): "CPU": 2 }]) actor_1 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=0).remote() actor_2 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=1).remote() actor_3 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=2).remote() print(ray.get(actor_1.value.remote())) @@ -218,7 +219,7 @@ def test_placement_group_actor_resource_ids(ray_start_cluster): ray.init(address=cluster.address) g1 = ray.experimental.placement_group([{"CPU": 2}]) - a1 = F.options(placement_group_id=g1).remote() + a1 = F.options(placement_group=g1).remote() resources = ray.get(a1.f.remote()) assert len(resources) == 1, resources assert "CPU_group_" in list(resources.keys())[0], resources @@ -236,15 +237,14 @@ def test_placement_group_task_resource_ids(ray_start_cluster): ray.init(address=cluster.address) g1 = ray.experimental.placement_group([{"CPU": 2}]) - o1 = f.options(placement_group_id=g1).remote() + o1 = f.options(placement_group=g1).remote() resources = ray.get(o1) assert len(resources) == 1, resources assert "CPU_group_" in list(resources.keys())[0], resources assert "CPU_group_0_" not in list(resources.keys())[0], resources # Now retry with a bundle index constraint. - o1 = f.options( - placement_group_id=g1, placement_group_bundle_index=0).remote() + o1 = f.options(placement_group=g1, placement_group_bundle_index=0).remote() resources = ray.get(o1) assert len(resources) == 2, resources keys = list(resources.keys()) @@ -270,7 +270,7 @@ def test_placement_group_hang(ray_start_cluster): g1 = ray.experimental.placement_group([{"CPU": 2}]) # This will start out infeasible. The placement group will then be created # and it transitions to feasible. - o1 = f.options(placement_group_id=g1).remote() + o1 = f.options(placement_group=g1).remote() resources = ray.get(o1) assert len(resources) == 1, resources @@ -283,17 +283,22 @@ def test_remove_placement_group(ray_start_cluster): ray.init(address=cluster.address) # First try to remove a placement group that doesn't # exist. This should not do anything. - random_placement_group_id = PlacementGroupID.from_random() + random_group_id = PlacementGroupID.from_random() + random_placement_group = PlacementGroup(random_group_id, -1) for _ in range(3): - ray.experimental.remove_placement_group(random_placement_group_id) + ray.experimental.remove_placement_group(random_placement_group) # Creating a placement group as soon as it is # created should work. - pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}]) - ray.experimental.remove_placement_group(pid) + placement_group = ray.experimental.placement_group([{ + "CPU": 2 + }, { + "CPU": 2 + }]) + ray.experimental.remove_placement_group(placement_group) def is_placement_group_removed(): - table = ray.experimental.placement_group_table(pid) + table = ray.experimental.placement_group_table(placement_group) if "state" not in table: return False return table["state"] == "REMOVED" @@ -301,7 +306,11 @@ def test_remove_placement_group(ray_start_cluster): wait_for_condition(is_placement_group_removed) # # Now let's create a placement group. - pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}]) + placement_group = ray.experimental.placement_group([{ + "CPU": 2 + }, { + "CPU": 2 + }]) # Create an actor that occupies resources. @ray.remote(num_cpus=2) @@ -320,14 +329,15 @@ def test_remove_placement_group(ray_start_cluster): time.sleep(50) # Schedule a long running task and actor. - task_ref = long_running_task.options(placement_group_id=pid).remote() - a = A.options(placement_group_id=pid).remote() + task_ref = long_running_task.options( + placement_group=placement_group).remote() + a = A.options(placement_group=placement_group).remote() assert ray.get(a.f.remote()) == 3 - ray.experimental.remove_placement_group(pid) + ray.experimental.remove_placement_group(placement_group) # Subsequent remove request shouldn't do anything. for _ in range(3): - ray.experimental.remove_placement_group(pid) + ray.experimental.remove_placement_group(placement_group) # Make sure placement group resources are # released and we can schedule this task. @@ -350,8 +360,12 @@ def test_remove_pending_placement_group(ray_start_cluster): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) # Create a placement group that cannot be scheduled now. - pid = ray.experimental.placement_group([{"GPU": 2}, {"CPU": 2}]) - ray.experimental.remove_placement_group(pid) + placement_group = ray.experimental.placement_group([{ + "GPU": 2 + }, { + "CPU": 2 + }]) + ray.experimental.remove_placement_group(placement_group) # TODO(sang): Add state check here. @ray.remote(num_cpus=4) def f(): @@ -381,9 +395,9 @@ def test_placement_group_table(ray_start_cluster): name = "name" strategy = "PACK" bundles = [{"CPU": 2, "GPU": 1}, {"CPU": 2}] - placement_group_id = ray.experimental.placement_group( + placement_group = ray.experimental.placement_group( name=name, strategy=strategy, bundles=bundles) - result = ray.experimental.placement_group_table(placement_group_id) + result = ray.experimental.placement_group_table(placement_group) assert result["name"] == name assert result["strategy"] == strategy for i in range(len(bundles)): @@ -394,11 +408,11 @@ def test_placement_group_table(ray_start_cluster): cluster.add_node(num_cpus=5, num_gpus=1) cluster.wait_for_nodes() actor_1 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=0).remote() ray.get(actor_1.value.remote()) - result = ray.experimental.placement_group_table(placement_group_id) + result = ray.experimental.placement_group_table(placement_group) assert result["state"] == "CREATED" @@ -414,7 +428,7 @@ def test_cuda_visible_devices(ray_start_cluster): ray.init(address=cluster.address) g1 = ray.experimental.placement_group([{"CPU": 1, "GPU": 1}]) - o1 = f.options(placement_group_id=g1).remote() + o1 = f.options(placement_group=g1).remote() devices = ray.get(o1) assert devices == "0", devices @@ -441,7 +455,7 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): assert len(nodes) == 3 assert nodes[0]["alive"] and nodes[1]["alive"] and nodes[2]["alive"] - placement_group_id = ray.experimental.placement_group( + placement_group = ray.experimental.placement_group( name="name", strategy="SPREAD", bundles=[{ @@ -452,15 +466,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): "CPU": 2 }]) actor_1 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=0, detached=True).remote() actor_2 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=1, detached=True).remote() actor_3 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=2, detached=True).remote() print(ray.get(actor_1.value.remote())) @@ -471,15 +485,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): cluster.wait_for_nodes() actor_4 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=0, detached=True).remote() actor_5 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=1, detached=True).remote() actor_6 = Actor.options( - placement_group_id=placement_group_id, + placement_group=placement_group, placement_group_bundle_index=2, detached=True).remote() print(ray.get(actor_4.value.remote())) @@ -488,5 +502,49 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster): ray.shutdown() +def test_check_bundle_index(ray_start_cluster): + @ray.remote(num_cpus=2) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + placement_group = ray.experimental.placement_group( + name="name", strategy="SPREAD", bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }]) + + error_count = 0 + try: + Actor.options( + placement_group=placement_group, + placement_group_bundle_index=3).remote() + except ValueError: + error_count = error_count + 1 + assert error_count == 1 + + try: + Actor.options( + placement_group=placement_group, + placement_group_bundle_index=-2).remote() + except ValueError: + error_count = error_count + 1 + assert error_count == 2 + + try: + Actor.options(placement_group_bundle_index=0).remote() + except ValueError: + error_count = error_count + 1 + assert error_count == 3 + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__]))