From a462ae2747afbeb9047e443cd51e67e3fe0b49e6 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Fri, 21 Aug 2020 01:18:58 +0800 Subject: [PATCH] [Placement Group]Add strict spread strategy (#10174) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * support STRICT_SPREAD strategy * fix review comments * rebase master * fix lint error * fix lint error Co-authored-by: 灵洵 --- .../api/placementgroup/PlacementStrategy.java | 10 +- python/ray/_raylet.pyx | 7 +- python/ray/experimental/placement_group.py | 1 + python/ray/includes/common.pxd | 2 + python/ray/tests/test_placement_group.py | 59 ++++++- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 4 +- .../gcs_placement_group_scheduler.cc | 82 ++++++++- .../gcs_placement_group_scheduler.h | 9 + .../gcs_placement_group_scheduler_test.cc | 160 +++++++++++++----- src/ray/gcs/test/gcs_test_util.h | 4 +- src/ray/protobuf/common.proto | 3 + 11 files changed, 278 insertions(+), 63 deletions(-) diff --git a/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java b/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java index 473c2eb12..1cdfb2733 100644 --- a/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java +++ b/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java @@ -8,14 +8,22 @@ public enum PlacementStrategy { * Packs Bundles close together inside nodes as tight as possible. */ PACK(0), + /** * Places Bundles across distinct nodes as even as possible. */ SPREAD(1), + /** * Packs Bundles into one node. The group is not allowed to span multiple nodes. */ - STRICT_PACK(2); + STRICT_PACK(2), + + /** + * Places Bundles across distinct nodes. + * The group is not allowed to deploy more than one bundle on a node. + */ + STRICT_SPREAD(3); private int value = 0; diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 2098df5c4..b3c6dae49 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -70,6 +70,7 @@ from ray.includes.common cimport ( PLACEMENT_STRATEGY_PACK, PLACEMENT_STRATEGY_SPREAD, PLACEMENT_STRATEGY_STRICT_PACK, + PLACEMENT_STRATEGY_STRICT_SPREAD, ) from ray.includes.unique_ids cimport ( CActorID, @@ -1064,9 +1065,11 @@ cdef class CoreWorker: c_strategy = PLACEMENT_STRATEGY_PACK elif strategy == b"SPREAD": c_strategy = PLACEMENT_STRATEGY_SPREAD + elif strategy == b"STRICT_PACK": + c_strategy = PLACEMENT_STRATEGY_STRICT_PACK else: - if strategy == b"STRICT_PACK": - c_strategy = PLACEMENT_STRATEGY_STRICT_PACK + if strategy == b"STRICT_SPREAD": + c_strategy = PLACEMENT_STRATEGY_STRICT_SPREAD else: raise TypeError(strategy) diff --git a/python/ray/experimental/placement_group.py b/python/ray/experimental/placement_group.py index 980f2147b..190b45849 100644 --- a/python/ray/experimental/placement_group.py +++ b/python/ray/experimental/placement_group.py @@ -19,6 +19,7 @@ def placement_group(bundles: List[Dict[str, float]], PACK: Packs Bundles into as few nodes as possible. SPREAD: Places Bundles across distinct nodes as even as possible. STRICT_PACK: Packs Bundles into one node. + STRICT_SPREAD: Packs Bundles across distinct nodes. The group is not allowed to span multiple nodes. name: The name of the placement group. """ diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 8ae73bdc4..de489a9fe 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -176,6 +176,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: "ray::PlacementStrategy::SPREAD" cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_PACK \ "ray::PlacementStrategy::STRICT_PACK" + cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_SPREAD \ + "ray::PlacementStrategy::STRICT_SPREAD" cdef extern from "ray/common/task/scheduling_resources.h" nogil: cdef cppclass ResourceSet "ray::ResourceSet": diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 1bb963c44..cae741c1d 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -137,7 +137,7 @@ def test_placement_group_spread(ray_start_cluster): # Get all actors. actor_infos = ray.actors() - # Make sure all actors in counter_list are collocated in one node. + # Make sure all actors in counter_list are located in separate nodes. actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) @@ -148,6 +148,63 @@ def test_placement_group_spread(ray_start_cluster): assert node_of_actor_1 != node_of_actor_2 +def test_placement_group_strict_spread(ray_start_cluster): + @ray.remote(num_cpus=2) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + cluster = ray_start_cluster + num_nodes = 3 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + placement_group_id = ray.experimental.placement_group( + name="name", + strategy="STRICT_SPREAD", + bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }, { + "CPU": 2 + }]) + actor_1 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=0).remote() + actor_2 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=1).remote() + actor_3 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=2).remote() + + print(ray.get(actor_1.value.remote())) + print(ray.get(actor_2.value.remote())) + print(ray.get(actor_3.value.remote())) + + # Get all actors. + actor_infos = ray.actors() + + # Make sure all actors in counter_list are located in separate nodes. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + actor_info_3 = actor_infos.get(actor_3._actor_id.hex()) + + assert actor_info_1 and actor_info_2 and actor_info_3 + + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + node_of_actor_3 = actor_info_3["Address"]["NodeID"] + assert node_of_actor_1 != node_of_actor_2 + assert node_of_actor_1 != node_of_actor_3 + assert node_of_actor_2 != node_of_actor_3 + + def test_placement_group_actor_resource_ids(ray_start_cluster): @ray.remote(num_cpus=1) class F: diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index db323ee4c..0541b0a4b 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -166,8 +166,10 @@ inline ray::PlacementStrategy ConvertStrategy(jint java_strategy) { return ray::rpc::PACK; case 1: return ray::rpc::SPREAD; - default: + case 2: return ray::rpc::STRICT_PACK; + default: + return ray::rpc::STRICT_SPREAD; } } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 0e628adee..638ec7079 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -32,6 +32,7 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( scheduler_strategies_.push_back(std::make_shared()); scheduler_strategies_.push_back(std::make_shared()); scheduler_strategies_.push_back(std::make_shared()); + scheduler_strategies_.push_back(std::make_shared()); } ScheduleMap GcsStrictPackStrategy::Schedule( @@ -97,19 +98,82 @@ ScheduleMap GcsPackStrategy::Schedule( ScheduleMap GcsSpreadStrategy::Schedule( std::vector> &bundles, const std::unique_ptr &context) { + // When selecting nodes, if you traverse from the beginning each time, a large number of + // bundles will be deployed to the previous nodes. So we start with the next node of the + // last selected node. ScheduleMap schedule_map; - auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources(); - auto iter = alive_nodes.begin(); - size_t index = 0; - size_t alive_nodes_size = alive_nodes.size(); - for (; iter != alive_nodes.end(); iter++, index++) { - for (size_t base = 0;; base++) { - if (index + base * alive_nodes_size >= bundles.size()) { + auto node_resources = context->node_manager_.GetClusterRealtimeResources(); + if (node_resources.empty()) { + return schedule_map; + } + + auto candidate_nodes = node_resources; + auto iter = candidate_nodes.begin(); + auto iter_begin = iter; + for (const auto &bundle : bundles) { + const auto &required_resources = bundle->GetRequiredResources(); + for (; iter != candidate_nodes.end(); ++iter) { + if (required_resources.IsSubset(*iter->second)) { + node_resources[iter->first]->SubtractResourcesStrict(required_resources); + schedule_map[bundle->BundleId()] = iter->first; break; - } else { - schedule_map[bundles[index + base * alive_nodes_size]->BundleId()] = iter->first; } } + + if (iter == candidate_nodes.end() && iter_begin != candidate_nodes.begin()) { + for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) { + if (required_resources.IsSubset(*iter->second)) { + node_resources[iter->first]->SubtractResourcesStrict(required_resources); + schedule_map[bundle->BundleId()] = iter->first; + break; + } + } + if (iter == iter_begin) { + break; + } + } + iter_begin = ++iter; + } + + if (schedule_map.size() != bundles.size()) { + schedule_map.clear(); + } + return schedule_map; +} + +ScheduleMap GcsStrictSpreadStrategy::Schedule( + std::vector> &bundles, + const std::unique_ptr &context) { + // TODO(ffbin): A bundle may require special resources, such as GPU. We need to + // schedule bundles with special resource requirements first, which will be implemented + // in the next pr. + ScheduleMap schedule_map; + auto candidate_nodes = context->node_manager_.GetClusterRealtimeResources(); + + // The number of bundles is more than the number of nodes, scheduling fails. + if (bundles.size() > candidate_nodes.size()) { + return schedule_map; + } + + for (const auto &bundle : bundles) { + const auto &required_resources = bundle->GetRequiredResources(); + auto iter = candidate_nodes.begin(); + for (; iter != candidate_nodes.end(); ++iter) { + if (required_resources.IsSubset(*iter->second)) { + schedule_map[bundle->BundleId()] = iter->first; + candidate_nodes.erase(iter); + break; + } + } + + // Node resource is not satisfied, scheduling failed. + if (iter == candidate_nodes.end()) { + break; + } + } + + if (schedule_map.size() != bundles.size()) { + schedule_map.clear(); } return schedule_map; } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index f56e337c4..db54eea73 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -121,6 +121,15 @@ class GcsStrictPackStrategy : public GcsScheduleStrategy { const std::unique_ptr &context) override; }; +/// The `GcsStrictSpreadStrategy` is that spread all bundles in different nodes. +/// A node can only deploy one bundle. +/// If the node resource is insufficient, it will fail to schedule. +class GcsStrictSpreadStrategy : public GcsScheduleStrategy { + public: + ScheduleMap Schedule(std::vector> &bundles, + const std::unique_ptr &context) override; +}; + /// GcsPlacementGroupScheduler is responsible for scheduling placement_groups registered /// to GcsPlacementGroupManager. This class is not thread-safe. class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index c4455e389..f324e49ab 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -31,6 +31,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { raylet_client_ = std::make_shared(); raylet_client1_ = std::make_shared(); + raylet_client2_ = std::make_shared(); gcs_table_storage_ = std::make_shared(io_service_); gcs_pub_sub_ = std::make_shared(redis_client_); gcs_node_manager_ = std::make_shared( @@ -43,8 +44,10 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { [this](const rpc::Address &address) { if (0 == address.port()) { return raylet_client_; - } else { + } else if (1 == address.port()) { return raylet_client1_; + } else { + return raylet_client2_; } }); } @@ -71,6 +74,59 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { heartbeat); } + void ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy strategy) { + ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); + auto request = Mocker::GenCreatePlacementGroupRequest("", strategy); + auto placement_group = std::make_shared(request); + + // Schedule the placement_group with zero node. + scheduler_->ScheduleUnplacedBundles( + placement_group, + [this](std::shared_ptr placement_group) { + failure_placement_groups_.emplace_back(std::move(placement_group)); + }, + [this](std::shared_ptr placement_group) { + success_placement_groups_.emplace_back(std::move(placement_group)); + }); + + // The lease request should not be send and the scheduling of placement_group should + // fail as there are no available nodes. + ASSERT_EQ(raylet_client_->num_lease_requested, 0); + ASSERT_EQ(0, success_placement_groups_.size()); + ASSERT_EQ(1, failure_placement_groups_.size()); + ASSERT_EQ(placement_group, failure_placement_groups_.front()); + } + + void SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy strategy) { + auto node = Mocker::GenNodeInfo(); + AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto request = Mocker::GenCreatePlacementGroupRequest("", strategy); + auto placement_group = std::make_shared(request); + + // Schedule the placement_group with 1 available node, and the lease request should be + // send to the node. + scheduler_->ScheduleUnplacedBundles( + placement_group, + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }, + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }); + + ASSERT_EQ(2, raylet_client_->num_lease_requested); + ASSERT_EQ(2, raylet_client_->lease_callbacks.size()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + WaitPendingDone(failure_placement_groups_, 0); + WaitPendingDone(success_placement_groups_, 1); + ASSERT_EQ(placement_group, success_placement_groups_.front()); + } + void ReschedulingWhenNodeAddTest(rpc::PlacementStrategy strategy) { AddNode(Mocker::GenNodeInfo(0), 1); auto failure_handler = @@ -110,6 +166,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { std::shared_ptr raylet_client_; std::shared_ptr raylet_client1_; + std::shared_ptr raylet_client2_; std::shared_ptr gcs_node_manager_; std::shared_ptr scheduler_; std::vector> success_placement_groups_; @@ -119,57 +176,32 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { std::shared_ptr redis_client_; }; -TEST_F(GcsPlacementGroupSchedulerTest, TestScheduleFailedWithZeroNode) { - ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); - auto request = Mocker::GenCreatePlacementGroupRequest(); - auto placement_group = std::make_shared(request); - - // Schedule the placement_group with zero node. - scheduler_->ScheduleUnplacedBundles( - placement_group, - [this](std::shared_ptr placement_group) { - failure_placement_groups_.emplace_back(std::move(placement_group)); - }, - [this](std::shared_ptr placement_group) { - success_placement_groups_.emplace_back(std::move(placement_group)); - }); - - // The lease request should not be send and the scheduling of placement_group should - // fail as there are no available nodes. - ASSERT_EQ(raylet_client_->num_lease_requested, 0); - ASSERT_EQ(0, success_placement_groups_.size()); - ASSERT_EQ(1, failure_placement_groups_.size()); - ASSERT_EQ(placement_group, failure_placement_groups_.front()); +TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadScheduleFailedWithZeroNode) { + ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::SPREAD); } -TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) { - auto node = Mocker::GenNodeInfo(); - AddNode(node); - ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); +TEST_F(GcsPlacementGroupSchedulerTest, TestPackScheduleFailedWithZeroNode) { + ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::PACK); +} - auto request = Mocker::GenCreatePlacementGroupRequest(); - auto placement_group = std::make_shared(request); +TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackScheduleFailedWithZeroNode) { + ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::STRICT_PACK); +} - // Schedule the placement_group with 1 available node, and the lease request should be - // send to the node. - scheduler_->ScheduleUnplacedBundles( - placement_group, - [this](std::shared_ptr placement_group) { - absl::MutexLock lock(&vector_mutex_); - failure_placement_groups_.emplace_back(std::move(placement_group)); - }, - [this](std::shared_ptr placement_group) { - absl::MutexLock lock(&vector_mutex_); - success_placement_groups_.emplace_back(std::move(placement_group)); - }); +TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadScheduleFailedWithZeroNode) { + ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::STRICT_SPREAD); +} - ASSERT_EQ(2, raylet_client_->num_lease_requested); - ASSERT_EQ(2, raylet_client_->lease_callbacks.size()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - WaitPendingDone(failure_placement_groups_, 0); - WaitPendingDone(success_placement_groups_, 1); - ASSERT_EQ(placement_group, success_placement_groups_.front()); +TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadSchedulePlacementGroupSuccess) { + SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::SPREAD); +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestPackSchedulePlacementGroupSuccess) { + SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::PACK); +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackSchedulePlacementGroupSuccess) { + SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::STRICT_PACK); } TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) { @@ -458,6 +490,40 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) { WaitPendingDone(success_placement_groups_, 2); } +TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadStrategyResourceCheck) { + auto node0 = Mocker::GenNodeInfo(0); + AddNode(node0); + auto failure_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto request = Mocker::GenCreatePlacementGroupRequest( + "", rpc::PlacementStrategy::STRICT_SPREAD, 2, 2); + auto placement_group = std::make_shared(request); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); + + // The number of nodes is less than the number of bundles, scheduling failed. + WaitPendingDone(failure_placement_groups_, 1); + + // Node1 resource is insufficient, scheduling failed. + auto node1 = Mocker::GenNodeInfo(1); + AddNode(node1, 1); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); + WaitPendingDone(failure_placement_groups_, 2); + + // The node2 resource is enough and the scheduling is successful. + auto node2 = Mocker::GenNodeInfo(2); + AddNode(node2); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client2_->GrantResourceReserve()); + WaitPendingDone(success_placement_groups_, 1); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index eaa97a1ca..3c63e9fb6 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -91,11 +91,11 @@ struct Mocker { static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest( const std::string name = "", rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD, - int bundles_count = 2) { + int bundles_count = 2, double cpu_num = 1.0) { rpc::CreatePlacementGroupRequest request; std::vector> bundles; std::unordered_map bundle; - bundle["CPU"] = 1.0; + bundle["CPU"] = cpu_num; for (int index = 0; index < bundles_count; ++index) { bundles.push_back(bundle); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 6c449275e..6e9952093 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -52,6 +52,9 @@ enum PlacementStrategy { SPREAD = 1; // Packs Bundles within one node. The group is not allowed to span multiple nodes. STRICT_PACK = 2; + // Places Bundles across distinct nodes. + // The group is not allowed to deploy more than one bundle on a node. + STRICT_SPREAD = 3; } // Address of a worker or node manager.