diff --git a/BUILD.bazel b/BUILD.bazel index 6ffa8df54..16b9a315f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -880,7 +880,7 @@ cc_test( ) cc_test( - name = "local_placement_group_manager_test", + name = "placement_group_resource_manager_test", srcs = ["src/ray/raylet/placement_group_resource_manager_test.cc"], copts = COPTS, deps = [ diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 573131fec..55fac64e5 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -129,11 +129,10 @@ py_test_module_list( py_test_module_list( files = [ - "test_placement_group.py", # placement groups not implemented + "test_placement_group.py", ], size = "large", extra_srcs = SRCS, - tags = ["exclusive", "new_scheduler_broken"], deps = ["//:ray_lib"], ) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a289900d4..49bf6a6af 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -180,8 +180,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self last_local_gc_ns_(absl::GetCurrentTimeNanos()), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), record_metrics_period_(config.record_metrics_period_ms) { - placement_group_resource_manager_ = std::make_shared( - local_available_resources_, cluster_resource_map_, self_node_id_); RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. @@ -227,6 +225,12 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self cluster_task_manager_ = std::shared_ptr(new ClusterTaskManager( self_node_id_, new_resource_scheduler_, fulfills_dependencies_func, is_owner_alive, get_node_info_func, announce_infeasible_task)); + placement_group_resource_manager_ = + std::make_shared(new_resource_scheduler_); + } else { + placement_group_resource_manager_ = + std::make_shared( + local_available_resources_, cluster_resource_map_, self_node_id_); } RAY_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); @@ -458,6 +462,10 @@ void NodeManager::ReportResourceUsage() { resources_data->set_node_id(self_node_id_.Binary()); if (new_scheduler_enabled_) { + // Update local chche from gcs remote cache, this is needed when gcs restart. + // We should always keep the cache view consistent. + new_resource_scheduler_->UpdateLastReportResourcesFromGcs( + gcs_client_->Nodes().GetLastResourceUsage()); new_resource_scheduler_->FillResourceUsage(light_report_resource_usage_enabled_, resources_data); cluster_task_manager_->FillResourceUsage(light_report_resource_usage_enabled_, @@ -600,7 +608,6 @@ void NodeManager::HandleRequestObjectSpillage( void NodeManager::HandleReleaseUnusedBundles( const rpc::ReleaseUnusedBundlesRequest &request, rpc::ReleaseUnusedBundlesReply *reply, rpc::SendReplyCallback send_reply_callback) { - RAY_CHECK(!new_scheduler_enabled_) << "Not implemented"; RAY_LOG(DEBUG) << "Releasing unused bundles."; std::unordered_set in_use_bundles; for (int index = 0; index < request.bundles_in_use_size(); ++index) { @@ -1745,39 +1752,44 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest void NodeManager::HandlePrepareBundleResources( const rpc::PrepareBundleResourcesRequest &request, rpc::PrepareBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { - // TODO(sang): Port this onto the new scheduler. - RAY_CHECK(!new_scheduler_enabled_) << "Not implemented yet."; auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "Request to prepare bundle resources is received, " << bundle_spec.DebugString(); + auto prepared = placement_group_resource_manager_->PrepareBundle(bundle_spec); reply->set_success(prepared); send_reply_callback(Status::OK(), nullptr, nullptr); - // Call task dispatch to assign work to the new group. - TryLocalInfeasibleTaskScheduling(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); + + if (!new_scheduler_enabled_) { + // Call task dispatch to assign work to the new group. + TryLocalInfeasibleTaskScheduling(); + DispatchTasks(local_queues_.GetReadyTasksByClass()); + } } void NodeManager::HandleCommitBundleResources( const rpc::CommitBundleResourcesRequest &request, rpc::CommitBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { - RAY_CHECK(!new_scheduler_enabled_) << "Not implemented yet."; - auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "Request to commit bundle resources is received, " << bundle_spec.DebugString(); placement_group_resource_manager_->CommitBundle(bundle_spec); send_reply_callback(Status::OK(), nullptr, nullptr); - // Call task dispatch to assign work to the new group. - TryLocalInfeasibleTaskScheduling(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); + if (new_scheduler_enabled_) { + // Schedule in case a lease request for this placement group arrived before the commit + // message. + ScheduleAndDispatch(); + } else { + // Call task dispatch to assign work to the new group. + TryLocalInfeasibleTaskScheduling(); + DispatchTasks(local_queues_.GetReadyTasksByClass()); + } } void NodeManager::HandleCancelResourceReserve( const rpc::CancelResourceReserveRequest &request, rpc::CancelResourceReserveReply *reply, rpc::SendReplyCallback send_reply_callback) { - RAY_CHECK(!new_scheduler_enabled_) << "Not implemented"; auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(INFO) << "Request to cancel reserved resource is received, " << bundle_spec.DebugString(); @@ -1806,8 +1818,16 @@ void NodeManager::HandleCancelResourceReserve( // Return bundle resources. placement_group_resource_manager_->ReturnBundle(bundle_spec); - TryLocalInfeasibleTaskScheduling(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); + + if (new_scheduler_enabled_) { + // Schedule in case a lease request for this placement group arrived before the commit + // message. + ScheduleAndDispatch(); + } else { + // Call task dispatch to assign work to the new group. + TryLocalInfeasibleTaskScheduling(); + DispatchTasks(local_queues_.GetReadyTasksByClass()); + } send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f2a43935a..1bc554b11 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -297,28 +297,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void ScheduleTasks(std::unordered_map &resource_map); - /// Make a placement decision for the resource_map and subtract original resources so - /// that the node is ready to commit (create) placement group resources. - /// - /// \param resource_map A mapping from node manager ID to an estimate of the - /// resources available to that node manager. Scheduling decisions will only - /// consider the local node manager and the node managers in the keys of the - /// resource_map argument. - /// \param bundle_spec Specification of bundle that will be prepared. - /// \return True is resources were successfully prepared. False otherwise. - bool PrepareBundle(std::unordered_map &resource_map, - const BundleSpecification &bundle_spec); - - /// Make a placement decision for the resource_map. - /// - /// \param resource_map A mapping from node manager ID to an estimate of the - /// resources available to that node manager. Scheduling decisions will only - /// consider the local node manager and the node managers in the keys of the - /// resource_map argument. - /// \param bundle_spec Specification of bundle that will be prepared. - void CommitBundle(std::unordered_map &resource_map, - const BundleSpecification &bundle_spec); - /// Handle a task whose return value(s) must be reconstructed. /// /// \param task_id The relevant task ID. diff --git a/src/ray/raylet/placement_group_resource_manager.cc b/src/ray/raylet/placement_group_resource_manager.cc index 06a82663e..ff5b32719 100644 --- a/src/ray/raylet/placement_group_resource_manager.cc +++ b/src/ray/raylet/placement_group_resource_manager.cc @@ -22,6 +22,18 @@ namespace ray { namespace raylet { +void PlacementGroupResourceManager::ReturnUnusedBundle( + const std::unordered_set &in_use_bundles) { + for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { + if (0 == in_use_bundles.count(iter->first)) { + ReturnBundle(*iter->second); + bundle_spec_map_.erase(iter++); + } else { + iter++; + } + } +} + OldPlacementGroupResourceManager::OldPlacementGroupResourceManager( ResourceIdSet &local_available_resources_, std::unordered_map &cluster_resource_map_, @@ -111,7 +123,7 @@ void OldPlacementGroupResourceManager::CommitBundle( void OldPlacementGroupResourceManager::ReturnBundle( const BundleSpecification &bundle_spec) { // We should commit resources if it weren't because - // ReturnBundleResources requires resources to be committed when it is called. + // ReturnBundle requires resources to be committed when it is called. auto it = bundle_state_map_.find(bundle_spec.BundleId()); if (it == bundle_state_map_.end()) { RAY_LOG(INFO) << "Duplicate cancel request, skip it directly."; @@ -136,16 +148,106 @@ void OldPlacementGroupResourceManager::ReturnBundle( ResourceSet(placement_group_resource_labels)); } -void OldPlacementGroupResourceManager::ReturnUnusedBundle( - const std::unordered_set &in_use_bundles) { - for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { - if (0 == in_use_bundles.count(iter->first)) { - ReturnBundle(*iter->second); - bundle_spec_map_.erase(iter++); +NewPlacementGroupResourceManager::NewPlacementGroupResourceManager( + std::shared_ptr cluster_resource_scheduler_) + : cluster_resource_scheduler_(cluster_resource_scheduler_) {} + +bool NewPlacementGroupResourceManager::PrepareBundle( + const BundleSpecification &bundle_spec) { + auto iter = pg_bundles_.find(bundle_spec.BundleId()); + if (iter != pg_bundles_.end()) { + if (iter->second->state_ == CommitState::COMMITTED) { + // If the bundle state is already committed, it means that prepare request is just + // stale. + RAY_LOG(INFO) << "Duplicate prepare bundle request, skip it directly. This should " + "only happen when GCS restarts."; + return true; } else { - iter++; + // If there was a bundle in prepare state, it already locked resources, we will + // return bundle resources so that we can start from the prepare phase again. + ReturnBundle(bundle_spec); } } + + std::shared_ptr resource_instances = + std::make_shared(); + bool allocated = cluster_resource_scheduler_->AllocateLocalTaskResources( + bundle_spec.GetRequiredResources().GetResourceMap(), resource_instances); + + if (!allocated) { + return false; + } + + auto bundle_state = + std::make_shared(CommitState::PREPARED, resource_instances); + pg_bundles_[bundle_spec.BundleId()] = bundle_state; + bundle_spec_map_.emplace(bundle_spec.BundleId(), std::make_shared( + bundle_spec.GetMessage())); + + return true; +} + +void NewPlacementGroupResourceManager::CommitBundle( + const BundleSpecification &bundle_spec) { + auto it = pg_bundles_.find(bundle_spec.BundleId()); + if (it == pg_bundles_.end()) { + // We should only ever receive a commit for a non-existent placement group when a + // placement group is created and removed in quick succession. + RAY_LOG(DEBUG) + << "Received a commit message for an unknown bundle. The bundle info is " + << bundle_spec.DebugString(); + return; + } else { + // Ignore request If the bundle state is already committed. + if (it->second->state_ == CommitState::COMMITTED) { + RAY_LOG(INFO) << "Duplicate committ bundle request, skip it directly."; + return; + } + } + + const auto &bundle_state = it->second; + bundle_state->state_ = CommitState::COMMITTED; + + for (const auto &resource : bundle_spec.GetFormattedResources()) { + cluster_resource_scheduler_->AddLocalResource(resource.first, resource.second); + } +} + +void NewPlacementGroupResourceManager::ReturnBundle( + const BundleSpecification &bundle_spec) { + auto it = pg_bundles_.find(bundle_spec.BundleId()); + if (it == pg_bundles_.end()) { + RAY_LOG(INFO) << "Duplicate cancel request, skip it directly."; + return; + } + const auto &bundle_state = it->second; + if (bundle_state->state_ == CommitState::PREPARED) { + // Commit bundle first so that we can remove the bundle with consistent + // implementation. + CommitBundle(bundle_spec); + } + + // Return original resources to resource allocator `ClusterResourceScheduler`. + auto original_resources = it->second->resources_; + cluster_resource_scheduler_->FreeLocalTaskResources(original_resources); + + // Substract placement group resources from resource allocator + // `ClusterResourceScheduler`. + const auto &placement_group_resources = bundle_spec.GetFormattedResources(); + std::shared_ptr resource_instances = + std::make_shared(); + cluster_resource_scheduler_->AllocateLocalTaskResources(placement_group_resources, + resource_instances); + for (const auto &resource : placement_group_resources) { + if (cluster_resource_scheduler_->IsAvailableResourceEmpty(resource.first)) { + RAY_LOG(DEBUG) << "Available bundle resource:[" << resource.first + << "] is empty, Will delete it from local resource"; + // Delete local resource if available resource is empty when return bundle, or there + // will be resource leak. + cluster_resource_scheduler_->DeleteLocalResource(resource.first); + } + } + pg_bundles_.erase(it); } } // namespace raylet diff --git a/src/ray/raylet/placement_group_resource_manager.h b/src/ray/raylet/placement_group_resource_manager.h index 3b6e3a928..20ef5325a 100644 --- a/src/ray/raylet/placement_group_resource_manager.h +++ b/src/ray/raylet/placement_group_resource_manager.h @@ -18,6 +18,7 @@ #include "ray/common/bundle_spec.h" #include "ray/common/id.h" #include "ray/common/task/scheduling_resources.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler.h" namespace ray { @@ -44,6 +45,14 @@ struct pair_hash { } }; +struct BundleTransactionState { + BundleTransactionState(CommitState state, + std::shared_ptr &resources) + : state_(state), resources_(resources) {} + CommitState state_; + std::shared_ptr resources_; +}; + /// `PlacementGroupResourceManager` responsible for managing the resources that /// about allocated for placement group bundles. class PlacementGroupResourceManager { @@ -68,16 +77,20 @@ class PlacementGroupResourceManager { /// Return back all the bundle(which is unused) resource. /// /// \param bundle_spec: A set of bundles which in use. - virtual void ReturnUnusedBundle( - const std::unordered_set &in_use_bundles) = 0; + void ReturnUnusedBundle(const std::unordered_set &in_use_bundles); virtual ~PlacementGroupResourceManager() {} + + protected: + /// Save `BundleSpecification` for cleaning leaked bundles after GCS restart. + absl::flat_hash_map, pair_hash> + bundle_spec_map_; }; /// Associated with old scheduler. class OldPlacementGroupResourceManager : public PlacementGroupResourceManager { public: - /// Create a local placement group manager. + /// Create a old placement group resource manager. /// /// \param local_available_resources_: The resources (IDs specificed) that are currently /// available. @@ -98,8 +111,6 @@ class OldPlacementGroupResourceManager : public PlacementGroupResourceManager { void ReturnBundle(const BundleSpecification &bundle_spec); - void ReturnUnusedBundle(const std::unordered_set &in_use_bundles); - /// Get all local available resource(IDs specificed). const ResourceIdSet &GetAllResourceIdSet() const { return local_available_resources_; }; @@ -121,10 +132,36 @@ class OldPlacementGroupResourceManager : public PlacementGroupResourceManager { /// creation. absl::flat_hash_map, pair_hash> bundle_state_map_; +}; - /// Save `BundleSpecification` for cleaning leaked bundles after GCS restart. - absl::flat_hash_map, pair_hash> - bundle_spec_map_; +/// Associated with new scheduler. +class NewPlacementGroupResourceManager : public PlacementGroupResourceManager { + public: + /// Create a new placement group resource manager. + /// + /// \param cluster_resource_scheduler_: The resource allocator of new scheduler. + NewPlacementGroupResourceManager( + std::shared_ptr cluster_resource_scheduler_); + + virtual ~NewPlacementGroupResourceManager() = default; + + bool PrepareBundle(const BundleSpecification &bundle_spec); + + void CommitBundle(const BundleSpecification &bundle_spec); + + void ReturnBundle(const BundleSpecification &bundle_spec); + + const std::shared_ptr GetResourceScheduler() const { + return cluster_resource_scheduler_; + } + + private: + std::shared_ptr cluster_resource_scheduler_; + + /// Tracking placement group bundles and their states. This mapping is the source of + /// truth for the new scheduler. + std::unordered_map, pair_hash> + pg_bundles_; }; } // namespace raylet diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc index 10011aece..e78a21b40 100644 --- a/src/ray/raylet/placement_group_resource_manager_test.cc +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -262,6 +262,232 @@ TEST_F(OldPlacementGroupResourceManagerTest, TestIdempotencyWithRandomOrder) { CheckRemainingResourceCorrect(result_resource); } +class NewPlacementGroupResourceManagerTest : public ::testing::Test { + public: + std::unique_ptr + new_placement_group_resource_manager_; + + void InitLocalAvailableResource( + std::unordered_map &unit_resource) { + auto cluster_resource_scheduler_ = + std::make_shared("local", unit_resource); + new_placement_group_resource_manager_.reset( + new raylet::NewPlacementGroupResourceManager(cluster_resource_scheduler_)); + } + + void CheckAvailableResoueceEmpty(const std::string &resource) { + const auto cluster_resource_scheduler_ = + new_placement_group_resource_manager_->GetResourceScheduler(); + ASSERT_TRUE(cluster_resource_scheduler_->IsAvailableResourceEmpty(resource)); + } + + void CheckRemainingResourceCorrect(NodeResourceInstances &node_resource_instances) { + const auto cluster_resource_scheduler_ = + new_placement_group_resource_manager_->GetResourceScheduler(); + ASSERT_TRUE(cluster_resource_scheduler_->GetLocalResources() == + node_resource_instances); + } +}; + +TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + InitLocalAvailableResource(unit_resource); + /// 3. prepare bundle resource. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + /// 4. check remaining resources is correct. + CheckAvailableResoueceEmpty("CPU"); +} + +TEST_F(NewPlacementGroupResourceManagerTest, + TestNewPrepareBundleWithInsufficientResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 2.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + std::unordered_map init_unit_resource; + init_unit_resource.insert({"CPU", 1.0}); + InitLocalAvailableResource(init_unit_resource); + /// 3. prepare bundle resource. + ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); +} + +TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + InitLocalAvailableResource(unit_resource); + /// 3. prepare and commit bundle resource. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + new_placement_group_resource_manager_->CommitBundle(bundle_spec); + /// 4. check remaining resources is correct. + std::unordered_map remaining_resources = { + {"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 1.0}}; + auto remaining_resource_scheduler = + std::make_shared("remaining", remaining_resources); + std::shared_ptr resource_instances = + std::make_shared(); + ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources( + unit_resource, resource_instances)); + auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); +} + +TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + InitLocalAvailableResource(unit_resource); + /// 3. prepare and commit bundle resource. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + new_placement_group_resource_manager_->CommitBundle(bundle_spec); + /// 4. return bundle resource. + new_placement_group_resource_manager_->ReturnBundle(bundle_spec); + /// 5. check remaining resources is correct. + auto remaining_resource_scheduler = + std::make_shared("remaining", unit_resource); + auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); +} + +TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndReturn) { + // 1. create two bundles spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto first_bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + auto second_bundle_spec = Mocker::GenBundleCreation(group_id, 2, unit_resource); + /// 2. init local available resource. + std::unordered_map init_unit_resource; + init_unit_resource.insert({"CPU", 2.0}); + InitLocalAvailableResource(init_unit_resource); + /// 3. prepare and commit two bundle resource. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(first_bundle_spec)); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(second_bundle_spec)); + new_placement_group_resource_manager_->CommitBundle(first_bundle_spec); + new_placement_group_resource_manager_->CommitBundle(second_bundle_spec); + /// 4. check remaining resources is correct after commit phase. + std::unordered_map remaining_resources = { + {"CPU_group_" + group_id.Hex(), 2.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU_group_2_" + group_id.Hex(), 1.0}, + {"CPU", 2.0}}; + auto remaining_resource_scheduler = + std::make_shared("remaining", remaining_resources); + std::shared_ptr resource_instances = + std::make_shared(); + ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources( + init_unit_resource, resource_instances)); + auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); + /// 5. return second bundle. + new_placement_group_resource_manager_->ReturnBundle(second_bundle_spec); + /// 6. check remaining resources is correct after return second bundle. + remaining_resources = {{"CPU_group_" + group_id.Hex(), 2.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 2.0}}; + remaining_resource_scheduler = + std::make_shared("remaining", remaining_resources); + ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources( + {{"CPU_group_" + group_id.Hex(), 1.0}, {"CPU", 1.0}}, resource_instances)); + remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); + /// 7. return first bundel. + new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec); + /// 8. check remaining resources is correct after all bundle returned. + remaining_resources = {{"CPU", 2.0}}; + remaining_resource_scheduler = + std::make_shared("remaining", remaining_resources); + remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); +} + +TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) { + // 1. create one bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + std::unordered_map available_resource = { + std::make_pair("CPU", 3.0)}; + InitLocalAvailableResource(available_resource); + /// 3. prepare bundle resource 10 times. + for (int i = 0; i < 10; i++) { + new_placement_group_resource_manager_->PrepareBundle(bundle_spec); + } + /// 4. check remaining resources is correct. + std::unordered_map remaining_resources = {{"CPU", 3.0}}; + auto remaining_resource_scheduler = + std::make_shared("remaining", remaining_resources); + std::shared_ptr resource_instances = + std::make_shared(); + ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources( + unit_resource, resource_instances)); + auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); +} + +TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) { + // 1. create one bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + std::unordered_map available_resource = { + std::make_pair("CPU", 3.0)}; + InitLocalAvailableResource(available_resource); + /// 3. prepare bundle -> commit bundle -> prepare bundle. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + new_placement_group_resource_manager_->CommitBundle(bundle_spec); + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + /// 4. check remaining resources is correct. + std::unordered_map remaining_resources = { + {"CPU_group_" + group_id.Hex(), 1.0}, + {"CPU_group_1_" + group_id.Hex(), 1.0}, + {"CPU", 3.0}}; + auto remaining_resource_scheduler = + std::make_shared("remaining", remaining_resources); + std::shared_ptr resource_instances = + std::make_shared(); + ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources( + unit_resource, resource_instances)); + auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); + new_placement_group_resource_manager_->ReturnBundle(bundle_spec); + // 5. prepare bundle -> commit bundle -> commit bundle. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + new_placement_group_resource_manager_->CommitBundle(bundle_spec); + new_placement_group_resource_manager_->CommitBundle(bundle_spec); + // 6. check remaining resources is correct. + CheckRemainingResourceCorrect(remaining_resouece_instance); + new_placement_group_resource_manager_->ReturnBundle(bundle_spec); + // 7. prepare bundle -> return bundle -> commit bundle. + ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec)); + new_placement_group_resource_manager_->ReturnBundle(bundle_spec); + new_placement_group_resource_manager_->CommitBundle(bundle_spec); + // 8. check remaining resources is correct. + remaining_resource_scheduler = + std::make_shared("remaining", available_resource); + remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources(); + CheckRemainingResourceCorrect(remaining_resouece_instance); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index cb0214dab..551b5a980 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -291,8 +291,7 @@ std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) co } for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << "\t" << string_to_int_map.Get(it->first) << ":(" - << VectorToString(it->second.total) << ":" + buffer << "\t" << it->first << ":(" << VectorToString(it->second.total) << ":" << VectorToString(it->second.available) << ")\n"; } buffer << "}" << std::endl; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 2590ed98f..66047d258 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -352,6 +352,40 @@ void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name } } +bool ClusterResourceScheduler::IsAvailableResourceEmpty( + const std::string &resource_name) { + auto it = nodes_.find(local_node_id_); + if (it == nodes_.end()) { + RAY_LOG(WARNING) << "Can't find local node:[" << local_node_id_ + << "] when check local available resource."; + return true; + } + + int idx = -1; + if (resource_name == ray::kCPU_ResourceLabel) { + idx = (int)CPU; + } else if (resource_name == ray::kGPU_ResourceLabel) { + idx = (int)GPU; + } else if (resource_name == ray::kTPU_ResourceLabel) { + idx = (int)TPU; + } else if (resource_name == ray::kMemory_ResourceLabel) { + idx = (int)MEM; + }; + + auto local_view = it->second.GetMutableLocalView(); + if (idx != -1) { + return local_view->predefined_resources[idx].available <= 0; + } + string_to_int_map_.Insert(resource_name); + int64_t resource_id = string_to_int_map_.Get(resource_name); + auto itr = local_view->custom_resources.find(resource_id); + if (itr != local_view->custom_resources.end()) { + return itr->second.available <= 0; + } else { + return true; + } +} + void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &node_id_string, const std::string &resource_name, double resource_total) { @@ -448,9 +482,11 @@ void ClusterResourceScheduler::DeleteResource(const std::string &node_id_string, local_view->custom_resources.erase(itr); } - if (node_id == local_node_id_) { + auto c_itr = local_resources_.custom_resources.find(resource_id); + if (node_id == local_node_id_ && c_itr != local_resources_.custom_resources.end()) { local_resources_.custom_resources[resource_id].total.clear(); local_resources_.custom_resources[resource_id].available.clear(); + local_resources_.custom_resources.erase(c_itr); } } } @@ -835,6 +871,14 @@ void ClusterResourceScheduler::FreeLocalTaskResources( UpdateLocalAvailableResourcesFromResourceInstances(); } +void ClusterResourceScheduler::UpdateLastReportResourcesFromGcs( + std::shared_ptr gcs_resources) { + NodeResources node_resources = ResourceMapToNodeResources( + string_to_int_map_, gcs_resources->GetTotalResources().GetResourceMap(), + gcs_resources->GetAvailableResources().GetResourceMap()); + last_report_resources_.reset(new NodeResources(node_resources)); +} + void ClusterResourceScheduler::FillResourceUsage( bool light_report_resource_usage_enabled, std::shared_ptr resources_data) { @@ -844,8 +888,56 @@ void ClusterResourceScheduler::FillResourceUsage( << "Error: Populating heartbeat failed. Please file a bug report: " "https://github.com/ray-project/ray/issues/new."; - if (!light_report_resource_usage_enabled || !last_report_resources_ || - resources != *last_report_resources_.get()) { + // Initialize if last report resources is empty. + if (!last_report_resources_) { + NodeResources node_resources = + ResourceMapToNodeResources(string_to_int_map_, {{}}, {{}}); + last_report_resources_.reset(new NodeResources(node_resources)); + } + + if (light_report_resource_usage_enabled) { + // Reset all local views for remote nodes. This is needed in case tasks that + // we spilled back to a remote node were not actually scheduled on the + // node. Then, the remote node's resource availability may not change and + // so it may not send us another update. + for (auto &node : nodes_) { + if (node.first != local_node_id_) { + node.second.ResetLocalView(); + } + } + + for (int i = 0; i < PredefinedResources_MAX; i++) { + const auto &label = ResourceEnumToString((PredefinedResources)i); + const auto &capacity = resources.predefined_resources[i]; + const auto &last_capacity = last_report_resources_->predefined_resources[i]; + if (capacity.available != last_capacity.available) { + resources_data->set_resources_available_changed(true); + (*resources_data->mutable_resources_available())[label] = + capacity.available.Double(); + } + if (capacity.total != last_capacity.total) { + (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); + } + } + for (auto it = resources.custom_resources.begin(); + it != resources.custom_resources.end(); it++) { + uint64_t custom_id = it->first; + const auto &capacity = it->second; + const auto &last_capacity = last_report_resources_->custom_resources[custom_id]; + const auto &label = string_to_int_map_.Get(custom_id); + if (capacity.available != last_capacity.available) { + resources_data->set_resources_available_changed(true); + (*resources_data->mutable_resources_available())[label] = + capacity.available.Double(); + } + if (capacity.total != last_capacity.total) { + (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); + } + } + if (resources != *last_report_resources_.get()) { + last_report_resources_.reset(new NodeResources(resources)); + } + } else { for (int i = 0; i < PredefinedResources_MAX; i++) { const auto &label = ResourceEnumToString((PredefinedResources)i); const auto &capacity = resources.predefined_resources[i]; @@ -870,22 +962,6 @@ void ClusterResourceScheduler::FillResourceUsage( (*resources_data->mutable_resources_total())[label] = capacity.total.Double(); } } - resources_data->set_resources_available_changed(true); - if (light_report_resource_usage_enabled) { - last_report_resources_.reset(new NodeResources(resources)); - } - } - - if (light_report_resource_usage_enabled) { - // Reset all local views for remote nodes. This is needed in case tasks that - // we spilled back to a remote node were not actually scheduled on the - // node. Then, the remote node's resource availability may not change and - // so it may not send us another update. - for (auto &node : nodes_) { - if (node.first != local_node_id_) { - node.second.ResetLocalView(); - } - } } } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index c4058e586..470c97c38 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -164,6 +164,11 @@ class ClusterResourceScheduler { /// \param resource_total: New capacity of the resource. void AddLocalResource(const std::string &resource_name, double resource_total); + /// Check whether the available resources are empty. + /// + /// \param resource_name: Resource which we want to check. + bool IsAvailableResourceEmpty(const std::string &resource_name); + /// Update total capacity of a given resource of a given node. /// /// \param node_name: Node whose resource we want to update. @@ -360,6 +365,13 @@ class ClusterResourceScheduler { void FillResourceUsage(bool light_report_resource_usage_enabled, std::shared_ptr resources_data); + /// Update last report resources local cache from gcs cache, + /// this is needed when gcs fo. + /// + /// \param gcs_resources: The remote cache from gcs. + void UpdateLastReportResourcesFromGcs( + std::shared_ptr gcs_resources); + /// Return human-readable string for this scheduler state. std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 37bf4b3da..db8fa44ed 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -1131,10 +1131,10 @@ TEST_F(ClusterResourceSchedulerTest, TestLightResourceUsageReport) { } // Report resource usage if resource availability has changed. - cluster_resources.AddOrUpdateNode("local", {{"CPU", 1.}}, {{"CPU", 0.}}); + cluster_resources.AddOrUpdateNode("local", {{"CPU", 2.}}, {{"CPU", 0.}}); data->Clear(); cluster_resources.FillResourceUsage(true, data); - ASSERT_RESOURCES_EQ(data, 0, 1); + ASSERT_RESOURCES_EQ(data, 0, 2); // Don't report resource usage if resource availability hasn't changed. for (int i = 0; i < 3; i++) { @@ -1224,6 +1224,17 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { ASSERT_TRUE(result.empty()); } +TEST_F(ClusterResourceSchedulerTest, AvailableResourceEmptyTest) { + ClusterResourceScheduler cluster_resources("local", {{"custom123", 5}}); + std::shared_ptr resource_instances = + std::make_shared(); + std::unordered_map task_request = {{"custom123", 5}}; + bool allocated = + cluster_resources.AllocateLocalTaskResources(task_request, resource_instances); + ASSERT_TRUE(allocated); + ASSERT_TRUE(cluster_resources.IsAvailableResourceEmpty("custom123")); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index bc86e280f..74437a4a1 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -202,6 +202,7 @@ bool ClusterTaskManager::AttemptDispatchWork(const Work &work, dispatched = true; } } else { + worker->SetBundleId(spec.PlacementGroupBundleId()); worker->SetOwnerAddress(spec.CallerAddress()); if (spec.IsActorCreationTask()) { // The actor belongs to this worker now. diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 4d64507b8..90fc9b158 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -170,7 +170,7 @@ class MockWorker : public WorkerInterface { return bundle_id_; } - void SetBundleId(const BundleID &bundle_id) { RAY_CHECK(false) << "Method unused"; } + void SetBundleId(const BundleID &bundle_id) { bundle_id_ = bundle_id; } std::vector &GetBorrowedCPUInstances() { return borrowed_cpu_instances_; }