diff --git a/BUILD.bazel b/BUILD.bazel index 126dc6b45..a863727ec 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1224,6 +1224,19 @@ cc_test( ], ) +cc_test( + name = "gcs_resource_scheduler_test", + srcs = [ + "src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc", + ], + copts = COPTS, + deps = [ + ":gcs_server_lib", + ":gcs_test_util_lib", + "@com_google_googletest//:gtest_main", + ], +) + cc_library( name = "service_based_gcs_client_lib", srcs = glob( diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index a77a32db2..b56f6b1d3 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -43,7 +43,7 @@ std::string GcsPlacementGroup::GetName() const { std::vector> GcsPlacementGroup::GetBundles() const { const auto &bundles = placement_group_table_data_.bundles(); std::vector> ret_bundles; - for (auto &bundle : bundles) { + for (const auto &bundle : bundles) { ret_bundles.push_back(std::make_shared(bundle)); } return ret_bundles; @@ -53,7 +53,7 @@ std::vector> GcsPlacementGroup::GetUnplaced const { const auto &bundles = placement_group_table_data_.bundles(); std::vector> unplaced_bundles; - for (auto &bundle : bundles) { + for (const auto &bundle : bundles) { if (NodeID::FromBinary(bundle.node_id()).IsNil()) { unplaced_bundles.push_back(std::make_shared(bundle)); } @@ -555,7 +555,7 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) { if (item.second.state() == rpc::PlacementGroupTableData::CREATED || item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { const auto &bundles = item.second.bundles(); - for (auto &bundle : bundles) { + for (const auto &bundle : bundles) { if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 13a7f38e6..9afe3d9e8 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -24,11 +24,13 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( boost::asio::io_context &io_context, std::shared_ptr gcs_table_storage, const gcs::GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager, + GcsResourceScheduler &gcs_resource_scheduler, std::shared_ptr raylet_client_pool) : return_timer_(io_context), gcs_table_storage_(std::move(gcs_table_storage)), gcs_node_manager_(gcs_node_manager), gcs_resource_manager_(gcs_resource_manager), + gcs_resource_scheduler_(gcs_resource_scheduler), raylet_client_pool_(raylet_client_pool) { scheduler_strategies_.push_back(std::make_shared()); scheduler_strategies_.push_back(std::make_shared()); @@ -36,199 +38,86 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( scheduler_strategies_.push_back(std::make_shared()); } -bool GcsScheduleStrategy::IsAvailableResourceSufficient( - const ResourceSet &available_resources, const ResourceSet &allocated_resources, - const ResourceSet &to_allocate_resources) const { - const auto &to_allocate_resource_capacity = - to_allocate_resources.GetResourceAmountMap(); - for (const auto &resource_pair : to_allocate_resource_capacity) { - const auto &resource_name = resource_pair.first; - const auto &lhs_quantity = resource_pair.second; - const auto &rhs_quantity = available_resources.GetResource(resource_name) - - allocated_resources.GetResource(resource_name); - if (lhs_quantity > rhs_quantity) { - // lhs capacity exceeds rhs capacity. - return false; +std::vector GcsScheduleStrategy::GetRequiredResourcesFromBundles( + const std::vector> &bundles) { + std::vector required_resources; + for (const auto &bundle : bundles) { + required_resources.push_back(bundle->GetRequiredResources()); + } + return required_resources; +} + +ScheduleMap GcsScheduleStrategy::GenerateScheduleMap( + const std::vector> &bundles, + const std::vector &selected_nodes) { + ScheduleMap schedule_map; + if (!selected_nodes.empty()) { + RAY_CHECK(bundles.size() == selected_nodes.size()); + int index = 0; + for (const auto &bundle : bundles) { + schedule_map[bundle->BundleId()] = selected_nodes[index++]; } } - return true; + return schedule_map; } ScheduleMap GcsStrictPackStrategy::Schedule( std::vector> &bundles, - const std::unique_ptr &context) { - // Aggregate required resources. - ResourceSet required_resources; - for (const auto &bundle : bundles) { - required_resources.AddResources(bundle->GetRequiredResources()); - } - - // Filter candidate nodes. - std::vector> candidate_nodes; - for (auto &node : context->cluster_resources_) { - if (required_resources.IsSubset(node.second.GetAvailableResources())) { - candidate_nodes.emplace_back((*context->node_to_bundles_)[node.first], node.first); - } - } - - // Select the node with the least number of bundles. - ScheduleMap schedule_map; - if (candidate_nodes.empty()) { - return schedule_map; - } - - std::sort( - std::begin(candidate_nodes), std::end(candidate_nodes), - [](const std::pair &left, - const std::pair &right) { return left.first < right.first; }); - - for (auto &bundle : bundles) { - schedule_map[bundle->BundleId()] = candidate_nodes.front().second; - } - return schedule_map; + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) { + const auto &required_resources = GetRequiredResourcesFromBundles(bundles); + const auto &selected_nodes = + gcs_resource_scheduler.Schedule(required_resources, SchedulingType::STRICT_PACK); + return GenerateScheduleMap(bundles, selected_nodes); } ScheduleMap GcsPackStrategy::Schedule( std::vector> &bundles, - const std::unique_ptr &context) { + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) { // The current algorithm is to select a node and deploy as many bundles as possible. // First fill up a node. If the node resource is insufficient, select a new node. // TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop. - ScheduleMap schedule_map; - absl::flat_hash_map allocated_resources; - for (const auto &bundle : bundles) { - const auto &required_resources = bundle->GetRequiredResources(); - for (const auto &node : context->cluster_resources_) { - if (IsAvailableResourceSufficient(node.second.GetAvailableResources(), - allocated_resources[node.first], - required_resources)) { - schedule_map[bundle->BundleId()] = node.first; - allocated_resources[node.first].AddResources(required_resources); - break; - } - } - } - - if (schedule_map.size() != bundles.size()) { - schedule_map.clear(); - } - return schedule_map; + const auto &required_resources = GetRequiredResourcesFromBundles(bundles); + const auto &selected_nodes = + gcs_resource_scheduler.Schedule(required_resources, SchedulingType::PACK); + return GenerateScheduleMap(bundles, selected_nodes); } ScheduleMap GcsSpreadStrategy::Schedule( std::vector> &bundles, - const std::unique_ptr &context) { - // When selecting nodes, if you traverse from the beginning each time, a large number of - // bundles will be deployed to the previous nodes. So we start with the next node of the - // last selected node. - ScheduleMap schedule_map; - const auto &candidate_nodes = context->cluster_resources_; - if (candidate_nodes.empty()) { - return schedule_map; - } - - absl::flat_hash_map allocated_resources; - auto iter = candidate_nodes.begin(); - auto iter_begin = iter; - for (const auto &bundle : bundles) { - const auto &required_resources = bundle->GetRequiredResources(); - // Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node - // that meets the resource requirements. `iter_begin` is the next node of the last - // selected node. - for (; iter != candidate_nodes.end(); ++iter) { - if (IsAvailableResourceSufficient(iter->second.GetAvailableResources(), - allocated_resources[iter->first], - required_resources)) { - schedule_map[bundle->BundleId()] = iter->first; - allocated_resources[iter->first].AddResources(required_resources); - break; - } - } - - // We've traversed all the nodes from `iter_begin` to `candidate_nodes.end()`, but we - // haven't found one that meets the requirements. - // If `iter_begin` is `candidate_nodes.begin()`, it means that all nodes are not - // satisfied, we will return directly. Otherwise, we will traverse the nodes from - // `candidate_nodes.begin()` to `iter_begin` to find the nodes that meet the - // requirements. - if (iter == candidate_nodes.end()) { - if (iter_begin != candidate_nodes.begin()) { - // Traverse all the nodes from `candidate_nodes.begin()` to `iter_begin`. - for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) { - if (IsAvailableResourceSufficient(iter->second.GetAvailableResources(), - allocated_resources[iter->first], - required_resources)) { - schedule_map[bundle->BundleId()] = iter->first; - allocated_resources[iter->first].AddResources(required_resources); - break; - } - } - if (iter == iter_begin) { - // We have traversed all the nodes, so return directly. - break; - } - } else { - // We have traversed all the nodes, so return directly. - break; - } - } - // NOTE: If `iter == candidate_nodes.end()`, ++iter causes crash. - iter_begin = ++iter; - } - - if (schedule_map.size() != bundles.size()) { - schedule_map.clear(); - } - return schedule_map; + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) { + const auto &required_resources = GetRequiredResourcesFromBundles(bundles); + const auto &selected_nodes = + gcs_resource_scheduler.Schedule(required_resources, SchedulingType::SPREAD); + return GenerateScheduleMap(bundles, selected_nodes); } ScheduleMap GcsStrictSpreadStrategy::Schedule( std::vector> &bundles, - const std::unique_ptr &context) { + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) { // TODO(ffbin): A bundle may require special resources, such as GPU. We need to // schedule bundles with special resource requirements first, which will be implemented // in the next pr. - ScheduleMap schedule_map; - const auto &candidate_nodes = context->cluster_resources_; - - // The number of bundles is more than the number of nodes, scheduling fails. - if (bundles.size() > candidate_nodes.size()) { - return schedule_map; - } // Filter out the nodes already scheduled by this placement group. - absl::flat_hash_map allocated_resources; + absl::flat_hash_set nodes_in_use; if (context->bundle_locations_.has_value()) { const auto &bundle_locations = context->bundle_locations_.value(); for (auto &bundle : *bundle_locations) { - allocated_resources[bundle.second.first] = ResourceSet(); + nodes_in_use.insert(bundle.second.first); } } - for (const auto &bundle : bundles) { - const auto &required_resources = bundle->GetRequiredResources(); - auto iter = candidate_nodes.begin(); - for (; iter != candidate_nodes.end(); ++iter) { - if (!allocated_resources.contains(iter->first) && - IsAvailableResourceSufficient(iter->second.GetAvailableResources(), - allocated_resources[iter->first], - required_resources)) { - schedule_map[bundle->BundleId()] = iter->first; - allocated_resources[iter->first].AddResources(required_resources); - break; - } - } - - // Node resource is not satisfied, scheduling failed. - if (iter == candidate_nodes.end()) { - break; - } - } - - if (schedule_map.size() != bundles.size()) { - schedule_map.clear(); - } - return schedule_map; + const auto &required_resources = GetRequiredResourcesFromBundles(bundles); + const auto &selected_nodes = gcs_resource_scheduler.Schedule( + required_resources, SchedulingType::STRICT_SPREAD, + /*node_filter_func=*/[&nodes_in_use](const NodeID &node_id) { + return nodes_in_use.count(node_id) == 0; + }); + return GenerateScheduleMap(bundles, selected_nodes); } void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( @@ -253,7 +142,8 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( << ", id: " << placement_group->GetPlacementGroupID() << ", bundles size = " << bundles.size(); auto selected_nodes = scheduler_strategies_[strategy]->Schedule( - bundles, GetScheduleContext(placement_group->GetPlacementGroupID())); + bundles, GetScheduleContext(placement_group->GetPlacementGroupID()), + gcs_resource_scheduler_); // If no nodes are available, scheduling fails. if (selected_nodes.empty()) { @@ -271,7 +161,7 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( .second); /// TODO(AlisaWu): Change the strategy when reserve resource failed. - for (auto &bundle : bundles) { + for (const auto &bundle : bundles) { const auto &bundle_id = bundle->BundleId(); const auto &node_id = selected_nodes[bundle_id]; lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle); @@ -564,8 +454,7 @@ std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext( auto &bundle_locations = committed_bundle_location_index_.GetBundleLocations(placement_group_id); return std::unique_ptr( - new ScheduleContext(std::move(node_to_bundles), bundle_locations, - gcs_resource_manager_.GetClusterResources())); + new ScheduleContext(std::move(node_to_bundles), bundle_locations)); } absl::flat_hash_map> diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index a604513a7..eb568b205 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -19,6 +19,7 @@ #include "ray/gcs/accessor.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_resource_manager.h" +#include "ray/gcs/gcs_server/gcs_resource_scheduler.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/raylet_client/raylet_client.h" #include "ray/rpc/node_manager/node_manager_client.h" @@ -89,20 +90,15 @@ class GcsPlacementGroupSchedulerInterface { /// ScheduleContext provides information that are needed for bundle scheduling decision. class ScheduleContext { public: - ScheduleContext( - std::shared_ptr> node_to_bundles, - const absl::optional> bundle_locations, - const absl::flat_hash_map &cluster_resources) + ScheduleContext(std::shared_ptr> node_to_bundles, + const absl::optional> bundle_locations) : node_to_bundles_(std::move(node_to_bundles)), - bundle_locations_(bundle_locations), - cluster_resources_(cluster_resources) {} + bundle_locations_(bundle_locations) {} // Key is node id, value is the number of bundles on the node. const std::shared_ptr> node_to_bundles_; // The locations of existing bundles for this placement group. const absl::optional> bundle_locations_; - // The available resources of all nodes. - const absl::flat_hash_map &cluster_resources_; }; class GcsScheduleStrategy { @@ -110,19 +106,25 @@ class GcsScheduleStrategy { virtual ~GcsScheduleStrategy() {} virtual ScheduleMap Schedule( std::vector> &bundles, - const std::unique_ptr &context) = 0; + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) = 0; protected: - /// Judge whether the remaining resources are sufficient for allocate. + /// Get required resources from bundles. /// - /// \param available_resources Total available resources. - /// \param allocated_resources Allocated resources. - /// \param to_allocate_resources Resources to be allocated. - /// \return True if allocated_resources + to_allocate_resources > available_resources. - /// False otherwise. - bool IsAvailableResourceSufficient(const ResourceSet &available_resources, - const ResourceSet &allocated_resources, - const ResourceSet &to_allocate_resources) const; + /// \param bundles Bundles to be scheduled. + /// \return Required resources. + std::vector GetRequiredResourcesFromBundles( + const std::vector> &bundles); + + /// Generate `ScheduleMap` from bundles and nodes . + /// + /// \param bundles Bundles to be scheduled. + /// \param selected_nodes selected_nodes to be scheduled. + /// \return Required resources. + ScheduleMap GenerateScheduleMap( + const std::vector> &bundles, + const std::vector &selected_nodes); }; /// The `GcsPackStrategy` is that pack all bundles in one node as much as possible. @@ -131,14 +133,16 @@ class GcsScheduleStrategy { class GcsPackStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context) override; + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; /// The `GcsSpreadStrategy` is that spread all bundles in different nodes. class GcsSpreadStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context) override; + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; /// The `GcsStrictPackStrategy` is that all bundles must be scheduled to one node. If one @@ -146,7 +150,8 @@ class GcsSpreadStrategy : public GcsScheduleStrategy { class GcsStrictPackStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context) override; + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; /// The `GcsStrictSpreadStrategy` is that spread all bundles in different nodes. @@ -155,7 +160,8 @@ class GcsStrictPackStrategy : public GcsScheduleStrategy { class GcsStrictSpreadStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, - const std::unique_ptr &context) override; + const std::unique_ptr &context, + GcsResourceScheduler &gcs_resource_scheduler) override; }; enum class LeasingState { @@ -381,11 +387,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param placement_group_info_accessor Used to flush placement_group info to storage. /// \param gcs_node_manager The node manager which is used when scheduling. /// \param gcs_resource_manager The resource manager which is used when scheduling. + /// \param gcs_resource_scheduler The resource scheduler which is used when scheduling. /// \param lease_client_factory Factory to create remote lease client. GcsPlacementGroupScheduler( boost::asio::io_context &io_context, std::shared_ptr gcs_table_storage, const GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager, + GcsResourceScheduler &gcs_resource_scheduler, std::shared_ptr raylet_client_pool); virtual ~GcsPlacementGroupScheduler() = default; @@ -536,6 +544,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// Reference of GcsResourceManager. GcsResourceManager &gcs_resource_manager_; + /// Reference of GcsResourceScheduler. + GcsResourceScheduler &gcs_resource_scheduler_; + /// A vector to store all the schedule strategy. std::vector> scheduler_strategies_; diff --git a/src/ray/gcs/gcs_server/gcs_resource_scheduler.cc b/src/ray/gcs/gcs_server/gcs_resource_scheduler.cc new file mode 100644 index 000000000..26f83084b --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_resource_scheduler.cc @@ -0,0 +1,303 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/gcs/gcs_server/gcs_resource_scheduler.h" + +namespace ray { +namespace gcs { + +double LeastResourceScorer::Score(const ResourceSet &required_resources, + const SchedulingResources &node_resources) { + const auto &available_resources = node_resources.GetAvailableResources(); + const auto &available_resource_amount_map = available_resources.GetResourceAmountMap(); + + double node_score = 0.0; + for (const auto &entry : required_resources.GetResourceAmountMap()) { + auto available_resource_amount_iter = available_resource_amount_map.find(entry.first); + if (available_resource_amount_iter == available_resource_amount_map.end()) { + return -1; + } + + auto calculated_score = + Calculate(entry.second, available_resource_amount_iter->second); + if (calculated_score < 0) { + return -1; + } + node_score += calculated_score; + } + + // TODO(ffbin): We always want to choose the node with the least matching resources. We + // will solve it in next pr. + return node_score; +} + +double LeastResourceScorer::Calculate(const FractionalResourceQuantity &requested, + const FractionalResourceQuantity &available) { + RAY_CHECK(available >= 0) << "Available resource " << available.ToDouble() + << " should be nonnegative."; + if (requested > available) { + return -1; + } + return (available - requested).ToDouble() / available.ToDouble(); +} + +///////////////////////////////////////////////////////////////////////////////////////// + +std::vector GcsResourceScheduler::Schedule( + const std::vector &required_resources_list, + const SchedulingType &scheduling_type, + const std::function &node_filter_func) { + const auto &cluster_resources = gcs_resource_manager_.GetClusterResources(); + + // Filter candidate nodes. + absl::flat_hash_set candidate_nodes = + FilterCandidateNodes(cluster_resources, node_filter_func); + if (candidate_nodes.empty()) { + RAY_LOG(DEBUG) << "The candidate nodes is empty, return directly."; + return {}; + } + + // First schedule scarce resources (such as GPU) and large capacity resources to improve + // the scheduling success rate. + const auto &to_schedule_resources = SortRequiredResources(required_resources_list); + + // Score and rank nodes. + std::vector result; + switch (scheduling_type) { + case SPREAD: + result = SpreadSchedule(to_schedule_resources, candidate_nodes); + break; + case STRICT_SPREAD: + result = StrictSpreadSchedule(to_schedule_resources, candidate_nodes); + break; + case PACK: + result = PackSchedule(to_schedule_resources, candidate_nodes); + break; + case STRICT_PACK: + result = StrictPackSchedule(to_schedule_resources, candidate_nodes); + break; + default: + RAY_LOG(FATAL) << "Unsupported scheduling type: " << scheduling_type; + break; + } + return result; +} + +absl::flat_hash_set GcsResourceScheduler::FilterCandidateNodes( + const absl::flat_hash_map &cluster_resources, + const std::function &node_filter_func) { + absl::flat_hash_set result; + for (const auto &iter : cluster_resources) { + const auto &node_id = iter.first; + if (node_filter_func == nullptr || node_filter_func(node_id)) { + result.emplace(node_id); + } + } + return result; +} + +const std::vector &GcsResourceScheduler::SortRequiredResources( + const std::vector &required_resources) { + // TODO(ffbin): A bundle may require special resources, such as GPU. We need to + // schedule bundles with special resource requirements first, which will be implemented + // in the next pr. + return required_resources; +} + +std::vector GcsResourceScheduler::StrictSpreadSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes) { + std::vector result; + if (required_resources_list.size() > candidate_nodes.size()) { + RAY_LOG(DEBUG) << "The number of required resources " + << required_resources_list.size() + << " is greater than the number of candidate nodes " + << candidate_nodes.size() << ", scheduling fails."; + return result; + } + + absl::flat_hash_set candidate_nodes_copy(candidate_nodes); + for (const auto &iter : required_resources_list) { + // Score and sort nodes. + const auto &node_scores = ScoreNodes(iter, candidate_nodes_copy); + + // There are nodes to meet the scheduling requirements. + if (!node_scores.empty() && node_scores.front().second >= 0) { + const auto &highest_score_node_id = node_scores.front().first; + result.push_back(highest_score_node_id); + candidate_nodes_copy.erase(highest_score_node_id); + } else { + // There is no node to meet the scheduling requirements. + break; + } + } + + if (result.size() != required_resources_list.size()) { + // Unable to meet the resources required for scheduling, scheduling failed. + result.clear(); + } + return result; +} + +std::vector GcsResourceScheduler::SpreadSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes) { + std::vector result; + absl::flat_hash_set candidate_nodes_copy(candidate_nodes); + absl::flat_hash_set selected_nodes; + for (const auto &iter : required_resources_list) { + // Score and sort nodes. + const auto &node_scores = ScoreNodes(iter, candidate_nodes_copy); + + // There are nodes to meet the scheduling requirements. + if (!node_scores.empty() && node_scores.front().second >= 0) { + const auto &highest_score_node_id = node_scores.front().first; + result.push_back(highest_score_node_id); + RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter)); + candidate_nodes_copy.erase(highest_score_node_id); + selected_nodes.insert(highest_score_node_id); + } else { + // Scheduling from selected nodes. + const auto &node_scores = ScoreNodes(iter, selected_nodes); + if (!node_scores.empty() && node_scores.front().second >= 0) { + const auto &highest_score_node_id = node_scores.front().first; + result.push_back(highest_score_node_id); + RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter)); + } else { + break; + } + } + } + + // Releasing the resources temporarily deducted from `gcs_resource_manager_`. + ReleaseTemporarilyDeductedResources(required_resources_list, result); + + if (result.size() != required_resources_list.size()) { + // Unable to meet the resources required for scheduling, scheduling failed. + result.clear(); + } + return result; +} + +std::vector GcsResourceScheduler::StrictPackSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes) { + std::vector result; + + // Aggregate required resources. + ResourceSet required_resources; + for (const auto &iter : required_resources_list) { + required_resources.AddResources(iter); + } + + // Score and sort nodes. + const auto &node_scores = ScoreNodes(required_resources, candidate_nodes); + + // Select the node with the highest score. + // `StrictPackSchedule` does not need to consider the scheduling context, because it + // only schedules to a node and triggers rescheduling when node dead. + if (!node_scores.empty() && node_scores.front().second >= 0) { + for (int index = 0; index < (int)required_resources_list.size(); ++index) { + result.push_back(node_scores.front().first); + } + } + return result; +} + +std::vector GcsResourceScheduler::PackSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes) { + std::vector result; + result.resize(required_resources_list.size()); + absl::flat_hash_set candidate_nodes_copy(candidate_nodes); + std::list> required_resources_list_copy; + int index = 0; + for (const auto &iter : required_resources_list) { + required_resources_list_copy.emplace_back(index++, iter); + } + + while (!required_resources_list_copy.empty()) { + // Score and sort nodes. + const auto &required_resources_index = required_resources_list_copy.front().first; + const auto &required_resources = required_resources_list_copy.front().second; + const auto &node_scores = ScoreNodes(required_resources, candidate_nodes_copy); + if (node_scores.empty() || node_scores.front().second < 0) { + // There is no node to meet the scheduling requirements. + break; + } + + const auto &highest_score_node_id = node_scores.front().first; + RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, + required_resources)); + result[required_resources_index] = highest_score_node_id; + required_resources_list_copy.pop_front(); + + // We try to schedule more resources on one node. + for (auto iter = required_resources_list_copy.begin(); + iter != required_resources_list_copy.end();) { + if (gcs_resource_manager_.AcquireResources(highest_score_node_id, iter->second)) { + result[iter->first] = highest_score_node_id; + required_resources_list_copy.erase(iter++); + } else { + ++iter; + } + } + candidate_nodes_copy.erase(highest_score_node_id); + } + + // Releasing the resources temporarily deducted from `gcs_resource_manager_`. + ReleaseTemporarilyDeductedResources(required_resources_list, result); + + if (!required_resources_list_copy.empty()) { + // Unable to meet the resources required for scheduling, scheduling failed. + result.clear(); + } + return result; +} + +std::list GcsResourceScheduler::ScoreNodes( + const ResourceSet &required_resources, + const absl::flat_hash_set &candidate_nodes) { + std::list node_scores; + const auto &cluster_resources = gcs_resource_manager_.GetClusterResources(); + + // Score the nodes. + for (const auto &node_id : candidate_nodes) { + const auto &iter = cluster_resources.find(node_id); + RAY_CHECK(iter != cluster_resources.end()); + double node_score = node_scorer_->Score(required_resources, iter->second); + node_scores.emplace_back(node_id, node_score); + } + + // Sort node scores, the large score is in the front. + node_scores.sort([](const NodeScore &left, const NodeScore &right) { + return right.second < left.second; + }); + return node_scores; +} + +void GcsResourceScheduler::ReleaseTemporarilyDeductedResources( + const std::vector &required_resources_list, + const std::vector &nodes) { + for (int index = 0; index < (int)nodes.size(); index++) { + // If `PackSchedule` fails, the id of some nodes may be nil. + if (!nodes[index].IsNil()) { + RAY_CHECK(gcs_resource_manager_.ReleaseResources(nodes[index], + required_resources_list[index])); + } + } +} + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_resource_scheduler.h b/src/ray/gcs/gcs_server/gcs_resource_scheduler.h new file mode 100644 index 000000000..4ad1b42aa --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_resource_scheduler.h @@ -0,0 +1,183 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "absl/container/flat_hash_set.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/gcs/gcs_server/gcs_resource_manager.h" + +namespace ray { +namespace gcs { + +// Type of resource scheduling strategy. +enum SchedulingType { + // Places resources across distinct nodes as even as possible. + SPREAD = 0, + // Places resources across distinct nodes. + // It is not allowed to deploy more than one resource on a node. + STRICT_SPREAD = 1, + // Packs resources into as few nodes as possible. + PACK = 2, + // Packs resources within one node. It is not allowed to span multiple nodes. + STRICT_PACK = 3, + SchedulingType_MAX = 4, +}; + +typedef std::pair NodeScore; + +/// NodeScorer is a scorer to make a grade to the node, which is used for scheduling +/// decision. +class NodeScorer { + public: + virtual ~NodeScorer() = default; + + /// \brief Score according to node resources. + /// + /// \param required_resources The required resources. + /// \param node_resources The node resources which contains available and total + /// resources. + /// \return Score of the node. + virtual double Score(const ResourceSet &required_resources, + const SchedulingResources &node_resources) = 0; +}; + +/// LeastResourceScorer is a score plugin that favors nodes with fewer allocation +/// requested resources based on requested resources. +class LeastResourceScorer : public NodeScorer { + public: + double Score(const ResourceSet &required_resources, + const SchedulingResources &node_resources) override; + + private: + /// \brief Calculate one of the resource scores. + /// + /// \param requested Quantity of one of the required resources. + /// \param available Quantity of one of the available resources. + /// \return Score of the node. + double Calculate(const FractionalResourceQuantity &requested, + const FractionalResourceQuantity &available); +}; + +/// Gcs resource scheduler implementation. +/// Non-thread safe. +class GcsResourceScheduler { + public: + GcsResourceScheduler(GcsResourceManager &gcs_resource_manager) + : gcs_resource_manager_(gcs_resource_manager), + node_scorer_(new LeastResourceScorer()) {} + + virtual ~GcsResourceScheduler() = default; + + /// Schedule the specified resources to the cluster nodes. + /// + /// \param required_resources_list The resources to be scheduled. + /// \param scheduling_type This scheduling strategy. + /// \param node_filter_func This function is used to filter candidate nodes. If a node + /// returns true, it can be used for scheduling. By default, all nodes in the cluster + /// can be used for scheduling. + /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one + /// by one. If the scheduling fails, an empty vector is returned. + std::vector Schedule( + const std::vector &required_resources_list, + const SchedulingType &scheduling_type, + const std::function &node_filter_func = nullptr); + + private: + /// Filter out candidate nodes which can be used for scheduling. + /// + /// \param cluster_resources The cluster node resources. + /// \param node_filter_func This function is used to filter candidate nodes. If a node + /// returns true, it can be used for scheduling. By default, all nodes in the cluster + /// can be used for scheduling. + /// \return The candidate nodes which can be used for scheduling. + absl::flat_hash_set FilterCandidateNodes( + const absl::flat_hash_map &cluster_resources, + const std::function &node_filter_func); + + /// Sort required resources according to the scarcity and capacity of resources. + /// We will first schedule scarce resources (such as GPU) and large capacity resources + /// to improve the scheduling success rate. + /// + /// \param required_resources The resources to be scheduled. + /// \return The Sorted resources. + const std::vector &SortRequiredResources( + const std::vector &required_resources); + + /// Schedule resources according to `STRICT_SPREAD` strategy. + /// + /// \param required_resources_list The resources to be scheduled. + /// \param candidate_nodes The nodes can be used for scheduling. + /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one + /// by one. If the scheduling fails, an empty vector is returned. + std::vector StrictSpreadSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes); + + /// Schedule resources according to `SPREAD` strategy. + /// + /// \param required_resources_list The resources to be scheduled. + /// \param candidate_nodes The nodes can be used for scheduling. + /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one + /// by one. If the scheduling fails, an empty vector is returned. + std::vector SpreadSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes); + + /// Schedule resources according to `STRICT_PACK` strategy. + /// + /// \param required_resources_list The resources to be scheduled. + /// \param candidate_nodes The nodes can be used for scheduling. + /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one + /// by one. If the scheduling fails, an empty vector is returned. + std::vector StrictPackSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes); + + /// Schedule resources according to `PACK` strategy. + /// + /// \param required_resources_list The resources to be scheduled. + /// \param candidate_nodes The nodes can be used for scheduling. + /// \return Scheduling selected nodes, it corresponds to `required_resources_list` one + /// by one. If the scheduling fails, an empty vector is returned. + std::vector PackSchedule( + const std::vector &required_resources_list, + const absl::flat_hash_set &candidate_nodes); + + /// Score all nodes according to the specified resources. + /// + /// \param required_resources The resources to be scheduled. + /// \param candidate_nodes The nodes can be used for scheduling. + /// \return Score of all nodes. + std::list ScoreNodes(const ResourceSet &required_resources, + const absl::flat_hash_set &candidate_nodes); + + /// Return the resources temporarily deducted from gcs resource manager. + /// + /// \param required_resources_list The resources to be scheduled. + /// \param nodes Scheduling selected nodes, it corresponds to `required_resources_list` + /// one by one. + void ReleaseTemporarilyDeductedResources( + const std::vector &required_resources_list, + const std::vector &nodes); + + /// Reference of GcsResourceManager. + GcsResourceManager &gcs_resource_manager_; + + /// Scorer to make a grade to the node. + std::unique_ptr node_scorer_; +}; + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index dba927143..c574565c5 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -69,6 +69,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Init gcs resource manager. InitGcsResourceManager(gcs_init_data); + // Init gcs resource scheduler. + InitGcsResourceScheduler(); + // Init gcs node manager. InitGcsNodeManager(gcs_init_data); @@ -171,6 +174,12 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { rpc_server_.RegisterService(*node_resource_info_service_); } +void GcsServer::InitGcsResourceScheduler() { + RAY_CHECK(gcs_resource_manager_); + gcs_resource_scheduler_ = + std::make_shared(*gcs_resource_manager_); +} + void GcsServer::InitGcsJobManager() { RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_)); @@ -223,7 +232,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { RAY_CHECK(gcs_table_storage_ && gcs_node_manager_); auto scheduler = std::make_shared( main_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_, - raylet_client_pool_); + *gcs_resource_scheduler_, raylet_client_pool_); gcs_placement_group_manager_ = std::make_shared( main_service_, scheduler, gcs_table_storage_, *gcs_resource_manager_); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 9a34afc3a..643e2bb5b 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -19,6 +19,7 @@ #include "ray/gcs/gcs_server/gcs_object_manager.h" #include "ray/gcs/gcs_server/gcs_redis_failure_detector.h" #include "ray/gcs/gcs_server/gcs_resource_manager.h" +#include "ray/gcs/gcs_server/gcs_resource_scheduler.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/redis_client.h" @@ -86,6 +87,9 @@ class GcsServer { /// Initialize gcs resource manager. void InitGcsResourceManager(const GcsInitData &gcs_init_data); + /// Initialize gcs resource scheduler. + void InitGcsResourceScheduler(); + /// Initialize gcs job manager. void InitGcsJobManager(); @@ -139,6 +143,8 @@ class GcsServer { std::shared_ptr raylet_client_pool_; /// The gcs resource manager. std::shared_ptr gcs_resource_manager_; + /// The gcs resource scheduler. + std::shared_ptr gcs_resource_scheduler_; /// The gcs node manager. std::shared_ptr gcs_node_manager_; /// The heartbeat manager. diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 0703d9497..ca08e85a6 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -41,6 +41,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { gcs_pub_sub_ = std::make_shared(redis_client_); gcs_resource_manager_ = std::make_shared(io_service_, nullptr, nullptr); + gcs_resource_scheduler_ = + std::make_shared(*gcs_resource_manager_); gcs_node_manager_ = std::make_shared(gcs_pub_sub_, gcs_table_storage_); gcs_table_storage_ = std::make_shared(io_service_); @@ -49,7 +51,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { [this](const rpc::Address &addr) { return raylet_clients_[addr.port()]; }); scheduler_ = std::make_shared( io_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_, - raylet_client_pool_); + *gcs_resource_scheduler_, raylet_client_pool_); } void TearDown() override { @@ -208,6 +210,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { std::vector> raylet_clients_; std::shared_ptr gcs_resource_manager_; + std::shared_ptr gcs_resource_scheduler_; std::shared_ptr gcs_node_manager_; std::shared_ptr scheduler_; std::vector> success_placement_groups_ diff --git a/src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc new file mode 100644 index 000000000..191f33cdc --- /dev/null +++ b/src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc @@ -0,0 +1,132 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "gtest/gtest.h" +#include "ray/gcs/gcs_server/gcs_resource_scheduler.h" +#include "ray/gcs/test/gcs_test_util.h" + +namespace ray { + +using ::testing::_; + +class GcsResourceSchedulerTest : public ::testing::Test { + public: + void SetUp() override { + gcs_resource_manager_ = + std::make_shared(io_service_, nullptr, nullptr); + gcs_resource_scheduler_ = + std::make_shared(*gcs_resource_manager_); + } + + void TearDown() override { + gcs_resource_scheduler_.reset(); + gcs_resource_manager_.reset(); + } + + void AddClusterResources(const NodeID &node_id, const std::string &resource_name, + double resource_value) { + std::unordered_map resource_map; + resource_map[resource_name] = resource_value; + gcs_resource_manager_->UpdateResourceCapacity(node_id, resource_map); + } + + void CheckClusterAvailableResources(const NodeID &node_id, + const std::string &resource_name, + double resource_value) { + const auto &cluster_resource = gcs_resource_manager_->GetClusterResources(); + auto iter = cluster_resource.find(node_id); + ASSERT_TRUE(iter != cluster_resource.end()); + ASSERT_EQ(iter->second.GetAvailableResources().GetResource(resource_name).ToDouble(), + resource_value); + } + + void TestResourceLeaks(const gcs::SchedulingType &scheduling_type) { + // Add node resources. + const auto &node_id = NodeID::FromRandom(); + const std::string cpu_resource = "CPU"; + const double node_cpu_num = 6.0; + AddClusterResources(node_id, cpu_resource, node_cpu_num); + + // Scheduling succeeded and node resources are used up. + std::vector required_resources_list; + std::unordered_map resource_map; + for (int bundle_cpu_num = 1; bundle_cpu_num <= 3; ++bundle_cpu_num) { + resource_map[cpu_resource] = bundle_cpu_num; + required_resources_list.emplace_back(resource_map); + } + const auto &result1 = + gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type); + ASSERT_EQ(result1.size(), 3); + + // Check for resource leaks. + CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num); + + // Scheduling failure. + resource_map[cpu_resource] = 5; + required_resources_list.emplace_back(resource_map); + const auto &result2 = + gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type); + ASSERT_EQ(result2.size(), 0); + + // Check for resource leaks. + CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num); + } + + std::shared_ptr gcs_resource_manager_; + std::shared_ptr gcs_resource_scheduler_; + + private: + boost::asio::io_service io_service_; +}; + +TEST_F(GcsResourceSchedulerTest, TestPackScheduleResourceLeaks) { + TestResourceLeaks(gcs::SchedulingType::PACK); +} + +TEST_F(GcsResourceSchedulerTest, TestSpreadScheduleResourceLeaks) { + TestResourceLeaks(gcs::SchedulingType::SPREAD); +} + +TEST_F(GcsResourceSchedulerTest, TestNodeFilter) { + // Add node resources. + const auto &node_id = NodeID::FromRandom(); + const std::string cpu_resource = "CPU"; + const double node_cpu_num = 10.0; + AddClusterResources(node_id, cpu_resource, node_cpu_num); + + // Scheduling failure. + std::vector required_resources_list; + std::unordered_map resource_map; + resource_map[cpu_resource] = 1; + required_resources_list.emplace_back(resource_map); + const auto &result1 = gcs_resource_scheduler_->Schedule( + required_resources_list, gcs::SchedulingType::STRICT_SPREAD, + [](const NodeID &) { return false; }); + ASSERT_EQ(result1.size(), 0); + + // Scheduling succeeded. + const auto &result2 = gcs_resource_scheduler_->Schedule( + required_resources_list, gcs::SchedulingType::STRICT_SPREAD, + [](const NodeID &) { return true; }); + ASSERT_EQ(result2.size(), 1); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}