diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3b106f5f2..501259373 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -878,13 +878,17 @@ cdef class CoreWorker: args, int num_return_vals, resources, - int max_retries): + int max_retries, + PlacementGroupID placement_group_id, + int64_t placement_group_bundle_index): cdef: unordered_map[c_string, double] c_resources CTaskOptions task_options CRayFunction ray_function c_vector[unique_ptr[CTaskArg]] args_vector c_vector[CObjectID] return_ids + CPlacementGroupID c_placement_group_id = \ + placement_group_id.native() with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) @@ -897,7 +901,8 @@ cdef class CoreWorker: with nogil: CCoreWorkerProcess.GetCoreWorker().SubmitTask( ray_function, args_vector, task_options, &return_ids, - max_retries) + max_retries, c_pair[CPlacementGroupID, int64_t]( + c_placement_group_id, placement_group_bundle_index)) return VectorToObjectRefs(return_ids) @@ -941,7 +946,9 @@ cdef class CoreWorker: max_restarts, max_task_retries, max_concurrency, c_resources, c_placement_resources, dynamic_worker_options, is_detached, name, is_asyncio, - c_pair[CPlacementGroupID, int64_t](c_placement_group_id, placement_group_bundle_index)), + c_pair[CPlacementGroupID, int64_t]( + c_placement_group_id, + placement_group_bundle_index)), extension_data, &c_actor_id)) @@ -955,7 +962,7 @@ cdef class CoreWorker: cdef: CPlacementGroupID c_placement_group_id CPlacementStrategy c_strategy - + if strategy == b"PACK": c_strategy = PLACEMENT_STRATEGY_PACK else: diff --git a/python/ray/actor.py b/python/ray/actor.py index 095bb2fa1..7c59af12e 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -398,22 +398,24 @@ class ActorClass: return ActorOptionWrapper() - def _remote(self, - args=None, - kwargs=None, - num_cpus=None, - num_gpus=None, - memory=None, - object_store_memory=None, - resources=None, - is_direct_call=None, - max_concurrency=None, - max_restarts=None, - max_task_retries=None, - name=None, - detached=False, - placement_group_id=None, - placement_group_bundle_index=None): + def _remote( + self, + args=None, + kwargs=None, + num_cpus=None, + num_gpus=None, + memory=None, + object_store_memory=None, + resources=None, + is_direct_call=None, + max_concurrency=None, + max_restarts=None, + max_task_retries=None, + name=None, + detached=False, + placement_group_id=None, + # TODO(ekl) set default to -1 once we support -1 as "any index" + placement_group_bundle_index=0): """Create an actor. This method allows more flexibility than the remote method because @@ -503,11 +505,6 @@ class ActorClass: else: detached = False - if placement_group_id is not None and placement_group_bundle_index is \ - None: - raise ValueError("The placement_group_id is set." - "But the bundle_index is not set.") - # Set the actor's default resources if not already set. First three # conditions are to check that no resources were specified in the # decorator. Last three conditions are to check that no resources were @@ -580,8 +577,7 @@ class ActorClass: is_asyncio, placement_group_id if placement_group_id is not None else ray.PlacementGroupID.nil(), - placement_group_bundle_index - if placement_group_bundle_index is not None else -1, + placement_group_bundle_index, # Store actor_method_cpu in actor handle's extension data. extension_data=str(actor_method_cpu)) diff --git a/python/ray/experimental/placement_group.py b/python/ray/experimental/placement_group.py index 105a33672..a99be4690 100644 --- a/python/ray/experimental/placement_group.py +++ b/python/ray/experimental/placement_group.py @@ -4,7 +4,7 @@ from typing import (List, Dict) def placement_group(bundles: List[Dict[str, float]], strategy: str = "PACK", - name: str = None): + name: str = "unnamed_group"): """ Create a placement group. @@ -21,6 +21,13 @@ def placement_group(bundles: List[Dict[str, float]], name: The name of the placement group. """ worker = ray.worker.global_worker + worker.check_connected() + + if not isinstance(bundles, list): + raise ValueError( + "The type of bundles must be list, got {}".format(bundles)) + placement_group_id = worker.core_worker.create_placement_group( name, bundles, strategy) + return placement_group_id diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 3a419f6b2..03e918f25 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -5,6 +5,7 @@ from libc.stdint cimport int64_t from libcpp cimport bool as c_bool from libcpp.memory cimport shared_ptr, unique_ptr +from libcpp.pair cimport pair as c_pair from libcpp.string cimport string as c_string from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport pair @@ -87,14 +88,15 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids, - int max_retries) + int max_retries, + c_pair[CPlacementGroupID, int64_t] placement_options) CRayStatus CreateActor( const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, const CActorCreationOptions &options, const c_string &extension_data, CActorID *actor_id) CRayStatus CreatePlacementGroup( - const CPlacementGroupCreationOptions &options, + const CPlacementGroupCreationOptions &options, CPlacementGroupID *placement_group_id) void SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 6ec9b7e51..85a02a446 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -60,7 +60,8 @@ class RemoteFunction: def __init__(self, language, function, function_descriptor, num_cpus, num_gpus, memory, object_store_memory, resources, - num_return_vals, max_calls, max_retries): + num_return_vals, max_calls, max_retries, placement_group_id, + placement_group_bundle_index): self._language = language self._function = function self._function_name = ( @@ -138,17 +139,21 @@ class RemoteFunction: return FuncWrapper() - def _remote(self, - args=None, - kwargs=None, - num_return_vals=None, - is_direct_call=None, - num_cpus=None, - num_gpus=None, - memory=None, - object_store_memory=None, - resources=None, - max_retries=None): + def _remote( + self, + args=None, + kwargs=None, + num_return_vals=None, + is_direct_call=None, + num_cpus=None, + num_gpus=None, + memory=None, + object_store_memory=None, + resources=None, + max_retries=None, + placement_group_id=None, + # TODO(ekl) set default to -1 once we support -1 as "any index" + placement_group_bundle_index=0): """Submit the remote function for execution.""" worker = ray.worker.global_worker worker.check_connected() @@ -184,6 +189,8 @@ class RemoteFunction: raise ValueError("Non-direct call tasks are no longer supported.") if max_retries is None: max_retries = self._max_retries + if placement_group_id is None: + placement_group_id = ray.PlacementGroupID.nil() resources = ray.utils.resources_from_resource_arguments( self._num_cpus, self._num_gpus, self._memory, @@ -205,7 +212,8 @@ class RemoteFunction: "cannot be executed locally." object_refs = worker.core_worker.submit_task( self._language, self._function_descriptor, list_args, - num_return_vals, resources, max_retries) + num_return_vals, resources, max_retries, placement_group_id, + placement_group_bundle_index) if len(object_refs) == 1: return object_refs[0] diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 1e8de3199..b2b178553 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -4,18 +4,12 @@ try: except ImportError: pytest_timeout = None import sys -import os import ray import ray.test_utils import ray.cluster_utils -@pytest.mark.skipif( - os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", - reason=("This edge case is not handled when GCS actor management is off. " - "We won't fix this because GCS actor management " - "will be on by default anyway.")) def test_placement_group_pack(ray_start_cluster): @ray.remote(num_cpus=2) class Actor(object): @@ -61,29 +55,6 @@ def test_placement_group_pack(ray_start_cluster): assert node_of_actor_1 == node_of_actor_2 -@pytest.mark.skipif( - os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", - reason=("This edge case is not handled when GCS actor management is off. " - "We won't fix this because GCS actor management " - "will be on by default anyway.")) -def test_placement_group_pack_best_effort(ray_start_cluster): - @ray.remote(num_cpus=2) - class Actor(object): - def __init__(self): - self.n = 0 - - def value(self): - return self.n - - # TODO(Shanly): - pass - - -@pytest.mark.skipif( - os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", - reason=("This edge case is not handled when GCS actor management is off. " - "We won't fix this because GCS actor management " - "will be on by default anyway.")) def test_placement_group_spread(ray_start_cluster): @ray.remote(num_cpus=2) class Actor(object): @@ -106,9 +77,11 @@ def test_placement_group_spread(ray_start_cluster): "CPU": 2 }]) actor_1 = Actor.options( - placement_group_id=placement_group_id, bundle_index=0).remote() + placement_group_id=placement_group_id, + placement_group_bundle_index=0).remote() actor_2 = Actor.options( - placement_group_id=placement_group_id, bundle_index=1).remote() + placement_group_id=placement_group_id, + placement_group_bundle_index=1).remote() print(ray.get(actor_1.value.remote())) print(ray.get(actor_2.value.remote())) @@ -127,22 +100,65 @@ def test_placement_group_spread(ray_start_cluster): assert node_of_actor_1 != node_of_actor_2 -@pytest.mark.skipif( - os.environ.get("RAY_GCS_ACTOR_SERVICE_ENABLED") != "true", - reason=("This edge case is not handled when GCS actor management is off. " - "We won't fix this because GCS actor management " - "will be on by default anyway.")) -def test_placement_group_spread_best_effort(ray_start_cluster): - @ray.remote(num_cpus=2) - class Actor(object): - def __init__(self): - self.n = 0 +def test_placement_group_actor_resource_ids(ray_start_cluster): + @ray.remote(num_cpus=1) + class F: + def f(self): + return ray.get_resource_ids() - def value(self): - return self.n + cluster = ray_start_cluster + num_nodes = 1 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) - # TODO(Shanly): - pass + g1 = ray.experimental.placement_group([{"CPU": 2}]) + a1 = F.options(placement_group_id=g1).remote() + resources = ray.get(a1.f.remote()) + assert len(resources) == 1, resources + assert "CPU_group_" in list(resources.keys())[0], resources + + +def test_placement_group_task_resource_ids(ray_start_cluster): + @ray.remote(num_cpus=1) + def f(): + return ray.get_resource_ids() + + cluster = ray_start_cluster + num_nodes = 1 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + g1 = ray.experimental.placement_group([{"CPU": 2}]) + o1 = f.options(placement_group_id=g1).remote() + resources = ray.get(o1) + assert len(resources) == 1, resources + assert "CPU_group_" in list(resources.keys())[0], resources + + +def test_placement_group_hang(ray_start_cluster): + @ray.remote(num_cpus=1) + def f(): + return ray.get_resource_ids() + + cluster = ray_start_cluster + num_nodes = 1 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + # Warm workers up, so that this triggers the hang rice. + ray.get(f.remote()) + + g1 = ray.experimental.placement_group([{"CPU": 2}]) + # This will start out infeasible. The placement group will then be created + # and it transitions to feasible. + o1 = f.options(placement_group_id=g1).remote() + + resources = ray.get(o1) + assert len(resources) == 1, resources + assert "CPU_group_" in list(resources.keys())[0], resources if __name__ == "__main__": diff --git a/python/ray/worker.py b/python/ray/worker.py index 7c1256855..d2151fcbc 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1768,7 +1768,9 @@ def make_decorator(num_return_vals=None, max_retries=None, max_restarts=None, max_task_retries=None, - worker=None): + worker=None, + placement_group_id=None, + placement_group_bundle_index=0): def decorator(function_or_class): if (inspect.isfunction(function_or_class) or is_cython(function_or_class)): @@ -1783,7 +1785,8 @@ def make_decorator(num_return_vals=None, return ray.remote_function.RemoteFunction( Language.PYTHON, function_or_class, None, num_cpus, num_gpus, memory, object_store_memory, resources, num_return_vals, - max_calls, max_retries) + max_calls, max_retries, placement_group_id, + placement_group_bundle_index) if inspect.isclass(function_or_class): if num_return_vals is not None: @@ -1854,6 +1857,10 @@ def remote(*args, **kwargs): number of times that the remote function should be rerun when the worker process executing it crashes unexpectedly. The minimum valid value is 0, the default is 4 (default), and a value of -1 indicates infinite retries. + * **placement_group_id**: the placement group this task belongs to, + or None if it doesn't belong to any group. + * **placement_group_bundle_index**: the index of the bundle + if the task belongs to a placement group. This can be done as follows: @@ -1918,6 +1925,8 @@ def remote(*args, **kwargs): "max_restarts", "max_task_retries", "max_retries", + "placement_group_id", + "placement_group_bundle_index", ], error_string num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d3201cdb1..42c174bd9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1172,34 +1172,6 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca return local_raylet_client_->SetResource(resource_name, capacity, client_id); } -void CoreWorker::SubmitTask(const RayFunction &function, - const std::vector> &args, - const TaskOptions &task_options, - std::vector *return_ids, int max_retries) { - TaskSpecBuilder builder; - const int next_task_index = worker_context_.GetNextTaskIndex(); - const auto task_id = - TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), - worker_context_.GetCurrentTaskID(), next_task_index); - - const std::unordered_map required_resources; - // TODO(ekl) offload task building onto a thread pool for performance - BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, - worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), - rpc_address_, function, args, task_options.num_returns, - task_options.resources, required_resources, return_ids); - TaskSpecification task_spec = builder.Build(); - if (options_.is_local_mode) { - ExecuteTaskLocalMode(task_spec); - } else { - task_manager_->AddPendingTask(task_spec.CallerAddress(), task_spec, CurrentCallSite(), - max_retries); - io_service_.post([this, task_spec]() { - RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); - }); - } -} - std::unordered_map AddPlacementGroupConstraint( const std::unordered_map &resources, PlacementGroupID placement_group_id, int64_t bundle_index) { @@ -1215,6 +1187,37 @@ std::unordered_map AddPlacementGroupConstraint( return resources; } +void CoreWorker::SubmitTask(const RayFunction &function, + const std::vector> &args, + const TaskOptions &task_options, + std::vector *return_ids, int max_retries, + PlacementOptions placement_options) { + TaskSpecBuilder builder; + const int next_task_index = worker_context_.GetNextTaskIndex(); + const auto task_id = + TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), + worker_context_.GetCurrentTaskID(), next_task_index); + + auto constrained_resources = AddPlacementGroupConstraint( + task_options.resources, placement_options.first, placement_options.second); + const std::unordered_map required_resources; + // TODO(ekl) offload task building onto a thread pool for performance + BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), + rpc_address_, function, args, task_options.num_returns, + constrained_resources, required_resources, return_ids); + TaskSpecification task_spec = builder.Build(); + if (options_.is_local_mode) { + ExecuteTaskLocalMode(task_spec); + } else { + task_manager_->AddPendingTask(task_spec.CallerAddress(), task_spec, CurrentCallSite(), + max_retries); + io_service_.post([this, task_spec]() { + RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); + }); + } +} + Status CoreWorker::CreateActor(const RayFunction &function, const std::vector> &args, const ActorCreationOptions &actor_creation_options, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 24e09463c..ad9d5fed0 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -584,7 +584,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void SubmitTask(const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, std::vector *return_ids, - int max_retries); + int max_retries, PlacementOptions placement_options); /// Create an actor. /// diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index c699120d7..4f3b0a7d7 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -264,7 +264,8 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res "MergeInputArgsAsOutput", "", "", "")); TaskOptions options; std::vector return_ids; - driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0); + driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0, + std::make_pair(PlacementGroupID::Nil(), -1)); ASSERT_EQ(return_ids.size(), 1); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 98a19f042..167d992c7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1800,6 +1800,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest void NodeManager::HandleRequestResourceReserve( const rpc::RequestResourceReserveRequest &request, rpc::RequestResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(!new_scheduler_enabled_) << "Not implemented"; auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "bundle lease request " << bundle_spec.BundleId().first << bundle_spec.BundleId().second; @@ -1811,11 +1812,15 @@ void NodeManager::HandleRequestResourceReserve( reply->set_success(true); send_reply_callback(Status::OK(), nullptr, nullptr); } + // Call task dispatch to assign work to the new group. + TryLocalInfeasibleTaskScheduling(); + DispatchTasks(local_queues_.GetReadyTasksByClass()); } void NodeManager::HandleCancelResourceReserve( const rpc::CancelResourceReserveRequest &request, rpc::CancelResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) { + RAY_CHECK(!new_scheduler_enabled_) << "Not implemented"; auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "bundle return resource request " << bundle_spec.BundleId().first << bundle_spec.BundleId().second; @@ -1827,6 +1832,9 @@ void NodeManager::HandleCancelResourceReserve( cluster_resource_map_[self_node_id_].ReturnBundleResource( bundle_spec.PlacementGroupId(), bundle_spec.Index()); send_reply_callback(Status::OK(), nullptr, nullptr); + // Call task dispatch to assign work to the released resources. + TryLocalInfeasibleTaskScheduling(); + DispatchTasks(local_queues_.GetReadyTasksByClass()); } void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,