diff --git a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java index c356169b5..c9cab4dce 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java @@ -29,6 +29,7 @@ import io.ray.runtime.generated.Common.TaskSpec; import io.ray.runtime.generated.Common.TaskType; import io.ray.runtime.object.LocalModeObjectStore; import io.ray.runtime.object.NativeRayObject; +import io.ray.runtime.placementgroup.PlacementGroupId; import io.ray.runtime.placementgroup.PlacementGroupImpl; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -165,6 +166,15 @@ public class LocalModeTaskSubmitter implements TaskSubmitter { public BaseActorHandle createActor( FunctionDescriptor functionDescriptor, List args, ActorCreationOptions options) throws IllegalArgumentException { + if (options != null) { + if (options.group != null) { + PlacementGroupImpl group = (PlacementGroupImpl)options.group; + Preconditions.checkArgument(options.bundleIndex >= 0 + && options.bundleIndex < group.getBundleCount(), + String.format("Bundle index %s is invalid", options.bundleIndex)); + } + } + ActorId actorId = ActorId.fromRandom(); TaskSpec taskSpec = getTaskSpecBuilder(TaskType.ACTOR_CREATION_TASK, functionDescriptor, args) .setNumReturns(1) @@ -215,7 +225,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter { @Override public PlacementGroup createPlacementGroup(List> bundles, PlacementStrategy strategy) { - return new PlacementGroupImpl(); + return new PlacementGroupImpl(PlacementGroupId.fromRandom(), bundles.size()); } @Override diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 13fb0637d..4efef0611 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -546,5 +546,30 @@ def test_check_bundle_index(ray_start_cluster): assert error_count == 3 +def test_schedule_placement_group_when_node_add(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + + # Creating a placement group that cannot be satisfied yet. + placement_group = ray.experimental.placement_group([{ + "GPU": 2 + }, { + "CPU": 2 + }]) + + def is_placement_group_created(): + table = ray.experimental.placement_group_table(placement_group) + if "state" not in table: + return False + return table["state"] == "CREATED" + + # Add a node that has GPU. + cluster.add_node(num_cpus=4, num_gpus=4) + + # Make sure the placement group is created. + wait_for_condition(is_placement_group_created) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index a4710a77e..1414559f4 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -337,8 +337,10 @@ void GcsPlacementGroupManager::OnNodeDead(const ClientID &node_id) { // TODO(ffbin): If we have a placement group bundle that requires a unique resource // (for example gpu resource when there’s only one gpu node), this can postpone // creating until a node with the resources is added. we will solve it in next pr. - iter->second->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING); - pending_placement_groups_.emplace_front(iter->second); + if (iter->second->GetState() != rpc::PlacementGroupTableData::RESCHEDULING) { + iter->second->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING); + pending_placement_groups_.emplace_front(iter->second); + } } } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 638ec7079..3d764460f 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -102,36 +102,52 @@ ScheduleMap GcsSpreadStrategy::Schedule( // bundles will be deployed to the previous nodes. So we start with the next node of the // last selected node. ScheduleMap schedule_map; - auto node_resources = context->node_manager_.GetClusterRealtimeResources(); - if (node_resources.empty()) { + const auto &candidate_nodes = context->node_manager_.GetClusterRealtimeResources(); + if (candidate_nodes.empty()) { return schedule_map; } - auto candidate_nodes = node_resources; auto iter = candidate_nodes.begin(); auto iter_begin = iter; for (const auto &bundle : bundles) { const auto &required_resources = bundle->GetRequiredResources(); + // Traverse all nodes from `iter_begin` to `candidate_nodes.end()` to find a node that + // meets the resource requirements. `iter_begin` is the next node of the last selected + // node. for (; iter != candidate_nodes.end(); ++iter) { if (required_resources.IsSubset(*iter->second)) { - node_resources[iter->first]->SubtractResourcesStrict(required_resources); + iter->second->SubtractResourcesStrict(required_resources); schedule_map[bundle->BundleId()] = iter->first; break; } } - if (iter == candidate_nodes.end() && iter_begin != candidate_nodes.begin()) { - for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) { - if (required_resources.IsSubset(*iter->second)) { - node_resources[iter->first]->SubtractResourcesStrict(required_resources); - schedule_map[bundle->BundleId()] = iter->first; + // We've traversed all the nodes from `iter_begin` to `candidate_nodes.end()`, but we + // haven't found one that meets the requirements. + // If `iter_begin` is `candidate_nodes.begin()`, it means that all nodes are not + // satisfied, we will return directly. Otherwise, we will traverse the nodes from + // `candidate_nodes.begin()` to `iter_begin` to find the nodes that meet the + // requirements. + if (iter == candidate_nodes.end()) { + if (iter_begin != candidate_nodes.begin()) { + // Traverse all the nodes from `candidate_nodes.begin()` to `iter_begin`. + for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) { + if (required_resources.IsSubset(*iter->second)) { + iter->second->SubtractResourcesStrict(required_resources); + schedule_map[bundle->BundleId()] = iter->first; + break; + } + } + if (iter == iter_begin) { + // We have traversed all the nodes, so return directly. break; } - } - if (iter == iter_begin) { + } else { + // We have traversed all the nodes, so return directly. break; } } + // NOTE: If `iter == candidate_nodes.end()`, ++iter causes crash. iter_begin = ++iter; } @@ -258,7 +274,7 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists( } placement_group_to_bundle_locations_.erase(it); - // Remove bundles from node_to_leased_bundles_ because bundels are removed now. + // Remove bundles from node_to_leased_bundles_ because bundles are removed now. for (const auto &bundle_location : *bundle_locations) { const auto &bundle_id = bundle_location.first; const auto &node_id = bundle_location.second.first; @@ -394,7 +410,6 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext( const PlacementGroupID &placement_group_id) { - // TODO(ffbin): We will add listener to the GCS node manager to handle node deletion. auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); for (const auto &iter : alive_nodes) { if (!node_to_leased_bundles_.contains(iter.first)) { diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index f324e49ab..e66d3d3fd 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -204,7 +204,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackSchedulePlacementGroupSucce SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::STRICT_PACK); } -TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) { +TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) { auto node = Mocker::GenNodeInfo(); AddNode(node); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); @@ -227,15 +227,40 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) { ASSERT_EQ(2, raylet_client_->num_lease_requested); ASSERT_EQ(2, raylet_client_->lease_callbacks.size()); + + // Reply failure, so the placement group scheduling failed. ASSERT_TRUE(raylet_client_->GrantResourceReserve(false)); ASSERT_TRUE(raylet_client_->GrantResourceReserve(false)); - // Reply the placement_group creation request, then the placement_group should be - // scheduled successfully. WaitPendingDone(failure_placement_groups_, 1); WaitPendingDone(success_placement_groups_, 0); ASSERT_EQ(placement_group, failure_placement_groups_.front()); } +TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadStrategyResourceCheck) { + auto node = Mocker::GenNodeInfo(0); + AddNode(node, 2); + auto failure_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto success_handler = [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }; + auto request = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 3, 2); + auto placement_group = std::make_shared(request); + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); + + // The node resource is not enough, scheduling failed. + WaitPendingDone(failure_placement_groups_, 1); + + scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); + + // The node resource is not enough, scheduling failed. + WaitPendingDone(failure_placement_groups_, 2); +} + TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource) { auto node = Mocker::GenNodeInfo(); AddNode(node); @@ -478,15 +503,15 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) { scheduler_->GetBundlesOnNode(ClientID::FromBinary(node1->node_id())); ASSERT_EQ(1, bundles_on_node1.size()); - // Node1 is dead, reschedule the placement group. + // One node is dead, reschedule the placement group. auto bundle_on_dead_node = placement_group->GetMutableBundle(0); bundle_on_dead_node->clear_node_id(); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); - if (0 == bundles_on_node0[placement_group->GetPlacementGroupID()][0]) { - ASSERT_TRUE(raylet_client_->GrantResourceReserve()); - } else { - ASSERT_TRUE(raylet_client1_->GrantResourceReserve()); - } + // TODO(ffbin): We need to see which node the other bundles that have been placed are + // deployed on, and spread them as far as possible. It will be implemented in the next + // pr. + raylet_client_->GrantResourceReserve(); + raylet_client1_->GrantResourceReserve(); WaitPendingDone(success_placement_groups_, 2); }