From 084f03797b7a88e4ba491b329eb6e1634905fb72 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Mon, 23 Nov 2020 16:57:58 +0800 Subject: [PATCH] [Placement Group]Placement Group supports gcs failover(Part3) (#12036) --- .../runtime/task/local_mode_task_submitter.cc | 2 +- src/ray/common/bundle_spec.cc | 2 +- src/ray/common/bundle_spec.h | 2 +- src/ray/common/id.h | 2 + src/ray/common/task/task_spec.cc | 5 ++- src/ray/common/task/task_spec.h | 4 +- src/ray/common/task/task_util.h | 6 +-- src/ray/core_worker/common.h | 5 +-- src/ray/core_worker/context.cc | 4 +- src/ray/core_worker/core_worker.cc | 15 +++---- src/ray/core_worker/core_worker.h | 2 +- src/ray/core_worker/test/core_worker_test.cc | 2 +- .../test/direct_task_transport_test.cc | 2 +- .../gcs_server/gcs_placement_group_manager.cc | 4 +- .../gcs_placement_group_scheduler.h | 1 - src/ray/gcs/test/gcs_test_util.h | 2 +- src/ray/protobuf/common.proto | 6 ++- src/ray/raylet/node_manager.cc | 39 +++++++++++++------ src/ray/raylet/node_manager.h | 9 ++++- .../scheduling/cluster_task_manager_test.cc | 3 +- .../raylet/task_dependency_manager_test.cc | 3 +- src/ray/raylet/test/util.h | 9 ++--- src/ray/raylet/worker.cc | 10 ++--- src/ray/raylet/worker.h | 14 +++---- 24 files changed, 89 insertions(+), 64 deletions(-) diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 61acd9298..3239c44a9 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -44,7 +44,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation) { local_mode_ray_tuntime_.GetCurrentTaskId(), 0, local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1, required_resources, required_placement_resources, - PlacementGroupID::Nil(), true); + std::make_pair(PlacementGroupID::Nil(), -1), true); if (invocation.task_type == TaskType::NORMAL_TASK) { } else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) { invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID(); diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index cb7f09061..d9dd610ec 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -31,7 +31,7 @@ const ResourceSet &BundleSpecification::GetRequiredResources() const { return *unit_resource_; } -std::pair BundleSpecification::BundleId() const { +BundleID BundleSpecification::BundleId() const { if (message_->bundle_id() .placement_group_id() .empty() /* e.g., empty proto default */) { diff --git a/src/ray/common/bundle_spec.h b/src/ray/common/bundle_spec.h index 7e0bd725c..cbdfa3049 100644 --- a/src/ray/common/bundle_spec.h +++ b/src/ray/common/bundle_spec.h @@ -49,7 +49,7 @@ class BundleSpecification : public MessageWrapper { ComputeResources(); } // Return the bundle_id - std::pair BundleId() const; + BundleID BundleId() const; // Return the Placement Group id which the Bundle belong to. PlacementGroupID PlacementGroupId() const; diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 2eb6cd679..d12ba550d 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -325,6 +325,8 @@ class PlacementGroupID : public BaseID { uint8_t id_[kLength]; }; +typedef std::pair BundleID; + static_assert(sizeof(JobID) == JobID::kLength + sizeof(size_t), "JobID size is not as expected"); static_assert(sizeof(ActorID) == ActorID::kLength + sizeof(size_t), diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index c69ebde86..875b61276 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -43,8 +43,9 @@ SchedulingClass TaskSpecification::GetSchedulingClass(const ResourceSet &sched_c return sched_cls_id; } -const PlacementGroupID TaskSpecification::PlacementGroupId() const { - return PlacementGroupID::FromBinary(message_->placement_group_id()); +const BundleID TaskSpecification::PlacementGroupBundleId() const { + return std::make_pair(PlacementGroupID::FromBinary(message_->placement_group_id()), + message_->placement_group_bundle_index()); } bool TaskSpecification::PlacementGroupCaptureChildTasks() const { diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 4399bf9c2..2dec30283 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -194,8 +194,8 @@ class TaskSpecification : public MessageWrapper { // Compute a static key that represents the given resource shape. static SchedulingClass GetSchedulingClass(const ResourceSet &sched_cls); - // Placement Group ID that this task or actor creation is associated with. - const PlacementGroupID PlacementGroupId() const; + // Placement Group bundle that this task or actor creation is associated with. + const BundleID PlacementGroupBundleId() const; // Whether or not we should capture parent's placement group implicitly. bool PlacementGroupCaptureChildTasks() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 6e06c5f1c..825ae659e 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -86,8 +86,7 @@ class TaskSpecBuilder { const rpc::Address &caller_address, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, - const PlacementGroupID &placement_group_id, - bool placement_group_capture_child_tasks, + const BundleID &bundle_id, bool placement_group_capture_child_tasks, const std::unordered_map &override_environment_variables = {}) { message_->set_type(TaskType::NORMAL_TASK); @@ -105,7 +104,8 @@ class TaskSpecBuilder { required_resources.end()); message_->mutable_required_placement_resources()->insert( required_placement_resources.begin(), required_placement_resources.end()); - message_->set_placement_group_id(placement_group_id.Binary()); + message_->set_placement_group_id(bundle_id.first.Binary()); + message_->set_placement_group_bundle_index(bundle_id.second); message_->set_placement_group_capture_child_tasks( placement_group_capture_child_tasks); for (const auto &env : override_environment_variables) { diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index a78f39722..1716fe606 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -25,7 +25,6 @@ namespace ray { using WorkerType = rpc::WorkerType; -using PlacementOptions = std::pair; // Return a string representation of the worker type. std::string WorkerTypeString(WorkerType type); @@ -84,7 +83,7 @@ struct ActorCreationOptions { const std::unordered_map &placement_resources, const std::vector &dynamic_worker_options, bool is_detached, std::string &name, bool is_asyncio, - PlacementOptions placement_options = std::make_pair(PlacementGroupID::Nil(), -1), + BundleID placement_options = std::make_pair(PlacementGroupID::Nil(), -1), bool placement_group_capture_child_tasks = true, const std::unordered_map &override_environment_variables = {}) @@ -130,7 +129,7 @@ struct ActorCreationOptions { /// The placement_options include placement_group_id and bundle_index. /// If the actor doesn't belong to a placement group, the placement_group_id will be /// nil, and the bundle_index will be -1. - PlacementOptions placement_options; + BundleID placement_options; /// When true, the child task will always scheduled on the same placement group /// specified in the PlacementOptions. bool placement_group_capture_child_tasks = true; diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index b62ce116e..b2a59ca41 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -68,7 +68,7 @@ struct WorkerThreadContext { RAY_CHECK(task_index_ == 0); RAY_CHECK(put_counter_ == 0); SetCurrentTaskId(task_spec.TaskId()); - SetCurrentPlacementGroupId(task_spec.PlacementGroupId()); + SetCurrentPlacementGroupId(task_spec.PlacementGroupBundleId().first); SetPlacementGroupCaptureChildTasks(task_spec.PlacementGroupCaptureChildTasks()); current_task_ = std::make_shared(task_spec); } @@ -192,7 +192,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); current_actor_is_asyncio_ = task_spec.IsAsyncioActor(); is_detached_actor_ = task_spec.IsDetachedActor(); - current_actor_placement_group_id_ = task_spec.PlacementGroupId(); + current_actor_placement_group_id_ = task_spec.PlacementGroupBundleId().first; placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks(); override_environment_variables_ = task_spec.OverrideEnvironmentVariables(); } else if (task_spec.IsActorTask()) { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index a47d5c7c0..39aa6a147 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -38,15 +38,15 @@ void BuildCommonTaskSpec( const std::vector> &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, - std::vector *return_ids, const ray::PlacementGroupID &placement_group_id, + std::vector *return_ids, const ray::BundleID &bundle_id, bool placement_group_capture_child_tasks, const std::unordered_map &override_environment_variables) { // Build common task spec. builder.SetCommonTaskSpec( task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, current_task_id, task_index, caller_id, address, num_returns, required_resources, - required_placement_resources, placement_group_id, - placement_group_capture_child_tasks, override_environment_variables); + required_placement_resources, bundle_id, placement_group_capture_child_tasks, + override_environment_variables); // Set task arguments. for (const auto &arg : args) { builder.AddArg(*arg); @@ -1304,7 +1304,7 @@ void CoreWorker::SubmitTask(const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, std::vector *return_ids, int max_retries, - PlacementOptions placement_options, + BundleID placement_options, bool placement_group_capture_child_tasks) { TaskSpecBuilder builder; const int next_task_index = worker_context_.GetNextTaskIndex(); @@ -1330,7 +1330,7 @@ void CoreWorker::SubmitTask(const RayFunction &function, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, task_options.num_returns, constrained_resources, required_resources, return_ids, - placement_options.first, placement_group_capture_child_tasks, + placement_options, placement_group_capture_child_tasks, override_environment_variables); TaskSpecification task_spec = builder.Build(); if (options_.is_local_mode) { @@ -1385,7 +1385,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, new_resource, new_placement_resources, &return_ids, - actor_creation_options.placement_options.first, + actor_creation_options.placement_options, actor_creation_options.placement_group_capture_child_tasks, override_environment_variables); builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_restarts, @@ -1494,7 +1494,8 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, task_name, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, num_returns, task_options.resources, - required_resources, return_ids, PlacementGroupID::Nil(), + required_resources, return_ids, + std::make_pair(PlacementGroupID::Nil(), -1), true, /* placement_group_capture_child_tasks */ override_environment_variables); // NOTE: placement_group_capture_child_tasks and override_environment_variables will be diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 92eb88ddb..7612f1642 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -647,7 +647,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void SubmitTask(const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, std::vector *return_ids, - int max_retries, PlacementOptions placement_options, + int max_retries, BundleID placement_options, bool placement_group_capture_child_tasks); /// Create an actor. diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 48ce35abe..23d16890b 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -533,7 +533,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { builder.SetCommonTaskSpec(RandomTaskId(), options.name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0, RandomTaskId(), address, num_returns, resources, resources, - PlacementGroupID::Nil(), true); + std::make_pair(PlacementGroupID::Nil(), -1), true); // Set task arguments. for (const auto &arg : args) { builder.AddArg(*arg); diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 958aa4fd6..a6056d45a 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -328,7 +328,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r builder.SetCommonTaskSpec(TaskID::Nil(), "dummy_task", Language::PYTHON, function_descriptor, JobID::Nil(), TaskID::Nil(), 0, TaskID::Nil(), empty_address, 1, resources, resources, - PlacementGroupID::Nil(), true); + std::make_pair(PlacementGroupID::Nil(), -1), true); return builder.Build(); } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index cbe40b400..52519bd90 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -489,7 +489,9 @@ void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) { item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { const auto &bundles = item.second.bundles(); for (auto &bundle : bundles) { - node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); + if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { + node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); + } } } } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 7ccabdc12..d64a7ce41 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -30,7 +30,6 @@ namespace gcs { using ReserveResourceClientFactoryFn = std::function(const rpc::Address &address)>; -typedef std::pair BundleID; struct pair_hash { template std::size_t operator()(const std::pair &pair) const { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index d96f6ea1f..40c478c37 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -41,7 +41,7 @@ struct Mocker { builder.SetCommonTaskSpec(task_id, name + ":" + empty_descriptor->CallString(), Language::PYTHON, empty_descriptor, job_id, TaskID::Nil(), 0, TaskID::Nil(), owner_address, 1, resource, resource, - PlacementGroupID::Nil(), true); + std::make_pair(PlacementGroupID::Nil(), -1), true); builder.SetActorCreationTaskSpec(actor_id, max_restarts, {}, 1, detached, name); return builder.Build(); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 905c41c5d..dde765d22 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -190,10 +190,12 @@ message TaskSpec { int32 max_retries = 17; // Placement group that is associated with this task. bytes placement_group_id = 18; + // Placement group bundle that is associated with this task. + int64 placement_group_bundle_index = 19; // Whether or not this task should capture parent's placement group automatically. - bool placement_group_capture_child_tasks = 19; + bool placement_group_capture_child_tasks = 20; // Environment variables to override for this task - map override_environment_variables = 20; + map override_environment_variables = 21; } message Bundle { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 643eec0df..31fcb356c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -341,6 +341,15 @@ void NodeManager::KillWorker(std::shared_ptr worker) { }); } +void NodeManager::DestroyWorker(std::shared_ptr worker) { + // We should disconnect the client first. Otherwise, we'll remove bundle resources + // before actual resources are returned. Subsequent disconnect request that comes + // due to worker dead will be ignored. + ProcessDisconnectClientMessage(worker->Connection(), /* intentional exit */ true); + worker->MarkDead(); + KillWorker(worker); +} + void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_data) { RAY_LOG(DEBUG) << "HandleJobStarted " << job_id; RAY_CHECK(!job_data.is_dead()); @@ -567,9 +576,20 @@ void NodeManager::HandleReleaseUnusedBundles( bundle_id.bundle_index())); } - // TODO(ffbin): Kill all workers that are currently associated with the unused bundles. - // At present, the worker does not have a bundle ID, so we cannot filter out the workers - // used by unused bundles. We will solve it in next pr. + // Kill all workers that are currently associated with the unused bundles. + for (const auto &worker_it : leased_workers_) { + auto &worker = worker_it.second; + if (0 == in_use_bundles.count(worker->GetBundleId())) { + RAY_LOG(DEBUG) + << "Destroying worker since its bundle was unused. Placement group id: " + << worker->GetBundleId().first + << ", bundle index: " << worker->GetBundleId().second + << ", task id: " << worker->GetAssignedTaskId() + << ", actor id: " << worker->GetActorId() + << ", worker id: " << worker->WorkerId(); + DestroyWorker(worker); + } + } // Return unused bundle resources. for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { @@ -1802,24 +1822,19 @@ void NodeManager::HandleCancelResourceReserve( std::vector> workers_associated_with_pg; for (const auto &worker_it : leased_workers_) { auto &worker = worker_it.second; - if (worker->GetPlacementGroupId() == bundle_spec.PlacementGroupId()) { + if (worker->GetBundleId().first == bundle_spec.PlacementGroupId()) { workers_associated_with_pg.push_back(worker); } } for (const auto &worker : workers_associated_with_pg) { RAY_LOG(DEBUG) << "Destroying worker since its placement group was removed. Placement group id: " - << worker->GetPlacementGroupId() + << worker->GetBundleId().first << ", bundle index: " << bundle_spec.BundleId().second << ", task id: " << worker->GetAssignedTaskId() << ", actor id: " << worker->GetActorId() << ", worker id: " << worker->WorkerId(); - // We should disconnect the client first. Otherwise, we'll remove bundle resources - // before actual resources are returned. Subsequent disconnect request that comes - // due to worker dead will be ignored. - ProcessDisconnectClientMessage(worker->Connection(), /* intentional exit */ true); - worker->MarkDead(); - KillWorker(worker); + DestroyWorker(worker); } // Return bundle resources. @@ -2511,7 +2526,7 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, if (task.GetTaskSpecification().IsDetachedActor()) { worker->MarkDetachedActor(); } - worker->SetPlacementGroupId(spec.PlacementGroupId()); + worker->SetBundleId(spec.PlacementGroupBundleId()); const auto owner_worker_id = WorkerID::FromBinary(spec.CallerAddress().worker_id()); const auto owner_node_id = NodeID::FromBinary(spec.CallerAddress().raylet_id()); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 9794d5d99..536892830 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -20,6 +20,7 @@ #include "ray/rpc/grpc_client.h" #include "ray/rpc/node_manager/node_manager_server.h" #include "ray/rpc/node_manager/node_manager_client.h" +#include "ray/common/id.h" #include "ray/common/task/task.h" #include "ray/common/ray_object.h" #include "ray/common/client_connection.h" @@ -103,7 +104,6 @@ struct NodeManagerConfig { uint64_t record_metrics_period_ms; }; -typedef std::pair BundleID; struct pair_hash { template std::size_t operator()(const std::pair &pair) const { @@ -425,6 +425,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void KillWorker(std::shared_ptr worker); + /// Destroy a worker. + /// We will disconnect the worker connection first and then kill the worker. + /// + /// \param worker The worker to destroy. + /// \return Void. + void DestroyWorker(std::shared_ptr worker); + /// The callback for handling an actor state transition (e.g., from ALIVE to /// DEAD), whether as a notification from the actor table or as a handler for /// a local actor's state transition. This method is idempotent and will ignore diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index b46d8ca18..f20515353 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -79,7 +79,8 @@ Task CreateTask(const std::unordered_map &required_resource spec_builder.SetCommonTaskSpec(id, "dummy_task", Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, - required_resources, {}, PlacementGroupID::Nil(), true); + required_resources, {}, + std::make_pair(PlacementGroupID::Nil(), -1), true); for (int i = 0; i < num_args; i++) { ObjectID put_id = ObjectID::FromIndex(TaskID::Nil(), /*index=*/i + 1); diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index 05f162c00..3f53b5a09 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -68,7 +68,8 @@ static inline Task ExampleTask(const std::vector &arguments, builder.SetCommonTaskSpec(RandomTaskId(), "example_task", Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address, - num_returns, {}, {}, PlacementGroupID::Nil(), true); + num_returns, {}, {}, + std::make_pair(PlacementGroupID::Nil(), -1), true); builder.SetActorCreationTaskSpec(ActorID::Nil(), 1, {}, 1, false, "", false); for (const auto &arg : arguments) { builder.AddArg(TaskArgByReference(arg, rpc::Address())); diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 3552d8009..459a31e02 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -168,14 +168,12 @@ class MockWorker : public WorkerInterface { borrowed_cpu_instances_ = cpu_instances; } - const PlacementGroupID &GetPlacementGroupId() const { + const BundleID &GetBundleId() const { RAY_CHECK(false) << "Method unused"; - return PlacementGroupID::Nil(); + return bundle_id_; } - void SetPlacementGroupId(const PlacementGroupID &placement_group_id) { - RAY_CHECK(false) << "Method unused"; - } + void SetBundleId(const BundleID &bundle_id) { RAY_CHECK(false) << "Method unused"; } std::vector &GetBorrowedCPUInstances() { return borrowed_cpu_instances_; } @@ -205,6 +203,7 @@ class MockWorker : public WorkerInterface { std::shared_ptr lifetime_allocated_instances_; std::vector borrowed_cpu_instances_; bool is_detached_actor_; + BundleID bundle_id_; }; } // namespace raylet diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index a3298247b..acce4c984 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -38,7 +38,7 @@ Worker::Worker(const JobID &job_id, const WorkerID &worker_id, const Language &l port_(-1), connection_(connection), assigned_job_id_(job_id), - placement_group_id_(PlacementGroupID::Nil()), + bundle_id_(std::make_pair(PlacementGroupID::Nil(), -1)), dead_(false), blocked_(false), client_call_manager_(client_call_manager), @@ -182,13 +182,9 @@ void Worker::DirectActorCallArgWaitComplete(int64_t tag) { }); } -const PlacementGroupID &Worker::GetPlacementGroupId() const { - return placement_group_id_; -} +const BundleID &Worker::GetBundleId() const { return bundle_id_; } -void Worker::SetPlacementGroupId(const PlacementGroupID &placement_group_id) { - placement_group_id_ = placement_group_id; -} +void Worker::SetBundleId(const BundleID &bundle_id) { bundle_id_ = bundle_id; } } // namespace raylet diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 830ae656d..a40855abe 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -83,8 +83,8 @@ class WorkerInterface { virtual void DirectActorCallArgWaitComplete(int64_t tag) = 0; - virtual const PlacementGroupID &GetPlacementGroupId() const = 0; - virtual void SetPlacementGroupId(const PlacementGroupID &placement_group_id) = 0; + virtual const BundleID &GetBundleId() const = 0; + virtual void SetBundleId(const BundleID &bundle_id) = 0; // Setter, geter, and clear methods for allocated_instances_. virtual void SetAllocatedInstances( @@ -174,8 +174,8 @@ class Worker : public WorkerInterface { void DirectActorCallArgWaitComplete(int64_t tag); - const PlacementGroupID &GetPlacementGroupId() const; - void SetPlacementGroupId(const PlacementGroupID &placement_group_id); + const BundleID &GetBundleId() const; + void SetBundleId(const BundleID &bundle_id); // Setter, geter, and clear methods for allocated_instances_. void SetAllocatedInstances( @@ -245,9 +245,9 @@ class Worker : public WorkerInterface { JobID assigned_job_id_; /// The worker's actor ID. If this is nil, then the worker is not an actor. ActorID actor_id_; - /// The worker's placement group ID. It is used to detect if the worker is - /// associated with a placement group. - PlacementGroupID placement_group_id_; + /// The worker's placement group bundle. It is used to detect if the worker is + /// associated with a placement group bundle. + BundleID bundle_id_; /// Whether the worker is dead. bool dead_; /// Whether the worker is blocked. Workers become blocked in a `ray.get`, if