diff --git a/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java b/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java index c97c24d84..473c2eb12 100644 --- a/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java +++ b/java/api/src/main/java/io/ray/api/placementgroup/PlacementStrategy.java @@ -11,7 +11,11 @@ public enum PlacementStrategy { /** * Places Bundles across distinct nodes as even as possible. */ - SPREAD(1); + SPREAD(1), + /** + * Packs Bundles into one node. The group is not allowed to span multiple nodes. + */ + STRICT_PACK(2); private int value = 0; diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b0bb90957..ab7f83ade 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -69,6 +69,7 @@ from ray.includes.common cimport ( WORKER_TYPE_IO_WORKER, PLACEMENT_STRATEGY_PACK, PLACEMENT_STRATEGY_SPREAD, + PLACEMENT_STRATEGY_STRICT_PACK, ) from ray.includes.unique_ids cimport ( CActorID, @@ -1061,9 +1062,11 @@ cdef class CoreWorker: if strategy == b"PACK": c_strategy = PLACEMENT_STRATEGY_PACK + elif strategy == b"SPREAD": + c_strategy = PLACEMENT_STRATEGY_SPREAD else: - if strategy == b"SPREAD": - c_strategy = PLACEMENT_STRATEGY_SPREAD + if strategy == b"STRICT_PACK": + c_strategy = PLACEMENT_STRATEGY_STRICT_PACK else: raise TypeError(strategy) diff --git a/python/ray/experimental/placement_group.py b/python/ray/experimental/placement_group.py index a99be4690..f1fec5eff 100644 --- a/python/ray/experimental/placement_group.py +++ b/python/ray/experimental/placement_group.py @@ -13,11 +13,10 @@ def placement_group(bundles: List[Dict[str, float]], Args: bundles: A list of bundles which represent the resources needed. strategy: The strategy to create the placement group. - There are two build-in strategies for the time begin. - PACK: Packs Bundles close together inside processes or nodes as - tight as possible. - SPREAD: Places Bundles across distinct nodes or processes as even - as possible. + PACK: Packs Bundles into as few nodes as possible. + SPREAD: Places Bundles across distinct nodes as even as possible. + STRICT_PACK: Packs Bundles into one node. + The group is not allowed to span multiple nodes. name: The name of the placement group. """ worker = ray.worker.global_worker diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 5c8917fc9..8ae73bdc4 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -174,6 +174,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: "ray::PlacementStrategy::PACK" cdef CPlacementStrategy PLACEMENT_STRATEGY_SPREAD \ "ray::PlacementStrategy::SPREAD" + cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_PACK \ + "ray::PlacementStrategy::STRICT_PACK" cdef extern from "ray/common/task/scheduling_resources.h" nogil: cdef cppclass ResourceSet "ray::ResourceSet": diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 5a4651cbc..656523c51 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -57,6 +57,51 @@ def test_placement_group_pack(ray_start_cluster): assert node_of_actor_1 == node_of_actor_2 +def test_placement_group_strict_pack(ray_start_cluster): + @ray.remote(num_cpus=2) + class Actor(object): + def __init__(self): + self.n = 0 + + def value(self): + return self.n + + cluster = ray_start_cluster + num_nodes = 2 + for _ in range(num_nodes): + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + placement_group_id = ray.experimental.placement_group( + name="name", strategy="STRICT_PACK", bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }]) + actor_1 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=0).remote() + actor_2 = Actor.options( + placement_group_id=placement_group_id, + placement_group_bundle_index=1).remote() + + print(ray.get(actor_1.value.remote())) + print(ray.get(actor_2.value.remote())) + + # Get all actors. + actor_infos = ray.actors() + + # Make sure all actors in counter_list are collocated in one node. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + + assert actor_info_1 and actor_info_2 + + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + assert node_of_actor_1 == node_of_actor_2 + + def test_placement_group_spread(ray_start_cluster): @ray.remote(num_cpus=2) class Actor(object): diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index c57598859..db323ee4c 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -161,7 +161,14 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, } inline ray::PlacementStrategy ConvertStrategy(jint java_strategy) { - return 0 == java_strategy ? ray::rpc::PACK : ray::rpc::SPREAD; + switch (java_strategy) { + case 0: + return ray::rpc::PACK; + case 1: + return ray::rpc::SPREAD; + default: + return ray::rpc::STRICT_PACK; + } } inline ray::PlacementGroupCreationOptions ToPlacementGroupCreationOptions( diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index efe4f089f..8a6246792 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -31,14 +31,10 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( lease_client_factory_(std::move(lease_client_factory)) { scheduler_strategies_.push_back(std::make_shared()); scheduler_strategies_.push_back(std::make_shared()); + scheduler_strategies_.push_back(std::make_shared()); } -/// In this algorithm, we try to pack all the bundles in the node which satisfies the -/// resource requirements and has the least number of bundles. -/// TODO(ffbin): At present, only one node will be scheduled. If one node does not have -/// enough resources, we need to divide bundles to multiple nodes. We will implement -/// it in the next pr. -ScheduleMap GcsPackStrategy::Schedule( +ScheduleMap GcsStrictPackStrategy::Schedule( std::vector> &bundles, const std::unique_ptr &context) { // Aggregate required resources. @@ -73,9 +69,31 @@ ScheduleMap GcsPackStrategy::Schedule( return schedule_map; } -/// This is an initial algorithm to respect spread algorithm. -/// In this algorithm, we try to spread all the bundle in different node -/// and don't care the real resource. +ScheduleMap GcsPackStrategy::Schedule( + std::vector> &bundles, + const std::unique_ptr &context) { + // The current algorithm is to select a node and deploy as many bundles as possible. + // First fill up a node. If the node resource is insufficient, select a new node. + // TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop. + ScheduleMap schedule_map; + const auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources(); + for (const auto &bundle : bundles) { + const auto &required_resources = bundle->GetRequiredResources(); + for (auto &node : alive_nodes) { + if (required_resources.IsSubset(*node.second)) { + node.second->SubtractResourcesStrict(required_resources); + schedule_map[bundle->BundleId()] = node.first; + break; + } + } + } + + if (schedule_map.size() != bundles.size()) { + schedule_map.clear(); + } + return schedule_map; +} + ScheduleMap GcsSpreadStrategy::Schedule( std::vector> &bundles, const std::unique_ptr &context) { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 7b440cfab..7d44874fe 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -74,18 +74,30 @@ class GcsScheduleStrategy { const std::unique_ptr &context) = 0; }; +/// The `GcsPackStrategy` is that pack all bundles in one node as much as possible. +/// If one node does not have enough resources, we need to divide bundles to multiple +/// nodes. class GcsPackStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, const std::unique_ptr &context) override; }; +/// The `GcsSpreadStrategy` is that spread all bundles in different nodes. class GcsSpreadStrategy : public GcsScheduleStrategy { public: ScheduleMap Schedule(std::vector> &bundles, const std::unique_ptr &context) override; }; +/// The `GcsStrictPackStrategy` is that all bundles must be scheduled to one node. If one +/// node does not have enough resources, it will fail to schedule. +class GcsStrictPackStrategy : public GcsScheduleStrategy { + public: + ScheduleMap Schedule(std::vector> &bundles, + const std::unique_ptr &context) override; +}; + /// GcsPlacementGroupScheduler is responsible for scheduling placement_groups registered /// to GcsPlacementGroupManager. This class is not thread-safe. class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 2b0356a00..33448b376 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -72,6 +72,36 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { heartbeat); } + void ReschedulingWhenNodeAddTest(rpc::PlacementStrategy strategy) { + AddNode(Mocker::GenNodeInfo(0), 1); + auto failure_handler = + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + + // Failed to schedule the placement group, because the node resources is not enough. + auto request = Mocker::GenCreatePlacementGroupRequest("", strategy); + auto placement_group = std::make_shared(request); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + WaitPendingDone(failure_placement_groups_, 1); + ASSERT_EQ(0, success_placement_groups_.size()); + + // A new node is added, and the rescheduling is successful. + AddNode(Mocker::GenNodeInfo(0), 2); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + WaitPendingDone(success_placement_groups_, 1); + } + protected: const std::chrono::milliseconds timeout_ms_{6000}; absl::Mutex vector_mutex_; @@ -93,9 +123,8 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { TEST_F(GcsPlacementGroupSchedulerTest, TestScheduleFailedWithZeroNode) { ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size()); - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); - auto placement_group = - std::make_shared(create_placement_group_request); + auto request = Mocker::GenCreatePlacementGroupRequest(); + auto placement_group = std::make_shared(request); // Schedule the placement_group with zero node. gcs_placement_group_scheduler_->Schedule( @@ -120,9 +149,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) { AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); - auto placement_group = - std::make_shared(create_placement_group_request); + auto request = Mocker::GenCreatePlacementGroupRequest(); + auto placement_group = std::make_shared(request); // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. @@ -151,9 +179,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) { AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); - auto placement_group = - std::make_shared(create_placement_group_request); + auto request = Mocker::GenCreatePlacementGroupRequest(); + auto placement_group = std::make_shared(request); // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. @@ -184,9 +211,8 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); - auto placement_group = - std::make_shared(create_placement_group_request); + auto request = Mocker::GenCreatePlacementGroupRequest(); + auto placement_group = std::make_shared(request); // Schedule the placement_group with 1 available node, and the lease request should be // send to the node. @@ -230,10 +256,9 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) int select_node0_count = 0; int select_node1_count = 0; for (int index = 0; index < 10; ++index) { - auto create_placement_group_request = - Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK); - auto placement_group = - std::make_shared(create_placement_group_request); + auto request = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK); + auto placement_group = std::make_shared(request); gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, success_handler); @@ -253,33 +278,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling) } TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyReschedulingWhenNodeAdd) { - AddNode(Mocker::GenNodeInfo(0), 1); - auto failure_handler = [this](std::shared_ptr placement_group) { - absl::MutexLock lock(&vector_mutex_); - failure_placement_groups_.emplace_back(std::move(placement_group)); - }; - auto success_handler = [this](std::shared_ptr placement_group) { - absl::MutexLock lock(&vector_mutex_); - success_placement_groups_.emplace_back(std::move(placement_group)); - }; - - // Failed to schedule the placement group, because the node resources is not enough. - auto create_placement_group_request = - Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK); - auto placement_group = - std::make_shared(create_placement_group_request); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, - success_handler); - WaitPendingDone(failure_placement_groups_, 1); - ASSERT_EQ(0, success_placement_groups_.size()); - - // A new node is added, and the rescheduling is successful. - AddNode(Mocker::GenNodeInfo(0), 2); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, - success_handler); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - WaitPendingDone(success_placement_groups_, 1); + ReschedulingWhenNodeAddTest(rpc::PlacementStrategy::STRICT_PACK); } TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { @@ -293,10 +292,9 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { absl::MutexLock lock(&vector_mutex_); success_placement_groups_.emplace_back(std::move(placement_group)); }; - auto create_placement_group_request = - Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK); - auto placement_group = - std::make_shared(create_placement_group_request); + auto request = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK); + auto placement_group = std::make_shared(request); gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, success_handler); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); @@ -314,6 +312,40 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { WaitPendingDone(success_placement_groups_, 2); } +TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyReschedulingWhenNodeAdd) { + ReschedulingWhenNodeAddTest(rpc::PlacementStrategy::PACK); +} + +TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) { + AddNode(Mocker::GenNodeInfo(0)); + AddNode(Mocker::GenNodeInfo(1)); + auto failure_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + + // Schedule placement group which has large bundles. + // One node does not have enough resources, so we will divide bundles to two nodes. + auto request = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 15); + auto placement_group = std::make_shared(request); + gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + success_handler); + RAY_CHECK(raylet_client_->num_lease_requested > 0); + RAY_CHECK(raylet_client1_->num_lease_requested > 0); + for (int index = 0; index < raylet_client_->num_lease_requested; ++index) { + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + } + for (int index = 0; index < raylet_client1_->num_lease_requested; ++index) { + ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); + } + WaitPendingDone(success_placement_groups_, 1); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 79d4116c8..467321b0e 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -90,13 +90,15 @@ struct Mocker { static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest( const std::string name = "", - rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD) { + rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD, + int bundles_count = 2) { rpc::CreatePlacementGroupRequest request; std::vector> bundles; std::unordered_map bundle; bundle["CPU"] = 1.0; - bundles.push_back(bundle); - bundles.push_back(bundle); + for (int index = 0; index < bundles_count; ++index) { + bundles.push_back(bundle); + } auto placement_group_creation_spec = GenPlacementGroupCreation(name, bundles, strategy); request.mutable_placement_group_spec()->CopyFrom( diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 261fd3f44..178e8dfc7 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -46,10 +46,12 @@ enum TaskType { // Type of placement group strategy. enum PlacementStrategy { - // Packs Bundles close together inside processes or nodes as tight as possible + // Packs Bundles into as few nodes as possible. PACK = 0; // Places Bundles across distinct nodes or processes as even as possible. SPREAD = 1; + // Packs Bundles within one node. The group is not allowed to span multiple nodes. + STRICT_PACK = 2; } // Address of a worker or node manager.