From 3a6fa7d6225b2f6ecb251f8763fca9f4581a2e49 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Fri, 14 Aug 2020 14:58:52 +0800 Subject: [PATCH] [Placement Group]Optimize placement group strict pack strategy (#9924) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add part code * add code * add part code * rm used import * add part code * add part code * add part code * add part code * add part code * add part code * fix review comment * add testcase * use ResourceSet * fix review comment * fix ut bug Co-authored-by: 灵洵 --- src/ray/gcs/gcs_server/gcs_node_manager.cc | 23 +--- src/ray/gcs/gcs_server/gcs_node_manager.h | 17 +-- .../gcs_placement_group_scheduler.cc | 90 ++++++++++--- .../gcs_placement_group_scheduler.h | 40 ++++-- .../gcs_server/test/gcs_node_manager_test.cc | 24 ++-- .../gcs_placement_group_scheduler_test.cc | 124 +++++++++++++++++- src/ray/gcs/test/gcs_test_util.h | 4 +- 7 files changed, 240 insertions(+), 82 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index a3180e128..679e01ff4 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -21,18 +21,6 @@ namespace ray { namespace gcs { -bool GcsNodeResource::IsSubset( - const std::unordered_map &request_resources) const { - for (const auto &request_resource : request_resources) { - auto iter = resources_available_.find(request_resource.first); - if (iter == resources_available_.end() || iter->second < request_resource.second) { - return false; - } - } - return true; -} - -////////////////////////////////////////////////////////////////////////////////////////// GcsNodeManager::NodeFailureDetector::NodeFailureDetector( boost::asio::io_service &io_service, std::shared_ptr gcs_table_storage, @@ -470,16 +458,11 @@ void GcsNodeManager::StartNodeFailureDetector() { void GcsNodeManager::UpdateNodeRealtimeResources( const ClientID &node_id, const rpc::HeartbeatTableData &heartbeat) { auto resources_available = MapFromProtobuf(heartbeat.resources_available()); - auto iter = cluster_realtime_resources_.find(node_id); - if (iter != cluster_realtime_resources_.end()) { - iter->second->resources_available_ = resources_available; - } else { - cluster_realtime_resources_[node_id] = - std::make_shared(resources_available); - } + cluster_realtime_resources_[node_id] = + std::make_shared(resources_available); } -const absl::flat_hash_map> +const absl::flat_hash_map> &GcsNodeManager::GetClusterRealtimeResources() const { return cluster_realtime_resources_; } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 3aa27094a..a818aa127 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -27,18 +27,6 @@ namespace ray { namespace gcs { -/// Node resource information. -class GcsNodeResource { - public: - explicit GcsNodeResource(std::unordered_map resources_available) - : resources_available_(std::move(resources_available)) {} - - bool IsSubset(const std::unordered_map &request_resources) const; - - /// Dynamic resource capacity. - std::unordered_map resources_available_; -}; - /// GcsNodeManager is responsible for managing and monitoring nodes as well as handing /// node and resource related rpc requests. /// This class is not thread-safe. @@ -160,7 +148,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler { const rpc::HeartbeatTableData &heartbeat); /// Get cluster realtime resources. - const absl::flat_hash_map> + const absl::flat_hash_map> &GetClusterRealtimeResources() const; protected: @@ -260,8 +248,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// Storage for GCS tables. std::shared_ptr gcs_table_storage_; /// Cluster realtime resources. - absl::flat_hash_map> - cluster_realtime_resources_; + absl::flat_hash_map> cluster_realtime_resources_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 47ae66395..efe4f089f 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -33,17 +33,42 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( scheduler_strategies_.push_back(std::make_shared()); } -/// This is an initial algorithm to respect pack algorithm. -/// In this algorithm, we try to pack all the bundle in the first node -/// and don't care the real resource. +/// In this algorithm, we try to pack all the bundles in the node which satisfies the +/// resource requirements and has the least number of bundles. +/// TODO(ffbin): At present, only one node will be scheduled. If one node does not have +/// enough resources, we need to divide bundles to multiple nodes. We will implement +/// it in the next pr. ScheduleMap GcsPackStrategy::Schedule( std::vector> &bundles, - const GcsNodeManager &node_manager) { + const std::unique_ptr &context) { + // Aggregate required resources. + ResourceSet required_resources; + for (const auto &bundle : bundles) { + required_resources.AddResources(bundle->GetRequiredResources()); + } + + // Filter candidate nodes. + const auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources(); + std::vector> candidate_nodes; + for (auto &node : alive_nodes) { + if (required_resources.IsSubset(*node.second)) { + candidate_nodes.emplace_back((*context->node_to_bundles_)[node.first], node.first); + } + } + + // Select the node with the least number of bundles. ScheduleMap schedule_map; - auto &alive_nodes = node_manager.GetAllAliveNodes(); + if (candidate_nodes.empty()) { + return schedule_map; + } + + std::sort( + std::begin(candidate_nodes), std::end(candidate_nodes), + [](const std::pair &left, + const std::pair &right) { return left.first < right.first; }); + for (auto &bundle : bundles) { - schedule_map[bundle->BundleId()] = - ClientID::FromBinary(alive_nodes.begin()->second->node_id()); + schedule_map[bundle->BundleId()] = candidate_nodes.front().second; } return schedule_map; } @@ -53,9 +78,9 @@ ScheduleMap GcsPackStrategy::Schedule( /// and don't care the real resource. ScheduleMap GcsSpreadStrategy::Schedule( std::vector> &bundles, - const GcsNodeManager &node_manager) { + const std::unique_ptr &context) { ScheduleMap schedule_map; - auto &alive_nodes = node_manager.GetAllAliveNodes(); + auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources(); auto iter = alive_nodes.begin(); size_t index = 0; size_t alive_nodes_size = alive_nodes.size(); @@ -64,8 +89,7 @@ ScheduleMap GcsSpreadStrategy::Schedule( if (index + base * alive_nodes_size >= bundles.size()) { break; } else { - schedule_map[bundles[index + base * alive_nodes_size]->BundleId()] = - ClientID::FromBinary(iter->second->node_id()); + schedule_map[bundles[index + base * alive_nodes_size]->BundleId()] = iter->first; } } } @@ -74,19 +98,19 @@ ScheduleMap GcsSpreadStrategy::Schedule( void GcsPlacementGroupScheduler::Schedule( std::shared_ptr placement_group, - std::function)> schedule_failure_handler, - std::function)> schedule_success_handler) { + std::function)> failure_callback, + std::function)> success_callback) { RAY_LOG(INFO) << "Scheduling placement group " << placement_group->GetName(); auto bundles = placement_group->GetBundles(); auto strategy = placement_group->GetStrategy(); auto selected_nodes = - scheduler_strategies_[strategy]->Schedule(bundles, gcs_node_manager_); + scheduler_strategies_[strategy]->Schedule(bundles, GetScheduleContext()); // If no nodes are available, scheduling fails. if (selected_nodes.empty()) { RAY_LOG(WARNING) << "Failed to schedule placement group " << placement_group->GetName() << ", because no nodes are available."; - schedule_failure_handler(placement_group); + failure_callback(placement_group); return; } @@ -104,8 +128,7 @@ void GcsPlacementGroupScheduler::Schedule( ReserveResourceFromNode( bundle, gcs_node_manager_.GetNode(node_id), [this, bundle_id, bundle, bundles, node_id, placement_group, bundle_locations, - finished_count, schedule_failure_handler, - schedule_success_handler](const Status &status) { + finished_count, failure_callback, success_callback](const Status &status) { if (status.ok()) { (*bundle_locations)[bundle_id] = std::make_pair(node_id, bundle); } @@ -120,17 +143,24 @@ void GcsPlacementGroupScheduler::Schedule( data.mutable_schedule_plan()->insert( {key, (*bundle_locations)[iter->BundleId()].first.Binary()}); } + + // Update `node_to_leased_bundles_`. + for (const auto &iter : *bundle_locations) { + const auto &location = iter.second; + node_to_leased_bundles_[location.first].push_back(location.second); + } + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put( placement_group->GetPlacementGroupID(), data, - [schedule_success_handler, placement_group](Status status) { - schedule_success_handler(placement_group); + [success_callback, placement_group](Status status) { + success_callback(placement_group); })); } else { - for (auto &iter : *bundle_locations) { + for (const auto &iter : *bundle_locations) { CancelResourceReserve(iter.second.second, gcs_node_manager_.GetNode(node_id)); } - schedule_failure_handler(placement_group); + failure_callback(placement_group); } } }); @@ -205,5 +235,23 @@ GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_a return iter->second; } +std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext() { + // TODO(ffbin): We will add listener to the GCS node manager to handle node deletion. + auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); + for (const auto &iter : alive_nodes) { + if (!node_to_leased_bundles_.contains(iter.first)) { + node_to_leased_bundles_.emplace( + iter.first, std::vector>()); + } + } + + auto node_to_bundles = std::make_shared>(); + for (const auto &iter : node_to_leased_bundles_) { + node_to_bundles->emplace(iter.first, iter.second.size()); + } + return std::unique_ptr( + new ScheduleContext(node_to_bundles, gcs_node_manager_)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index afd8acec6..7b440cfab 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -13,9 +13,6 @@ // limitations under the License. #pragma once -#include -#include - #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "ray/common/id.h" @@ -57,24 +54,36 @@ class GcsPlacementGroupSchedulerInterface { virtual ~GcsPlacementGroupSchedulerInterface() {} }; +class ScheduleContext { + public: + ScheduleContext(std::shared_ptr> node_to_bundles, + const GcsNodeManager &node_manager) + : node_to_bundles_(std::move(node_to_bundles)), node_manager_(node_manager) {} + + // Key is node id, value is the number of bundles on the node. + std::shared_ptr> node_to_bundles_; + + const GcsNodeManager &node_manager_; +}; + class GcsScheduleStrategy { public: virtual ~GcsScheduleStrategy() {} virtual ScheduleMap Schedule( std::vector> &bundles, - const GcsNodeManager &node_manager) = 0; + const std::unique_ptr &context) = 0; }; class GcsPackStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, - const GcsNodeManager &node_manager) override; + const std::unique_ptr &context) override; }; class GcsSpreadStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, - const GcsNodeManager &node_manager) override; + const std::unique_ptr &context) override; }; /// GcsPlacementGroupScheduler is responsible for scheduling placement_groups registered @@ -91,18 +100,21 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { std::shared_ptr gcs_table_storage, const GcsNodeManager &gcs_node_manager, ReserveResourceClientFactoryFn lease_client_factory = nullptr); + virtual ~GcsPlacementGroupScheduler() = default; + /// Schedule the specified placement_group. /// If there is no available nodes then the `schedule_failed_handler` will be /// triggered, otherwise the bundle in placement_group will be add into a queue and /// schedule all bundle by calling ReserveResourceFromNode(). /// /// \param placement_group to be scheduled. + /// \param failure_callback This function is called if the schedule is failed. + /// \param success_callback This function is called if the schedule is successful. void Schedule( std::shared_ptr placement_group, - std::function)> schedule_failure_handler, - std::function)> schedule_success_handler) - override; + std::function)> failure_handler, + std::function)> success_handler) override; protected: /// Lease resource from the specified node for the specified bundle. @@ -120,6 +132,10 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// Get an existing lease client or connect a new one. std::shared_ptr GetOrConnectLeaseClient( const rpc::Address &raylet_address); + + /// Generate schedule conetext. + std::unique_ptr GetScheduleContext(); + /// A timer that ticks every cancel resource failure milliseconds. boost::asio::deadline_timer return_timer_; /// Used to update placement group information upon creation, deletion, etc. @@ -137,6 +153,12 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// until we receive a reply or the node is removed. absl::flat_hash_map> node_to_bundles_when_leasing_; + + /// Map from node ID to the set of bundles. This is needed so that we can reschedule + /// bundles when a node is dead. + absl::flat_hash_map>> + node_to_leased_bundles_; + /// A vector to store all the schedule strategy. std::vector> scheduler_strategies_; }; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index 53ce725db..78deece92 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -93,19 +93,21 @@ TEST_F(GcsNodeManagerTest, TestGetClusterRealtimeResources) { auto node_id = ClientID::FromRandom(); rpc::HeartbeatTableData heartbeat; - (*heartbeat.mutable_resources_available())["CPU"] = 10; + const std::string cpu_resource = "CPU"; + (*heartbeat.mutable_resources_available())[cpu_resource] = 10; node_manager.UpdateNodeRealtimeResources(node_id, heartbeat); auto node_resources = node_manager.GetClusterRealtimeResources(); - std::unordered_map request_resources; - request_resources["CPU"] = 9; - ASSERT_TRUE(node_resources[node_id]->IsSubset(request_resources)); - request_resources["CPU"] = 10; - ASSERT_TRUE(node_resources[node_id]->IsSubset(request_resources)); - request_resources["CPU"] = 10.1; - ASSERT_FALSE(node_resources[node_id]->IsSubset(request_resources)); - request_resources.clear(); - request_resources["GPU"] = 1; - ASSERT_FALSE(node_resources[node_id]->IsSubset(request_resources)); + + ResourceSet required_resources; + required_resources.AddOrUpdateResource(cpu_resource, 9); + ASSERT_TRUE(required_resources.IsSubset(*node_resources[node_id])); + required_resources.AddOrUpdateResource(cpu_resource, 10); + ASSERT_TRUE(required_resources.IsSubset(*node_resources[node_id])); + required_resources.AddOrUpdateResource(cpu_resource, 10.1); + ASSERT_FALSE(required_resources.IsSubset(*node_resources[node_id])); + required_resources.DeleteResource(cpu_resource); + required_resources.AddOrUpdateResource("GPU", 9); + ASSERT_FALSE(required_resources.IsSubset(*node_resources[node_id])); } } // namespace ray diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 70413007c..2b0356a00 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -30,6 +30,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { })); raylet_client_ = std::make_shared(); + raylet_client1_ = std::make_shared(); gcs_table_storage_ = std::make_shared(io_service_); gcs_pub_sub_ = std::make_shared(redis_client_); gcs_node_manager_ = std::make_shared( @@ -40,7 +41,13 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { std::make_shared( io_service_, gcs_table_storage_, *gcs_node_manager_, /*lease_client_fplacement_groupy=*/ - [this](const rpc::Address &address) { return raylet_client_; }); + [this](const rpc::Address &address) { + if (0 == address.port()) { + return raylet_client_; + } else { + return raylet_client1_; + } + }); } void TearDown() override { @@ -57,6 +64,14 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); } + void AddNode(const std::shared_ptr &node, int cpu_num = 10) { + gcs_node_manager_->AddNode(node); + rpc::HeartbeatTableData heartbeat; + (*heartbeat.mutable_resources_available())["CPU"] = cpu_num; + gcs_node_manager_->UpdateNodeRealtimeResources(ClientID::FromBinary(node->node_id()), + heartbeat); + } + protected: const std::chrono::milliseconds timeout_ms_{6000}; absl::Mutex vector_mutex_; @@ -65,6 +80,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { std::shared_ptr store_client_; std::shared_ptr raylet_client_; + std::shared_ptr raylet_client1_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_placement_group_scheduler_; @@ -101,7 +117,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestScheduleFailedWithZeroNode) { TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) { auto node = Mocker::GenNodeInfo(); - gcs_node_manager_->AddNode(node); + AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); @@ -132,7 +148,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) { TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) { auto node = Mocker::GenNodeInfo(); - gcs_node_manager_->AddNode(node); + AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); @@ -165,7 +181,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) { TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) { auto node = Mocker::GenNodeInfo(); - gcs_node_manager_->AddNode(node); + AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); @@ -198,6 +214,106 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) ASSERT_EQ(placement_group, failure_placement_groups_.front()); } +TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) { + AddNode(Mocker::GenNodeInfo(0)); + AddNode(Mocker::GenNodeInfo(1)); + auto failure_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + + // Schedule placement group, it will be evenly distributed over the two nodes. + int select_node0_count = 0; + int select_node1_count = 0; + for (int index = 0; index < 10; ++index) { + auto create_placement_group_request = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK); + auto placement_group = + std::make_shared(create_placement_group_request); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + + if (!raylet_client_->lease_callbacks.empty()) { + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ++select_node0_count; + } else { + ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + ++select_node1_count; + } + } + WaitPendingDone(success_placement_groups_, 10); + ASSERT_EQ(select_node0_count, 5); + ASSERT_EQ(select_node1_count, 5); +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyReschedulingWhenNodeAdd) { + AddNode(Mocker::GenNodeInfo(0), 1); + auto failure_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + + // Failed to schedule the placement group, because the node resources is not enough. + auto create_placement_group_request = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK); + auto placement_group = + std::make_shared(create_placement_group_request); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + WaitPendingDone(failure_placement_groups_, 1); + ASSERT_EQ(0, success_placement_groups_.size()); + + // A new node is added, and the rescheduling is successful. + AddNode(Mocker::GenNodeInfo(0), 2); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + WaitPendingDone(success_placement_groups_, 1); +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { + auto node0 = Mocker::GenNodeInfo(0); + AddNode(node0); + auto failure_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto create_placement_group_request = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK); + auto placement_group = + std::make_shared(create_placement_group_request); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + WaitPendingDone(success_placement_groups_, 1); + + // Node1 has less number of bundles, but it doesn't satisfy the resource + // requirement. In this case, the bundles should be scheduled on Node0. + auto node1 = Mocker::GenNodeInfo(1); + AddNode(node1, 1); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + WaitPendingDone(success_placement_groups_, 2); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index c8b4d86fe..79d4116c8 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -89,10 +89,10 @@ struct Mocker { } static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest( - const std::string name = "") { + const std::string name = "", + rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD) { rpc::CreatePlacementGroupRequest request; std::vector> bundles; - rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD; std::unordered_map bundle; bundle["CPU"] = 1.0; bundles.push_back(bundle);