diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 3e7864f63..280a642c6 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -34,7 +34,8 @@ ObjectID LocalModeTaskSubmitter::Submit(const InvocationSpec &invocation, TaskTy local_mode_ray_tuntime_.GetCurrentJobID(), local_mode_ray_tuntime_.GetCurrentTaskId(), 0, local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1, - required_resources, required_placement_resources); + required_resources, required_placement_resources, + PlacementGroupID::Nil()); if (type == TaskType::NORMAL_TASK) { } else if (type == TaskType::ACTOR_CREATION_TASK) { builder.SetActorCreationTaskSpec(invocation.actor_id); diff --git a/python/ray/state.py b/python/ray/state.py index 1d6c44104..b2b6e5792 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -377,7 +377,6 @@ class GlobalState: return dict(result) - # SANG-TODO Add functions. def placement_group_table(self, placement_group_id=None): self._check_connected() diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 1b6280556..1bb963c44 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -246,33 +246,46 @@ def test_remove_placement_group(ray_start_cluster): # # Now let's create a placement group. pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}]) - # # This is a hack to wait for placement group creation. - # # TODO(sang): Remove it when wait is implemented. - @ray.remote(num_cpus=0) + # Create an actor that occupies resources. + @ray.remote(num_cpus=2) class A: def f(self): return 3 + # Currently, there's no way to prevent + # tasks to be retried for removed placement group. + # Set max_retrie=0 for testing. + # TODO(sang): Handle this edge case. + @ray.remote(num_cpus=2, max_retries=0) + def long_running_task(): + print(os.getpid()) + import time + time.sleep(50) + + # Schedule a long running task and actor. + task_ref = long_running_task.options(placement_group_id=pid).remote() a = A.options(placement_group_id=pid).remote() assert ray.get(a.f.remote()) == 3 + ray.experimental.remove_placement_group(pid) - # # Subsequent remove request shouldn't do anything + # Subsequent remove request shouldn't do anything. for _ in range(3): ray.experimental.remove_placement_group(pid) - # # Make sure placement group resources are - # # released and we can schedule this task. + # Make sure placement group resources are + # released and we can schedule this task. @ray.remote(num_cpus=4) def f(): return 3 assert ray.get(f.remote()) == 3 - # Since the placement group is removed, # the actor should've been killed. # That means this request should fail. - # TODO(sang): Turn it on. - # ray.get(a.f.remote()) + with pytest.raises(ray.exceptions.RayActorError, match="actor died"): + ray.get(a.f.remote(), timeout=3.0) + with pytest.raises(ray.exceptions.RayWorkerError): + ray.get(task_ref) def test_remove_pending_placement_group(ray_start_cluster): diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index e7de6cb3a..ee8785713 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -43,6 +43,10 @@ SchedulingClass TaskSpecification::GetSchedulingClass(const ResourceSet &sched_c return sched_cls_id; } +const PlacementGroupID TaskSpecification::PlacementGroupId() const { + return PlacementGroupID::FromBinary(message_->placement_group_id()); +} + void TaskSpecification::ComputeResources() { auto required_resources = MapFromProtobuf(message_->required_resources()); auto required_placement_resources = diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 8e963f433..e300c7624 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -189,6 +189,9 @@ class TaskSpecification : public MessageWrapper { // Compute a static key that represents the given resource shape. static SchedulingClass GetSchedulingClass(const ResourceSet &sched_cls); + // Placement Group ID that this task or actor creation is associated with. + const PlacementGroupID PlacementGroupId() const; + private: void ComputeResources(); diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index c562d6d25..c0670fd2b 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -85,7 +85,8 @@ class TaskSpecBuilder { const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id, const rpc::Address &caller_address, uint64_t num_returns, const std::unordered_map &required_resources, - const std::unordered_map &required_placement_resources) { + const std::unordered_map &required_placement_resources, + const PlacementGroupID &placement_group_id) { message_->set_type(TaskType::NORMAL_TASK); message_->set_language(language); *message_->mutable_function_descriptor() = function_descriptor->GetMessage(); @@ -100,6 +101,7 @@ class TaskSpecBuilder { required_resources.end()); message_->mutable_required_placement_resources()->insert( required_placement_resources.begin(), required_placement_resources.end()); + message_->set_placement_group_id(placement_group_id.Binary()); return *this; } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2f8dd59a5..48282e486 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -37,12 +37,12 @@ void BuildCommonTaskSpec( const std::vector> &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, - std::vector *return_ids) { + std::vector *return_ids, const ray::PlacementGroupID &placement_group_id) { // Build common task spec. - builder.SetCommonTaskSpec(task_id, function.GetLanguage(), - function.GetFunctionDescriptor(), job_id, current_task_id, - task_index, caller_id, address, num_returns, - required_resources, required_placement_resources); + builder.SetCommonTaskSpec( + task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, + current_task_id, task_index, caller_id, address, num_returns, required_resources, + required_placement_resources, placement_group_id); // Set task arguments. for (const auto &arg : args) { builder.AddArg(*arg); @@ -1222,7 +1222,8 @@ void CoreWorker::SubmitTask(const RayFunction &function, BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, task_options.num_returns, - constrained_resources, required_resources, return_ids); + constrained_resources, required_resources, return_ids, + placement_options.first); TaskSpecification task_spec = builder.Build(); if (options_.is_local_mode) { ExecuteTaskLocalMode(task_spec); @@ -1262,7 +1263,8 @@ Status CoreWorker::CreateActor(const RayFunction &function, BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, new_resource, - new_placement_resources, &return_ids); + new_placement_resources, &return_ids, + actor_creation_options.placement_options.first); builder.SetActorCreationTaskSpec( actor_id, actor_creation_options.max_restarts, actor_creation_options.dynamic_worker_options, @@ -1363,7 +1365,7 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, num_returns, task_options.resources, - required_resources, return_ids); + required_resources, return_ids, PlacementGroupID::Nil()); const ObjectID new_cursor = return_ids->back(); actor_handle->SetActorTaskSpec(builder, new_cursor); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index af87dd5ef..f28c5c486 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -546,7 +546,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { TaskSpecBuilder builder; builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(), function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0, - RandomTaskId(), address, num_returns, resources, resources); + RandomTaskId(), address, num_returns, resources, resources, + PlacementGroupID::Nil()); // Set task arguments. for (const auto &arg : args) { builder.AddArg(*arg); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 3f36170ad..6d2ad0444 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -326,7 +326,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r rpc::Address empty_address; builder.SetCommonTaskSpec(TaskID::Nil(), Language::PYTHON, function_descriptor, JobID::Nil(), TaskID::Nil(), 0, TaskID::Nil(), empty_address, - 1, resources, resources); + 1, resources, resources, PlacementGroupID::Nil()); return builder.Build(); } diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 467321b0e..eaa97a1ca 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -40,7 +40,7 @@ struct Mocker { auto resource = std::unordered_map(); builder.SetCommonTaskSpec(task_id, Language::PYTHON, empty_descriptor, job_id, TaskID::Nil(), 0, TaskID::Nil(), owner_address, 1, resource, - resource); + resource, PlacementGroupID::Nil()); builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name); return builder.Build(); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index b776ec5ef..6c449275e 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -137,6 +137,8 @@ message TaskSpec { ActorTaskSpec actor_task_spec = 15; // Number of times this task may be retried on worker failure. int32 max_retries = 16; + // Placement group that is associated with this task. + bytes placement_group_id = 17; } message Bundle { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2001a310b..55917b457 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1843,11 +1843,33 @@ void NodeManager::HandleCancelResourceReserve( rpc::CancelResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(!new_scheduler_enabled_) << "Not implemented"; auto bundle_spec = BundleSpecification(request.bundle_spec()); - RAY_LOG(DEBUG) << "bundle return resource request " << bundle_spec.BundleId().first - << bundle_spec.BundleId().second; + RAY_LOG(INFO) << "bundle return resource request " << bundle_spec.BundleId().first + << bundle_spec.BundleId().second; auto resource_set = bundle_spec.GetRequiredResources(); - // TODO(ekl) doesn't this not return in-use resources? We need to be able to - // reclaim those somehow (i.e., destroy the workers allocated in the bundle). + + // Kill all workers that are currently associated with the placement group. + std::vector> workers_associated_with_pg; + for (const auto &worker_it : leased_workers_) { + auto &worker = worker_it.second; + if (worker->GetPlacementGroupId() == bundle_spec.PlacementGroupId()) { + workers_associated_with_pg.push_back(worker); + } + } + for (const auto &worker : workers_associated_with_pg) { + RAY_LOG(DEBUG) + << "Destroying worker since its placement group was removed. Placement group id: " + << worker->GetPlacementGroupId() + << ", bundle index: " << bundle_spec.BundleId().second + << ", task id: " << worker->GetAssignedTaskId() + << ", actor id: " << worker->GetActorId() + << ", worker id: " << worker->WorkerId(); + // We should disconnect the client first. Otherwise, we'll remove bundle resources + // before actual resources are returned. Subsequent disconnect request that comes + // due to worker dead will be ignored. + ProcessDisconnectClientMessage(worker->Connection(), /* intentional exit */ true); + worker->MarkDead(); + KillWorker(worker); + } for (auto resource : resource_set.GetResourceMap()) { local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(), bundle_spec.Index(), resource.first); @@ -2507,6 +2529,7 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, if (task.GetTaskSpecification().IsDetachedActor()) { worker->MarkDetachedActor(); } + worker->SetPlacementGroupId(spec.PlacementGroupId()); const auto owner_worker_id = WorkerID::FromBinary(spec.CallerAddress().worker_id()); const auto owner_node_id = ClientID::FromBinary(spec.CallerAddress().raylet_id()); diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 27dbd5b24..031cfa335 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -207,6 +207,15 @@ class MockWorker : public WorkerInterface { RAY_CHECK(false) << "Method unused"; } + const PlacementGroupID &GetPlacementGroupId() const { + RAY_CHECK(false) << "Method unused"; + return PlacementGroupID::Nil(); + } + + void SetPlacementGroupId(const PlacementGroupID &placement_group_id) { + RAY_CHECK(false) << "Method unused"; + } + std::vector &GetBorrowedCPUInstances() { RAY_CHECK(false) << "Method unused"; auto *t = new std::vector(); @@ -258,9 +267,10 @@ Task CreateTask(const std::unordered_map &required_resource TaskID id = RandomTaskId(); JobID job_id = RandomJobId(); rpc::Address address; - spec_builder.SetCommonTaskSpec( - id, Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), - job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, required_resources, {}); + spec_builder.SetCommonTaskSpec(id, Language::PYTHON, + FunctionDescriptorBuilder::BuildPython("", "", "", ""), + job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, + required_resources, {}, PlacementGroupID::Nil()); for (int i = 0; i < num_args; i++) { ObjectID put_id = ObjectID::ForPut(TaskID::Nil(), /*index=*/i + 1); diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index a4e9fa1fa..213c96732 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -112,7 +112,7 @@ static inline Task ExampleTask(const std::vector &arguments, builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address, - num_returns, {}, {}); + num_returns, {}, {}, PlacementGroupID::Nil()); builder.SetActorCreationTaskSpec(ActorID::Nil(), 1, {}, 1, false, "", false); for (const auto &arg : arguments) { builder.AddArg(TaskArgByReference(arg, rpc::Address())); diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index d062f49ff..4ddc252bb 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -37,6 +37,7 @@ Worker::Worker(const WorkerID &worker_id, const Language &language, assigned_port_(-1), port_(-1), connection_(connection), + placement_group_id_(PlacementGroupID::Nil()), dead_(false), blocked_(false), client_call_manager_(client_call_manager), @@ -190,6 +191,14 @@ void Worker::DirectActorCallArgWaitComplete(int64_t tag) { }); } +const PlacementGroupID &Worker::GetPlacementGroupId() const { + return placement_group_id_; +} + +void Worker::SetPlacementGroupId(const PlacementGroupID &placement_group_id) { + placement_group_id_ = placement_group_id; +} + } // namespace raylet } // end namespace ray diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 61d811d4f..856991e86 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -82,6 +82,9 @@ class WorkerInterface { virtual void DirectActorCallArgWaitComplete(int64_t tag) = 0; + virtual const PlacementGroupID &GetPlacementGroupId() const = 0; + virtual void SetPlacementGroupId(const PlacementGroupID &placement_group_id) = 0; + // Setter, geter, and clear methods for allocated_instances_. virtual void SetAllocatedInstances( std::shared_ptr &allocated_instances) = 0; @@ -168,6 +171,9 @@ class Worker : public WorkerInterface { void DirectActorCallArgWaitComplete(int64_t tag); + const PlacementGroupID &GetPlacementGroupId() const; + void SetPlacementGroupId(const PlacementGroupID &placement_group_id); + // Setter, geter, and clear methods for allocated_instances_. void SetAllocatedInstances( std::shared_ptr &allocated_instances) { @@ -236,6 +242,9 @@ class Worker : public WorkerInterface { JobID assigned_job_id_; /// The worker's actor ID. If this is nil, then the worker is not an actor. ActorID actor_id_; + /// The worker's placement group ID. It is used to detect if the worker is + /// associated with a placement group. + PlacementGroupID placement_group_id_; /// Whether the worker is dead. bool dead_; /// Whether the worker is blocked. Workers become blocked in a `ray.get`, if