From 401d34260287b8aac8ebaf8ed27a18c200b47910 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Mon, 7 Dec 2020 13:53:49 +0800 Subject: [PATCH] [PlacementGroup]Add PlacementGroup wait python api (#12601) --- python/ray/tests/test_placement_group.py | 28 ++++++++++++++++++------ python/ray/util/placement_group.py | 15 +++++++++++++ 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 5acd0fc9e..0a87d9b8f 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -50,7 +50,7 @@ def test_placement_group_pack(ray_start_cluster): "CPU": 2 } ]) - ray.get(placement_group.ready()) + assert placement_group.wait(10000) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -96,7 +96,7 @@ def test_placement_group_strict_pack(ray_start_cluster): }, { "CPU": 2 }]) - ray.get(placement_group.ready()) + assert placement_group.wait(10000) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -142,7 +142,7 @@ def test_placement_group_spread(ray_start_cluster): }, { "CPU": 2 }]) - ray.get(placement_group.ready()) + assert placement_group.wait(10000) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -192,7 +192,7 @@ def test_placement_group_strict_spread(ray_start_cluster): }, { "CPU": 2 }]) - ray.get(placement_group.ready()) + assert placement_group.wait(10000) actor_1 = Actor.options( placement_group=placement_group, placement_group_bundle_index=0).remote() @@ -1177,7 +1177,7 @@ def test_create_placement_group_after_gcs_server_restart( # Create placement group 1 successfully. placement_group1 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) - ray.get(placement_group1.ready(), timeout=10) + assert placement_group1.wait(10000) table = ray.util.placement_group_table(placement_group1) assert table["state"] == "CREATED" @@ -1187,7 +1187,7 @@ def test_create_placement_group_after_gcs_server_restart( # Create placement group 2 successfully. placement_group2 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) - ray.get(placement_group2.ready(), timeout=10) + assert placement_group2.wait(10000) table = ray.util.placement_group_table(placement_group2) assert table["state"] == "CREATED" @@ -1250,7 +1250,21 @@ def test_create_placement_group_during_gcs_server_restart( cluster.head_node.start_gcs_server() for i in range(0, 100): - ray.get(placement_groups[i].ready()) + assert placement_groups[i].wait(10000) + + +def test_placement_group_wait_api(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + placement_group = ray.util.placement_group( + name="name", strategy="PACK", bundles=[{ + "CPU": 2, + }, { + "CPU": 2 + }]) + assert placement_group.wait(10000) if __name__ == "__main__": diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 8a6b6c3d1..e0254fbe6 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -83,6 +83,21 @@ class PlacementGroup: placement_group_bundle_index=bundle_index, resources=resources).remote(self) + def wait(self, timeout_ms: int) -> bool: + """Wait for the placement group to be ready within the specified time. + + Args: + timeout_ms(str): Timeout in milliseconds. + + Return: + True if the placement group is created. False otherwise. + """ + worker = ray.worker.global_worker + worker.check_connected() + + return worker.core_worker.wait_placement_group_ready( + self.id, timeout_ms) + @property def bundle_specs(self) -> List[Dict]: """List[Dict]: Return bundles belonging to this placement group."""