diff --git a/BUILD.bazel b/BUILD.bazel index b076ce82f..0805a6cb6 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1076,6 +1076,20 @@ cc_test( ], ) +cc_test( + name = "gcs_placement_group_manager_test", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc", + "src/ray/gcs/gcs_server/test/gcs_server_test_util.h", + ], + copts = COPTS, + deps = [ + ":gcs_server_lib", + ":gcs_test_util_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "gcs_placement_group_scheduler_test", srcs = [ diff --git a/src/ray/common/placement_group.cc b/src/ray/common/placement_group.cc index 8925aa7fe..15c1d825a 100644 --- a/src/ray/common/placement_group.cc +++ b/src/ray/common/placement_group.cc @@ -33,12 +33,8 @@ std::vector PlacementGroupSpecification::GetBundles() const return bundles_; } -Strategy PlacementGroupSpecification::GetStrategy() const { - if (message_->strategy() == rpc::PlacementStrategy::PACK) { - return Strategy::PACK; - } else { - return Strategy::SPREAD; - } +rpc::PlacementStrategy PlacementGroupSpecification::GetStrategy() const { + return message_->strategy(); } BundleSpecification PlacementGroupSpecification::GetBundle(int position) const { diff --git a/src/ray/common/placement_group.h b/src/ray/common/placement_group.h index 9ea0220db..44f340934 100644 --- a/src/ray/common/placement_group.h +++ b/src/ray/common/placement_group.h @@ -21,11 +21,6 @@ namespace ray { -enum class Strategy { - PACK = 0x0, - SPREAD = 0x1, -}; - class PlacementGroupSpecification : public MessageWrapper { public: /// Construct from a protobuf message object. @@ -48,7 +43,7 @@ class PlacementGroupSpecification : public MessageWrapper GetBundles() const; /// Return the strategy of the placement group. - Strategy GetStrategy() const; + rpc::PlacementStrategy GetStrategy() const; /// Return the bundle by given index. BundleSpecification GetBundle(int position) const; /// Return the name of this placement group. @@ -71,18 +66,19 @@ class PlacementGroupSpecBuilder { /// \return Reference to the builder object itself. PlacementGroupSpecBuilder &SetPlacementGroupSpec( const PlacementGroupID &placement_group_id, std::string name, - const std::vector &bundles, const rpc::PlacementStrategy strategy) { + const std::vector> &bundles, + const rpc::PlacementStrategy strategy) { message_->set_placement_group_id(placement_group_id.Binary()); message_->set_name(name); message_->set_strategy(strategy); for (int i = 0; i < bundles.size(); i++) { - rpc::Bundle bundle = bundles[i]; + auto resources = bundles[i]; auto message_bundle = message_->add_bundles(); auto mutable_bundle_id = message_bundle->mutable_bundle_id(); mutable_bundle_id->set_bundle_index(i); mutable_bundle_id->set_placement_group_id(placement_group_id.Binary()); - message_bundle->mutable_unit_resources()->insert(bundle.unit_resources().begin(), - bundle.unit_resources().end()); + message_bundle->mutable_unit_resources()->insert(resources.begin(), + resources.end()); } return *this; } diff --git a/src/ray/common/task/scheduling_resources.cc b/src/ray/common/task/scheduling_resources.cc index fe163eeaa..729dafddd 100644 --- a/src/ray/common/task/scheduling_resources.cc +++ b/src/ray/common/task/scheduling_resources.cc @@ -683,7 +683,6 @@ void ResourceIdSet::CancelResourceReserve(const std::string &resource_name) { available_resources_.erase(iter_bundle); } } - void ResourceIdSet::DeleteResource(const std::string &resource_name) { available_resources_.erase(resource_name); } diff --git a/src/ray/common/task/scheduling_resources.h b/src/ray/common/task/scheduling_resources.h index b3d1bb998..580ad0bbb 100644 --- a/src/ray/common/task/scheduling_resources.h +++ b/src/ray/common/task/scheduling_resources.h @@ -429,6 +429,7 @@ class ResourceIdSet { /// \brief remove a Bundle resource in the ResourceIdSet. /// /// \param resource_name the name of the resource to remove. + void CancelResourceReserve(const std::string &resource_name); /// \brief Deletes a resource in the ResourceIdSet. This does not raise an exception, /// just deletes the resource. Tasks with acquired resources keep running. diff --git a/src/ray/common/task/scheduling_resources_test.cc b/src/ray/common/task/scheduling_resources_test.cc index 6442d459f..46553330f 100644 --- a/src/ray/common/task/scheduling_resources_test.cc +++ b/src/ray/common/task/scheduling_resources_test.cc @@ -63,14 +63,7 @@ TEST_F(SchedulingResourcesTest, ReturnBundleResources) { resource_labels.push_back(bundle_id.Binary() + "_" + "CPU"); ResourceSet result_resource(resource_labels, resource_capacity); ASSERT_EQ(1, resource_set->IsEqual(result_resource)); - resource_set->ReturnBundleResources(bundle_id.Binary()); ASSERT_EQ(1, resource_set->IsEqual(resource)); } - } // namespace ray - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index b8afe83ea..8f52907c7 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -25,6 +25,7 @@ namespace ray { using WorkerType = rpc::WorkerType; +using PlacementOptions = std::pair; // Return a string representation of the worker type. std::string WorkerTypeString(WorkerType type); @@ -65,12 +66,13 @@ struct TaskOptions { /// Options for actor creation tasks. struct ActorCreationOptions { ActorCreationOptions() {} - ActorCreationOptions(int64_t max_restarts, int64_t max_task_retries, - int max_concurrency, - const std::unordered_map &resources, - const std::unordered_map &placement_resources, - const std::vector &dynamic_worker_options, - bool is_detached, std::string &name, bool is_asyncio) + ActorCreationOptions( + int64_t max_restarts, int64_t max_task_retries, int max_concurrency, + const std::unordered_map &resources, + const std::unordered_map &placement_resources, + const std::vector &dynamic_worker_options, bool is_detached, + std::string &name, bool is_asyncio, + PlacementOptions placement_options = std::make_pair(PlacementGroupID::Nil(), -1)) : max_restarts(max_restarts), max_task_retries(max_task_retries), max_concurrency(max_concurrency), @@ -79,7 +81,8 @@ struct ActorCreationOptions { dynamic_worker_options(dynamic_worker_options), is_detached(is_detached), name(name), - is_asyncio(is_asyncio){}; + is_asyncio(is_asyncio), + placement_options(placement_options){}; /// Maximum number of times that the actor should be restarted if it dies /// unexpectedly. A value of -1 indicates infinite restarts. If it's 0, the @@ -107,6 +110,27 @@ struct ActorCreationOptions { const std::string name; /// Whether to use async mode of direct actor call. const bool is_asyncio = false; + /// The placement_options include placement_group_id and bundle_index. + /// If the actor doesn't belong to a placement group, the placement_group_id will be + /// nil, and the bundle_index will be -1. + PlacementOptions placement_options; +}; + +using PlacementStrategy = rpc::PlacementStrategy; + +struct PlacementGroupCreationOptions { + PlacementGroupCreationOptions() {} + PlacementGroupCreationOptions( + const std::string &name, PlacementStrategy strategy, + const std::vector> &bundles) + : name(name), strategy(strategy), bundles(bundles) {} + + /// The strategy to place the bundle in Placement Group. + const PlacementStrategy strategy = rpc::PACK; + /// The resource bundles in this placement group. + const std::vector> bundles; + /// The name of the placement group. + const std::string name; }; } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2c2bb9a5a..d4991b540 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1168,6 +1168,21 @@ void CoreWorker::SubmitTask(const RayFunction &function, } } +std::unordered_map AddPlacementGroupConstraint( + const std::unordered_map &resources, + PlacementGroupID placement_group_id, int64_t bundle_index) { + std::unordered_map new_resources; + if (placement_group_id != PlacementGroupID::Nil()) { + for (auto iter = resources.begin(); iter != resources.end(); iter++) { + auto new_name = + placement_group_id.Hex() + std::to_string(bundle_index) + "_" + iter->first; + new_resources[new_name] = iter->second; + } + return new_resources; + } + return resources; +} + Status CoreWorker::CreateActor(const RayFunction &function, const std::vector> &args, const ActorCreationOptions &actor_creation_options, @@ -1185,10 +1200,17 @@ Status CoreWorker::CreateActor(const RayFunction &function, const JobID job_id = worker_context_.GetCurrentJobID(); std::vector return_ids; TaskSpecBuilder builder; + auto new_placement_resources = + AddPlacementGroupConstraint(actor_creation_options.placement_resources, + actor_creation_options.placement_options.first, + actor_creation_options.placement_options.second); + auto new_resource = AddPlacementGroupConstraint( + actor_creation_options.resources, actor_creation_options.placement_options.first, + actor_creation_options.placement_options.second); BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), - rpc_address_, function, args, 1, actor_creation_options.resources, - actor_creation_options.placement_resources, &return_ids); + rpc_address_, function, args, 1, new_resource, + new_placement_resources, &return_ids); builder.SetActorCreationTaskSpec( actor_id, actor_creation_options.max_restarts, actor_creation_options.dynamic_worker_options, @@ -1227,6 +1249,22 @@ Status CoreWorker::CreateActor(const RayFunction &function, return status; } +Status CoreWorker::CreatePlacementGroup( + const PlacementGroupCreationOptions &placement_group_creation_options, + PlacementGroupID *return_placement_group_id) { + const PlacementGroupID placement_group_id = PlacementGroupID ::FromRandom(); + PlacementGroupSpecBuilder builder; + builder.SetPlacementGroupSpec(placement_group_id, placement_group_creation_options.name, + placement_group_creation_options.bundles, + placement_group_creation_options.strategy); + PlacementGroupSpecification placement_group_spec = builder.Build(); + *return_placement_group_id = placement_group_id; + RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id; + RAY_CHECK_OK( + gcs_client_->PlacementGroups().AsyncCreatePlacementGroup(placement_group_spec)); + return Status::OK(); +} + void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 0e57cb88e..877afb5cb 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -17,6 +17,7 @@ #include "absl/base/optimization.h" #include "absl/container/flat_hash_map.h" #include "ray/common/buffer.h" +#include "ray/common/placement_group.h" #include "ray/core_worker/actor_handle.h" #include "ray/core_worker/actor_manager.h" #include "ray/core_worker/actor_reporter.h" @@ -593,6 +594,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const ActorCreationOptions &actor_creation_options, const std::string &extension_data, ActorID *actor_id); + /// Create a placement group. + /// + /// \param[in] function The remote function that generates the placement group object. + /// \param[in] placement_group_creation_options Options for this placement group + /// creation task. \param[out] placement_group_id ID of the created placement group. + /// This can be used to shedule actor in node \return Status error if placement group + /// creation fails, likely due to raylet failure. + Status CreatePlacementGroup( + const PlacementGroupCreationOptions &placement_group_creation_options, + PlacementGroupID *placement_group_id); + /// Submit an actor task. /// /// \param[in] caller_id ID of the task submitter. diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 436512e02..cd87f0c79 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -15,6 +15,7 @@ #pragma once #include "ray/common/id.h" +#include "ray/common/placement_group.h" #include "ray/common/task/task_spec.h" #include "ray/gcs/callback.h" #include "ray/gcs/entry_change_notification.h" @@ -711,6 +712,21 @@ class WorkerInfoAccessor { WorkerInfoAccessor() = default; }; +class PlacementGroupInfoAccessor { + public: + // TODO(AlisaWu): fill the accessor. + /// Create an placement group to GCS asynchronously. + /// + /// \param placement_group_spec The specification for the placement group creation task. + /// \param callback Callback that will be called after the placement group info is + /// written to GCS. \return Status + virtual Status AsyncCreatePlacementGroup( + const PlacementGroupSpecification &placement_group_spec) = 0; + + protected: + PlacementGroupInfoAccessor() = default; +}; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_client.h b/src/ray/gcs/gcs_client.h index 8e2a7248b..33cb0dd0d 100644 --- a/src/ray/gcs/gcs_client.h +++ b/src/ray/gcs/gcs_client.h @@ -134,6 +134,13 @@ class GcsClient : public std::enable_shared_from_this { return *worker_accessor_; } + /// Get the sub-interface for accessing worker information in GCS. + /// This function is thread safe. + PlacementGroupInfoAccessor &PlacementGroups() { + RAY_CHECK(placement_group_accessor_ != nullptr); + return *placement_group_accessor_; + } + protected: /// Constructor of GcsClient. /// @@ -153,6 +160,7 @@ class GcsClient : public std::enable_shared_from_this { std::unique_ptr error_accessor_; std::unique_ptr stats_accessor_; std::unique_ptr worker_accessor_; + std::unique_ptr placement_group_accessor_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 9c27d1dee..3cf17b91b 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1383,5 +1383,28 @@ Status ServiceBasedWorkerInfoAccessor::AsyncAdd( return Status::OK(); } +ServiceBasedPlacementGroupInfoAccessor::ServiceBasedPlacementGroupInfoAccessor( + ServiceBasedGcsClient *client_impl) + : client_impl_(client_impl) {} + +Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( + const ray::PlacementGroupSpecification &placement_group_spec) { + rpc::CreatePlacementGroupRequest request; + request.mutable_placement_group_spec()->CopyFrom(placement_group_spec.GetMessage()); + client_impl_->GetGcsRpcClient().CreatePlacementGroup( + request, [placement_group_spec](const Status &, + const rpc::CreatePlacementGroupReply &reply) { + auto status = + reply.status().code() == (int)StatusCode::OK + ? Status() + : Status(StatusCode(reply.status().code()), reply.status().message()); + if (status.ok()) { + RAY_LOG(DEBUG) << "Finished registering placement group. placement group id = " + << placement_group_spec.PlacementGroupId(); + } + }); + return Status::OK(); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 8cda97f50..d7af72c2f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -409,5 +409,23 @@ class ServiceBasedWorkerInfoAccessor : public WorkerInfoAccessor { ServiceBasedGcsClient *client_impl_; }; +/// \class ServiceBasedPlacementGroupInfoAccessor +/// ServiceBasedPlacementGroupInfoAccessor is an implementation of +/// `PlacementGroupInfoAccessor` that uses GCS Service as the backend. + +class ServiceBasedPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor { + // TODO(AlisaWu):fill the ServiceAccessor. + public: + explicit ServiceBasedPlacementGroupInfoAccessor(ServiceBasedGcsClient *client_impl); + + virtual ~ServiceBasedPlacementGroupInfoAccessor() = default; + + Status AsyncCreatePlacementGroup( + const PlacementGroupSpecification &placement_group_spec) override; + + private: + ServiceBasedGcsClient *client_impl_; +}; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index 5bc7cfa9a..e21cc863c 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -75,6 +75,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) { stats_accessor_.reset(new ServiceBasedStatsInfoAccessor(this)); error_accessor_.reset(new ServiceBasedErrorInfoAccessor(this)); worker_accessor_.reset(new ServiceBasedWorkerInfoAccessor(this)); + placement_group_accessor_.reset(new ServiceBasedPlacementGroupInfoAccessor(this)); // Init gcs service address check timer. detect_timer_.reset(new boost::asio::deadline_timer(io_service)); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index dc25c6a20..a38291cba 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -54,5 +54,131 @@ const rpc::PlacementGroupTableData &GcsPlacementGroup::GetPlacementGroupTableDat return placement_group_table_data_; } +///////////////////////////////////////////////////////////////////////////////////////// + +GcsPlacementGroupManager::GcsPlacementGroupManager( + boost::asio::io_context &io_context, + std::shared_ptr scheduler, + std::shared_ptr gcs_table_storage) + : gcs_placement_group_scheduler_(std::move(scheduler)), + gcs_table_storage_(gcs_table_storage), + reschedule_timer_(io_context) {} + +void GcsPlacementGroupManager::RegisterPlacementGroup( + const ray::rpc::CreatePlacementGroupRequest &request, + std::function)> callback) { + RAY_CHECK(callback); + const auto &placement_group_spec = request.placement_group_spec(); + auto placement_group_id = + PlacementGroupID::FromBinary(placement_group_spec.placement_group_id()); + + auto placement_group = std::make_shared(request); + // Mark the callback as pending and invoke it after the placement_group has been + // successfully created. + placement_group_to_register_callbacks_[placement_group_id].emplace_back( + std::move(callback)); + RAY_CHECK(registered_placement_groups_ + .emplace(placement_group->GetPlacementGroupID(), placement_group) + .second); + pending_placement_groups_.emplace_back(std::move(placement_group)); + SchedulePendingPlacementGroups(); +} + +PlacementGroupID GcsPlacementGroupManager::GetPlacementGroupIDByName( + const std::string &name) { + PlacementGroupID placement_group_id = PlacementGroupID::Nil(); + for (auto placement_group_pair : registered_placement_groups_) { + auto placement_group = placement_group_pair.second; + if (placement_group->GetName() == name) { + placement_group_id = placement_group_pair.first; + break; + } + } + return placement_group_id; +} + +void GcsPlacementGroupManager::OnPlacementGroupCreationFailed( + std::shared_ptr placement_group) { + // We will attempt to schedule this placement_group once an eligible node is + // registered. + pending_placement_groups_.emplace_back(std::move(placement_group)); + is_creating_ = false; + ScheduleTick(); +} + +void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess( + std::shared_ptr placement_group) { + auto placement_group_id = placement_group->GetPlacementGroupID(); + RAY_CHECK(registered_placement_groups_.count(placement_group_id) > 0); + placement_group->UpdateState(rpc::PlacementGroupTableData::ALIVE); + + auto placement_group_table_data = placement_group->GetPlacementGroupTableData(); + // The backend storage is reliable in the future, so the status must be ok. + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( + placement_group_id, placement_group_table_data, [](Status status) {})); + + // Invoke all callbacks for all registration requests of this placement_group + // (duplicated requests are included) and remove all of them from + // placement_group_to_register_callbacks_. + auto iter = placement_group_to_register_callbacks_.find(placement_group_id); + if (iter != placement_group_to_register_callbacks_.end()) { + for (auto &callback : iter->second) { + callback(placement_group); + } + placement_group_to_register_callbacks_.erase(iter); + } + is_creating_ = false; + SchedulePendingPlacementGroups(); +} + +void GcsPlacementGroupManager::SchedulePendingPlacementGroups() { + if (pending_placement_groups_.empty() || is_creating_) { + return; + } + is_creating_ = true; + gcs_placement_group_scheduler_->Schedule( + *pending_placement_groups_.begin(), + [this](std::shared_ptr placement_group) { + OnPlacementGroupCreationFailed(std::move(placement_group)); + }, + [this](std::shared_ptr placement_group) { + OnPlacementGroupCreationSuccess(std::move(placement_group)); + }); + pending_placement_groups_.pop_front(); +} + +void GcsPlacementGroupManager::HandleCreatePlacementGroup( + const ray::rpc::CreatePlacementGroupRequest &request, + ray::rpc::CreatePlacementGroupReply *reply, + ray::rpc::SendReplyCallback send_reply_callback) { + auto placement_group_id = + PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id()); + + RAY_LOG(INFO) << "Registering placement group, placement group id = " + << placement_group_id; + RegisterPlacementGroup( + request, [reply, send_reply_callback, placement_group_id]( + std::shared_ptr placement_group) { + RAY_LOG(INFO) << "Registered placement group, placement group id = " + << placement_group_id; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + }); + auto placement_group = std::make_shared(request); + auto placement_group_table_data = placement_group->GetPlacementGroupTableData(); + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( + placement_group_id, placement_group_table_data, [](Status status) {})); +} + +void GcsPlacementGroupManager::ScheduleTick() { + reschedule_timer_.expires_from_now(boost::posix_time::milliseconds(5)); + reschedule_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::system::errc::operation_canceled) { + return; + } else { + SchedulePendingPlacementGroups(); + } + }); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 19aebf15e..3c8bc6968 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -47,7 +47,7 @@ class GcsPlacementGroup { const auto &placement_group_spec = request.placement_group_spec(); placement_group_table_data_.set_placement_group_id( placement_group_spec.placement_group_id()); - + placement_group_table_data_.set_name(placement_group_spec.name()); placement_group_table_data_.set_state(rpc::PlacementGroupTableData::PENDING); placement_group_table_data_.mutable_bundles()->CopyFrom( placement_group_spec.bundles()); @@ -78,5 +78,100 @@ class GcsPlacementGroup { /// state of the gcs placement_group and so on (see gcs.proto). rpc::PlacementGroupTableData placement_group_table_data_; }; + +using RegisterPlacementGroupCallback = + std::function)>; +/// GcsPlacementGroupManager is responsible for managing the lifecycle of all placement +/// group. This class is not thread-safe. +/// The placementGroup will be added into queue and set the status as pending first and +/// use SchedulePendingPlacementGroups(). The SchedulePendingPlacementGroups() will get +/// the head of the queue and schedule it. If schedule success, using the +/// SchedulePendingPlacementGroups() Immediately. else wait for a short time beforw using +/// SchedulePendingPlacementGroups() next time. +class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { + public: + /// Create a GcsPlacementGroupManager + /// + /// \param io_context The event loop to run the monitor on. + /// \param scheduler Used to schedule placement group creation tasks. + /// \param gcs_table_storage Used to flush placement group data to storage. + explicit GcsPlacementGroupManager( + boost::asio::io_context &io_context, + std::shared_ptr scheduler, + std::shared_ptr gcs_table_storage); + + ~GcsPlacementGroupManager() = default; + + void HandleCreatePlacementGroup(const rpc::CreatePlacementGroupRequest &request, + rpc::CreatePlacementGroupReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Register placement_group asynchronously. + /// + /// \param request Contains the meta info to create the placement_group. + /// \param callback Will be invoked after the placement_group is created successfully or + /// be invoked immediately if the placement_group is already registered to + /// `registered_placement_groups_` and its state is `ALIVE`. The callback will not be + /// called in this case. + void RegisterPlacementGroup(const rpc::CreatePlacementGroupRequest &request, + RegisterPlacementGroupCallback callback); + + /// Schedule placement_groups in the `pending_placement_groups_` queue. + /// This function is exposed for testing only. + void SchedulePendingPlacementGroups(); + + /// Get the placement_group ID for the named placement_group. Returns nil if the + /// placement_group was not found. + /// \param name The name of the placement_group to look up. + /// \returns PlacementGroupID The ID of the placement_group. Nil if the + /// placement_group was not found. + PlacementGroupID GetPlacementGroupIDByName(const std::string &name); + + /// Handle placement_group creation task failure. This should be called when scheduling + /// an placement_group creation task is infeasible. + /// + /// \param placement_group The placement_group whose creation task is infeasible. + void OnPlacementGroupCreationFailed(std::shared_ptr placement_group); + + /// Handle placement_group creation task success. This should be called when the + /// placement_group creation task has been scheduled successfully. + /// + /// \param placement_group The placement_group that has been created. + void OnPlacementGroupCreationSuccess( + std::shared_ptr placement_group); + + private: + /// Schedule another tick after a short time. + void ScheduleTick(); + + /// Callbacks of placement_group registration requests that are not yet flushed. + /// This map is used to filter duplicated messages from a Driver/Worker caused by some + /// network problems. + /// + /// Since the GRPC message received by the GCS side is out of order, it can not be + /// determined that the last callback is the valid one. Therefore, the repeated + /// callbacks are recorded in the form of vector without distinction. When the operation + /// is successful, all callbacks will be triggered. One of them must be valid, and the + /// rest invalid callbacks will not have any effect even if they are called. + absl::flat_hash_map> + placement_group_to_register_callbacks_; + /// All registered placement_groups (pending placement_groups are also included). + absl::flat_hash_map> + registered_placement_groups_; + /// The pending placement_groups which will not be scheduled until there's a resource + /// change. + std::deque> pending_placement_groups_; + /// The scheduler to schedule all registered placement_groups. + std::shared_ptr + gcs_placement_group_scheduler_; + /// Used to update placement group information upon creation, deletion, etc. + std::shared_ptr gcs_table_storage_; + /// If a placement group is creating + bool is_creating_ = false; + + /// A timer that ticks every schedule failure milliseconds. + boost::asio::deadline_timer reschedule_timer_; +}; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 031102891..9fac025ee 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -188,8 +188,8 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode( if (iter->second.empty()) { node_to_bundles_when_leasing_.erase(iter); } + callback(status, reply); } - callback(status, reply); } }); if (!status.ok()) { @@ -229,7 +229,7 @@ void GcsPlacementGroupScheduler::CancelResourceReserve( }); } }); -} // namespace gcs +} std::shared_ptr GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_address) { diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 0617881a5..c745e045e 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -19,6 +19,7 @@ #include "gcs_job_manager.h" #include "gcs_node_manager.h" #include "gcs_object_manager.h" +#include "gcs_placement_group_manager.h" #include "gcs_worker_manager.h" #include "ray/common/network_util.h" #include "ray/common/ray_config.h" @@ -60,6 +61,9 @@ void GcsServer::Start() { // Init gcs actor manager. InitGcsActorManager(); + // Init gcs placement group manager. + InitGcsPlacementGroupManager(); + // Register rpc service. gcs_object_manager_ = InitObjectManager(); object_info_service_.reset( @@ -79,6 +83,10 @@ void GcsServer::Start() { new rpc::ActorInfoGrpcService(main_service_, *gcs_actor_manager_)); rpc_server_.RegisterService(*actor_info_service_); + placement_group_info_service_.reset(new rpc::PlacementGroupInfoGrpcService( + main_service_, *gcs_placement_group_manager_)); + rpc_server_.RegisterService(*placement_group_info_service_); + node_info_service_.reset( new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_)); rpc_server_.RegisterService(*node_info_service_); @@ -210,6 +218,22 @@ void GcsServer::InitGcsJobManager() { }); } +void GcsServer::InitGcsPlacementGroupManager() { + RAY_CHECK(gcs_table_storage_ != nullptr && gcs_node_manager_ != nullptr); + auto scheduler = std::make_shared( + main_service_, gcs_table_storage_, *gcs_node_manager_, + /*lease_client_factory=*/ + [this](const rpc::Address &address) { + auto node_manager_worker_client = rpc::NodeManagerWorkerClient::make( + address.ip_address(), address.port(), client_call_manager_); + return std::make_shared( + std::move(node_manager_worker_client)); + }); + + gcs_placement_group_manager_ = std::make_shared( + main_service_, scheduler, gcs_table_storage_); +} + std::unique_ptr GcsServer::InitObjectManager() { return std::unique_ptr( new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_)); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 2eecc9276..3c8b782cf 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -40,6 +40,7 @@ class GcsNodeManager; class GcsActorManager; class GcsJobManager; class GcsWorkerManager; +class GcsPlacementGroupManager; /// The GcsServer will take over all requests from ServiceBasedGcsClient and transparent /// transmit the command to the backend reliable storage for the time being. @@ -85,6 +86,9 @@ class GcsServer { /// Initialize the gcs job manager. virtual void InitGcsJobManager(); + /// Initialize the gcs placement group manager. + virtual void InitGcsPlacementGroupManager(); + /// The object manager virtual std::unique_ptr InitObjectManager(); @@ -122,6 +126,8 @@ class GcsServer { std::shared_ptr gcs_redis_failure_detector_; /// The gcs actor manager std::shared_ptr gcs_actor_manager_; + /// The gcs placement group manager + std::shared_ptr gcs_placement_group_manager_; /// Job info handler and service std::unique_ptr gcs_job_manager_; std::unique_ptr job_info_service_; @@ -145,6 +151,9 @@ class GcsServer { std::unique_ptr gcs_worker_manager_; /// Worker info service std::unique_ptr worker_info_service_; + /// Placement Group info handler and service + std::unique_ptr placement_group_info_handler_; + std::unique_ptr placement_group_info_service_; /// Backend client std::shared_ptr redis_gcs_client_; /// A publisher for publishing gcs messages. diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.cc b/src/ray/gcs/gcs_server/gcs_table_storage.cc index 956235842..4da49f71d 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.cc +++ b/src/ray/gcs/gcs_server/gcs_table_storage.cc @@ -128,6 +128,7 @@ template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; template class GcsTableWithJobId; +template class GcsTable; template class GcsTable; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index 573157846..2b605f748 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -33,6 +33,7 @@ using rpc::HeartbeatTableData; using rpc::JobTableData; using rpc::ObjectTableData; using rpc::ObjectTableDataList; +using rpc::PlacementGroupTableData; using rpc::ProfileTableData; using rpc::ResourceMap; using rpc::ResourceTableData; @@ -154,6 +155,15 @@ class GcsActorTable : public GcsTableWithJobId { JobID GetJobIdFromKey(const ActorID &key) override { return key.JobId(); } }; +class GcsPlacementGroupTable + : public GcsTable { + public: + explicit GcsPlacementGroupTable(std::shared_ptr &store_client) + : GcsTable(store_client) { + table_name_ = TablePrefix_Name(TablePrefix::PLACEMENT_GROUP); + } +}; + class GcsActorCheckpointTable : public GcsTable { public: explicit GcsActorCheckpointTable(std::shared_ptr &store_client) @@ -307,6 +317,11 @@ class GcsTableStorage { return *actor_table_; } + GcsPlacementGroupTable &PlacementGroupTable() { + RAY_CHECK(placement_group_table_ != nullptr); + return *placement_group_table_; + } + GcsActorCheckpointTable &ActorCheckpointTable() { RAY_CHECK(actor_checkpoint_table_ != nullptr); return *actor_checkpoint_table_; @@ -386,6 +401,7 @@ class GcsTableStorage { std::shared_ptr store_client_; std::unique_ptr job_table_; std::unique_ptr actor_table_; + std::unique_ptr placement_group_table_; std::unique_ptr actor_checkpoint_table_; std::unique_ptr actor_checkpoint_id_table_; std::unique_ptr task_table_; @@ -394,8 +410,8 @@ class GcsTableStorage { std::unique_ptr object_table_; std::unique_ptr node_table_; std::unique_ptr node_resource_table_; - std::unique_ptr placement_group_schedule_table_; std::unique_ptr heartbeat_table_; + std::unique_ptr placement_group_schedule_table_; std::unique_ptr heartbeat_batch_table_; std::unique_ptr error_info_table_; std::unique_ptr profile_table_; @@ -412,6 +428,7 @@ class RedisGcsTableStorage : public GcsTableStorage { store_client_ = std::make_shared(redis_client); job_table_.reset(new GcsJobTable(store_client_)); actor_table_.reset(new GcsActorTable(store_client_)); + placement_group_table_.reset(new GcsPlacementGroupTable(store_client_)); actor_checkpoint_table_.reset(new GcsActorCheckpointTable(store_client_)); actor_checkpoint_id_table_.reset(new GcsActorCheckpointIdTable(store_client_)); task_table_.reset(new GcsTaskTable(store_client_)); @@ -442,6 +459,7 @@ class InMemoryGcsTableStorage : public GcsTableStorage { store_client_ = std::make_shared(main_io_service); job_table_.reset(new GcsJobTable(store_client_)); actor_table_.reset(new GcsActorTable(store_client_)); + placement_group_table_.reset(new GcsPlacementGroupTable(store_client_)); actor_checkpoint_table_.reset(new GcsActorCheckpointTable(store_client_)); actor_checkpoint_id_table_.reset(new GcsActorCheckpointIdTable(store_client_)); task_table_.reset(new GcsTaskTable(store_client_)); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc new file mode 100644 index 000000000..91ade0b2e --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -0,0 +1,132 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +#include "gtest/gtest.h" + +namespace ray { + +using ::testing::_; + +class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterface { + public: + MockPlacementGroupScheduler() {} + + void Schedule(std::shared_ptr placement_group, + std::function)> + schedule_failure_handler = nullptr, + std::function)> + schedule_success_handler = nullptr) { + placement_groups.push_back(placement_group); + } + + std::vector> placement_groups; +}; + +class GcsPlacementGroupManagerTest : public ::testing::Test { + public: + GcsPlacementGroupManagerTest() + : mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) { + gcs_table_storage_ = std::make_shared(io_service_); + gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager( + io_service_, mock_placement_group_scheduler_, gcs_table_storage_)); + } + + boost::asio::io_service io_service_; + std::shared_ptr gcs_table_storage_; + std::shared_ptr mock_placement_group_scheduler_; + std::unique_ptr gcs_placement_group_manager_; +}; + +TEST_F(GcsPlacementGroupManagerTest, TestBasic) { + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + std::vector> finished_placement_groups; + gcs_placement_group_manager_->RegisterPlacementGroup( + create_placement_group_request, + [&finished_placement_groups]( + std::shared_ptr placement_group) { + finished_placement_groups.emplace_back(placement_group); + }); + ASSERT_EQ(finished_placement_groups.size(), 0); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); + mock_placement_group_scheduler_->placement_groups.pop_back(); + + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(finished_placement_groups.size(), 1); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE); +} + +TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + std::vector> finished_placement_groups; + gcs_placement_group_manager_->RegisterPlacementGroup( + create_placement_group_request, + [&finished_placement_groups]( + std::shared_ptr placement_group) { + finished_placement_groups.emplace_back(placement_group); + }); + + ASSERT_EQ(finished_placement_groups.size(), 0); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); + mock_placement_group_scheduler_->placement_groups.clear(); + + gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); + gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); + mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(finished_placement_groups.size(), 0); + + // Check that the placement_group is in state `ALIVE`. + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(finished_placement_groups.size(), 1); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE); +} + +TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { + auto create_placement_group_request = + Mocker::GenCreatePlacementGroupRequest("test_name"); + std::vector> finished_placement_groups; + gcs_placement_group_manager_->RegisterPlacementGroup( + create_placement_group_request, + [&finished_placement_groups]( + std::shared_ptr placement_group) { + finished_placement_groups.emplace_back(placement_group); + }); + + ASSERT_EQ(finished_placement_groups.size(), 0); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); + mock_placement_group_scheduler_->placement_groups.pop_back(); + + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(finished_placement_groups.size(), 1); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE); + ASSERT_EQ( + gcs_placement_group_manager_->GetPlacementGroupIDByName("test_name"), + PlacementGroupID::FromBinary( + create_placement_group_request.placement_group_spec().placement_group_id())); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 2b773fa0e..42be7a5be 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -163,8 +163,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) ASSERT_TRUE(raylet_client_->GrantResourceReserve()); ASSERT_TRUE(raylet_client_->GrantResourceReserve(false)); ASSERT_EQ(1, raylet_client_->num_return_requested); - // // Reply the placement_group creation request, then the placement_group should be - // scheduled successfully. + // Reply the placement_group creation request, then the placement_group should be + // scheduled successfully. ASSERT_EQ(1, failure_placement_groups_.size()); ASSERT_EQ(0, success_placement_groups_.size()); ASSERT_EQ(placement_group, failure_placement_groups_.front()); diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index f6d8d72c4..c60c1200e 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -64,7 +64,8 @@ struct Mocker { } static PlacementGroupSpecification GenPlacementGroupCreation( - const std::string &name, std::vector &bundles, + const std::string &name, + std::vector> &bundles, rpc::PlacementStrategy strategy) { PlacementGroupSpecBuilder builder; @@ -76,9 +77,10 @@ struct Mocker { static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest( const std::string name = "") { rpc::CreatePlacementGroupRequest request; - std::vector bundles; + std::vector> bundles; rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD; - rpc::Bundle bundle; + std::unordered_map bundle; + bundle["CPU"] = 1.0; bundles.push_back(bundle); bundles.push_back(bundle); auto placement_group_creation_spec = @@ -87,7 +89,6 @@ struct Mocker { placement_group_creation_spec.GetMessage()); return request; } - static std::shared_ptr GenNodeInfo( uint16_t port = 0, const std::string address = "127.0.0.1") { auto node = std::make_shared(); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 7633f6559..5db1ebb99 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -46,6 +46,7 @@ enum TablePrefix { INTERNAL_CONFIG = 20; TABLE_PREFIX_MAX = 21; PLACEMENT_GROUP_SCHEDULE = 22; + PLACEMENT_GROUP = 23; } // The channel that Add operations to the Table should be published on, if any. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 9fac12af7..bd137759e 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -162,6 +162,13 @@ service ActorInfoGcsService { returns (GetActorCheckpointIDReply); } +// Service for placement group info access. +service PlacementGroupInfoGcsService { + // Create placement group via gcs service + rpc CreatePlacementGroup(CreatePlacementGroupRequest) + returns (CreatePlacementGroupReply); +} + message RegisterNodeRequest { // Info of node. GcsNodeInfo node_info = 1; diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 95e54b18c..fcf5f4c7e 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -247,6 +247,10 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(WorkerInfoGcsService, AddWorkerInfo, worker_info_grpc_client_, ) + /// Create placement group via GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(PlacementGroupInfoGcsService, CreatePlacementGroup, + placement_group_info_grpc_client_, ) + private: std::function gcs_service_failure_detected_; @@ -259,6 +263,8 @@ class GcsRpcClient { std::unique_ptr> stats_grpc_client_; std::unique_ptr> error_info_grpc_client_; std::unique_ptr> worker_info_grpc_client_; + std::unique_ptr> + placement_group_info_grpc_client_; }; } // namespace rpc diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index a8551f5d6..3b5d1a5f3 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -44,6 +44,9 @@ namespace rpc { #define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \ RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER) +#define PLACEMENT_GROUP_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(PlacementGroupInfoGcsService, HANDLER) + #define GCS_RPC_SEND_REPLY(send_reply_callback, reply, status) \ reply->mutable_status()->set_code((int)status.code()); \ reply->mutable_status()->set_message(status.message()); \ @@ -479,6 +482,41 @@ class WorkerInfoGrpcService : public GrpcService { WorkerInfoGcsServiceHandler &service_handler_; }; +class PlacementGroupInfoGcsServiceHandler { + public: + virtual ~PlacementGroupInfoGcsServiceHandler() = default; + + virtual void HandleCreatePlacementGroup(const CreatePlacementGroupRequest &request, + CreatePlacementGroupReply *reply, + SendReplyCallback send_reply_callback) = 0; +}; + +/// The `GrpcService` for `PlacementGroupInfoGcsService`. +class PlacementGroupInfoGrpcService : public GrpcService { + public: + /// Constructor. + /// + /// \param[in] handler The service handler that actually handle the requests. + explicit PlacementGroupInfoGrpcService(boost::asio::io_service &io_service, + PlacementGroupInfoGcsServiceHandler &handler) + : GrpcService(io_service), service_handler_(handler){}; + + protected: + grpc::Service &GetGrpcService() override { return service_; } + + void InitServerCallFactories( + const std::unique_ptr &cq, + std::vector> *server_call_factories) override { + PLACEMENT_GROUP_INFO_SERVICE_RPC_HANDLER(CreatePlacementGroup); + } + + private: + /// The grpc async service object. + PlacementGroupInfoGcsService::AsyncService service_; + /// The service handler that actually handle the requests. + PlacementGroupInfoGcsServiceHandler &service_handler_; +}; + using JobInfoHandler = JobInfoGcsServiceHandler; using ActorInfoHandler = ActorInfoGcsServiceHandler; using NodeInfoHandler = NodeInfoGcsServiceHandler; @@ -487,6 +525,7 @@ using TaskInfoHandler = TaskInfoGcsServiceHandler; using StatsHandler = StatsGcsServiceHandler; using ErrorInfoHandler = ErrorInfoGcsServiceHandler; using WorkerInfoHandler = WorkerInfoGcsServiceHandler; +using PlacementGroupInfoHandler = PlacementGroupInfoGcsServiceHandler; } // namespace rpc } // namespace ray