[Placement group] Check if placement group bundle index is valid (#10194)

* add part code

* rebase master

* add java testcase

* fix review comments

* fix lint error

* rebase master

* fix lint error

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin
2020-08-22 02:04:56 +08:00
committed by GitHub
parent 17f801dc69
commit 36c6c4b298
5 changed files with 177 additions and 60 deletions
+12 -4
View File
@@ -8,6 +8,9 @@ import ray.ray_constants as ray_constants
import ray._raylet
import ray.signature as signature
import ray.worker
from ray.experimental.placement_group import PlacementGroup, \
check_placement_group_index
from ray import ActorClassID, Language
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language
@@ -412,7 +415,7 @@ class ActorClass:
max_task_retries=None,
name=None,
detached=False,
placement_group_id=None,
placement_group=None,
placement_group_bundle_index=-1):
"""Create an actor.
@@ -438,7 +441,7 @@ class ActorClass:
guaranteed when max_concurrency > 1.
name: The globally unique name for the actor.
detached: DEPRECATED.
placement_group_id: the placement group this actor belongs to,
placement_group: the placement group this actor belongs to,
or None if it doesn't belong to any group.
placement_group_bundle_index: the index of the bundle
if the actor belongs to a placement group, which may be -1 to
@@ -504,6 +507,12 @@ class ActorClass:
else:
detached = False
if placement_group is None:
placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1)
check_placement_group_index(placement_group,
placement_group_bundle_index)
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
@@ -574,8 +583,7 @@ class ActorClass:
detached,
name if name is not None else "",
is_asyncio,
placement_group_id
if placement_group_id is not None else ray.PlacementGroupID.nil(),
placement_group.id,
placement_group_bundle_index,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))
+29 -9
View File
@@ -1,8 +1,15 @@
from typing import (List, Dict)
import ray
from ray._raylet import (
PlacementGroupID, )
class PlacementGroup:
"""A handle to a placement group.
"""
def __init__(self, id, bundle_count):
self.id = id
self.bundle_count = bundle_count
def placement_group(bundles: List[Dict[str, float]],
@@ -33,19 +40,32 @@ def placement_group(bundles: List[Dict[str, float]],
placement_group_id = worker.core_worker.create_placement_group(
name, bundles, strategy)
return placement_group_id
return PlacementGroup(placement_group_id, len(bundles))
def remove_placement_group(placement_group_id: PlacementGroupID):
assert type(placement_group_id) == PlacementGroupID
def remove_placement_group(placement_group):
assert placement_group is not None
worker = ray.worker.global_worker
worker.check_connected()
worker.core_worker.remove_placement_group(placement_group_id)
worker.core_worker.remove_placement_group(placement_group.id)
def placement_group_table(placement_group_id):
assert placement_group_id is not None
def placement_group_table(placement_group):
assert placement_group is not None
worker = ray.worker.global_worker
worker.check_connected()
return ray.state.state.placement_group_table(placement_group_id)
return ray.state.state.placement_group_table(placement_group.id)
def check_placement_group_index(placement_group, bundle_index):
assert placement_group is not None
if placement_group.id.is_nil():
if bundle_index != -1:
raise ValueError("If placement group is not set, "
"the value of bundle index must be -1.")
elif bundle_index >= placement_group.bundle_count \
or bundle_index < -1:
raise ValueError(f"placement group bundle index {bundle_index} "
f"is invalid. Valid placement group indexes: "
f"0-{placement_group.bundle_count}")
+11 -5
View File
@@ -4,6 +4,8 @@ from functools import wraps
from ray import cloudpickle as pickle
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language, Language
from ray.experimental.placement_group import PlacementGroup, \
check_placement_group_index
import ray.signature
# Default parameters for remote functions.
@@ -60,7 +62,7 @@ class RemoteFunction:
def __init__(self, language, function, function_descriptor, num_cpus,
num_gpus, memory, object_store_memory, resources,
num_return_vals, max_calls, max_retries, placement_group_id,
num_return_vals, max_calls, max_retries, placement_group,
placement_group_bundle_index):
self._language = language
self._function = function
@@ -150,7 +152,7 @@ class RemoteFunction:
object_store_memory=None,
resources=None,
max_retries=None,
placement_group_id=None,
placement_group=None,
placement_group_bundle_index=-1):
"""Submit the remote function for execution."""
worker = ray.worker.global_worker
@@ -187,8 +189,12 @@ class RemoteFunction:
raise ValueError("Non-direct call tasks are no longer supported.")
if max_retries is None:
max_retries = self._max_retries
if placement_group_id is None:
placement_group_id = ray.PlacementGroupID.nil()
if placement_group is None:
placement_group = PlacementGroup(ray.PlacementGroupID.nil(), -1)
check_placement_group_index(placement_group,
placement_group_bundle_index)
resources = ray.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._memory,
@@ -210,7 +216,7 @@ class RemoteFunction:
"cannot be executed locally."
object_refs = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args,
num_return_vals, resources, max_retries, placement_group_id,
num_return_vals, resources, max_retries, placement_group.id,
placement_group_bundle_index)
if len(object_refs) == 1:
+100 -42
View File
@@ -11,6 +11,7 @@ import ray
from ray.test_utils import get_other_nodes, wait_for_condition
import ray.cluster_utils
from ray._raylet import PlacementGroupID
from ray.experimental.placement_group import PlacementGroup
def test_placement_group_pack(ray_start_cluster):
@@ -28,17 +29,17 @@ def test_placement_group_pack(ray_start_cluster):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group_id = ray.experimental.placement_group(
placement_group = ray.experimental.placement_group(
name="name", strategy="PACK", bundles=[{
"CPU": 2
}, {
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
@@ -73,17 +74,17 @@ def test_placement_group_strict_pack(ray_start_cluster):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group_id = ray.experimental.placement_group(
placement_group = ray.experimental.placement_group(
name="name", strategy="STRICT_PACK", bundles=[{
"CPU": 2
}, {
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
@@ -118,17 +119,17 @@ def test_placement_group_spread(ray_start_cluster):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group_id = ray.experimental.placement_group(
placement_group = ray.experimental.placement_group(
name="name", strategy="SPREAD", bundles=[{
"CPU": 2
}, {
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
@@ -163,7 +164,7 @@ def test_placement_group_strict_spread(ray_start_cluster):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group_id = ray.experimental.placement_group(
placement_group = ray.experimental.placement_group(
name="name",
strategy="STRICT_SPREAD",
bundles=[{
@@ -174,13 +175,13 @@ def test_placement_group_strict_spread(ray_start_cluster):
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=1).remote()
actor_3 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=2).remote()
print(ray.get(actor_1.value.remote()))
@@ -218,7 +219,7 @@ def test_placement_group_actor_resource_ids(ray_start_cluster):
ray.init(address=cluster.address)
g1 = ray.experimental.placement_group([{"CPU": 2}])
a1 = F.options(placement_group_id=g1).remote()
a1 = F.options(placement_group=g1).remote()
resources = ray.get(a1.f.remote())
assert len(resources) == 1, resources
assert "CPU_group_" in list(resources.keys())[0], resources
@@ -236,15 +237,14 @@ def test_placement_group_task_resource_ids(ray_start_cluster):
ray.init(address=cluster.address)
g1 = ray.experimental.placement_group([{"CPU": 2}])
o1 = f.options(placement_group_id=g1).remote()
o1 = f.options(placement_group=g1).remote()
resources = ray.get(o1)
assert len(resources) == 1, resources
assert "CPU_group_" in list(resources.keys())[0], resources
assert "CPU_group_0_" not in list(resources.keys())[0], resources
# Now retry with a bundle index constraint.
o1 = f.options(
placement_group_id=g1, placement_group_bundle_index=0).remote()
o1 = f.options(placement_group=g1, placement_group_bundle_index=0).remote()
resources = ray.get(o1)
assert len(resources) == 2, resources
keys = list(resources.keys())
@@ -270,7 +270,7 @@ def test_placement_group_hang(ray_start_cluster):
g1 = ray.experimental.placement_group([{"CPU": 2}])
# This will start out infeasible. The placement group will then be created
# and it transitions to feasible.
o1 = f.options(placement_group_id=g1).remote()
o1 = f.options(placement_group=g1).remote()
resources = ray.get(o1)
assert len(resources) == 1, resources
@@ -283,17 +283,22 @@ def test_remove_placement_group(ray_start_cluster):
ray.init(address=cluster.address)
# First try to remove a placement group that doesn't
# exist. This should not do anything.
random_placement_group_id = PlacementGroupID.from_random()
random_group_id = PlacementGroupID.from_random()
random_placement_group = PlacementGroup(random_group_id, -1)
for _ in range(3):
ray.experimental.remove_placement_group(random_placement_group_id)
ray.experimental.remove_placement_group(random_placement_group)
# Creating a placement group as soon as it is
# created should work.
pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}])
ray.experimental.remove_placement_group(pid)
placement_group = ray.experimental.placement_group([{
"CPU": 2
}, {
"CPU": 2
}])
ray.experimental.remove_placement_group(placement_group)
def is_placement_group_removed():
table = ray.experimental.placement_group_table(pid)
table = ray.experimental.placement_group_table(placement_group)
if "state" not in table:
return False
return table["state"] == "REMOVED"
@@ -301,7 +306,11 @@ def test_remove_placement_group(ray_start_cluster):
wait_for_condition(is_placement_group_removed)
# # Now let's create a placement group.
pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}])
placement_group = ray.experimental.placement_group([{
"CPU": 2
}, {
"CPU": 2
}])
# Create an actor that occupies resources.
@ray.remote(num_cpus=2)
@@ -320,14 +329,15 @@ def test_remove_placement_group(ray_start_cluster):
time.sleep(50)
# Schedule a long running task and actor.
task_ref = long_running_task.options(placement_group_id=pid).remote()
a = A.options(placement_group_id=pid).remote()
task_ref = long_running_task.options(
placement_group=placement_group).remote()
a = A.options(placement_group=placement_group).remote()
assert ray.get(a.f.remote()) == 3
ray.experimental.remove_placement_group(pid)
ray.experimental.remove_placement_group(placement_group)
# Subsequent remove request shouldn't do anything.
for _ in range(3):
ray.experimental.remove_placement_group(pid)
ray.experimental.remove_placement_group(placement_group)
# Make sure placement group resources are
# released and we can schedule this task.
@@ -350,8 +360,12 @@ def test_remove_pending_placement_group(ray_start_cluster):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
# Create a placement group that cannot be scheduled now.
pid = ray.experimental.placement_group([{"GPU": 2}, {"CPU": 2}])
ray.experimental.remove_placement_group(pid)
placement_group = ray.experimental.placement_group([{
"GPU": 2
}, {
"CPU": 2
}])
ray.experimental.remove_placement_group(placement_group)
# TODO(sang): Add state check here.
@ray.remote(num_cpus=4)
def f():
@@ -381,9 +395,9 @@ def test_placement_group_table(ray_start_cluster):
name = "name"
strategy = "PACK"
bundles = [{"CPU": 2, "GPU": 1}, {"CPU": 2}]
placement_group_id = ray.experimental.placement_group(
placement_group = ray.experimental.placement_group(
name=name, strategy=strategy, bundles=bundles)
result = ray.experimental.placement_group_table(placement_group_id)
result = ray.experimental.placement_group_table(placement_group)
assert result["name"] == name
assert result["strategy"] == strategy
for i in range(len(bundles)):
@@ -394,11 +408,11 @@ def test_placement_group_table(ray_start_cluster):
cluster.add_node(num_cpus=5, num_gpus=1)
cluster.wait_for_nodes()
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=0).remote()
ray.get(actor_1.value.remote())
result = ray.experimental.placement_group_table(placement_group_id)
result = ray.experimental.placement_group_table(placement_group)
assert result["state"] == "CREATED"
@@ -414,7 +428,7 @@ def test_cuda_visible_devices(ray_start_cluster):
ray.init(address=cluster.address)
g1 = ray.experimental.placement_group([{"CPU": 1, "GPU": 1}])
o1 = f.options(placement_group_id=g1).remote()
o1 = f.options(placement_group=g1).remote()
devices = ray.get(o1)
assert devices == "0", devices
@@ -441,7 +455,7 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
assert len(nodes) == 3
assert nodes[0]["alive"] and nodes[1]["alive"] and nodes[2]["alive"]
placement_group_id = ray.experimental.placement_group(
placement_group = ray.experimental.placement_group(
name="name",
strategy="SPREAD",
bundles=[{
@@ -452,15 +466,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=0,
detached=True).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=1,
detached=True).remote()
actor_3 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=2,
detached=True).remote()
print(ray.get(actor_1.value.remote()))
@@ -471,15 +485,15 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
cluster.wait_for_nodes()
actor_4 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=0,
detached=True).remote()
actor_5 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=1,
detached=True).remote()
actor_6 = Actor.options(
placement_group_id=placement_group_id,
placement_group=placement_group,
placement_group_bundle_index=2,
detached=True).remote()
print(ray.get(actor_4.value.remote()))
@@ -488,5 +502,49 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster):
ray.shutdown()
def test_check_bundle_index(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group = ray.experimental.placement_group(
name="name", strategy="SPREAD", bundles=[{
"CPU": 2
}, {
"CPU": 2
}])
error_count = 0
try:
Actor.options(
placement_group=placement_group,
placement_group_bundle_index=3).remote()
except ValueError:
error_count = error_count + 1
assert error_count == 1
try:
Actor.options(
placement_group=placement_group,
placement_group_bundle_index=-2).remote()
except ValueError:
error_count = error_count + 1
assert error_count == 2
try:
Actor.options(placement_group_bundle_index=0).remote()
except ValueError:
error_count = error_count + 1
assert error_count == 3
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))