[Placement Group]Optimize placement group strict pack strategy (#9924)

* add part code

* add code

* add part code

* rm used import

* add part code

* add part code

* add part code

* add part code

* add part code

* add part code

* fix review comment

* add testcase

* use ResourceSet

* fix review comment

* fix ut bug

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin
2020-08-14 14:58:52 +08:00
committed by GitHub
parent 0fe5722744
commit 3a6fa7d622
7 changed files with 240 additions and 82 deletions
+3 -20
View File
@@ -21,18 +21,6 @@
namespace ray {
namespace gcs {
bool GcsNodeResource::IsSubset(
const std::unordered_map<std::string, double> &request_resources) const {
for (const auto &request_resource : request_resources) {
auto iter = resources_available_.find(request_resource.first);
if (iter == resources_available_.end() || iter->second < request_resource.second) {
return false;
}
}
return true;
}
//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::NodeFailureDetector::NodeFailureDetector(
boost::asio::io_service &io_service,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
@@ -470,16 +458,11 @@ void GcsNodeManager::StartNodeFailureDetector() {
void GcsNodeManager::UpdateNodeRealtimeResources(
const ClientID &node_id, const rpc::HeartbeatTableData &heartbeat) {
auto resources_available = MapFromProtobuf(heartbeat.resources_available());
auto iter = cluster_realtime_resources_.find(node_id);
if (iter != cluster_realtime_resources_.end()) {
iter->second->resources_available_ = resources_available;
} else {
cluster_realtime_resources_[node_id] =
std::make_shared<GcsNodeResource>(resources_available);
}
cluster_realtime_resources_[node_id] =
std::make_shared<ResourceSet>(resources_available);
}
const absl::flat_hash_map<ClientID, std::shared_ptr<GcsNodeResource>>
const absl::flat_hash_map<ClientID, std::shared_ptr<ResourceSet>>
&GcsNodeManager::GetClusterRealtimeResources() const {
return cluster_realtime_resources_;
}
+2 -15
View File
@@ -27,18 +27,6 @@
namespace ray {
namespace gcs {
/// Node resource information.
class GcsNodeResource {
public:
explicit GcsNodeResource(std::unordered_map<std::string, double> resources_available)
: resources_available_(std::move(resources_available)) {}
bool IsSubset(const std::unordered_map<std::string, double> &request_resources) const;
/// Dynamic resource capacity.
std::unordered_map<std::string, double> resources_available_;
};
/// GcsNodeManager is responsible for managing and monitoring nodes as well as handing
/// node and resource related rpc requests.
/// This class is not thread-safe.
@@ -160,7 +148,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
const rpc::HeartbeatTableData &heartbeat);
/// Get cluster realtime resources.
const absl::flat_hash_map<ClientID, std::shared_ptr<GcsNodeResource>>
const absl::flat_hash_map<ClientID, std::shared_ptr<ResourceSet>>
&GetClusterRealtimeResources() const;
protected:
@@ -260,8 +248,7 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Cluster realtime resources.
absl::flat_hash_map<ClientID, std::shared_ptr<GcsNodeResource>>
cluster_realtime_resources_;
absl::flat_hash_map<ClientID, std::shared_ptr<ResourceSet>> cluster_realtime_resources_;
};
} // namespace gcs
@@ -33,17 +33,42 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
scheduler_strategies_.push_back(std::make_shared<GcsSpreadStrategy>());
}
/// This is an initial algorithm to respect pack algorithm.
/// In this algorithm, we try to pack all the bundle in the first node
/// and don't care the real resource.
/// In this algorithm, we try to pack all the bundles in the node which satisfies the
/// resource requirements and has the least number of bundles.
/// TODO(ffbin): At present, only one node will be scheduled. If one node does not have
/// enough resources, we need to divide bundles to multiple nodes. We will implement
/// it in the next pr.
ScheduleMap GcsPackStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const GcsNodeManager &node_manager) {
const std::unique_ptr<ScheduleContext> &context) {
// Aggregate required resources.
ResourceSet required_resources;
for (const auto &bundle : bundles) {
required_resources.AddResources(bundle->GetRequiredResources());
}
// Filter candidate nodes.
const auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources();
std::vector<std::pair<int64_t, ClientID>> candidate_nodes;
for (auto &node : alive_nodes) {
if (required_resources.IsSubset(*node.second)) {
candidate_nodes.emplace_back((*context->node_to_bundles_)[node.first], node.first);
}
}
// Select the node with the least number of bundles.
ScheduleMap schedule_map;
auto &alive_nodes = node_manager.GetAllAliveNodes();
if (candidate_nodes.empty()) {
return schedule_map;
}
std::sort(
std::begin(candidate_nodes), std::end(candidate_nodes),
[](const std::pair<int64_t, ClientID> &left,
const std::pair<int64_t, ClientID> &right) { return left.first < right.first; });
for (auto &bundle : bundles) {
schedule_map[bundle->BundleId()] =
ClientID::FromBinary(alive_nodes.begin()->second->node_id());
schedule_map[bundle->BundleId()] = candidate_nodes.front().second;
}
return schedule_map;
}
@@ -53,9 +78,9 @@ ScheduleMap GcsPackStrategy::Schedule(
/// and don't care the real resource.
ScheduleMap GcsSpreadStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const GcsNodeManager &node_manager) {
const std::unique_ptr<ScheduleContext> &context) {
ScheduleMap schedule_map;
auto &alive_nodes = node_manager.GetAllAliveNodes();
auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources();
auto iter = alive_nodes.begin();
size_t index = 0;
size_t alive_nodes_size = alive_nodes.size();
@@ -64,8 +89,7 @@ ScheduleMap GcsSpreadStrategy::Schedule(
if (index + base * alive_nodes_size >= bundles.size()) {
break;
} else {
schedule_map[bundles[index + base * alive_nodes_size]->BundleId()] =
ClientID::FromBinary(iter->second->node_id());
schedule_map[bundles[index + base * alive_nodes_size]->BundleId()] = iter->first;
}
}
}
@@ -74,19 +98,19 @@ ScheduleMap GcsSpreadStrategy::Schedule(
void GcsPlacementGroupScheduler::Schedule(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> schedule_success_handler) {
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_callback,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_callback) {
RAY_LOG(INFO) << "Scheduling placement group " << placement_group->GetName();
auto bundles = placement_group->GetBundles();
auto strategy = placement_group->GetStrategy();
auto selected_nodes =
scheduler_strategies_[strategy]->Schedule(bundles, gcs_node_manager_);
scheduler_strategies_[strategy]->Schedule(bundles, GetScheduleContext());
// If no nodes are available, scheduling fails.
if (selected_nodes.empty()) {
RAY_LOG(WARNING) << "Failed to schedule placement group "
<< placement_group->GetName() << ", because no nodes are available.";
schedule_failure_handler(placement_group);
failure_callback(placement_group);
return;
}
@@ -104,8 +128,7 @@ void GcsPlacementGroupScheduler::Schedule(
ReserveResourceFromNode(
bundle, gcs_node_manager_.GetNode(node_id),
[this, bundle_id, bundle, bundles, node_id, placement_group, bundle_locations,
finished_count, schedule_failure_handler,
schedule_success_handler](const Status &status) {
finished_count, failure_callback, success_callback](const Status &status) {
if (status.ok()) {
(*bundle_locations)[bundle_id] = std::make_pair(node_id, bundle);
}
@@ -120,17 +143,24 @@ void GcsPlacementGroupScheduler::Schedule(
data.mutable_schedule_plan()->insert(
{key, (*bundle_locations)[iter->BundleId()].first.Binary()});
}
// Update `node_to_leased_bundles_`.
for (const auto &iter : *bundle_locations) {
const auto &location = iter.second;
node_to_leased_bundles_[location.first].push_back(location.second);
}
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put(
placement_group->GetPlacementGroupID(), data,
[schedule_success_handler, placement_group](Status status) {
schedule_success_handler(placement_group);
[success_callback, placement_group](Status status) {
success_callback(placement_group);
}));
} else {
for (auto &iter : *bundle_locations) {
for (const auto &iter : *bundle_locations) {
CancelResourceReserve(iter.second.second,
gcs_node_manager_.GetNode(node_id));
}
schedule_failure_handler(placement_group);
failure_callback(placement_group);
}
}
});
@@ -205,5 +235,23 @@ GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_a
return iter->second;
}
std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext() {
// TODO(ffbin): We will add listener to the GCS node manager to handle node deletion.
auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
for (const auto &iter : alive_nodes) {
if (!node_to_leased_bundles_.contains(iter.first)) {
node_to_leased_bundles_.emplace(
iter.first, std::vector<std::shared_ptr<BundleSpecification>>());
}
}
auto node_to_bundles = std::make_shared<absl::flat_hash_map<ClientID, int64_t>>();
for (const auto &iter : node_to_leased_bundles_) {
node_to_bundles->emplace(iter.first, iter.second.size());
}
return std::unique_ptr<ScheduleContext>(
new ScheduleContext(node_to_bundles, gcs_node_manager_));
}
} // namespace gcs
} // namespace ray
@@ -13,9 +13,6 @@
// limitations under the License.
#pragma once
#include <queue>
#include <tuple>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/id.h"
@@ -57,24 +54,36 @@ class GcsPlacementGroupSchedulerInterface {
virtual ~GcsPlacementGroupSchedulerInterface() {}
};
class ScheduleContext {
public:
ScheduleContext(std::shared_ptr<absl::flat_hash_map<ClientID, int64_t>> node_to_bundles,
const GcsNodeManager &node_manager)
: node_to_bundles_(std::move(node_to_bundles)), node_manager_(node_manager) {}
// Key is node id, value is the number of bundles on the node.
std::shared_ptr<absl::flat_hash_map<ClientID, int64_t>> node_to_bundles_;
const GcsNodeManager &node_manager_;
};
class GcsScheduleStrategy {
public:
virtual ~GcsScheduleStrategy() {}
virtual ScheduleMap Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const GcsNodeManager &node_manager) = 0;
const std::unique_ptr<ScheduleContext> &context) = 0;
};
class GcsPackStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const GcsNodeManager &node_manager) override;
const std::unique_ptr<ScheduleContext> &context) override;
};
class GcsSpreadStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const GcsNodeManager &node_manager) override;
const std::unique_ptr<ScheduleContext> &context) override;
};
/// GcsPlacementGroupScheduler is responsible for scheduling placement_groups registered
@@ -91,18 +100,21 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const GcsNodeManager &gcs_node_manager,
ReserveResourceClientFactoryFn lease_client_factory = nullptr);
virtual ~GcsPlacementGroupScheduler() = default;
/// Schedule the specified placement_group.
/// If there is no available nodes then the `schedule_failed_handler` will be
/// triggered, otherwise the bundle in placement_group will be add into a queue and
/// schedule all bundle by calling ReserveResourceFromNode().
///
/// \param placement_group to be scheduled.
/// \param failure_callback This function is called if the schedule is failed.
/// \param success_callback This function is called if the schedule is successful.
void Schedule(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> schedule_success_handler)
override;
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_handler,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_handler) override;
protected:
/// Lease resource from the specified node for the specified bundle.
@@ -120,6 +132,10 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// Get an existing lease client or connect a new one.
std::shared_ptr<ResourceReserveInterface> GetOrConnectLeaseClient(
const rpc::Address &raylet_address);
/// Generate schedule conetext.
std::unique_ptr<ScheduleContext> GetScheduleContext();
/// A timer that ticks every cancel resource failure milliseconds.
boost::asio::deadline_timer return_timer_;
/// Used to update placement group information upon creation, deletion, etc.
@@ -137,6 +153,12 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// until we receive a reply or the node is removed.
absl::flat_hash_map<ClientID, absl::flat_hash_set<BundleID>>
node_to_bundles_when_leasing_;
/// Map from node ID to the set of bundles. This is needed so that we can reschedule
/// bundles when a node is dead.
absl::flat_hash_map<ClientID, std::vector<std::shared_ptr<BundleSpecification>>>
node_to_leased_bundles_;
/// A vector to store all the schedule strategy.
std::vector<std::shared_ptr<GcsScheduleStrategy>> scheduler_strategies_;
};
@@ -93,19 +93,21 @@ TEST_F(GcsNodeManagerTest, TestGetClusterRealtimeResources) {
auto node_id = ClientID::FromRandom();
rpc::HeartbeatTableData heartbeat;
(*heartbeat.mutable_resources_available())["CPU"] = 10;
const std::string cpu_resource = "CPU";
(*heartbeat.mutable_resources_available())[cpu_resource] = 10;
node_manager.UpdateNodeRealtimeResources(node_id, heartbeat);
auto node_resources = node_manager.GetClusterRealtimeResources();
std::unordered_map<std::string, double> request_resources;
request_resources["CPU"] = 9;
ASSERT_TRUE(node_resources[node_id]->IsSubset(request_resources));
request_resources["CPU"] = 10;
ASSERT_TRUE(node_resources[node_id]->IsSubset(request_resources));
request_resources["CPU"] = 10.1;
ASSERT_FALSE(node_resources[node_id]->IsSubset(request_resources));
request_resources.clear();
request_resources["GPU"] = 1;
ASSERT_FALSE(node_resources[node_id]->IsSubset(request_resources));
ResourceSet required_resources;
required_resources.AddOrUpdateResource(cpu_resource, 9);
ASSERT_TRUE(required_resources.IsSubset(*node_resources[node_id]));
required_resources.AddOrUpdateResource(cpu_resource, 10);
ASSERT_TRUE(required_resources.IsSubset(*node_resources[node_id]));
required_resources.AddOrUpdateResource(cpu_resource, 10.1);
ASSERT_FALSE(required_resources.IsSubset(*node_resources[node_id]));
required_resources.DeleteResource(cpu_resource);
required_resources.AddOrUpdateResource("GPU", 9);
ASSERT_FALSE(required_resources.IsSubset(*node_resources[node_id]));
}
} // namespace ray
@@ -30,6 +30,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
}));
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletResourceClient>();
raylet_client1_ = std::make_shared<GcsServerMocker::MockRayletResourceClient>();
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
@@ -40,7 +41,13 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::make_shared<GcsServerMocker::MockedGcsPlacementGroupScheduler>(
io_service_, gcs_table_storage_, *gcs_node_manager_,
/*lease_client_fplacement_groupy=*/
[this](const rpc::Address &address) { return raylet_client_; });
[this](const rpc::Address &address) {
if (0 == address.port()) {
return raylet_client_;
} else {
return raylet_client1_;
}
});
}
void TearDown() override {
@@ -57,6 +64,14 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
void AddNode(const std::shared_ptr<rpc::GcsNodeInfo> &node, int cpu_num = 10) {
gcs_node_manager_->AddNode(node);
rpc::HeartbeatTableData heartbeat;
(*heartbeat.mutable_resources_available())["CPU"] = cpu_num;
gcs_node_manager_->UpdateNodeRealtimeResources(ClientID::FromBinary(node->node_id()),
heartbeat);
}
protected:
const std::chrono::milliseconds timeout_ms_{6000};
absl::Mutex vector_mutex_;
@@ -65,6 +80,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::shared_ptr<gcs::StoreClient> store_client_;
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client_;
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client1_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsPlacementGroupScheduler>
gcs_placement_group_scheduler_;
@@ -101,7 +117,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestScheduleFailedWithZeroNode) {
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) {
auto node = Mocker::GenNodeInfo();
gcs_node_manager_->AddNode(node);
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
@@ -132,7 +148,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) {
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) {
auto node = Mocker::GenNodeInfo();
gcs_node_manager_->AddNode(node);
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
@@ -165,7 +181,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) {
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) {
auto node = Mocker::GenNodeInfo();
gcs_node_manager_->AddNode(node);
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
@@ -198,6 +214,106 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
ASSERT_EQ(placement_group, failure_placement_groups_.front());
}
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) {
AddNode(Mocker::GenNodeInfo(0));
AddNode(Mocker::GenNodeInfo(1));
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto success_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
};
// Schedule placement group, it will be evenly distributed over the two nodes.
int select_node0_count = 0;
int select_node1_count = 0;
for (int index = 0; index < 10; ++index) {
auto create_placement_group_request =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK);
auto placement_group =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
if (!raylet_client_->lease_callbacks.empty()) {
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
++select_node0_count;
} else {
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
ASSERT_TRUE(raylet_client1_->GrantResourceReserve());
++select_node1_count;
}
}
WaitPendingDone(success_placement_groups_, 10);
ASSERT_EQ(select_node0_count, 5);
ASSERT_EQ(select_node1_count, 5);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyReschedulingWhenNodeAdd) {
AddNode(Mocker::GenNodeInfo(0), 1);
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto success_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
};
// Failed to schedule the placement group, because the node resources is not enough.
auto create_placement_group_request =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK);
auto placement_group =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
WaitPendingDone(failure_placement_groups_, 1);
ASSERT_EQ(0, success_placement_groups_.size());
// A new node is added, and the rescheduling is successful.
AddNode(Mocker::GenNodeInfo(0), 2);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 1);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
auto node0 = Mocker::GenNodeInfo(0);
AddNode(node0);
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto success_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
};
auto create_placement_group_request =
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK);
auto placement_group =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 1);
// Node1 has less number of bundles, but it doesn't satisfy the resource
// requirement. In this case, the bundles should be scheduled on Node0.
auto node1 = Mocker::GenNodeInfo(1);
AddNode(node1, 1);
gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler,
success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 2);
}
} // namespace ray
int main(int argc, char **argv) {
+2 -2
View File
@@ -89,10 +89,10 @@ struct Mocker {
}
static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest(
const std::string name = "") {
const std::string name = "",
rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD) {
rpc::CreatePlacementGroupRequest request;
std::vector<std::unordered_map<std::string, double>> bundles;
rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD;
std::unordered_map<std::string, double> bundle;
bundle["CPU"] = 1.0;
bundles.push_back(bundle);