[Placement Group] Wait (#10259)

* Initial progress done.

* Fix wrong test.

* Improve tests.

* Update code.

* Addressed code review and merge conflict.

* Addressed code review.
This commit is contained in:
SangBin Cho
2020-08-24 20:14:48 -07:00
committed by GitHub
parent 6dc22a6d68
commit 3b3ca96a4e
4 changed files with 147 additions and 21 deletions
+1 -1
View File
@@ -507,7 +507,7 @@ class ActorClass:
detached = False
if placement_group is None:
placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1)
placement_group = PlacementGroup.empty()
check_placement_group_index(placement_group,
placement_group_bundle_index)
+68 -3
View File
@@ -1,15 +1,72 @@
from typing import (List, Dict)
import ray
from ray._raylet import PlacementGroupID, ObjectRef
class PlacementGroup:
"""A handle to a placement group.
Args:
id: Placement group id.
bundles: List of bundles.
"""
def __init__(self, id, bundle_count):
@staticmethod
def empty():
return PlacementGroup(PlacementGroupID.nil(), [])
def __init__(self, id: PlacementGroupID, bundles: List[Dict[str, float]]):
self.id = id
self.bundle_count = bundle_count
self.bundles = bundles
def ready(self) -> ObjectRef:
"""Returns an object ID to check ready status."""
worker = ray.worker.global_worker
worker.check_connected()
@ray.remote(num_cpus=0, max_calls=0)
def bundle_reservation_check(placement_group):
return placement_group
assert len(self.bundles) != 0, (
"ready() cannot be called on placement group object with a "
f"bundle length == 0, current bundle length: {len(self.bundles)}")
# Select the first bundle to schedule a dummy task.
# Since the placement group creation will be atomic, it is sufficient
# to schedule a single task.
bundle_index = 0
bundle = self.bundles[bundle_index]
resource_name, value = self._get_none_zero_resource(bundle)
num_cpus = 0
num_gpus = 0
resources = None
if resource_name == "CPU":
num_cpus = value
elif resource_name == "GPU":
num_gpus = value
else:
resources[resource_name] = value
return bundle_reservation_check.options(
num_cpus=num_cpus,
num_gpus=num_gpus,
placement_group=self,
placement_group_bundle_index=bundle_index,
resources=resources).remote(self)
@property
def bundle_count(self):
return len(self.bundles)
def _get_none_zero_resource(self, bundle: List[Dict]):
for key, value in bundle.items():
if value > 0:
value = min(value, 0.001)
return key, value
assert False, "This code should be unreachable."
def placement_group(bundles: List[Dict[str, float]],
@@ -37,10 +94,18 @@ def placement_group(bundles: List[Dict[str, float]],
raise ValueError(
"The type of bundles must be list, got {}".format(bundles))
# Validate bundles
for bundle in bundles:
if (len(bundle) == 0 or all(resource_value == 0
for resource_value in bundle.values())):
raise ValueError(
"Bundles cannot be an empty dictionary or "
f"resources with only 0 values. Bundles: {bundles}")
placement_group_id = worker.core_worker.create_placement_group(
name, bundles, strategy)
return PlacementGroup(placement_group_id, len(bundles))
return PlacementGroup(placement_group_id, bundles)
def remove_placement_group(placement_group):
+1 -1
View File
@@ -191,7 +191,7 @@ class RemoteFunction:
max_retries = self._max_retries
if placement_group is None:
placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1)
placement_group = PlacementGroup.empty()
check_placement_group_index(placement_group,
placement_group_bundle_index)
+77 -16
View File
@@ -35,6 +35,7 @@ def test_placement_group_pack(ray_start_cluster):
}, {
"CPU": 2
}])
ray.get(placement_group.ready())
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
@@ -42,8 +43,8 @@ def test_placement_group_pack(ray_start_cluster):
placement_group=placement_group,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
ray.get(actor_1.value.remote())
ray.get(actor_2.value.remote())
# Get all actors.
actor_infos = ray.actors()
@@ -80,6 +81,7 @@ def test_placement_group_strict_pack(ray_start_cluster):
}, {
"CPU": 2
}])
ray.get(placement_group.ready())
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
@@ -87,8 +89,8 @@ def test_placement_group_strict_pack(ray_start_cluster):
placement_group=placement_group,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
ray.get(actor_1.value.remote())
ray.get(actor_2.value.remote())
# Get all actors.
actor_infos = ray.actors()
@@ -125,6 +127,7 @@ def test_placement_group_spread(ray_start_cluster):
}, {
"CPU": 2
}])
ray.get(placement_group.ready())
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
@@ -132,8 +135,8 @@ def test_placement_group_spread(ray_start_cluster):
placement_group=placement_group,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
ray.get(actor_1.value.remote())
ray.get(actor_2.value.remote())
# Get all actors.
actor_infos = ray.actors()
@@ -174,6 +177,7 @@ def test_placement_group_strict_spread(ray_start_cluster):
}, {
"CPU": 2
}])
ray.get(placement_group.ready())
actor_1 = Actor.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
@@ -184,9 +188,9 @@ def test_placement_group_strict_spread(ray_start_cluster):
placement_group=placement_group,
placement_group_bundle_index=2).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
print(ray.get(actor_3.value.remote()))
ray.get(actor_1.value.remote())
ray.get(actor_2.value.remote())
ray.get(actor_3.value.remote())
# Get all actors.
actor_infos = ray.actors()
@@ -284,7 +288,7 @@ def test_remove_placement_group(ray_start_cluster):
# First try to remove a placement group that doesn't
# exist. This should not do anything.
random_group_id = PlacementGroupID.from_random()
random_placement_group = PlacementGroup(random_group_id, -1)
random_placement_group = PlacementGroup(random_group_id, [{"CPU": 1}])
for _ in range(3):
ray.experimental.remove_placement_group(random_placement_group)
@@ -477,9 +481,9 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
placement_group=placement_group,
placement_group_bundle_index=2,
detached=True).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
print(ray.get(actor_3.value.remote()))
ray.get(actor_1.value.remote())
ray.get(actor_2.value.remote())
ray.get(actor_3.value.remote())
cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1])
cluster.wait_for_nodes()
@@ -496,9 +500,9 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
placement_group=placement_group,
placement_group_bundle_index=2,
detached=True).remote()
print(ray.get(actor_4.value.remote()))
print(ray.get(actor_5.value.remote()))
print(ray.get(actor_6.value.remote()))
ray.get(actor_4.value.remote())
ray.get(actor_5.value.remote())
ray.get(actor_6.value.remote())
ray.shutdown()
@@ -546,6 +550,63 @@ def test_check_bundle_index(ray_start_cluster):
assert error_count == 3
def test_pending_placement_group_wait(ray_start_cluster):
cluster = ray_start_cluster
[cluster.add_node(num_cpus=2) for _ in range(1)]
ray.init(address=cluster.address)
cluster.wait_for_nodes()
# Wait on placement group that cannot be created.
placement_group = ray.experimental.placement_group(
name="name",
strategy="SPREAD",
bundles=[
{
"CPU": 2
},
{
"CPU": 2
},
{
"GPU": 2
},
])
ready, unready = ray.wait([placement_group.ready()], timeout=0.1)
assert len(unready) == 1
assert len(ready) == 0
table = ray.experimental.placement_group_table(placement_group)
assert table["state"] == "PENDING"
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(placement_group.ready(), timeout=0.1)
def test_placement_group_wait(ray_start_cluster):
cluster = ray_start_cluster
[cluster.add_node(num_cpus=2) for _ in range(2)]
ray.init(address=cluster.address)
cluster.wait_for_nodes()
# Wait on placement group that cannot be created.
placement_group = ray.experimental.placement_group(
name="name", strategy="SPREAD", bundles=[
{
"CPU": 2
},
{
"CPU": 2
},
])
ready, unready = ray.wait([placement_group.ready()])
assert len(unready) == 0
assert len(ready) == 1
table = ray.experimental.placement_group_table(placement_group)
assert table["state"] == "CREATED"
pg = ray.get(placement_group.ready())
assert pg.bundles == placement_group.bundles
assert pg.id.binary() == placement_group.id.binary()
def test_schedule_placement_group_when_node_add(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)