diff --git a/BUILD.bazel b/BUILD.bazel index 2ea04ad60..ec103528e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1189,6 +1189,19 @@ cc_test( ], ) +cc_test( + name = "gcs_resource_manager_test", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc", + ], + copts = COPTS, + deps = [ + ":gcs_server_lib", + ":gcs_test_util_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_library( name = "service_based_gcs_client_lib", srcs = glob( diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 88884cb68..2105bc900 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -94,10 +94,12 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() { } ////////////////////////////////////////////////////////////////////////////////////////// -GcsNodeManager::GcsNodeManager(boost::asio::io_service &main_io_service, - boost::asio::io_service &node_failure_detector_io_service, - std::shared_ptr gcs_pub_sub, - std::shared_ptr gcs_table_storage) +GcsNodeManager::GcsNodeManager( + boost::asio::io_service &main_io_service, + boost::asio::io_service &node_failure_detector_io_service, + std::shared_ptr gcs_pub_sub, + std::shared_ptr gcs_table_storage, + std::shared_ptr gcs_resource_manager) : main_io_service_(main_io_service), node_failure_detector_(new NodeFailureDetector( node_failure_detector_io_service, gcs_table_storage, gcs_pub_sub, @@ -124,7 +126,8 @@ GcsNodeManager::GcsNodeManager(boost::asio::io_service &main_io_service, node_failure_detector_service_(node_failure_detector_io_service), heartbeat_timer_(main_io_service), gcs_pub_sub_(gcs_pub_sub), - gcs_table_storage_(gcs_table_storage) { + gcs_table_storage_(gcs_table_storage), + gcs_resource_manager_(gcs_resource_manager) { SendBatchedHeartbeat(); } @@ -349,7 +352,7 @@ void GcsNodeManager::HandleGetAllAvailableResources( const rpc::GetAllAvailableResourcesRequest &request, rpc::GetAllAvailableResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { - for (const auto &iter : GetClusterRealtimeResources()) { + for (const auto &iter : gcs_resource_manager_->GetClusterResources()) { rpc::AvailableResources resource; resource.set_node_id(iter.first.Binary()); for (const auto &res : iter.second.GetResourceAmountMap()) { @@ -477,8 +480,6 @@ std::shared_ptr GcsNodeManager::RemoveNode( alive_nodes_.erase(iter); // Remove from cluster resources. cluster_resources_.erase(node_id); - // Remove from cluster realtime resources. - cluster_realtime_resources_.erase(node_id); heartbeat_buffer_.erase(node_id); if (!is_intended) { // Broadcast a warning to all of the drivers indicating that the node @@ -535,18 +536,13 @@ void GcsNodeManager::StartNodeFailureDetector() { void GcsNodeManager::UpdateNodeRealtimeResources( const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) { if (!RayConfig::instance().light_heartbeat_enabled() || - cluster_realtime_resources_.count(node_id) == 0 || + gcs_resource_manager_->GetClusterResources().count(node_id) == 0 || heartbeat.resources_available_changed()) { - cluster_realtime_resources_[node_id] = - ResourceSet(MapFromProtobuf(heartbeat.resources_available())); + gcs_resource_manager_->UpdateResources( + node_id, ResourceSet(MapFromProtobuf(heartbeat.resources_available()))); } } -const absl::flat_hash_map - &GcsNodeManager::GetClusterRealtimeResources() const { - return cluster_realtime_resources_; -} - void GcsNodeManager::UpdatePlacementGroupLoad( const std::shared_ptr placement_group_load) { placement_group_load_ = absl::make_optional(placement_group_load); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 2bad2a0fb..41b70b957 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -19,6 +19,7 @@ #include "ray/common/id.h" #include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_init_data.h" +#include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/rpc/client_call.h" @@ -39,11 +40,12 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \param node_failure_detector_io_service The event loop of node failure detector. /// \param gcs_pub_sub GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - /// when detecting the death of nodes. + /// \param gcs_resource_manager GCS resource manager. explicit GcsNodeManager(boost::asio::io_service &main_io_service, boost::asio::io_service &node_failure_detector_io_service, std::shared_ptr gcs_pub_sub, - std::shared_ptr gcs_table_storage); + std::shared_ptr gcs_table_storage, + std::shared_ptr gcs_resource_manager); /// Handle register rpc request come from raylet. void HandleRegisterNode(const rpc::RegisterNodeRequest &request, @@ -176,9 +178,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { void UpdateNodeRealtimeResources(const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat); - /// Get cluster realtime resources. - const absl::flat_hash_map &GetClusterRealtimeResources() const; - /// Update the placement group load information so that it will be reported through /// heartbeat. /// @@ -295,8 +294,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::shared_ptr gcs_pub_sub_; /// Storage for GCS tables. std::shared_ptr gcs_table_storage_; - /// Cluster realtime resources. - absl::flat_hash_map cluster_realtime_resources_; + /// Gcs resource manager. + std::shared_ptr gcs_resource_manager_; /// Placement group load information that is used for autoscaler. absl::optional> placement_group_load_; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 56169a3b8..3c6b1ead2 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -23,11 +23,12 @@ namespace gcs { GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( boost::asio::io_context &io_context, std::shared_ptr gcs_table_storage, - const gcs::GcsNodeManager &gcs_node_manager, + const gcs::GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager, ReserveResourceClientFactoryFn lease_client_factory) : return_timer_(io_context), gcs_table_storage_(std::move(gcs_table_storage)), gcs_node_manager_(gcs_node_manager), + gcs_resource_manager_(gcs_resource_manager), lease_client_factory_(std::move(lease_client_factory)) { scheduler_strategies_.push_back(std::make_shared()); scheduler_strategies_.push_back(std::make_shared()); @@ -35,6 +36,24 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( scheduler_strategies_.push_back(std::make_shared()); } +bool GcsScheduleStrategy::IsAvailableResourceSufficient( + const ResourceSet &available_resources, const ResourceSet &allocated_resources, + const ResourceSet &to_allocate_resources) const { + const auto &to_allocate_resource_capacity = + to_allocate_resources.GetResourceAmountMap(); + for (const auto &resource_pair : to_allocate_resource_capacity) { + const auto &resource_name = resource_pair.first; + const auto &lhs_quantity = resource_pair.second; + const auto &rhs_quantity = available_resources.GetResource(resource_name) - + allocated_resources.GetResource(resource_name); + if (lhs_quantity > rhs_quantity) { + // lhs capacity exceeds rhs capacity. + return false; + } + } + return true; +} + ScheduleMap GcsStrictPackStrategy::Schedule( std::vector> &bundles, const std::unique_ptr &context) { @@ -45,9 +64,8 @@ ScheduleMap GcsStrictPackStrategy::Schedule( } // Filter candidate nodes. - const auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources(); std::vector> candidate_nodes; - for (auto &node : alive_nodes) { + for (auto &node : context->cluster_resources_) { if (required_resources.IsSubset(node.second)) { candidate_nodes.emplace_back((*context->node_to_bundles_)[node.first], node.first); } @@ -77,16 +95,14 @@ ScheduleMap GcsPackStrategy::Schedule( // First fill up a node. If the node resource is insufficient, select a new node. // TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop. ScheduleMap schedule_map; - // TODO(WangTao): This copy might take too much space once cluster grows very large. - // Would find better solution. - absl::flat_hash_map alive_nodes( - context->node_manager_.GetClusterRealtimeResources()); + absl::flat_hash_map allocated_resources; for (const auto &bundle : bundles) { const auto &required_resources = bundle->GetRequiredResources(); - for (auto &node : alive_nodes) { - if (required_resources.IsSubset(node.second)) { - node.second.SubtractResourcesStrict(required_resources); + for (const auto &node : context->cluster_resources_) { + if (IsAvailableResourceSufficient(node.second, allocated_resources[node.first], + required_resources)) { schedule_map[bundle->BundleId()] = node.first; + allocated_resources[node.first].AddResources(required_resources); break; } } @@ -105,25 +121,24 @@ ScheduleMap GcsSpreadStrategy::Schedule( // bundles will be deployed to the previous nodes. So we start with the next node of the // last selected node. ScheduleMap schedule_map; - // TODO(WangTao): This copy might take too much space once cluster grows very large. - // Would find better solution. - absl::flat_hash_map candidate_nodes( - context->node_manager_.GetClusterRealtimeResources()); + const auto &candidate_nodes = context->cluster_resources_; if (candidate_nodes.empty()) { return schedule_map; } + absl::flat_hash_map allocated_resources; auto iter = candidate_nodes.begin(); auto iter_begin = iter; for (const auto &bundle : bundles) { const auto &required_resources = bundle->GetRequiredResources(); - // Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node that - // meets the resource requirements. `iter_begin` is the next node of the last selected - // node. + // Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node + // that meets the resource requirements. `iter_begin` is the next node of the last + // selected node. for (; iter != candidate_nodes.end(); ++iter) { - if (required_resources.IsSubset(iter->second)) { - iter->second.SubtractResourcesStrict(required_resources); + if (IsAvailableResourceSufficient(iter->second, allocated_resources[iter->first], + required_resources)) { schedule_map[bundle->BundleId()] = iter->first; + allocated_resources[iter->first].AddResources(required_resources); break; } } @@ -138,9 +153,10 @@ ScheduleMap GcsSpreadStrategy::Schedule( if (iter_begin != candidate_nodes.begin()) { // Traverse all the nodes from `candidate_nodes.begin()` to `iter_begin`. for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) { - if (required_resources.IsSubset(iter->second)) { - iter->second.SubtractResourcesStrict(required_resources); + if (IsAvailableResourceSufficient( + iter->second, allocated_resources[iter->first], required_resources)) { schedule_map[bundle->BundleId()] = iter->first; + allocated_resources[iter->first].AddResources(required_resources); break; } } @@ -170,20 +186,23 @@ ScheduleMap GcsStrictSpreadStrategy::Schedule( // schedule bundles with special resource requirements first, which will be implemented // in the next pr. ScheduleMap schedule_map; - auto candidate_nodes = context->node_manager_.GetClusterRealtimeResources(); + const auto &candidate_nodes = context->cluster_resources_; // The number of bundles is more than the number of nodes, scheduling fails. if (bundles.size() > candidate_nodes.size()) { return schedule_map; } + absl::flat_hash_map allocated_resources; for (const auto &bundle : bundles) { const auto &required_resources = bundle->GetRequiredResources(); auto iter = candidate_nodes.begin(); for (; iter != candidate_nodes.end(); ++iter) { - if (required_resources.IsSubset(iter->second)) { + if (!allocated_resources.contains(iter->first) && + IsAvailableResourceSufficient(iter->second, allocated_resources[iter->first], + required_resources)) { schedule_map[bundle->BundleId()] = iter->first; - candidate_nodes.erase(iter); + allocated_resources[iter->first].AddResources(required_resources); break; } } @@ -518,8 +537,9 @@ std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext( auto &bundle_locations = committed_bundle_location_index_.GetBundleLocations(placement_group_id); - return std::unique_ptr(new ScheduleContext( - std::move(node_to_bundles), bundle_locations, gcs_node_manager_)); + return std::unique_ptr( + new ScheduleContext(std::move(node_to_bundles), bundle_locations, + gcs_resource_manager_.GetClusterResources())); } absl::flat_hash_map> @@ -609,7 +629,17 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources( void BundleLocationIndex::AddBundleLocations( const PlacementGroupID &placement_group_id, std::shared_ptr bundle_locations) { - placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations); + // Update `placement_group_to_bundle_locations_`. + // The placement group may be scheduled several times to succeed, so we need to merge + // `bundle_locations` instead of covering it directly. + auto iter = placement_group_to_bundle_locations_.find(placement_group_id); + if (iter == placement_group_to_bundle_locations_.end()) { + placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations); + } else { + iter->second->insert(bundle_locations->begin(), bundle_locations->end()); + } + + // Update `node_to_leased_bundles_`. for (auto iter : *bundle_locations) { const auto &node_id = iter.second.first; if (!node_to_leased_bundles_.contains(node_id)) { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 2444c4804..a57441f89 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -18,6 +18,7 @@ #include "ray/common/id.h" #include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" +#include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/raylet_client/raylet_client.h" #include "ray/rpc/node_manager/node_manager_client.h" @@ -87,17 +88,17 @@ class ScheduleContext { public: ScheduleContext(std::shared_ptr> node_to_bundles, const absl::optional> bundle_locations, - const GcsNodeManager &node_manager) + const absl::flat_hash_map &cluster_resources) : node_to_bundles_(std::move(node_to_bundles)), bundle_locations_(bundle_locations), - node_manager_(node_manager) {} + cluster_resources_(cluster_resources) {} // Key is node id, value is the number of bundles on the node. const std::shared_ptr> node_to_bundles_; // The locations of existing bundles for this placement group. const absl::optional> bundle_locations_; - - const GcsNodeManager &node_manager_; + // The available resources of all nodes. + const absl::flat_hash_map &cluster_resources_; }; class GcsScheduleStrategy { @@ -106,6 +107,18 @@ class GcsScheduleStrategy { virtual ScheduleMap Schedule( std::vector> &bundles, const std::unique_ptr &context) = 0; + + protected: + /// Judge whether the remaining resources are sufficient for allocate. + /// + /// \param available_resources Total available resources. + /// \param allocated_resources Allocated resources. + /// \param to_allocate_resources Resources to be allocated. + /// \return True if allocated_resources + to_allocate_resources > available_resources. + /// False otherwise. + bool IsAvailableResourceSufficient(const ResourceSet &available_resources, + const ResourceSet &allocated_resources, + const ResourceSet &to_allocate_resources) const; }; /// The `GcsPackStrategy` is that pack all bundles in one node as much as possible. @@ -221,9 +234,9 @@ class LeaseStatusTracker { /// \return Location of bundles that succeed to prepare resources on a node. const std::shared_ptr &GetPreparedBundleLocations() const; - /// This method returns bundle locations that succeed to commit resources. + /// This method returns bundle locations that failed to commit resources. /// - /// \return Location of bundles that succeed to commit resources on a node. + /// \return Location of bundles that failed to commit resources on a node. const std::shared_ptr &GetUnCommittedBundleLocations() const; /// Return the leasing state. @@ -345,10 +358,12 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param io_context The main event loop. /// \param placement_group_info_accessor Used to flush placement_group info to storage. /// \param gcs_node_manager The node manager which is used when scheduling. + /// \param gcs_resource_manager The resource manager which is used when scheduling. + /// \param lease_client_factory Factory to create remote lease client. GcsPlacementGroupScheduler( boost::asio::io_context &io_context, std::shared_ptr gcs_table_storage, - const GcsNodeManager &gcs_node_manager, + const GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager, ReserveResourceClientFactoryFn lease_client_factory = nullptr); virtual ~GcsPlacementGroupScheduler() = default; @@ -490,6 +505,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// Reference of GcsNodeManager. const GcsNodeManager &gcs_node_manager_; + /// Reference of GcsResourceManager. + GcsResourceManager &gcs_resource_manager_; + /// The cached node clients which are used to communicate with raylet to lease workers. absl::flat_hash_map> remote_lease_clients_; diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc new file mode 100644 index 000000000..eeb5630f7 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -0,0 +1,58 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/gcs/gcs_server/gcs_resource_manager.h" + +namespace ray { +namespace gcs { + +const absl::flat_hash_map &GcsResourceManager::GetClusterResources() + const { + return cluster_resources_; +} + +void GcsResourceManager::UpdateResources(const NodeID &node_id, + const ResourceSet &resources) { + cluster_resources_[node_id] = resources; +} + +void GcsResourceManager::RemoveResources(const NodeID &node_id) { + cluster_resources_.erase(node_id); +} + +bool GcsResourceManager::AcquireResources(const NodeID &node_id, + const ResourceSet &required_resources) { + auto iter = cluster_resources_.find(node_id); + // + RAY_CHECK(iter != cluster_resources_.end()) << "Node " << node_id << " not exist."; + if (!required_resources.IsSubset(iter->second)) { + return false; + } + iter->second.SubtractResourcesStrict(required_resources); + return true; +} + +bool GcsResourceManager::ReleaseResources(const NodeID &node_id, + const ResourceSet &acquired_resources) { + auto iter = cluster_resources_.find(node_id); + if (iter != cluster_resources_.end()) { + iter->second.AddResources(acquired_resources); + } + // If node dead, we will not find the node. This is a normal scenario, so it returns + // true. + return true; +} + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h new file mode 100644 index 000000000..0a38d07ac --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -0,0 +1,86 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include "absl/container/flat_hash_map.h" +#include "ray/common/task/scheduling_resources.h" + +namespace ray { +namespace gcs { + +/// Gcs resource manager interface. +/// It is used for actor and placement group scheduling. +/// Non-thread safe. +class GcsResourceManagerInterface { + public: + virtual ~GcsResourceManagerInterface() {} + + /// Get the resources of all nodes in the cluster. + /// + /// \return The resources of all nodes in the cluster. + virtual const absl::flat_hash_map &GetClusterResources() const = 0; + + /// Update the resources of the specified node. + /// + /// \param node_id Id of a node. + /// \param resources Resources of a node. + virtual void UpdateResources(const NodeID &node_id, const ResourceSet &resources) = 0; + + /// Remove the resources of the specified node. + /// + /// \param node_id Id of a node. + virtual void RemoveResources(const NodeID &node_id) = 0; + + /// Acquire resources from the specified node. It will deduct directly from the node + /// resources. + /// + /// \param node_id Id of a node. + /// \param required_resources Resources to apply for. + /// \return True if acquire resources successfully. False otherwise. + virtual bool AcquireResources(const NodeID &node_id, + const ResourceSet &required_resources) = 0; + + /// Release the resources of the specified node. It will be added directly to the node + /// resources. + /// + /// \param node_id Id of a node. + /// \param acquired_resources Resources to release. + /// \return True if release resources successfully. False otherwise. + virtual bool ReleaseResources(const NodeID &node_id, + const ResourceSet &acquired_resources) = 0; +}; + +/// Gcs resource manager implementation. It obtains the available resources of nodes +/// through heartbeat reporting. Non-thread safe. +class GcsResourceManager : public GcsResourceManagerInterface { + public: + virtual ~GcsResourceManager() = default; + + const absl::flat_hash_map &GetClusterResources() const; + + void UpdateResources(const NodeID &node_id, const ResourceSet &resources); + + void RemoveResources(const NodeID &node_id); + + bool AcquireResources(const NodeID &node_id, const ResourceSet &required_resources); + + bool ReleaseResources(const NodeID &node_id, const ResourceSet &acquired_resources); + + private: + /// Map from node id to the resources of the node. + absl::flat_hash_map cluster_resources_; +}; + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 3d634fe60..a5874ceef 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -65,7 +65,10 @@ void GcsServer::Start() { } void GcsServer::DoStart(const GcsInitData &gcs_init_data) { - // Init gcs node_manager. + // Init gcs resource manager. + InitGcsResourceManager(); + + // Init gcs node manager. InitGcsNodeManager(gcs_init_data); // Init gcs job manager. @@ -134,7 +137,8 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { node_manager_io_service_.run(); })); gcs_node_manager_ = std::make_shared( - main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_); + main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_, + gcs_resource_manager_); // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); // Register service. @@ -143,6 +147,10 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { rpc_server_.RegisterService(*node_info_service_); } +void GcsServer::InitGcsResourceManager() { + gcs_resource_manager_ = std::make_shared(); +} + void GcsServer::InitGcsJobManager() { RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_)); @@ -197,7 +205,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_node_manager_); auto scheduler = std::make_shared( - main_service_, gcs_table_storage_, *gcs_node_manager_, + main_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_, /*lease_client_factory=*/ [this](const rpc::Address &address) { auto node_manager_worker_client = rpc::NodeManagerWorkerClient::make( @@ -285,6 +293,7 @@ void GcsServer::InstallEventListeners() { // node is removed from the GCS. gcs_placement_group_manager_->OnNodeDead(node_id); gcs_actor_manager_->OnNodeDead(node_id); + gcs_resource_manager_->RemoveResources(node_id); }); // Install worker event listener. diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index ef916151f..8d9d55c4b 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -17,6 +17,7 @@ #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_object_manager.h" #include "ray/gcs/gcs_server/gcs_redis_failure_detector.h" +#include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/redis_gcs_client.h" @@ -77,6 +78,9 @@ class GcsServer { /// Initialize gcs node manager. void InitGcsNodeManager(const GcsInitData &gcs_init_data); + /// Initialize gcs resource manager. + void InitGcsResourceManager(); + /// Initialize gcs job manager. void InitGcsJobManager(); @@ -127,6 +131,8 @@ class GcsServer { rpc::GrpcServer rpc_server_; /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. rpc::ClientCallManager client_call_manager_; + /// The gcs resource manager. + std::shared_ptr gcs_resource_manager_; /// The gcs node manager. std::shared_ptr gcs_node_manager_; /// The gcs redis failure detector. diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index 00460a8b2..6be265678 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -27,8 +27,10 @@ class GcsActorSchedulerTest : public ::testing::Test { worker_client_ = std::make_shared(); gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(redis_client_); - gcs_node_manager_ = std::make_shared( - io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); + gcs_resource_manager_ = std::make_shared(); + gcs_node_manager_ = + std::make_shared(io_service_, io_service_, gcs_pub_sub_, + gcs_table_storage_, gcs_resource_manager_); store_client_ = std::make_shared(io_service_); gcs_actor_table_ = std::make_shared(store_client_); @@ -54,6 +56,7 @@ class GcsActorSchedulerTest : public ::testing::Test { std::shared_ptr gcs_actor_table_; std::shared_ptr raylet_client_; std::shared_ptr worker_client_; + std::shared_ptr gcs_resource_manager_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_actor_scheduler_; std::vector> success_actors_; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index a9dfaa9a0..17ea70e8d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -29,12 +29,13 @@ class GcsNodeManagerTest : public ::testing::Test { std::shared_ptr gcs_pub_sub_; std::shared_ptr redis_client_; std::shared_ptr gcs_table_storage_; + std::shared_ptr gcs_resource_manager_; }; TEST_F(GcsNodeManagerTest, TestManagement) { boost::asio::io_service io_service; gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_, - gcs_table_storage_); + gcs_table_storage_, gcs_resource_manager_); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); @@ -49,7 +50,7 @@ TEST_F(GcsNodeManagerTest, TestManagement) { TEST_F(GcsNodeManagerTest, TestListener) { boost::asio::io_service io_service; gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_, - gcs_table_storage_); + gcs_table_storage_, gcs_resource_manager_); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; @@ -86,30 +87,6 @@ TEST_F(GcsNodeManagerTest, TestListener) { } } -TEST_F(GcsNodeManagerTest, TestGetClusterRealtimeResources) { - boost::asio::io_service io_service; - gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_, - gcs_table_storage_); - - auto node_id = NodeID::FromRandom(); - rpc::HeartbeatTableData heartbeat; - const std::string cpu_resource = "CPU"; - (*heartbeat.mutable_resources_available())[cpu_resource] = 10; - node_manager.UpdateNodeRealtimeResources(node_id, heartbeat); - auto node_resources = node_manager.GetClusterRealtimeResources(); - - ResourceSet required_resources; - required_resources.AddOrUpdateResource(cpu_resource, 9); - ASSERT_TRUE(required_resources.IsSubset(node_resources[node_id])); - required_resources.AddOrUpdateResource(cpu_resource, 10); - ASSERT_TRUE(required_resources.IsSubset(node_resources[node_id])); - required_resources.AddOrUpdateResource(cpu_resource, 10.1); - ASSERT_FALSE(required_resources.IsSubset(node_resources[node_id])); - required_resources.DeleteResource(cpu_resource); - required_resources.AddOrUpdateResource("GPU", 9); - ASSERT_FALSE(required_resources.IsSubset(node_resources[node_id])); -} - } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc index cd6f143fe..6d4484c7d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc @@ -54,8 +54,10 @@ class GcsObjectManagerTest : public ::testing::Test { public: void SetUp() override { gcs_table_storage_ = std::make_shared(io_service_); - gcs_node_manager_ = std::make_shared( - io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); + gcs_resource_manager_ = std::make_shared(); + gcs_node_manager_ = + std::make_shared(io_service_, io_service_, gcs_pub_sub_, + gcs_table_storage_, gcs_resource_manager_); gcs_object_manager_ = std::make_shared( gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_); GenTestData(); @@ -83,6 +85,7 @@ class GcsObjectManagerTest : public ::testing::Test { protected: boost::asio::io_service io_service_; + std::shared_ptr gcs_resource_manager_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_client_; std::shared_ptr gcs_pub_sub_; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 9f6050ace..80b83ea81 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -68,8 +68,10 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { : mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) { gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(io_service_); - gcs_node_manager_ = std::make_shared( - io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); + gcs_resource_manager_ = std::make_shared(); + gcs_node_manager_ = + std::make_shared(io_service_, io_service_, gcs_pub_sub_, + gcs_table_storage_, gcs_resource_manager_); gcs_placement_group_manager_.reset( new gcs::GcsPlacementGroupManager(io_service_, mock_placement_group_scheduler_, gcs_table_storage_, *gcs_node_manager_)); @@ -103,6 +105,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { std::unique_ptr thread_io_service_; boost::asio::io_service io_service_; std::shared_ptr gcs_table_storage_; + std::shared_ptr gcs_resource_manager_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_pub_sub_; std::shared_ptr redis_client_; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 021ccdef4..7ecb34fe0 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -40,12 +40,14 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { } gcs_table_storage_ = std::make_shared(io_service_); gcs_pub_sub_ = std::make_shared(redis_client_); - gcs_node_manager_ = std::make_shared( - io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_); + gcs_resource_manager_ = std::make_shared(); + gcs_node_manager_ = + std::make_shared(io_service_, io_service_, gcs_pub_sub_, + gcs_table_storage_, gcs_resource_manager_); gcs_table_storage_ = std::make_shared(io_service_); store_client_ = std::make_shared(io_service_); scheduler_ = std::make_shared( - io_service_, gcs_table_storage_, *gcs_node_manager_, + io_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_, /*lease_client_fplacement_groupy=*/ [this](const rpc::Address &address) { return raylet_clients_[address.port()]; }); } @@ -98,6 +100,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { void AddNode(const std::shared_ptr &node, int cpu_num = 10) { gcs_node_manager_->AddNode(node); rpc::HeartbeatTableData heartbeat; + heartbeat.set_client_id(node->node_id()); (*heartbeat.mutable_resources_available())["CPU"] = cpu_num; gcs_node_manager_->UpdateNodeRealtimeResources(NodeID::FromBinary(node->node_id()), heartbeat); @@ -202,6 +205,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { std::shared_ptr store_client_; std::vector> raylet_clients_; + std::shared_ptr gcs_resource_manager_; std::shared_ptr gcs_node_manager_; std::shared_ptr scheduler_; std::vector> success_placement_groups_ diff --git a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc new file mode 100644 index 000000000..5e3343d3e --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc @@ -0,0 +1,63 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "gtest/gtest.h" +#include "ray/gcs/gcs_server/gcs_resource_manager.h" +#include "ray/gcs/test/gcs_test_util.h" + +namespace ray { + +using ::testing::_; + +class GcsResourceManagerTest : public ::testing::Test { + public: + GcsResourceManagerTest() { + gcs_resource_manager_ = std::make_shared(); + } + + std::shared_ptr gcs_resource_manager_; +}; + +TEST_F(GcsResourceManagerTest, TestBasic) { + // Add node resources. + auto node_id = NodeID::FromRandom(); + const std::string cpu_resource = "CPU"; + std::unordered_map resource_map; + resource_map[cpu_resource] = 10; + ResourceSet resource_set(resource_map); + gcs_resource_manager_->UpdateResources(node_id, resource_set); + + // Get and check cluster resources. + const auto &cluster_resource = gcs_resource_manager_->GetClusterResources(); + ASSERT_EQ(1, cluster_resource.size()); + + // Test `AcquireResources`. + ASSERT_TRUE(gcs_resource_manager_->AcquireResources(node_id, resource_set)); + ASSERT_FALSE(gcs_resource_manager_->AcquireResources(node_id, resource_set)); + + // Test `ReleaseResources`. + ASSERT_TRUE( + gcs_resource_manager_->ReleaseResources(NodeID::FromRandom(), resource_set)); + ASSERT_TRUE(gcs_resource_manager_->ReleaseResources(node_id, resource_set)); + ASSERT_TRUE(gcs_resource_manager_->AcquireResources(node_id, resource_set)); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 4e482b55a..4477a1ef5 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -25,6 +25,7 @@ #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_placement_group_manager.h" #include "ray/gcs/gcs_server/gcs_placement_group_scheduler.h" +#include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/util/asio_util.h" namespace ray {