[PlacementGroup] Introduce GcsResourceManager and avoid copying resources when scheduling placement groups (#12253)

This commit is contained in:
fangfengbin
2020-11-26 11:21:58 +08:00
committed by GitHub
parent 90d7863eb3
commit d5215745e4
16 changed files with 364 additions and 95 deletions
+13
View File
@@ -1189,6 +1189,19 @@ cc_test(
],
)
cc_test(
name = "gcs_resource_manager_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc",
],
copts = COPTS,
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "service_based_gcs_client_lib",
srcs = glob(
+12 -16
View File
@@ -94,10 +94,12 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() {
}
//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(boost::asio::io_service &main_io_service,
boost::asio::io_service &node_failure_detector_io_service,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
GcsNodeManager::GcsNodeManager(
boost::asio::io_service &main_io_service,
boost::asio::io_service &node_failure_detector_io_service,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager)
: main_io_service_(main_io_service),
node_failure_detector_(new NodeFailureDetector(
node_failure_detector_io_service, gcs_table_storage, gcs_pub_sub,
@@ -124,7 +126,8 @@ GcsNodeManager::GcsNodeManager(boost::asio::io_service &main_io_service,
node_failure_detector_service_(node_failure_detector_io_service),
heartbeat_timer_(main_io_service),
gcs_pub_sub_(gcs_pub_sub),
gcs_table_storage_(gcs_table_storage) {
gcs_table_storage_(gcs_table_storage),
gcs_resource_manager_(gcs_resource_manager) {
SendBatchedHeartbeat();
}
@@ -349,7 +352,7 @@ void GcsNodeManager::HandleGetAllAvailableResources(
const rpc::GetAllAvailableResourcesRequest &request,
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
for (const auto &iter : GetClusterRealtimeResources()) {
for (const auto &iter : gcs_resource_manager_->GetClusterResources()) {
rpc::AvailableResources resource;
resource.set_node_id(iter.first.Binary());
for (const auto &res : iter.second.GetResourceAmountMap()) {
@@ -477,8 +480,6 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
alive_nodes_.erase(iter);
// Remove from cluster resources.
cluster_resources_.erase(node_id);
// Remove from cluster realtime resources.
cluster_realtime_resources_.erase(node_id);
heartbeat_buffer_.erase(node_id);
if (!is_intended) {
// Broadcast a warning to all of the drivers indicating that the node
@@ -535,18 +536,13 @@ void GcsNodeManager::StartNodeFailureDetector() {
void GcsNodeManager::UpdateNodeRealtimeResources(
const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) {
if (!RayConfig::instance().light_heartbeat_enabled() ||
cluster_realtime_resources_.count(node_id) == 0 ||
gcs_resource_manager_->GetClusterResources().count(node_id) == 0 ||
heartbeat.resources_available_changed()) {
cluster_realtime_resources_[node_id] =
ResourceSet(MapFromProtobuf(heartbeat.resources_available()));
gcs_resource_manager_->UpdateResources(
node_id, ResourceSet(MapFromProtobuf(heartbeat.resources_available())));
}
}
const absl::flat_hash_map<NodeID, ResourceSet>
&GcsNodeManager::GetClusterRealtimeResources() const {
return cluster_realtime_resources_;
}
void GcsNodeManager::UpdatePlacementGroupLoad(
const std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load) {
placement_group_load_ = absl::make_optional(placement_group_load);
+6 -7
View File
@@ -19,6 +19,7 @@
#include "ray/common/id.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/rpc/client_call.h"
@@ -39,11 +40,12 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// \param node_failure_detector_io_service The event loop of node failure detector.
/// \param gcs_pub_sub GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
/// when detecting the death of nodes.
/// \param gcs_resource_manager GCS resource manager.
explicit GcsNodeManager(boost::asio::io_service &main_io_service,
boost::asio::io_service &node_failure_detector_io_service,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage);
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager);
/// Handle register rpc request come from raylet.
void HandleRegisterNode(const rpc::RegisterNodeRequest &request,
@@ -176,9 +178,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
void UpdateNodeRealtimeResources(const NodeID &node_id,
const rpc::HeartbeatTableData &heartbeat);
/// Get cluster realtime resources.
const absl::flat_hash_map<NodeID, ResourceSet> &GetClusterRealtimeResources() const;
/// Update the placement group load information so that it will be reported through
/// heartbeat.
///
@@ -295,8 +294,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Cluster realtime resources.
absl::flat_hash_map<NodeID, ResourceSet> cluster_realtime_resources_;
/// Gcs resource manager.
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
/// Placement group load information that is used for autoscaler.
absl::optional<std::shared_ptr<rpc::PlacementGroupLoad>> placement_group_load_;
@@ -23,11 +23,12 @@ namespace gcs {
GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
boost::asio::io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const gcs::GcsNodeManager &gcs_node_manager,
const gcs::GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager,
ReserveResourceClientFactoryFn lease_client_factory)
: return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_node_manager_(gcs_node_manager),
gcs_resource_manager_(gcs_resource_manager),
lease_client_factory_(std::move(lease_client_factory)) {
scheduler_strategies_.push_back(std::make_shared<GcsPackStrategy>());
scheduler_strategies_.push_back(std::make_shared<GcsSpreadStrategy>());
@@ -35,6 +36,24 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
scheduler_strategies_.push_back(std::make_shared<GcsStrictSpreadStrategy>());
}
bool GcsScheduleStrategy::IsAvailableResourceSufficient(
const ResourceSet &available_resources, const ResourceSet &allocated_resources,
const ResourceSet &to_allocate_resources) const {
const auto &to_allocate_resource_capacity =
to_allocate_resources.GetResourceAmountMap();
for (const auto &resource_pair : to_allocate_resource_capacity) {
const auto &resource_name = resource_pair.first;
const auto &lhs_quantity = resource_pair.second;
const auto &rhs_quantity = available_resources.GetResource(resource_name) -
allocated_resources.GetResource(resource_name);
if (lhs_quantity > rhs_quantity) {
// lhs capacity exceeds rhs capacity.
return false;
}
}
return true;
}
ScheduleMap GcsStrictPackStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) {
@@ -45,9 +64,8 @@ ScheduleMap GcsStrictPackStrategy::Schedule(
}
// Filter candidate nodes.
const auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources();
std::vector<std::pair<int64_t, NodeID>> candidate_nodes;
for (auto &node : alive_nodes) {
for (auto &node : context->cluster_resources_) {
if (required_resources.IsSubset(node.second)) {
candidate_nodes.emplace_back((*context->node_to_bundles_)[node.first], node.first);
}
@@ -77,16 +95,14 @@ ScheduleMap GcsPackStrategy::Schedule(
// First fill up a node. If the node resource is insufficient, select a new node.
// TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop.
ScheduleMap schedule_map;
// TODO(WangTao): This copy might take too much space once cluster grows very large.
// Would find better solution.
absl::flat_hash_map<NodeID, ResourceSet> alive_nodes(
context->node_manager_.GetClusterRealtimeResources());
absl::flat_hash_map<NodeID, ResourceSet> allocated_resources;
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
for (auto &node : alive_nodes) {
if (required_resources.IsSubset(node.second)) {
node.second.SubtractResourcesStrict(required_resources);
for (const auto &node : context->cluster_resources_) {
if (IsAvailableResourceSufficient(node.second, allocated_resources[node.first],
required_resources)) {
schedule_map[bundle->BundleId()] = node.first;
allocated_resources[node.first].AddResources(required_resources);
break;
}
}
@@ -105,25 +121,24 @@ ScheduleMap GcsSpreadStrategy::Schedule(
// bundles will be deployed to the previous nodes. So we start with the next node of the
// last selected node.
ScheduleMap schedule_map;
// TODO(WangTao): This copy might take too much space once cluster grows very large.
// Would find better solution.
absl::flat_hash_map<NodeID, ResourceSet> candidate_nodes(
context->node_manager_.GetClusterRealtimeResources());
const auto &candidate_nodes = context->cluster_resources_;
if (candidate_nodes.empty()) {
return schedule_map;
}
absl::flat_hash_map<NodeID, ResourceSet> allocated_resources;
auto iter = candidate_nodes.begin();
auto iter_begin = iter;
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
// Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node that
// meets the resource requirements. `iter_begin` is the next node of the last selected
// node.
// Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node
// that meets the resource requirements. `iter_begin` is the next node of the last
// selected node.
for (; iter != candidate_nodes.end(); ++iter) {
if (required_resources.IsSubset(iter->second)) {
iter->second.SubtractResourcesStrict(required_resources);
if (IsAvailableResourceSufficient(iter->second, allocated_resources[iter->first],
required_resources)) {
schedule_map[bundle->BundleId()] = iter->first;
allocated_resources[iter->first].AddResources(required_resources);
break;
}
}
@@ -138,9 +153,10 @@ ScheduleMap GcsSpreadStrategy::Schedule(
if (iter_begin != candidate_nodes.begin()) {
// Traverse all the nodes from `candidate_nodes.begin()` to `iter_begin`.
for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) {
if (required_resources.IsSubset(iter->second)) {
iter->second.SubtractResourcesStrict(required_resources);
if (IsAvailableResourceSufficient(
iter->second, allocated_resources[iter->first], required_resources)) {
schedule_map[bundle->BundleId()] = iter->first;
allocated_resources[iter->first].AddResources(required_resources);
break;
}
}
@@ -170,20 +186,23 @@ ScheduleMap GcsStrictSpreadStrategy::Schedule(
// schedule bundles with special resource requirements first, which will be implemented
// in the next pr.
ScheduleMap schedule_map;
auto candidate_nodes = context->node_manager_.GetClusterRealtimeResources();
const auto &candidate_nodes = context->cluster_resources_;
// The number of bundles is more than the number of nodes, scheduling fails.
if (bundles.size() > candidate_nodes.size()) {
return schedule_map;
}
absl::flat_hash_map<NodeID, ResourceSet> allocated_resources;
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
auto iter = candidate_nodes.begin();
for (; iter != candidate_nodes.end(); ++iter) {
if (required_resources.IsSubset(iter->second)) {
if (!allocated_resources.contains(iter->first) &&
IsAvailableResourceSufficient(iter->second, allocated_resources[iter->first],
required_resources)) {
schedule_map[bundle->BundleId()] = iter->first;
candidate_nodes.erase(iter);
allocated_resources[iter->first].AddResources(required_resources);
break;
}
}
@@ -518,8 +537,9 @@ std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
auto &bundle_locations =
committed_bundle_location_index_.GetBundleLocations(placement_group_id);
return std::unique_ptr<ScheduleContext>(new ScheduleContext(
std::move(node_to_bundles), bundle_locations, gcs_node_manager_));
return std::unique_ptr<ScheduleContext>(
new ScheduleContext(std::move(node_to_bundles), bundle_locations,
gcs_resource_manager_.GetClusterResources()));
}
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>
@@ -609,7 +629,17 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
void BundleLocationIndex::AddBundleLocations(
const PlacementGroupID &placement_group_id,
std::shared_ptr<BundleLocations> bundle_locations) {
placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations);
// Update `placement_group_to_bundle_locations_`.
// The placement group may be scheduled several times to succeed, so we need to merge
// `bundle_locations` instead of covering it directly.
auto iter = placement_group_to_bundle_locations_.find(placement_group_id);
if (iter == placement_group_to_bundle_locations_.end()) {
placement_group_to_bundle_locations_.emplace(placement_group_id, bundle_locations);
} else {
iter->second->insert(bundle_locations->begin(), bundle_locations->end());
}
// Update `node_to_leased_bundles_`.
for (auto iter : *bundle_locations) {
const auto &node_id = iter.second.first;
if (!node_to_leased_bundles_.contains(node_id)) {
@@ -18,6 +18,7 @@
#include "ray/common/id.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"
@@ -87,17 +88,17 @@ class ScheduleContext {
public:
ScheduleContext(std::shared_ptr<absl::flat_hash_map<NodeID, int64_t>> node_to_bundles,
const absl::optional<std::shared_ptr<BundleLocations>> bundle_locations,
const GcsNodeManager &node_manager)
const absl::flat_hash_map<NodeID, ResourceSet> &cluster_resources)
: node_to_bundles_(std::move(node_to_bundles)),
bundle_locations_(bundle_locations),
node_manager_(node_manager) {}
cluster_resources_(cluster_resources) {}
// Key is node id, value is the number of bundles on the node.
const std::shared_ptr<absl::flat_hash_map<NodeID, int64_t>> node_to_bundles_;
// The locations of existing bundles for this placement group.
const absl::optional<std::shared_ptr<BundleLocations>> bundle_locations_;
const GcsNodeManager &node_manager_;
// The available resources of all nodes.
const absl::flat_hash_map<NodeID, ResourceSet> &cluster_resources_;
};
class GcsScheduleStrategy {
@@ -106,6 +107,18 @@ class GcsScheduleStrategy {
virtual ScheduleMap Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) = 0;
protected:
/// Judge whether the remaining resources are sufficient for allocate.
///
/// \param available_resources Total available resources.
/// \param allocated_resources Allocated resources.
/// \param to_allocate_resources Resources to be allocated.
/// \return True if allocated_resources + to_allocate_resources > available_resources.
/// False otherwise.
bool IsAvailableResourceSufficient(const ResourceSet &available_resources,
const ResourceSet &allocated_resources,
const ResourceSet &to_allocate_resources) const;
};
/// The `GcsPackStrategy` is that pack all bundles in one node as much as possible.
@@ -221,9 +234,9 @@ class LeaseStatusTracker {
/// \return Location of bundles that succeed to prepare resources on a node.
const std::shared_ptr<BundleLocations> &GetPreparedBundleLocations() const;
/// This method returns bundle locations that succeed to commit resources.
/// This method returns bundle locations that failed to commit resources.
///
/// \return Location of bundles that succeed to commit resources on a node.
/// \return Location of bundles that failed to commit resources on a node.
const std::shared_ptr<BundleLocations> &GetUnCommittedBundleLocations() const;
/// Return the leasing state.
@@ -345,10 +358,12 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param io_context The main event loop.
/// \param placement_group_info_accessor Used to flush placement_group info to storage.
/// \param gcs_node_manager The node manager which is used when scheduling.
/// \param gcs_resource_manager The resource manager which is used when scheduling.
/// \param lease_client_factory Factory to create remote lease client.
GcsPlacementGroupScheduler(
boost::asio::io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const GcsNodeManager &gcs_node_manager,
const GcsNodeManager &gcs_node_manager, GcsResourceManager &gcs_resource_manager,
ReserveResourceClientFactoryFn lease_client_factory = nullptr);
virtual ~GcsPlacementGroupScheduler() = default;
@@ -490,6 +505,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// Reference of GcsNodeManager.
const GcsNodeManager &gcs_node_manager_;
/// Reference of GcsResourceManager.
GcsResourceManager &gcs_resource_manager_;
/// The cached node clients which are used to communicate with raylet to lease workers.
absl::flat_hash_map<NodeID, std::shared_ptr<ResourceReserveInterface>>
remote_lease_clients_;
@@ -0,0 +1,58 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
namespace ray {
namespace gcs {
const absl::flat_hash_map<NodeID, ResourceSet> &GcsResourceManager::GetClusterResources()
const {
return cluster_resources_;
}
void GcsResourceManager::UpdateResources(const NodeID &node_id,
const ResourceSet &resources) {
cluster_resources_[node_id] = resources;
}
void GcsResourceManager::RemoveResources(const NodeID &node_id) {
cluster_resources_.erase(node_id);
}
bool GcsResourceManager::AcquireResources(const NodeID &node_id,
const ResourceSet &required_resources) {
auto iter = cluster_resources_.find(node_id);
//
RAY_CHECK(iter != cluster_resources_.end()) << "Node " << node_id << " not exist.";
if (!required_resources.IsSubset(iter->second)) {
return false;
}
iter->second.SubtractResourcesStrict(required_resources);
return true;
}
bool GcsResourceManager::ReleaseResources(const NodeID &node_id,
const ResourceSet &acquired_resources) {
auto iter = cluster_resources_.find(node_id);
if (iter != cluster_resources_.end()) {
iter->second.AddResources(acquired_resources);
}
// If node dead, we will not find the node. This is a normal scenario, so it returns
// true.
return true;
}
} // namespace gcs
} // namespace ray
@@ -0,0 +1,86 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "absl/container/flat_hash_map.h"
#include "ray/common/task/scheduling_resources.h"
namespace ray {
namespace gcs {
/// Gcs resource manager interface.
/// It is used for actor and placement group scheduling.
/// Non-thread safe.
class GcsResourceManagerInterface {
public:
virtual ~GcsResourceManagerInterface() {}
/// Get the resources of all nodes in the cluster.
///
/// \return The resources of all nodes in the cluster.
virtual const absl::flat_hash_map<NodeID, ResourceSet> &GetClusterResources() const = 0;
/// Update the resources of the specified node.
///
/// \param node_id Id of a node.
/// \param resources Resources of a node.
virtual void UpdateResources(const NodeID &node_id, const ResourceSet &resources) = 0;
/// Remove the resources of the specified node.
///
/// \param node_id Id of a node.
virtual void RemoveResources(const NodeID &node_id) = 0;
/// Acquire resources from the specified node. It will deduct directly from the node
/// resources.
///
/// \param node_id Id of a node.
/// \param required_resources Resources to apply for.
/// \return True if acquire resources successfully. False otherwise.
virtual bool AcquireResources(const NodeID &node_id,
const ResourceSet &required_resources) = 0;
/// Release the resources of the specified node. It will be added directly to the node
/// resources.
///
/// \param node_id Id of a node.
/// \param acquired_resources Resources to release.
/// \return True if release resources successfully. False otherwise.
virtual bool ReleaseResources(const NodeID &node_id,
const ResourceSet &acquired_resources) = 0;
};
/// Gcs resource manager implementation. It obtains the available resources of nodes
/// through heartbeat reporting. Non-thread safe.
class GcsResourceManager : public GcsResourceManagerInterface {
public:
virtual ~GcsResourceManager() = default;
const absl::flat_hash_map<NodeID, ResourceSet> &GetClusterResources() const;
void UpdateResources(const NodeID &node_id, const ResourceSet &resources);
void RemoveResources(const NodeID &node_id);
bool AcquireResources(const NodeID &node_id, const ResourceSet &required_resources);
bool ReleaseResources(const NodeID &node_id, const ResourceSet &acquired_resources);
private:
/// Map from node id to the resources of the node.
absl::flat_hash_map<NodeID, ResourceSet> cluster_resources_;
};
} // namespace gcs
} // namespace ray
+12 -3
View File
@@ -65,7 +65,10 @@ void GcsServer::Start() {
}
void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs node_manager.
// Init gcs resource manager.
InitGcsResourceManager();
// Init gcs node manager.
InitGcsNodeManager(gcs_init_data);
// Init gcs job manager.
@@ -134,7 +137,8 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
node_manager_io_service_.run();
}));
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_);
main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_,
gcs_resource_manager_);
// Initialize by gcs tables data.
gcs_node_manager_->Initialize(gcs_init_data);
// Register service.
@@ -143,6 +147,10 @@ void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
rpc_server_.RegisterService(*node_info_service_);
}
void GcsServer::InitGcsResourceManager() {
gcs_resource_manager_ = std::make_shared<GcsResourceManager>();
}
void GcsServer::InitGcsJobManager() {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_));
@@ -197,7 +205,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_node_manager_);
auto scheduler = std::make_shared<GcsPlacementGroupScheduler>(
main_service_, gcs_table_storage_, *gcs_node_manager_,
main_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_,
/*lease_client_factory=*/
[this](const rpc::Address &address) {
auto node_manager_worker_client = rpc::NodeManagerWorkerClient::make(
@@ -285,6 +293,7 @@ void GcsServer::InstallEventListeners() {
// node is removed from the GCS.
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
gcs_resource_manager_->RemoveResources(node_id);
});
// Install worker event listener.
+6
View File
@@ -17,6 +17,7 @@
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
@@ -77,6 +78,9 @@ class GcsServer {
/// Initialize gcs node manager.
void InitGcsNodeManager(const GcsInitData &gcs_init_data);
/// Initialize gcs resource manager.
void InitGcsResourceManager();
/// Initialize gcs job manager.
void InitGcsJobManager();
@@ -127,6 +131,8 @@ class GcsServer {
rpc::GrpcServer rpc_server_;
/// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s.
rpc::ClientCallManager client_call_manager_;
/// The gcs resource manager.
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
/// The gcs node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
/// The gcs redis failure detector.
@@ -27,8 +27,10 @@ class GcsActorSchedulerTest : public ::testing::Test {
worker_client_ = std::make_shared<GcsServerMocker::MockWorkerClient>();
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
@@ -54,6 +56,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockedGcsActorTable> gcs_actor_table_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<GcsServerMocker::MockWorkerClient> worker_client_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
@@ -29,12 +29,13 @@ class GcsNodeManagerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
};
TEST_F(GcsNodeManagerTest, TestManagement) {
boost::asio::io_service io_service;
gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_,
gcs_table_storage_);
gcs_table_storage_, gcs_resource_manager_);
// Test Add/Get/Remove functionality.
auto node = Mocker::GenNodeInfo();
auto node_id = NodeID::FromBinary(node->node_id());
@@ -49,7 +50,7 @@ TEST_F(GcsNodeManagerTest, TestManagement) {
TEST_F(GcsNodeManagerTest, TestListener) {
boost::asio::io_service io_service;
gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_,
gcs_table_storage_);
gcs_table_storage_, gcs_resource_manager_);
// Test AddNodeAddedListener.
int node_count = 1000;
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> added_nodes;
@@ -86,30 +87,6 @@ TEST_F(GcsNodeManagerTest, TestListener) {
}
}
TEST_F(GcsNodeManagerTest, TestGetClusterRealtimeResources) {
boost::asio::io_service io_service;
gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_,
gcs_table_storage_);
auto node_id = NodeID::FromRandom();
rpc::HeartbeatTableData heartbeat;
const std::string cpu_resource = "CPU";
(*heartbeat.mutable_resources_available())[cpu_resource] = 10;
node_manager.UpdateNodeRealtimeResources(node_id, heartbeat);
auto node_resources = node_manager.GetClusterRealtimeResources();
ResourceSet required_resources;
required_resources.AddOrUpdateResource(cpu_resource, 9);
ASSERT_TRUE(required_resources.IsSubset(node_resources[node_id]));
required_resources.AddOrUpdateResource(cpu_resource, 10);
ASSERT_TRUE(required_resources.IsSubset(node_resources[node_id]));
required_resources.AddOrUpdateResource(cpu_resource, 10.1);
ASSERT_FALSE(required_resources.IsSubset(node_resources[node_id]));
required_resources.DeleteResource(cpu_resource);
required_resources.AddOrUpdateResource("GPU", 9);
ASSERT_FALSE(required_resources.IsSubset(node_resources[node_id]));
}
} // namespace ray
int main(int argc, char **argv) {
@@ -54,8 +54,10 @@ class GcsObjectManagerTest : public ::testing::Test {
public:
void SetUp() override {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs_object_manager_ = std::make_shared<MockedGcsObjectManager>(
gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_);
GenTestData();
@@ -83,6 +85,7 @@ class GcsObjectManagerTest : public ::testing::Test {
protected:
boost::asio::io_service io_service_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
@@ -68,8 +68,10 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
: mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs_placement_group_manager_.reset(
new gcs::GcsPlacementGroupManager(io_service_, mock_placement_group_scheduler_,
gcs_table_storage_, *gcs_node_manager_));
@@ -103,6 +105,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
std::unique_ptr<std::thread> thread_io_service_;
boost::asio::io_service io_service_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
@@ -40,12 +40,14 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
}
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(io_service_, io_service_, gcs_pub_sub_,
gcs_table_storage_, gcs_resource_manager_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
scheduler_ = std::make_shared<GcsServerMocker::MockedGcsPlacementGroupScheduler>(
io_service_, gcs_table_storage_, *gcs_node_manager_,
io_service_, gcs_table_storage_, *gcs_node_manager_, *gcs_resource_manager_,
/*lease_client_fplacement_groupy=*/
[this](const rpc::Address &address) { return raylet_clients_[address.port()]; });
}
@@ -98,6 +100,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
void AddNode(const std::shared_ptr<rpc::GcsNodeInfo> &node, int cpu_num = 10) {
gcs_node_manager_->AddNode(node);
rpc::HeartbeatTableData heartbeat;
heartbeat.set_client_id(node->node_id());
(*heartbeat.mutable_resources_available())["CPU"] = cpu_num;
gcs_node_manager_->UpdateNodeRealtimeResources(NodeID::FromBinary(node->node_id()),
heartbeat);
@@ -202,6 +205,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::shared_ptr<gcs::StoreClient> store_client_;
std::vector<std::shared_ptr<GcsServerMocker::MockRayletResourceClient>> raylet_clients_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsPlacementGroupScheduler> scheduler_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> success_placement_groups_
@@ -0,0 +1,63 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <memory>
#include "gtest/gtest.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/test/gcs_test_util.h"
namespace ray {
using ::testing::_;
class GcsResourceManagerTest : public ::testing::Test {
public:
GcsResourceManagerTest() {
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>();
}
std::shared_ptr<gcs::GcsResourceManagerInterface> gcs_resource_manager_;
};
TEST_F(GcsResourceManagerTest, TestBasic) {
// Add node resources.
auto node_id = NodeID::FromRandom();
const std::string cpu_resource = "CPU";
std::unordered_map<std::string, double> resource_map;
resource_map[cpu_resource] = 10;
ResourceSet resource_set(resource_map);
gcs_resource_manager_->UpdateResources(node_id, resource_set);
// Get and check cluster resources.
const auto &cluster_resource = gcs_resource_manager_->GetClusterResources();
ASSERT_EQ(1, cluster_resource.size());
// Test `AcquireResources`.
ASSERT_TRUE(gcs_resource_manager_->AcquireResources(node_id, resource_set));
ASSERT_FALSE(gcs_resource_manager_->AcquireResources(node_id, resource_set));
// Test `ReleaseResources`.
ASSERT_TRUE(
gcs_resource_manager_->ReleaseResources(NodeID::FromRandom(), resource_set));
ASSERT_TRUE(gcs_resource_manager_->ReleaseResources(node_id, resource_set));
ASSERT_TRUE(gcs_resource_manager_->AcquireResources(node_id, resource_set));
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
@@ -25,6 +25,7 @@
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_scheduler.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/util/asio_util.h"
namespace ray {