[GCS]Add gcs resource scheduler (#13072)

This commit is contained in:
fangfengbin
2021-01-14 20:05:55 +08:00
committed by GitHub
parent b296642646
commit 33b092de28
10 changed files with 741 additions and 192 deletions
+13
View File
@@ -1224,6 +1224,19 @@ cc_test(
],
)
cc_test(
name = "gcs_resource_scheduler_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_resource_scheduler_test.cc",
],
copts = COPTS,
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "service_based_gcs_client_lib",
srcs = glob(
@@ -43,7 +43,7 @@ std::string GcsPlacementGroup::GetName() const {
std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetBundles() const {
const auto &bundles = placement_group_table_data_.bundles();
std::vector<std::shared_ptr<BundleSpecification>> ret_bundles;
for (auto &bundle : bundles) {
for (const auto &bundle : bundles) {
ret_bundles.push_back(std::make_shared<BundleSpecification>(bundle));
}
return ret_bundles;
@@ -53,7 +53,7 @@ std::vector<std::shared_ptr<BundleSpecification>> GcsPlacementGroup::GetUnplaced
const {
const auto &bundles = placement_group_table_data_.bundles();
std::vector<std::shared_ptr<BundleSpecification>> unplaced_bundles;
for (auto &bundle : bundles) {
for (const auto &bundle : bundles) {
if (NodeID::FromBinary(bundle.node_id()).IsNil()) {
unplaced_bundles.push_back(std::make_shared<BundleSpecification>(bundle));
}
@@ -555,7 +555,7 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
if (item.second.state() == rpc::PlacementGroupTableData::CREATED ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
const auto &bundles = item.second.bundles();
for (auto &bundle : bundles) {
for (const auto &bundle : bundles) {
if (!NodeID::FromBinary(bundle.node_id()).IsNil()) {
node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle);
}
@@ -24,11 +24,13 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
boost::asio::io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const gcs::GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager,
GcsResourceScheduler &gcs_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool)
: return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_node_manager_(gcs_node_manager),
gcs_resource_manager_(gcs_resource_manager),
gcs_resource_scheduler_(gcs_resource_scheduler),
raylet_client_pool_(raylet_client_pool) {
scheduler_strategies_.push_back(std::make_shared<GcsPackStrategy>());
scheduler_strategies_.push_back(std::make_shared<GcsSpreadStrategy>());
@@ -36,199 +38,86 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
scheduler_strategies_.push_back(std::make_shared<GcsStrictSpreadStrategy>());
}
bool GcsScheduleStrategy::IsAvailableResourceSufficient(
const ResourceSet &available_resources, const ResourceSet &allocated_resources,
const ResourceSet &to_allocate_resources) const {
const auto &to_allocate_resource_capacity =
to_allocate_resources.GetResourceAmountMap();
for (const auto &resource_pair : to_allocate_resource_capacity) {
const auto &resource_name = resource_pair.first;
const auto &lhs_quantity = resource_pair.second;
const auto &rhs_quantity = available_resources.GetResource(resource_name) -
allocated_resources.GetResource(resource_name);
if (lhs_quantity > rhs_quantity) {
// lhs capacity exceeds rhs capacity.
return false;
std::vector<ResourceSet> GcsScheduleStrategy::GetRequiredResourcesFromBundles(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles) {
std::vector<ResourceSet> required_resources;
for (const auto &bundle : bundles) {
required_resources.push_back(bundle->GetRequiredResources());
}
return required_resources;
}
ScheduleMap GcsScheduleStrategy::GenerateScheduleMap(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::vector<NodeID> &selected_nodes) {
ScheduleMap schedule_map;
if (!selected_nodes.empty()) {
RAY_CHECK(bundles.size() == selected_nodes.size());
int index = 0;
for (const auto &bundle : bundles) {
schedule_map[bundle->BundleId()] = selected_nodes[index++];
}
}
return true;
return schedule_map;
}
ScheduleMap GcsStrictPackStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) {
// Aggregate required resources.
ResourceSet required_resources;
for (const auto &bundle : bundles) {
required_resources.AddResources(bundle->GetRequiredResources());
}
// Filter candidate nodes.
std::vector<std::pair<int64_t, NodeID>> candidate_nodes;
for (auto &node : context->cluster_resources_) {
if (required_resources.IsSubset(node.second.GetAvailableResources())) {
candidate_nodes.emplace_back((*context->node_to_bundles_)[node.first], node.first);
}
}
// Select the node with the least number of bundles.
ScheduleMap schedule_map;
if (candidate_nodes.empty()) {
return schedule_map;
}
std::sort(
std::begin(candidate_nodes), std::end(candidate_nodes),
[](const std::pair<int64_t, NodeID> &left,
const std::pair<int64_t, NodeID> &right) { return left.first < right.first; });
for (auto &bundle : bundles) {
schedule_map[bundle->BundleId()] = candidate_nodes.front().second;
}
return schedule_map;
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::STRICT_PACK);
return GenerateScheduleMap(bundles, selected_nodes);
}
ScheduleMap GcsPackStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) {
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
// The current algorithm is to select a node and deploy as many bundles as possible.
// First fill up a node. If the node resource is insufficient, select a new node.
// TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop.
ScheduleMap schedule_map;
absl::flat_hash_map<NodeID, ResourceSet> allocated_resources;
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
for (const auto &node : context->cluster_resources_) {
if (IsAvailableResourceSufficient(node.second.GetAvailableResources(),
allocated_resources[node.first],
required_resources)) {
schedule_map[bundle->BundleId()] = node.first;
allocated_resources[node.first].AddResources(required_resources);
break;
}
}
}
if (schedule_map.size() != bundles.size()) {
schedule_map.clear();
}
return schedule_map;
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::PACK);
return GenerateScheduleMap(bundles, selected_nodes);
}
ScheduleMap GcsSpreadStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) {
// When selecting nodes, if you traverse from the beginning each time, a large number of
// bundles will be deployed to the previous nodes. So we start with the next node of the
// last selected node.
ScheduleMap schedule_map;
const auto &candidate_nodes = context->cluster_resources_;
if (candidate_nodes.empty()) {
return schedule_map;
}
absl::flat_hash_map<NodeID, ResourceSet> allocated_resources;
auto iter = candidate_nodes.begin();
auto iter_begin = iter;
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
// Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node
// that meets the resource requirements. `iter_begin` is the next node of the last
// selected node.
for (; iter != candidate_nodes.end(); ++iter) {
if (IsAvailableResourceSufficient(iter->second.GetAvailableResources(),
allocated_resources[iter->first],
required_resources)) {
schedule_map[bundle->BundleId()] = iter->first;
allocated_resources[iter->first].AddResources(required_resources);
break;
}
}
// We've traversed all the nodes from `iter_begin` to `candidate_nodes.end()`, but we
// haven't found one that meets the requirements.
// If `iter_begin` is `candidate_nodes.begin()`, it means that all nodes are not
// satisfied, we will return directly. Otherwise, we will traverse the nodes from
// `candidate_nodes.begin()` to `iter_begin` to find the nodes that meet the
// requirements.
if (iter == candidate_nodes.end()) {
if (iter_begin != candidate_nodes.begin()) {
// Traverse all the nodes from `candidate_nodes.begin()` to `iter_begin`.
for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) {
if (IsAvailableResourceSufficient(iter->second.GetAvailableResources(),
allocated_resources[iter->first],
required_resources)) {
schedule_map[bundle->BundleId()] = iter->first;
allocated_resources[iter->first].AddResources(required_resources);
break;
}
}
if (iter == iter_begin) {
// We have traversed all the nodes, so return directly.
break;
}
} else {
// We have traversed all the nodes, so return directly.
break;
}
}
// NOTE: If `iter == candidate_nodes.end()`, ++iter causes crash.
iter_begin = ++iter;
}
if (schedule_map.size() != bundles.size()) {
schedule_map.clear();
}
return schedule_map;
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::SPREAD);
return GenerateScheduleMap(bundles, selected_nodes);
}
ScheduleMap GcsStrictSpreadStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) {
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
// TODO(ffbin): A bundle may require special resources, such as GPU. We need to
// schedule bundles with special resource requirements first, which will be implemented
// in the next pr.
ScheduleMap schedule_map;
const auto &candidate_nodes = context->cluster_resources_;
// The number of bundles is more than the number of nodes, scheduling fails.
if (bundles.size() > candidate_nodes.size()) {
return schedule_map;
}
// Filter out the nodes already scheduled by this placement group.
absl::flat_hash_map<NodeID, ResourceSet> allocated_resources;
absl::flat_hash_set<NodeID> nodes_in_use;
if (context->bundle_locations_.has_value()) {
const auto &bundle_locations = context->bundle_locations_.value();
for (auto &bundle : *bundle_locations) {
allocated_resources[bundle.second.first] = ResourceSet();
nodes_in_use.insert(bundle.second.first);
}
}
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
auto iter = candidate_nodes.begin();
for (; iter != candidate_nodes.end(); ++iter) {
if (!allocated_resources.contains(iter->first) &&
IsAvailableResourceSufficient(iter->second.GetAvailableResources(),
allocated_resources[iter->first],
required_resources)) {
schedule_map[bundle->BundleId()] = iter->first;
allocated_resources[iter->first].AddResources(required_resources);
break;
}
}
// Node resource is not satisfied, scheduling failed.
if (iter == candidate_nodes.end()) {
break;
}
}
if (schedule_map.size() != bundles.size()) {
schedule_map.clear();
}
return schedule_map;
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &selected_nodes = gcs_resource_scheduler.Schedule(
required_resources, SchedulingType::STRICT_SPREAD,
/*node_filter_func=*/[&nodes_in_use](const NodeID &node_id) {
return nodes_in_use.count(node_id) == 0;
});
return GenerateScheduleMap(bundles, selected_nodes);
}
void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
@@ -253,7 +142,8 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
<< ", id: " << placement_group->GetPlacementGroupID()
<< ", bundles size = " << bundles.size();
auto selected_nodes = scheduler_strategies_[strategy]->Schedule(
bundles, GetScheduleContext(placement_group->GetPlacementGroupID()));
bundles, GetScheduleContext(placement_group->GetPlacementGroupID()),
gcs_resource_scheduler_);
// If no nodes are available, scheduling fails.
if (selected_nodes.empty()) {
@@ -271,7 +161,7 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
.second);
/// TODO(AlisaWu): Change the strategy when reserve resource failed.
for (auto &bundle : bundles) {
for (const auto &bundle : bundles) {
const auto &bundle_id = bundle->BundleId();
const auto &node_id = selected_nodes[bundle_id];
lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle);
@@ -564,8 +454,7 @@ std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
auto &bundle_locations =
committed_bundle_location_index_.GetBundleLocations(placement_group_id);
return std::unique_ptr<ScheduleContext>(
new ScheduleContext(std::move(node_to_bundles), bundle_locations,
gcs_resource_manager_.GetClusterResources()));
new ScheduleContext(std::move(node_to_bundles), bundle_locations));
}
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>
@@ -19,6 +19,7 @@
#include "ray/gcs/accessor.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_scheduler.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"
@@ -89,20 +90,15 @@ class GcsPlacementGroupSchedulerInterface {
/// ScheduleContext provides information that are needed for bundle scheduling decision.
class ScheduleContext {
public:
ScheduleContext(
std::shared_ptr<absl::flat_hash_map<NodeID, int64_t>> node_to_bundles,
const absl::optional<std::shared_ptr<BundleLocations>> bundle_locations,
const absl::flat_hash_map<NodeID, SchedulingResources> &cluster_resources)
ScheduleContext(std::shared_ptr<absl::flat_hash_map<NodeID, int64_t>> node_to_bundles,
const absl::optional<std::shared_ptr<BundleLocations>> bundle_locations)
: node_to_bundles_(std::move(node_to_bundles)),
bundle_locations_(bundle_locations),
cluster_resources_(cluster_resources) {}
bundle_locations_(bundle_locations) {}
// Key is node id, value is the number of bundles on the node.
const std::shared_ptr<absl::flat_hash_map<NodeID, int64_t>> node_to_bundles_;
// The locations of existing bundles for this placement group.
const absl::optional<std::shared_ptr<BundleLocations>> bundle_locations_;
// The available resources of all nodes.
const absl::flat_hash_map<NodeID, SchedulingResources> &cluster_resources_;
};
class GcsScheduleStrategy {
@@ -110,19 +106,25 @@ class GcsScheduleStrategy {
virtual ~GcsScheduleStrategy() {}
virtual ScheduleMap Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) = 0;
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) = 0;
protected:
/// Judge whether the remaining resources are sufficient for allocate.
/// Get required resources from bundles.
///
/// \param available_resources Total available resources.
/// \param allocated_resources Allocated resources.
/// \param to_allocate_resources Resources to be allocated.
/// \return True if allocated_resources + to_allocate_resources > available_resources.
/// False otherwise.
bool IsAvailableResourceSufficient(const ResourceSet &available_resources,
const ResourceSet &allocated_resources,
const ResourceSet &to_allocate_resources) const;
/// \param bundles Bundles to be scheduled.
/// \return Required resources.
std::vector<ResourceSet> GetRequiredResourcesFromBundles(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles);
/// Generate `ScheduleMap` from bundles and nodes .
///
/// \param bundles Bundles to be scheduled.
/// \param selected_nodes selected_nodes to be scheduled.
/// \return Required resources.
ScheduleMap GenerateScheduleMap(
const std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::vector<NodeID> &selected_nodes);
};
/// The `GcsPackStrategy` is that pack all bundles in one node as much as possible.
@@ -131,14 +133,16 @@ class GcsScheduleStrategy {
class GcsPackStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) override;
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
/// The `GcsSpreadStrategy` is that spread all bundles in different nodes.
class GcsSpreadStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) override;
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
/// The `GcsStrictPackStrategy` is that all bundles must be scheduled to one node. If one
@@ -146,7 +150,8 @@ class GcsSpreadStrategy : public GcsScheduleStrategy {
class GcsStrictPackStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) override;
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
/// The `GcsStrictSpreadStrategy` is that spread all bundles in different nodes.
@@ -155,7 +160,8 @@ class GcsStrictPackStrategy : public GcsScheduleStrategy {
class GcsStrictSpreadStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) override;
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
};
enum class LeasingState {
@@ -381,11 +387,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param placement_group_info_accessor Used to flush placement_group info to storage.
/// \param gcs_node_manager The node manager which is used when scheduling.
/// \param gcs_resource_manager The resource manager which is used when scheduling.
/// \param gcs_resource_scheduler The resource scheduler which is used when scheduling.
/// \param lease_client_factory Factory to create remote lease client.
GcsPlacementGroupScheduler(
boost::asio::io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager,
GcsResourceScheduler &gcs_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool);
virtual ~GcsPlacementGroupScheduler() = default;
@@ -536,6 +544,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// Reference of GcsResourceManager.
GcsResourceManager &gcs_resource_manager_;
/// Reference of GcsResourceScheduler.
GcsResourceScheduler &gcs_resource_scheduler_;
/// A vector to store all the schedule strategy.
std::vector<std::shared_ptr<GcsScheduleStrategy>> scheduler_strategies_;
@@ -0,0 +1,303 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_resource_scheduler.h"
namespace ray {
namespace gcs {
double LeastResourceScorer::Score(const ResourceSet &required_resources,
const SchedulingResources &node_resources) {
const auto &available_resources = node_resources.GetAvailableResources();
const auto &available_resource_amount_map = available_resources.GetResourceAmountMap();
double node_score = 0.0;
for (const auto &entry : required_resources.GetResourceAmountMap()) {
auto available_resource_amount_iter = available_resource_amount_map.find(entry.first);
if (available_resource_amount_iter == available_resource_amount_map.end()) {
return -1;
}
auto calculated_score =
Calculate(entry.second, available_resource_amount_iter->second);
if (calculated_score < 0) {
return -1;
}
node_score += calculated_score;
}
// TODO(ffbin): We always want to choose the node with the least matching resources. We
// will solve it in next pr.
return node_score;
}
double LeastResourceScorer::Calculate(const FractionalResourceQuantity &requested,
const FractionalResourceQuantity &available) {
RAY_CHECK(available >= 0) << "Available resource " << available.ToDouble()
<< " should be nonnegative.";
if (requested > available) {
return -1;
}
return (available - requested).ToDouble() / available.ToDouble();
}
/////////////////////////////////////////////////////////////////////////////////////////
std::vector<NodeID> GcsResourceScheduler::Schedule(
const std::vector<ResourceSet> &required_resources_list,
const SchedulingType &scheduling_type,
const std::function<bool(const NodeID &)> &node_filter_func) {
const auto &cluster_resources = gcs_resource_manager_.GetClusterResources();
// Filter candidate nodes.
absl::flat_hash_set<NodeID> candidate_nodes =
FilterCandidateNodes(cluster_resources, node_filter_func);
if (candidate_nodes.empty()) {
RAY_LOG(DEBUG) << "The candidate nodes is empty, return directly.";
return {};
}
// First schedule scarce resources (such as GPU) and large capacity resources to improve
// the scheduling success rate.
const auto &to_schedule_resources = SortRequiredResources(required_resources_list);
// Score and rank nodes.
std::vector<NodeID> result;
switch (scheduling_type) {
case SPREAD:
result = SpreadSchedule(to_schedule_resources, candidate_nodes);
break;
case STRICT_SPREAD:
result = StrictSpreadSchedule(to_schedule_resources, candidate_nodes);
break;
case PACK:
result = PackSchedule(to_schedule_resources, candidate_nodes);
break;
case STRICT_PACK:
result = StrictPackSchedule(to_schedule_resources, candidate_nodes);
break;
default:
RAY_LOG(FATAL) << "Unsupported scheduling type: " << scheduling_type;
break;
}
return result;
}
absl::flat_hash_set<NodeID> GcsResourceScheduler::FilterCandidateNodes(
const absl::flat_hash_map<NodeID, SchedulingResources> &cluster_resources,
const std::function<bool(const NodeID &)> &node_filter_func) {
absl::flat_hash_set<NodeID> result;
for (const auto &iter : cluster_resources) {
const auto &node_id = iter.first;
if (node_filter_func == nullptr || node_filter_func(node_id)) {
result.emplace(node_id);
}
}
return result;
}
const std::vector<ResourceSet> &GcsResourceScheduler::SortRequiredResources(
const std::vector<ResourceSet> &required_resources) {
// TODO(ffbin): A bundle may require special resources, such as GPU. We need to
// schedule bundles with special resource requirements first, which will be implemented
// in the next pr.
return required_resources;
}
std::vector<NodeID> GcsResourceScheduler::StrictSpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
if (required_resources_list.size() > candidate_nodes.size()) {
RAY_LOG(DEBUG) << "The number of required resources "
<< required_resources_list.size()
<< " is greater than the number of candidate nodes "
<< candidate_nodes.size() << ", scheduling fails.";
return result;
}
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
for (const auto &iter : required_resources_list) {
// Score and sort nodes.
const auto &node_scores = ScoreNodes(iter, candidate_nodes_copy);
// There are nodes to meet the scheduling requirements.
if (!node_scores.empty() && node_scores.front().second >= 0) {
const auto &highest_score_node_id = node_scores.front().first;
result.push_back(highest_score_node_id);
candidate_nodes_copy.erase(highest_score_node_id);
} else {
// There is no node to meet the scheduling requirements.
break;
}
}
if (result.size() != required_resources_list.size()) {
// Unable to meet the resources required for scheduling, scheduling failed.
result.clear();
}
return result;
}
std::vector<NodeID> GcsResourceScheduler::SpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
absl::flat_hash_set<NodeID> selected_nodes;
for (const auto &iter : required_resources_list) {
// Score and sort nodes.
const auto &node_scores = ScoreNodes(iter, candidate_nodes_copy);
// There are nodes to meet the scheduling requirements.
if (!node_scores.empty() && node_scores.front().second >= 0) {
const auto &highest_score_node_id = node_scores.front().first;
result.push_back(highest_score_node_id);
RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter));
candidate_nodes_copy.erase(highest_score_node_id);
selected_nodes.insert(highest_score_node_id);
} else {
// Scheduling from selected nodes.
const auto &node_scores = ScoreNodes(iter, selected_nodes);
if (!node_scores.empty() && node_scores.front().second >= 0) {
const auto &highest_score_node_id = node_scores.front().first;
result.push_back(highest_score_node_id);
RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id, iter));
} else {
break;
}
}
}
// Releasing the resources temporarily deducted from `gcs_resource_manager_`.
ReleaseTemporarilyDeductedResources(required_resources_list, result);
if (result.size() != required_resources_list.size()) {
// Unable to meet the resources required for scheduling, scheduling failed.
result.clear();
}
return result;
}
std::vector<NodeID> GcsResourceScheduler::StrictPackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
// Aggregate required resources.
ResourceSet required_resources;
for (const auto &iter : required_resources_list) {
required_resources.AddResources(iter);
}
// Score and sort nodes.
const auto &node_scores = ScoreNodes(required_resources, candidate_nodes);
// Select the node with the highest score.
// `StrictPackSchedule` does not need to consider the scheduling context, because it
// only schedules to a node and triggers rescheduling when node dead.
if (!node_scores.empty() && node_scores.front().second >= 0) {
for (int index = 0; index < (int)required_resources_list.size(); ++index) {
result.push_back(node_scores.front().first);
}
}
return result;
}
std::vector<NodeID> GcsResourceScheduler::PackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result;
result.resize(required_resources_list.size());
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
std::list<std::pair<int, ResourceSet>> required_resources_list_copy;
int index = 0;
for (const auto &iter : required_resources_list) {
required_resources_list_copy.emplace_back(index++, iter);
}
while (!required_resources_list_copy.empty()) {
// Score and sort nodes.
const auto &required_resources_index = required_resources_list_copy.front().first;
const auto &required_resources = required_resources_list_copy.front().second;
const auto &node_scores = ScoreNodes(required_resources, candidate_nodes_copy);
if (node_scores.empty() || node_scores.front().second < 0) {
// There is no node to meet the scheduling requirements.
break;
}
const auto &highest_score_node_id = node_scores.front().first;
RAY_CHECK(gcs_resource_manager_.AcquireResources(highest_score_node_id,
required_resources));
result[required_resources_index] = highest_score_node_id;
required_resources_list_copy.pop_front();
// We try to schedule more resources on one node.
for (auto iter = required_resources_list_copy.begin();
iter != required_resources_list_copy.end();) {
if (gcs_resource_manager_.AcquireResources(highest_score_node_id, iter->second)) {
result[iter->first] = highest_score_node_id;
required_resources_list_copy.erase(iter++);
} else {
++iter;
}
}
candidate_nodes_copy.erase(highest_score_node_id);
}
// Releasing the resources temporarily deducted from `gcs_resource_manager_`.
ReleaseTemporarilyDeductedResources(required_resources_list, result);
if (!required_resources_list_copy.empty()) {
// Unable to meet the resources required for scheduling, scheduling failed.
result.clear();
}
return result;
}
std::list<NodeScore> GcsResourceScheduler::ScoreNodes(
const ResourceSet &required_resources,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::list<NodeScore> node_scores;
const auto &cluster_resources = gcs_resource_manager_.GetClusterResources();
// Score the nodes.
for (const auto &node_id : candidate_nodes) {
const auto &iter = cluster_resources.find(node_id);
RAY_CHECK(iter != cluster_resources.end());
double node_score = node_scorer_->Score(required_resources, iter->second);
node_scores.emplace_back(node_id, node_score);
}
// Sort node scores, the large score is in the front.
node_scores.sort([](const NodeScore &left, const NodeScore &right) {
return right.second < left.second;
});
return node_scores;
}
void GcsResourceScheduler::ReleaseTemporarilyDeductedResources(
const std::vector<ResourceSet> &required_resources_list,
const std::vector<NodeID> &nodes) {
for (int index = 0; index < (int)nodes.size(); index++) {
// If `PackSchedule` fails, the id of some nodes may be nil.
if (!nodes[index].IsNil()) {
RAY_CHECK(gcs_resource_manager_.ReleaseResources(nodes[index],
required_resources_list[index]));
}
}
}
} // namespace gcs
} // namespace ray
@@ -0,0 +1,183 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "absl/container/flat_hash_set.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
namespace ray {
namespace gcs {
// Type of resource scheduling strategy.
enum SchedulingType {
// Places resources across distinct nodes as even as possible.
SPREAD = 0,
// Places resources across distinct nodes.
// It is not allowed to deploy more than one resource on a node.
STRICT_SPREAD = 1,
// Packs resources into as few nodes as possible.
PACK = 2,
// Packs resources within one node. It is not allowed to span multiple nodes.
STRICT_PACK = 3,
SchedulingType_MAX = 4,
};
typedef std::pair<NodeID, double> NodeScore;
/// NodeScorer is a scorer to make a grade to the node, which is used for scheduling
/// decision.
class NodeScorer {
public:
virtual ~NodeScorer() = default;
/// \brief Score according to node resources.
///
/// \param required_resources The required resources.
/// \param node_resources The node resources which contains available and total
/// resources.
/// \return Score of the node.
virtual double Score(const ResourceSet &required_resources,
const SchedulingResources &node_resources) = 0;
};
/// LeastResourceScorer is a score plugin that favors nodes with fewer allocation
/// requested resources based on requested resources.
class LeastResourceScorer : public NodeScorer {
public:
double Score(const ResourceSet &required_resources,
const SchedulingResources &node_resources) override;
private:
/// \brief Calculate one of the resource scores.
///
/// \param requested Quantity of one of the required resources.
/// \param available Quantity of one of the available resources.
/// \return Score of the node.
double Calculate(const FractionalResourceQuantity &requested,
const FractionalResourceQuantity &available);
};
/// Gcs resource scheduler implementation.
/// Non-thread safe.
class GcsResourceScheduler {
public:
GcsResourceScheduler(GcsResourceManager &gcs_resource_manager)
: gcs_resource_manager_(gcs_resource_manager),
node_scorer_(new LeastResourceScorer()) {}
virtual ~GcsResourceScheduler() = default;
/// Schedule the specified resources to the cluster nodes.
///
/// \param required_resources_list The resources to be scheduled.
/// \param scheduling_type This scheduling strategy.
/// \param node_filter_func This function is used to filter candidate nodes. If a node
/// returns true, it can be used for scheduling. By default, all nodes in the cluster
/// can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> Schedule(
const std::vector<ResourceSet> &required_resources_list,
const SchedulingType &scheduling_type,
const std::function<bool(const NodeID &)> &node_filter_func = nullptr);
private:
/// Filter out candidate nodes which can be used for scheduling.
///
/// \param cluster_resources The cluster node resources.
/// \param node_filter_func This function is used to filter candidate nodes. If a node
/// returns true, it can be used for scheduling. By default, all nodes in the cluster
/// can be used for scheduling.
/// \return The candidate nodes which can be used for scheduling.
absl::flat_hash_set<NodeID> FilterCandidateNodes(
const absl::flat_hash_map<NodeID, SchedulingResources> &cluster_resources,
const std::function<bool(const NodeID &)> &node_filter_func);
/// Sort required resources according to the scarcity and capacity of resources.
/// We will first schedule scarce resources (such as GPU) and large capacity resources
/// to improve the scheduling success rate.
///
/// \param required_resources The resources to be scheduled.
/// \return The Sorted resources.
const std::vector<ResourceSet> &SortRequiredResources(
const std::vector<ResourceSet> &required_resources);
/// Schedule resources according to `STRICT_SPREAD` strategy.
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> StrictSpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// Schedule resources according to `SPREAD` strategy.
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> SpreadSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// Schedule resources according to `STRICT_PACK` strategy.
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> StrictPackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// Schedule resources according to `PACK` strategy.
///
/// \param required_resources_list The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Scheduling selected nodes, it corresponds to `required_resources_list` one
/// by one. If the scheduling fails, an empty vector is returned.
std::vector<NodeID> PackSchedule(
const std::vector<ResourceSet> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// Score all nodes according to the specified resources.
///
/// \param required_resources The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Score of all nodes.
std::list<NodeScore> ScoreNodes(const ResourceSet &required_resources,
const absl::flat_hash_set<NodeID> &candidate_nodes);
/// Return the resources temporarily deducted from gcs resource manager.
///
/// \param required_resources_list The resources to be scheduled.
/// \param nodes Scheduling selected nodes, it corresponds to `required_resources_list`
/// one by one.
void ReleaseTemporarilyDeductedResources(
const std::vector<ResourceSet> &required_resources_list,
const std::vector<NodeID> &nodes);
/// Reference of GcsResourceManager.
GcsResourceManager &gcs_resource_manager_;
/// Scorer to make a grade to the node.
std::unique_ptr<NodeScorer> node_scorer_;
};
} // namespace gcs
} // namespace ray
+10 -1
View File
@@ -69,6 +69,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs resource manager.
InitGcsResourceManager(gcs_init_data);
// Init gcs resource scheduler.
InitGcsResourceScheduler();
// Init gcs node manager.
InitGcsNodeManager(gcs_init_data);
@@ -171,6 +174,12 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
rpc_server_.RegisterService(*node_resource_info_service_);
}
void GcsServer::InitGcsResourceScheduler() {
RAY_CHECK(gcs_resource_manager_);
gcs_resource_scheduler_ =
std::make_shared<GcsResourceScheduler>(*gcs_resource_manager_);
}
void GcsServer::InitGcsJobManager() {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_));
@@ -223,7 +232,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_node_manager_);
auto scheduler = std::make_shared<GcsPlacementGroupScheduler>(
main_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_,
raylet_client_pool_);
*gcs_resource_scheduler_, raylet_client_pool_);
gcs_placement_group_manager_ = std::make_shared<GcsPlacementGroupManager>(
main_service_, scheduler, gcs_table_storage_, *gcs_resource_manager_);
+6
View File
@@ -19,6 +19,7 @@
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_scheduler.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_client.h"
@@ -86,6 +87,9 @@ class GcsServer {
/// Initialize gcs resource manager.
void InitGcsResourceManager(const GcsInitData &gcs_init_data);
/// Initialize gcs resource scheduler.
void InitGcsResourceScheduler();
/// Initialize gcs job manager.
void InitGcsJobManager();
@@ -139,6 +143,8 @@ class GcsServer {
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
/// The gcs resource manager.
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
/// The gcs resource scheduler.
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler_;
/// The gcs node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
/// The heartbeat manager.
@@ -41,6 +41,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(io_service_, nullptr, nullptr);
gcs_resource_scheduler_ =
std::make_shared<gcs::GcsResourceScheduler>(*gcs_resource_manager_);
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
@@ -49,7 +51,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
[this](const rpc::Address &addr) { return raylet_clients_[addr.port()]; });
scheduler_ = std::make_shared<GcsServerMocker::MockedGcsPlacementGroupScheduler>(
io_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_,
raylet_client_pool_);
*gcs_resource_scheduler_, raylet_client_pool_);
}
void TearDown() override {
@@ -208,6 +210,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::vector<std::shared_ptr<GcsServerMocker::MockRayletClient>> raylet_clients_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsResourceScheduler> gcs_resource_scheduler_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsPlacementGroupScheduler> scheduler_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> success_placement_groups_
@@ -0,0 +1,132 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include "gtest/gtest.h"
#include "ray/gcs/gcs_server/gcs_resource_scheduler.h"
#include "ray/gcs/test/gcs_test_util.h"
namespace ray {
using ::testing::_;
class GcsResourceSchedulerTest : public ::testing::Test {
public:
void SetUp() override {
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(io_service_, nullptr, nullptr);
gcs_resource_scheduler_ =
std::make_shared<gcs::GcsResourceScheduler>(*gcs_resource_manager_);
}
void TearDown() override {
gcs_resource_scheduler_.reset();
gcs_resource_manager_.reset();
}
void AddClusterResources(const NodeID &node_id, const std::string &resource_name,
double resource_value) {
std::unordered_map<std::string, double> resource_map;
resource_map[resource_name] = resource_value;
gcs_resource_manager_->UpdateResourceCapacity(node_id, resource_map);
}
void CheckClusterAvailableResources(const NodeID &node_id,
const std::string &resource_name,
double resource_value) {
const auto &cluster_resource = gcs_resource_manager_->GetClusterResources();
auto iter = cluster_resource.find(node_id);
ASSERT_TRUE(iter != cluster_resource.end());
ASSERT_EQ(iter->second.GetAvailableResources().GetResource(resource_name).ToDouble(),
resource_value);
}
void TestResourceLeaks(const gcs::SchedulingType &scheduling_type) {
// Add node resources.
const auto &node_id = NodeID::FromRandom();
const std::string cpu_resource = "CPU";
const double node_cpu_num = 6.0;
AddClusterResources(node_id, cpu_resource, node_cpu_num);
// Scheduling succeeded and node resources are used up.
std::vector<ResourceSet> required_resources_list;
std::unordered_map<std::string, double> resource_map;
for (int bundle_cpu_num = 1; bundle_cpu_num <= 3; ++bundle_cpu_num) {
resource_map[cpu_resource] = bundle_cpu_num;
required_resources_list.emplace_back(resource_map);
}
const auto &result1 =
gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type);
ASSERT_EQ(result1.size(), 3);
// Check for resource leaks.
CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num);
// Scheduling failure.
resource_map[cpu_resource] = 5;
required_resources_list.emplace_back(resource_map);
const auto &result2 =
gcs_resource_scheduler_->Schedule(required_resources_list, scheduling_type);
ASSERT_EQ(result2.size(), 0);
// Check for resource leaks.
CheckClusterAvailableResources(node_id, cpu_resource, node_cpu_num);
}
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsResourceScheduler> gcs_resource_scheduler_;
private:
boost::asio::io_service io_service_;
};
TEST_F(GcsResourceSchedulerTest, TestPackScheduleResourceLeaks) {
TestResourceLeaks(gcs::SchedulingType::PACK);
}
TEST_F(GcsResourceSchedulerTest, TestSpreadScheduleResourceLeaks) {
TestResourceLeaks(gcs::SchedulingType::SPREAD);
}
TEST_F(GcsResourceSchedulerTest, TestNodeFilter) {
// Add node resources.
const auto &node_id = NodeID::FromRandom();
const std::string cpu_resource = "CPU";
const double node_cpu_num = 10.0;
AddClusterResources(node_id, cpu_resource, node_cpu_num);
// Scheduling failure.
std::vector<ResourceSet> required_resources_list;
std::unordered_map<std::string, double> resource_map;
resource_map[cpu_resource] = 1;
required_resources_list.emplace_back(resource_map);
const auto &result1 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_SPREAD,
[](const NodeID &) { return false; });
ASSERT_EQ(result1.size(), 0);
// Scheduling succeeded.
const auto &result2 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_SPREAD,
[](const NodeID &) { return true; });
ASSERT_EQ(result2.size(), 1);
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}