Allow tasks to be used with placement groups (#9738)

This commit is contained in:
Eric Liang
2020-07-31 10:51:37 -07:00
committed by GitHub
parent 78995d085f
commit b73080c85f
11 changed files with 177 additions and 120 deletions
+11 -4
View File
@@ -878,13 +878,17 @@ cdef class CoreWorker:
args,
int num_return_vals,
resources,
int max_retries):
int max_retries,
PlacementGroupID placement_group_id,
int64_t placement_group_bundle_index):
cdef:
unordered_map[c_string, double] c_resources
CTaskOptions task_options
CRayFunction ray_function
c_vector[unique_ptr[CTaskArg]] args_vector
c_vector[CObjectID] return_ids
CPlacementGroupID c_placement_group_id = \
placement_group_id.native()
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
@@ -897,7 +901,8 @@ cdef class CoreWorker:
with nogil:
CCoreWorkerProcess.GetCoreWorker().SubmitTask(
ray_function, args_vector, task_options, &return_ids,
max_retries)
max_retries, c_pair[CPlacementGroupID, int64_t](
c_placement_group_id, placement_group_bundle_index))
return VectorToObjectRefs(return_ids)
@@ -941,7 +946,9 @@ cdef class CoreWorker:
max_restarts, max_task_retries, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, name, is_asyncio,
c_pair[CPlacementGroupID, int64_t](c_placement_group_id, placement_group_bundle_index)),
c_pair[CPlacementGroupID, int64_t](
c_placement_group_id,
placement_group_bundle_index)),
extension_data,
&c_actor_id))
@@ -955,7 +962,7 @@ cdef class CoreWorker:
cdef:
CPlacementGroupID c_placement_group_id
CPlacementStrategy c_strategy
if strategy == b"PACK":
c_strategy = PLACEMENT_STRATEGY_PACK
else:
+19 -23
View File
@@ -398,22 +398,24 @@ class ActorClass:
return ActorOptionWrapper()
def _remote(self,
args=None,
kwargs=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
is_direct_call=None,
max_concurrency=None,
max_restarts=None,
max_task_retries=None,
name=None,
detached=False,
placement_group_id=None,
placement_group_bundle_index=None):
def _remote(
self,
args=None,
kwargs=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
is_direct_call=None,
max_concurrency=None,
max_restarts=None,
max_task_retries=None,
name=None,
detached=False,
placement_group_id=None,
# TODO(ekl) set default to -1 once we support -1 as "any index"
placement_group_bundle_index=0):
"""Create an actor.
This method allows more flexibility than the remote method because
@@ -503,11 +505,6 @@ class ActorClass:
else:
detached = False
if placement_group_id is not None and placement_group_bundle_index is \
None:
raise ValueError("The placement_group_id is set."
"But the bundle_index is not set.")
# Set the actor's default resources if not already set. First three
# conditions are to check that no resources were specified in the
# decorator. Last three conditions are to check that no resources were
@@ -580,8 +577,7 @@ class ActorClass:
is_asyncio,
placement_group_id
if placement_group_id is not None else ray.PlacementGroupID.nil(),
placement_group_bundle_index
if placement_group_bundle_index is not None else -1,
placement_group_bundle_index,
# Store actor_method_cpu in actor handle's extension data.
extension_data=str(actor_method_cpu))
+8 -1
View File
@@ -4,7 +4,7 @@ from typing import (List, Dict)
def placement_group(bundles: List[Dict[str, float]],
strategy: str = "PACK",
name: str = None):
name: str = "unnamed_group"):
"""
Create a placement group.
@@ -21,6 +21,13 @@ def placement_group(bundles: List[Dict[str, float]],
name: The name of the placement group.
"""
worker = ray.worker.global_worker
worker.check_connected()
if not isinstance(bundles, list):
raise ValueError(
"The type of bundles must be list, got {}".format(bundles))
placement_group_id = worker.core_worker.create_placement_group(
name, bundles, strategy)
return placement_group_id
+4 -2
View File
@@ -5,6 +5,7 @@
from libc.stdint cimport int64_t
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.pair cimport pair as c_pair
from libcpp.string cimport string as c_string
from libcpp.unordered_map cimport unordered_map
from libcpp.utility cimport pair
@@ -87,14 +88,15 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids,
int max_retries)
int max_retries,
c_pair[CPlacementGroupID, int64_t] placement_options)
CRayStatus CreateActor(
const CRayFunction &function,
const c_vector[unique_ptr[CTaskArg]] &args,
const CActorCreationOptions &options,
const c_string &extension_data, CActorID *actor_id)
CRayStatus CreatePlacementGroup(
const CPlacementGroupCreationOptions &options,
const CPlacementGroupCreationOptions &options,
CPlacementGroupID *placement_group_id)
void SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
+21 -13
View File
@@ -60,7 +60,8 @@ class RemoteFunction:
def __init__(self, language, function, function_descriptor, num_cpus,
num_gpus, memory, object_store_memory, resources,
num_return_vals, max_calls, max_retries):
num_return_vals, max_calls, max_retries, placement_group_id,
placement_group_bundle_index):
self._language = language
self._function = function
self._function_name = (
@@ -138,17 +139,21 @@ class RemoteFunction:
return FuncWrapper()
def _remote(self,
args=None,
kwargs=None,
num_return_vals=None,
is_direct_call=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
max_retries=None):
def _remote(
self,
args=None,
kwargs=None,
num_return_vals=None,
is_direct_call=None,
num_cpus=None,
num_gpus=None,
memory=None,
object_store_memory=None,
resources=None,
max_retries=None,
placement_group_id=None,
# TODO(ekl) set default to -1 once we support -1 as "any index"
placement_group_bundle_index=0):
"""Submit the remote function for execution."""
worker = ray.worker.global_worker
worker.check_connected()
@@ -184,6 +189,8 @@ class RemoteFunction:
raise ValueError("Non-direct call tasks are no longer supported.")
if max_retries is None:
max_retries = self._max_retries
if placement_group_id is None:
placement_group_id = ray.PlacementGroupID.nil()
resources = ray.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._memory,
@@ -205,7 +212,8 @@ class RemoteFunction:
"cannot be executed locally."
object_refs = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args,
num_return_vals, resources, max_retries)
num_return_vals, resources, max_retries, placement_group_id,
placement_group_bundle_index)
if len(object_refs) == 1:
return object_refs[0]
+61 -45
View File
@@ -4,18 +4,12 @@ try:
except ImportError:
pytest_timeout = None
import sys
import os
import ray
import ray.test_utils
import ray.cluster_utils
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_pack(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
@@ -61,29 +55,6 @@ def test_placement_group_pack(ray_start_cluster):
assert node_of_actor_1 == node_of_actor_2
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_pack_best_effort(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
# TODO(Shanly):
pass
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_spread(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
@@ -106,9 +77,11 @@ def test_placement_group_spread(ray_start_cluster):
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id, bundle_index=0).remote()
placement_group_id=placement_group_id,
placement_group_bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id, bundle_index=1).remote()
placement_group_id=placement_group_id,
placement_group_bundle_index=1).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
@@ -127,22 +100,65 @@ def test_placement_group_spread(ray_start_cluster):
assert node_of_actor_1 != node_of_actor_2
@pytest.mark.skipif(
os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true",
reason=("This edge case is not handled when GCS actor management is off. "
"We won't fix this because GCS actor management "
"will be on by default anyway."))
def test_placement_group_spread_best_effort(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def test_placement_group_actor_resource_ids(ray_start_cluster):
@ray.remote(num_cpus=1)
class F:
def f(self):
return ray.get_resource_ids()
def value(self):
return self.n
cluster = ray_start_cluster
num_nodes = 1
for _ in range(num_nodes):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
# TODO(Shanly):
pass
g1 = ray.experimental.placement_group([{"CPU": 2}])
a1 = F.options(placement_group_id=g1).remote()
resources = ray.get(a1.f.remote())
assert len(resources) == 1, resources
assert "CPU_group_" in list(resources.keys())[0], resources
def test_placement_group_task_resource_ids(ray_start_cluster):
@ray.remote(num_cpus=1)
def f():
return ray.get_resource_ids()
cluster = ray_start_cluster
num_nodes = 1
for _ in range(num_nodes):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
g1 = ray.experimental.placement_group([{"CPU": 2}])
o1 = f.options(placement_group_id=g1).remote()
resources = ray.get(o1)
assert len(resources) == 1, resources
assert "CPU_group_" in list(resources.keys())[0], resources
def test_placement_group_hang(ray_start_cluster):
@ray.remote(num_cpus=1)
def f():
return ray.get_resource_ids()
cluster = ray_start_cluster
num_nodes = 1
for _ in range(num_nodes):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
# Warm workers up, so that this triggers the hang rice.
ray.get(f.remote())
g1 = ray.experimental.placement_group([{"CPU": 2}])
# This will start out infeasible. The placement group will then be created
# and it transitions to feasible.
o1 = f.options(placement_group_id=g1).remote()
resources = ray.get(o1)
assert len(resources) == 1, resources
assert "CPU_group_" in list(resources.keys())[0], resources
if __name__ == "__main__":
+11 -2
View File
@@ -1768,7 +1768,9 @@ def make_decorator(num_return_vals=None,
max_retries=None,
max_restarts=None,
max_task_retries=None,
worker=None):
worker=None,
placement_group_id=None,
placement_group_bundle_index=0):
def decorator(function_or_class):
if (inspect.isfunction(function_or_class)
or is_cython(function_or_class)):
@@ -1783,7 +1785,8 @@ def make_decorator(num_return_vals=None,
return ray.remote_function.RemoteFunction(
Language.PYTHON, function_or_class, None, num_cpus, num_gpus,
memory, object_store_memory, resources, num_return_vals,
max_calls, max_retries)
max_calls, max_retries, placement_group_id,
placement_group_bundle_index)
if inspect.isclass(function_or_class):
if num_return_vals is not None:
@@ -1854,6 +1857,10 @@ def remote(*args, **kwargs):
number of times that the remote function should be rerun when the worker
process executing it crashes unexpectedly. The minimum valid value is 0,
the default is 4 (default), and a value of -1 indicates infinite retries.
* **placement_group_id**: the placement group this task belongs to,
or None if it doesn't belong to any group.
* **placement_group_bundle_index**: the index of the bundle
if the task belongs to a placement group.
This can be done as follows:
@@ -1918,6 +1925,8 @@ def remote(*args, **kwargs):
"max_restarts",
"max_task_retries",
"max_retries",
"placement_group_id",
"placement_group_bundle_index",
], error_string
num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None