From 153b24746ca6d35349ef546743d3934ff0f8f295 Mon Sep 17 00:00:00 2001 From: "DK.Pino" Date: Sun, 13 Dec 2020 15:32:15 +0800 Subject: [PATCH] [Placement Group] Refactor pg resource constrain in node manager (#12538) * first version by pointer * second version reference * clean up * add cpp ut * lint * extract LocalPlacementGroupManagerInterface * lint * fix commemt * add idempotency test * lint * fix pg ut * fix pg ut * python lint * fix pg ut timeout * python lint * fix comment * lint * lint --- BUILD.bazel | 6 +- python/ray/tests/test_placement_group.py | 21 +- src/ray/common/bundle_spec.cc | 21 ++ src/ray/common/bundle_spec.h | 16 +- src/ray/common/task/scheduling_resources.cc | 113 +------- src/ray/common/task/scheduling_resources.h | 94 +----- .../common/task/scheduling_resources_test.cc | 169 ----------- src/ray/gcs/test/gcs_test_util.h | 15 + src/ray/raylet/node_manager.cc | 125 +------- src/ray/raylet/node_manager.h | 35 +-- .../placement_group_resource_manager.cc | 152 ++++++++++ .../raylet/placement_group_resource_manager.h | 131 +++++++++ .../placement_group_resource_manager_test.cc | 270 ++++++++++++++++++ src/ray/raylet/scheduling_policy.cc | 32 --- src/ray/raylet/scheduling_policy.h | 11 - 15 files changed, 648 insertions(+), 563 deletions(-) delete mode 100644 src/ray/common/task/scheduling_resources_test.cc create mode 100644 src/ray/raylet/placement_group_resource_manager.cc create mode 100644 src/ray/raylet/placement_group_resource_manager.h create mode 100644 src/ray/raylet/placement_group_resource_manager_test.cc diff --git a/BUILD.bazel b/BUILD.bazel index c23326d3d..6ffa8df54 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -880,11 +880,13 @@ cc_test( ) cc_test( - name = "scheduling_resources_test", - srcs = ["src/ray/common/task/scheduling_resources_test.cc"], + name = "local_placement_group_manager_test", + srcs = ["src/ray/raylet/placement_group_resource_manager_test.cc"], copts = COPTS, deps = [ + "gcs_test_util_lib", "ray_common", + "raylet_lib", "@com_google_googletest//:gtest_main", ], ) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 123a27b49..628e1ed85 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -1,6 +1,7 @@ import pytest import os import sys +import time try: import pytest_timeout @@ -674,12 +675,18 @@ def test_atomic_creation(ray_start_cluster): @ray.remote(num_cpus=3) def bothering_task(): - import time - time.sleep(1) + time.sleep(6) return True # Schedule tasks to fail initial placement group creation. tasks = [bothering_task.remote() for _ in range(2)] + + # Make sure the two common task has scheduled. + def tasks_scheduled(): + return ray.available_resources()["CPU"] == 2.0 + + wait_for_condition(tasks_scheduled) + # Create an actor that will fail bundle scheduling. # It is important to use pack strategy to make test less flaky. pg = ray.util.placement_group( @@ -699,7 +706,7 @@ def test_atomic_creation(ray_start_cluster): # Wait on the placement group now. It should be unready # because normal actor takes resources that are required # for one of bundle creation. - ready, unready = ray.wait([pg.ready()], timeout=0) + ready, unready = ray.wait([pg.ready()], timeout=0.5) assert len(ready) == 0 assert len(unready) == 1 # Wait until all tasks are done. @@ -1233,17 +1240,13 @@ def test_create_actor_with_placement_group_after_gcs_server_restart( def test_create_placement_group_during_gcs_server_restart( ray_start_cluster_head): cluster = ray_start_cluster_head - cluster.add_node(num_cpus=20) + cluster.add_node(num_cpus=200) cluster.wait_for_nodes() # Create placement groups during gcs server restart. placement_groups = [] for i in range(0, 100): - placement_group = ray.util.placement_group([{ - "CPU": 0.1 - }, { - "CPU": 0.1 - }]) + placement_group = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}]) placement_groups.append(placement_group) cluster.head_node.kill_gcs_server() diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index d9dd610ec..701fa30c2 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -25,6 +25,27 @@ void BundleSpecification::ComputeResources() { } else { unit_resource_.reset(new ResourceSet(unit_resource)); } + + // Generate placement group bundle labels. + ComputeBundleResourceLabels(); +} + +void BundleSpecification::ComputeBundleResourceLabels() { + RAY_CHECK(unit_resource_); + + for (const auto &resource_pair : unit_resource_->GetResourceMap()) { + double resource_value = resource_pair.second; + + /// With bundle index (e.g., CPU_group_i_zzz). + const std::string &resource_label = + FormatPlacementGroupResource(resource_pair.first, PlacementGroupId(), Index()); + bundle_resource_labels_.insert(std::make_pair(resource_label, resource_value)); + + /// Without bundle index (e.g., CPU_group_zzz). + const std::string &wildcard_label = + FormatPlacementGroupResource(resource_pair.first, PlacementGroupId(), -1); + bundle_resource_labels_.insert(std::make_pair(wildcard_label, resource_value)); + } } const ResourceSet &BundleSpecification::GetRequiredResources() const { diff --git a/src/ray/common/bundle_spec.h b/src/ray/common/bundle_spec.h index cbdfa3049..843770450 100644 --- a/src/ray/common/bundle_spec.h +++ b/src/ray/common/bundle_spec.h @@ -72,6 +72,11 @@ class BundleSpecification : public MessageWrapper { on_spillback_ = callback; } + /// Get all placement group bundle resource labels. + const std::unordered_map &GetFormattedResources() const { + return bundle_resource_labels_; + } + /// Returns the schedule bundle callback, or nullptr. const ScheduleBundleCallback &OnSchedule() const { return on_schedule_; } @@ -82,18 +87,27 @@ class BundleSpecification : public MessageWrapper { private: void ComputeResources(); + void ComputeBundleResourceLabels(); /// Field storing unit resources. Initialized in constructor. /// TODO(ekl) consider optimizing the representation of ResourceSet for fast copies /// instead of keeping shared pointers here. std::shared_ptr unit_resource_; + /// When a bundle is assigned on a node, we'll add the following special resources on + /// that node: + /// 1) `CPU_group_${group_id}`: this is the requested resource when the actor + /// or task specifies placement group without bundle id. + /// 2) `CPU_group_${bundle_index}_${group_id}`: this is the requested resource + /// when the actor or task specifies placement group with bundle id. + std::unordered_map bundle_resource_labels_; + mutable ScheduleBundleCallback on_schedule_ = nullptr; mutable SpillbackBundleCallback on_spillback_ = nullptr; }; -/// Format a placement group resource, e.g., CPU -> CPU_group_YYY_i +/// Format a placement group resource, e.g., CPU -> CPU_group_i std::string FormatPlacementGroupResource(const std::string &original_resource_name, const PlacementGroupID &group_id, int64_t bundle_index = -1); diff --git a/src/ray/common/task/scheduling_resources.cc b/src/ray/common/task/scheduling_resources.cc index e5080cab5..db7be28b6 100644 --- a/src/ray/common/task/scheduling_resources.cc +++ b/src/ray/common/task/scheduling_resources.cc @@ -4,7 +4,6 @@ #include #include "absl/container/flat_hash_map.h" -#include "ray/common/bundle_spec.h" #include "ray/util/logging.h" namespace ray { @@ -228,51 +227,6 @@ void ResourceSet::AddResources(const ResourceSet &other) { } } -void ResourceSet::CommitBundleResources(const PlacementGroupID &group_id, - const int bundle_index, - const ResourceSet &other) { - for (const auto &resource_pair : other.GetResourceAmountMap()) { - // With bundle index (e.g., CPU_group_i_zzz). - const std::string &resource_label = - FormatPlacementGroupResource(resource_pair.first, group_id, bundle_index); - const FractionalResourceQuantity &resource_capacity = resource_pair.second; - resource_capacity_[resource_label] += resource_capacity; - - // Without bundle index (e.g., CPU_group_zzz). - const std::string &wildcard_label = - FormatPlacementGroupResource(resource_pair.first, group_id, -1); - resource_capacity_[wildcard_label] += resource_capacity; - } -} - -void ResourceSet::ReturnBundleResources(const PlacementGroupID &group_id, - const int bundle_index) { - absl::flat_hash_map to_restore; - for (auto iter = resource_capacity_.begin(); iter != resource_capacity_.end();) { - const std::string &bundle_resource_label = iter->first; - // We only consider the indexed resources, ignoring the wildcard resource. - // This is because when multiple bundles are created on one node, the quantity - // of the wildcard resources contains resources from multiple bundles. - if (IsBundleIndex(bundle_resource_label, group_id, bundle_index)) { - const std::string &resource_label = GetOriginalResourceName(bundle_resource_label); - const FractionalResourceQuantity &resource_capacity = iter->second; - to_restore[resource_label] = resource_capacity; - iter = resource_capacity_.erase(iter); - } else { - iter++; - } - } - // For each matching resource to restore (e.g., key like CPU, GPU). - for (const auto &pair : to_restore) { - resource_capacity_[pair.first] += pair.second; - auto wildcard_resource = FormatPlacementGroupResource(pair.first, group_id, -1); - resource_capacity_[wildcard_resource] -= pair.second; - if (resource_capacity_[wildcard_resource] <= 0) { - resource_capacity_.erase(wildcard_resource); - } - } -} - FractionalResourceQuantity ResourceSet::GetResource( const std::string &resource_name) const { if (resource_capacity_.count(resource_name) == 0) { @@ -686,43 +640,10 @@ void ResourceIdSet::AddOrUpdateResource(const std::string &resource_name, } } -void ResourceIdSet::CommitBundleResourceIds(const PlacementGroupID &group_id, - const int bundle_index, - const std::string &resource_name, - ResourceIds &resource_ids) { - auto index_name = FormatPlacementGroupResource(resource_name, group_id, bundle_index); - auto wildcard_name = FormatPlacementGroupResource(resource_name, group_id, -1); - available_resources_[index_name] = available_resources_[index_name].Plus(resource_ids); - available_resources_[wildcard_name] = - available_resources_[wildcard_name].Plus(resource_ids); -} - -void ResourceIdSet::ReturnBundleResources(const PlacementGroupID &group_id, - const int bundle_index, - const std::string &original_resource_name) { - auto index_resource_name = - FormatPlacementGroupResource(original_resource_name, group_id, bundle_index); - auto iter_index = available_resources_.find(index_resource_name); - if (iter_index == available_resources_.end()) { - return; - } - - // Erase and transfer the index bundle resource back to the original. - auto bundle_ids = iter_index->second; - available_resources_.erase(iter_index); - available_resources_[original_resource_name] = - (available_resources_[original_resource_name].Plus(bundle_ids)); - - // Also erase the the equivalent number of units from the wildcard resource. - auto wildcard_name = FormatPlacementGroupResource(original_resource_name, group_id, -1); - available_resources_[wildcard_name].Acquire(bundle_ids.TotalQuantity()); - if (available_resources_[wildcard_name].TotalQuantityIsZero()) { - available_resources_.erase(wildcard_name); - } -} - void ResourceIdSet::DeleteResource(const std::string &resource_name) { - available_resources_.erase(resource_name); + if (available_resources_.count(resource_name) != 0) { + available_resources_.erase(resource_name); + } } const std::unordered_map &ResourceIdSet::AvailableResources() @@ -848,6 +769,14 @@ void SchedulingResources::Acquire(const ResourceSet &resources) { resources_available_.SubtractResourcesStrict(resources); } +// The reason we need this method is sometimes we may want add some converted +// resource which is not exist in total resource to the available resource. +// (e.g., placement group) +void SchedulingResources::AddResource(const ResourceSet &resources) { + resources_total_.AddResources(resources); + resources_available_.AddResources(resources); +} + void SchedulingResources::UpdateResourceCapacity(const std::string &resource_name, int64_t capacity) { const FractionalResourceQuantity new_capacity = FractionalResourceQuantity(capacity); @@ -873,26 +802,6 @@ void SchedulingResources::UpdateResourceCapacity(const std::string &resource_nam } } -void SchedulingResources::PrepareBundleResources(const PlacementGroupID &group, - const int bundle_index, - const ResourceSet &resource_set) { - resources_available_.SubtractResourcesStrict(resource_set); - resources_total_.SubtractResourcesStrict(resource_set); -} - -void SchedulingResources::CommitBundleResources(const PlacementGroupID &group, - const int bundle_index, - const ResourceSet &resource_set) { - resources_available_.CommitBundleResources(group, bundle_index, resource_set); - resources_total_.CommitBundleResources(group, bundle_index, resource_set); -} - -void SchedulingResources::ReturnBundleResources(const PlacementGroupID &group_id, - const int bundle_index) { - resources_available_.ReturnBundleResources(group_id, bundle_index); - resources_total_.ReturnBundleResources(group_id, bundle_index); -} - void SchedulingResources::DeleteResource(const std::string &resource_name) { resources_total_.DeleteResource(resource_name); resources_available_.DeleteResource(resource_name); diff --git a/src/ray/common/task/scheduling_resources.h b/src/ray/common/task/scheduling_resources.h index 44fecfe40..41ed07a1c 100644 --- a/src/ray/common/task/scheduling_resources.h +++ b/src/ray/common/task/scheduling_resources.h @@ -148,33 +148,6 @@ class ResourceSet { /// \return Void. void AddResources(const ResourceSet &other); - /// \brief Aggregate resources from the other set into this set, adding any missing - /// resource labels to this set. - /// - /// This adds both the the indexed and wildcard resources (e.g., both - /// CPU_group_i_zzz and CPU_group_zzz). - /// - /// NOTE: This method should be used AFTER resources are COMMITTED. - /// It can have unexpected behavior if you call this method on PREPARED resources. - /// - /// \param group_id: The placement group id. - /// \param bundle_index: The index of the bundle. - /// \param other: The other resource set to add. - /// \return Void. - void CommitBundleResources(const PlacementGroupID &group_id, const int bundle_index, - const ResourceSet &other); - - /// \brief Return back all the bundle resource. Changing the resource name and adding - /// any missing resource labels to this set. - /// - /// Note that this method assumes bundle resources are COMMITTED. - /// Please make sure to commit bundle resources before calling this method. - /// - /// \param group_id: The placement group id. - /// \param bundle_index: The bundle index to return resources for. - /// \return Void. - void ReturnBundleResources(const PlacementGroupID &group_id, const int bundle_index); - /// \brief Subtract a set of resources from the current set of resources and /// check that the post-subtraction result nonnegative. Assumes other /// is a subset of the ResourceSet. Deletes any resource if the capacity after @@ -431,31 +404,6 @@ class ResourceIdSet { /// \param capacity capacity of the resource being added void AddOrUpdateResource(const std::string &resource_name, int64_t capacity); - /// \brief Commit a Bundle resource in the ResourceIdSet. - /// - /// This adds both the the indexed and wildcard resources (e.g., both - /// CPU_group_i_zzz and CPU_group_zzz). - /// - /// \param group_id: The placement group id. - /// \param bundle_index: The index of the bundle. - /// \param resource_name the name of the resource to create/update (e.g., "CPU"). - /// \param resource_ids resource_ids of the resource being added - void CommitBundleResourceIds(const PlacementGroupID &group_id, const int bundle_index, - const std::string &resource_name, - ResourceIds &resource_ids); - - /// \brief remove a Bundle resource in the ResourceIdSet. - /// - /// The bundle resources will be returned to their original resource names. - /// Note that the bundle resources should've been COMMITTED before this method is - /// called. - /// - /// \param group_id: The placement group id. - /// \param bundle_index: The index of the bundle. - /// \param resource_name the name of the resource to remove (e.g., "CPU"). - void ReturnBundleResources(const PlacementGroupID &group_id, const int bundle_index, - const std::string &resource_name); - /// \brief Deletes a resource in the ResourceIdSet. This does not raise an exception, /// just deletes the resource. Tasks with acquired resources keep running. /// @@ -564,6 +512,12 @@ class SchedulingResources { /// \return Void. void Acquire(const ResourceSet &resources); + /// \brief Add a new resource to available resource. + /// + /// \param resources: the amount of resources to be added. + /// \return Void. + void AddResource(const ResourceSet &resources); + /// Returns debug string for class. /// /// \return string. @@ -577,42 +531,6 @@ class SchedulingResources { /// \return Void. void UpdateResourceCapacity(const std::string &resource_name, int64_t capacity); - /// \brief Update total, available and load resources with the ResourceIds. - /// Create if not exists. This will only update resources, but it won't - /// create placement group resources. That'll be done when resources are - /// COMMITTED. Commit should be done by CommitBundleResources. - /// - /// We need this step for running 2PC protocol for atomic placement group creation. - /// - /// \param resource_name: Name of the resource to be modified. - /// \param resource_set: New resource_set of the resource. - void PrepareBundleResources(const PlacementGroupID &group, const int bundle_index, - const ResourceSet &resource_set); - - /// \brief Commit placement group resources. It means this method'll create - /// placement group resources. The original resources should've been updated - /// by PrepareBundleResources. - /// - /// We need this step for running 2PC protocol for atomic placement group creation. - /// - /// The resources will be transfered from their original resource names. - /// This includes both the the indexed and wildcard resources (e.g., both - /// CPU_group_i_zzz and CPU_group_zzz). - /// - /// \param resource_name: Name of the resource to be modified - /// \param resource_set: New resource_set of the resource. - void CommitBundleResources(const PlacementGroupID &group, const int bundle_index, - const ResourceSet &resource_set); - - /// \brief delete total, available and load resources with the ResourceIds. - /// - /// The bundle resources will be returned to their original resource names. - /// This is the inverse of TransferToBundleResources(). - /// - /// \param group_id: Placement group id to delete resources for. - /// \param bundle_index: The bundle index to return resources for. - void ReturnBundleResources(const PlacementGroupID &group_id, const int bundle_index); - /// \brief Delete resource from total, available and load resources. /// /// \param resource_name: Name of the resource to be deleted. diff --git a/src/ray/common/task/scheduling_resources_test.cc b/src/ray/common/task/scheduling_resources_test.cc deleted file mode 100644 index 120f9f124..000000000 --- a/src/ray/common/task/scheduling_resources_test.cc +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/common/task/scheduling_resources.h" - -#include - -#include "gtest/gtest.h" -#include "ray/common/id.h" - -namespace ray { -class SchedulingResourcesTest : public ::testing::Test { - public: - void SetUp() override { - resource_set = std::make_shared(); - resource_id_set = std::make_shared(); - } - - protected: - std::shared_ptr resource_set; - std::shared_ptr resource_id_set; -}; - -TEST_F(SchedulingResourcesTest, CommitBundleResources) { - PlacementGroupID group_id = PlacementGroupID::FromRandom(); - std::vector resource_labels = {"CPU"}; - std::vector resource_capacity = {1.0}; - ResourceSet resource(resource_labels, resource_capacity); - resource_set->CommitBundleResources(group_id, 1, resource); - resource_labels.pop_back(); - resource_labels.push_back("CPU_group_1_" + group_id.Hex()); - resource_labels.push_back("CPU_group_" + group_id.Hex()); - resource_capacity.push_back(1.0); - ResourceSet result_resource(resource_labels, resource_capacity); - ASSERT_EQ(1, resource_set->IsEqual(result_resource)); -} - -TEST_F(SchedulingResourcesTest, AddBundleResource) { - PlacementGroupID group_id = PlacementGroupID::FromRandom(); - std::string wild_name = "CPU_group_" + group_id.Hex(); - std::string index_name = "CPU_group_1_" + group_id.Hex(); - std::vector whole_ids = {1, 2, 3}; - ResourceIds resource_ids(whole_ids); - resource_id_set->CommitBundleResourceIds(group_id, 1, "CPU", resource_ids); - ASSERT_EQ(2, resource_id_set->AvailableResources().size()); - for (auto res : resource_id_set->AvailableResources()) { - ASSERT_TRUE(res.first == wild_name || res.first == index_name) << res.first; - } -} - -TEST_F(SchedulingResourcesTest, ReturnBundleResources) { - PlacementGroupID group_id = PlacementGroupID::FromRandom(); - std::vector resource_labels = {"CPU"}; - std::vector resource_capacity = {1.0}; - ResourceSet resource(resource_labels, resource_capacity); - resource_set->CommitBundleResources(group_id, 1, resource); - resource_labels.pop_back(); - resource_labels.push_back("CPU_group_" + group_id.Hex()); - resource_labels.push_back("CPU_group_1_" + group_id.Hex()); - resource_capacity.push_back(1.0); - ResourceSet result_resource(resource_labels, resource_capacity); - ASSERT_EQ(1, resource_set->IsEqual(result_resource)); - resource_set->ReturnBundleResources(group_id, 1); - ASSERT_EQ(1, resource_set->IsEqual(resource)) - << resource_set->ToString() << " vs " << resource.ToString(); -} - -TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemove) { - PlacementGroupID group_id = PlacementGroupID::FromRandom(); - std::vector resource_labels = {"CPU"}; - std::vector resource_capacity = {1.0}; - ResourceSet resource(resource_labels, resource_capacity); - - // Construct resource set containing two bundles. - resource_set->CommitBundleResources(group_id, 1, resource); - resource_set->CommitBundleResources(group_id, 2, resource); - resource_labels = { - "CPU_group_" + group_id.Hex(), - "CPU_group_1_" + group_id.Hex(), - "CPU_group_2_" + group_id.Hex(), - }; - resource_capacity = {2.0, 1.0, 1.0}; - ResourceSet result_resource(resource_labels, resource_capacity); - ASSERT_EQ(1, resource_set->IsEqual(result_resource)) - << resource_set->ToString() << " vs " << result_resource.ToString(); - - // Return group 2. - resource_set->ReturnBundleResources(group_id, 2); - resource_labels = { - "CPU", - "CPU_group_" + group_id.Hex(), - "CPU_group_1_" + group_id.Hex(), - }; - resource_capacity = {1.0, 1.0, 1.0}; - ResourceSet result_resource2(resource_labels, resource_capacity); - ASSERT_EQ(1, resource_set->IsEqual(result_resource2)) - << resource_set->ToString() << " vs " << result_resource2.ToString(); - - // Return group 1. - resource_set->ReturnBundleResources(group_id, 1); - ASSERT_EQ(1, resource_set->IsEqual(ResourceSet({"CPU"}, {2.0}))) - << resource_set->ToString() << " vs " << resource.ToString(); -} - -TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemoveIdSet) { - PlacementGroupID group_id = PlacementGroupID::FromRandom(); - ResourceIdSet resource_ids; - - // Construct resource set containing two bundles. - auto rid1 = ResourceIds({1, 2}); - auto rid2 = ResourceIds({3, 4}); - resource_ids.CommitBundleResourceIds(group_id, 1, "CPU", rid1); - resource_ids.CommitBundleResourceIds(group_id, 2, "CPU", rid2); - resource_ids.CommitBundleResourceIds(group_id, 1, "GPU", rid1); - resource_ids.CommitBundleResourceIds(group_id, 2, "GPU", rid2); - auto result = ResourceSet( - { - "CPU_group_" + group_id.Hex(), - "CPU_group_1_" + group_id.Hex(), - "CPU_group_2_" + group_id.Hex(), - "GPU_group_" + group_id.Hex(), - "GPU_group_1_" + group_id.Hex(), - "GPU_group_2_" + group_id.Hex(), - }, - {4.0, 2.0, 2.0, 4.0, 2.0, 2.0}); - ASSERT_EQ(1, resource_ids.ToResourceSet().IsEqual(result)) - << resource_ids.ToString() << " vs " << result.ToString(); - - // Remove the first bundle. - resource_ids.ReturnBundleResources(group_id, 1, "CPU"); - resource_ids.ReturnBundleResources(group_id, 1, "GPU"); - result = ResourceSet( - { - "CPU_group_" + group_id.Hex(), - "CPU", - "CPU_group_2_" + group_id.Hex(), - "GPU_group_" + group_id.Hex(), - "GPU", - "GPU_group_2_" + group_id.Hex(), - }, - {2.0, 2.0, 2.0, 2.0, 2.0, 2.0}); - ASSERT_EQ(1, resource_ids.ToResourceSet().IsEqual(result)) - << resource_ids.ToString() << " vs " << result.ToString(); - - // Remove the second bundle. - resource_ids.ReturnBundleResources(group_id, 2, "CPU"); - resource_ids.ReturnBundleResources(group_id, 2, "GPU"); - result = ResourceSet( - { - "CPU", - "GPU", - }, - {4.0, 4.0}); - ASSERT_EQ(1, resource_ids.ToResourceSet().IsEqual(result)) - << resource_ids.ToString() << " vs " << result.ToString(); -} - -} // namespace ray diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 68988bf19..bf908c3a2 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -18,6 +18,7 @@ #include #include "gmock/gmock.h" +#include "ray/common/bundle_spec.h" #include "ray/common/placement_group.h" #include "ray/common/task/task.h" #include "ray/common/task/task_util.h" @@ -79,6 +80,20 @@ struct Mocker { return request; } + static BundleSpecification GenBundleCreation( + const PlacementGroupID &placement_group_id, const int bundle_index, + std::unordered_map &unit_resource) { + rpc::Bundle bundle; + auto mutable_bundle_id = bundle.mutable_bundle_id(); + mutable_bundle_id->set_bundle_index(bundle_index); + mutable_bundle_id->set_placement_group_id(placement_group_id.Binary()); + auto mutable_unit_resources = bundle.mutable_unit_resources(); + for (auto &resource : unit_resource) { + mutable_unit_resources->insert({resource.first, resource.second}); + } + return BundleSpecification(bundle); + } + static PlacementGroupSpecification GenPlacementGroupCreation( const std::string &name, std::vector> &bundles, diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e2e39fe30..1af9bf8d2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -175,6 +175,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self last_local_gc_ns_(absl::GetCurrentTimeNanos()), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), record_metrics_period_(config.record_metrics_period_ms) { + placement_group_resource_manager_ = std::make_shared( + local_available_resources_, cluster_resource_map_, self_node_id_); RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. @@ -622,14 +624,7 @@ void NodeManager::HandleReleaseUnusedBundles( } // Return unused bundle resources. - for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { - if (0 == in_use_bundles.count(iter->first)) { - ReturnBundleResources(*iter->second); - bundle_spec_map_.erase(iter++); - } else { - iter++; - } - } + placement_group_resource_manager_->ReturnUnusedBundle(in_use_bundles); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -1742,7 +1737,7 @@ void NodeManager::HandlePrepareBundleResources( auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "Request to prepare bundle resources is received, " << bundle_spec.DebugString(); - auto prepared = PrepareBundle(cluster_resource_map_, bundle_spec); + auto prepared = placement_group_resource_manager_->PrepareBundle(bundle_spec); reply->set_success(prepared); send_reply_callback(Status::OK(), nullptr, nullptr); // Call task dispatch to assign work to the new group. @@ -1758,7 +1753,7 @@ void NodeManager::HandleCommitBundleResources( auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "Request to commit bundle resources is received, " << bundle_spec.DebugString(); - CommitBundle(cluster_resource_map_, bundle_spec); + placement_group_resource_manager_->CommitBundle(bundle_spec); send_reply_callback(Status::OK(), nullptr, nullptr); // Call task dispatch to assign work to the new group. @@ -1797,7 +1792,7 @@ void NodeManager::HandleCancelResourceReserve( } // Return bundle resources. - ReturnBundleResources(bundle_spec); + placement_group_resource_manager_->ReturnBundle(bundle_spec); TryLocalInfeasibleTaskScheduling(); DispatchTasks(local_queues_.GetReadyTasksByClass()); @@ -1946,89 +1941,6 @@ void NodeManager::ProcessSetResourceRequest( } } -bool NodeManager::PrepareBundle( - std::unordered_map &resource_map, - const BundleSpecification &bundle_spec) { - // We will first delete the existing bundle to ensure idempotent. - // The reason why we do this is: after GCS restarts, placement group can be rescheduled - // directly without rolling back the operations performed before the restart. - const auto &bundle_id = bundle_spec.BundleId(); - auto iter = bundle_state_map_.find(bundle_id); - if (iter != bundle_state_map_.end()) { - if (iter->second->state == CommitState::COMMITTED) { - // If the bundle state is already committed, it means that prepare request is just - // stale. - RAY_LOG(INFO) << "Duplicate prepare bundle request, skip it directly."; - return true; - } else { - // If there was a bundle in prepare state, it already locked resources, we will - // return bundle resources. - ReturnBundleResources(bundle_spec); - } - } - - if (resource_map.count(self_node_id_) > 0) { - resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad()); - } - // Invoke the scheduling policy. - auto reserve_resource_success = - scheduling_policy_.ScheduleBundle(resource_map, self_node_id_, bundle_spec); - - auto bundle_state = std::make_shared(); - if (reserve_resource_success) { - // Register states. - auto it = bundle_state_map_.find(bundle_id); - // Same bundle cannot be rescheduled. - RAY_CHECK(it == bundle_state_map_.end()); - - // Prepare resources. This shouldn't create formatted placement group resources - // because that'll be done at the commit phase. - bundle_state->acquired_resources = - local_available_resources_.Acquire(bundle_spec.GetRequiredResources()); - resource_map[self_node_id_].PrepareBundleResources( - bundle_spec.PlacementGroupId(), bundle_spec.Index(), - bundle_spec.GetRequiredResources()); - - // Register bundle state. - bundle_state->state = CommitState::PREPARED; - bundle_state_map_.emplace(bundle_id, bundle_state); - bundle_spec_map_.emplace( - bundle_id, std::make_shared(bundle_spec.GetMessage())); - } - return bundle_state->acquired_resources.AvailableResources().size() > 0; -} - -void NodeManager::CommitBundle( - std::unordered_map &resource_map, - const BundleSpecification &bundle_spec) { - // TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent - // once retry is implemented. - const auto &bundle_id = bundle_spec.BundleId(); - auto it = bundle_state_map_.find(bundle_id); - // When bundle is committed, it should've been prepared already. - // If GCS call `CommitBundleResources` after `CancelResourceReserve`, we will skip it - // directly. - if (it == bundle_state_map_.end()) { - RAY_LOG(INFO) << "The bundle has been cancelled. Skip it directly. Bundle info is " - << bundle_spec.DebugString(); - return; - } - const auto &bundle_state = it->second; - bundle_state->state = CommitState::COMMITTED; - const auto &acquired_resources = bundle_state->acquired_resources; - for (auto resource : acquired_resources.AvailableResources()) { - local_available_resources_.CommitBundleResourceIds(bundle_spec.PlacementGroupId(), - bundle_spec.Index(), - resource.first, resource.second); - } - - resource_map[self_node_id_].CommitBundleResources(bundle_spec.PlacementGroupId(), - bundle_spec.Index(), - bundle_spec.GetRequiredResources()); - RAY_CHECK(bundle_state->acquired_resources.AvailableResources().size() > 0) - << "Prepare should've been failed if there were no acquireable resources."; -} - void NodeManager::ScheduleTasks( std::unordered_map &resource_map) { // If the resource map contains the local raylet, update load before calling policy. @@ -3275,31 +3187,6 @@ void NodeManager::RecordMetrics() { local_queues_.RecordMetrics(); } -bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) { - // We should commit resources if it weren't because - // ReturnBundleResources requires resources to be committed when it is called. - auto it = bundle_state_map_.find(bundle_spec.BundleId()); - if (it == bundle_state_map_.end()) { - RAY_LOG(INFO) << "Duplicate cancel request, skip it directly."; - return false; - } - const auto &bundle_state = it->second; - if (bundle_state->state == CommitState::PREPARED) { - CommitBundle(cluster_resource_map_, bundle_spec); - } - bundle_state_map_.erase(it); - - // Return resources. - const auto &resource_set = bundle_spec.GetRequiredResources(); - for (const auto &resource : resource_set.GetResourceMap()) { - local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(), - bundle_spec.Index(), resource.first); - } - cluster_resource_map_[self_node_id_].ReturnBundleResources( - bundle_spec.PlacementGroupId(), bundle_spec.Index()); - return true; -} - } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 9d8696a88..f1e9ffad0 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -40,6 +40,7 @@ #include "ray/rpc/worker/core_worker_client_pool.h" #include "ray/util/ordered_set.h" #include "ray/common/bundle_spec.h" +#include "ray/raylet/placement_group_resource_manager.h" // clang-format on namespace ray { @@ -105,27 +106,6 @@ struct NodeManagerConfig { uint64_t record_metrics_period_ms; }; -struct pair_hash { - template - std::size_t operator()(const std::pair &pair) const { - return std::hash()(pair.first) ^ std::hash()(pair.second); - } -}; - -enum CommitState { - /// Resources are prepared. - PREPARED, - /// Resources are COMMITTED. - COMMITTED -}; - -struct BundleState { - /// Leasing state for 2PC protocol. - CommitState state; - /// Resources that are acquired at preparation stage. - ResourceIdSet acquired_resources; -}; - class NodeManager : public rpc::NodeManagerServiceHandler { public: /// Create a node manager. @@ -718,6 +698,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Initial node manager configuration. const NodeManagerConfig initial_config_; /// The resources (and specific resource IDs) that are currently available. + /// These two resource container is shared with `PlacementGroupResourceManager`. ResourceIdSet local_available_resources_; std::unordered_map cluster_resource_map_; @@ -808,15 +789,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { absl::flat_hash_map>> async_plasma_objects_notification_ GUARDED_BY(plasma_object_notification_lock_); - /// This map represents the commit state of 2PC protocol for atomic placement group - /// creation. - absl::flat_hash_map, pair_hash> - bundle_state_map_; - - /// Save `BundleSpecification` for cleaning leaked bundles after GCS restart. - absl::flat_hash_map, pair_hash> - bundle_spec_map_; - /// Fields that are used to report metrics. /// The period between debug state dumps. int64_t record_metrics_period_; @@ -832,6 +804,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Number of tasks that are spilled back to other nodes. uint64_t metrics_num_task_spilled_back_; + + /// Managers all bundle-related operations. + std::shared_ptr placement_group_resource_manager_; }; } // namespace raylet diff --git a/src/ray/raylet/placement_group_resource_manager.cc b/src/ray/raylet/placement_group_resource_manager.cc new file mode 100644 index 000000000..06a82663e --- /dev/null +++ b/src/ray/raylet/placement_group_resource_manager.cc @@ -0,0 +1,152 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +#include "ray/raylet/placement_group_resource_manager.h" + +#include +#include +#include + +namespace ray { + +namespace raylet { + +OldPlacementGroupResourceManager::OldPlacementGroupResourceManager( + ResourceIdSet &local_available_resources_, + std::unordered_map &cluster_resource_map_, + const NodeID &self_node_id_) + : local_available_resources_(local_available_resources_), + cluster_resource_map_(cluster_resource_map_), + self_node_id_(self_node_id_) {} + +bool OldPlacementGroupResourceManager::PrepareBundle( + const BundleSpecification &bundle_spec) { + // We will first delete the existing bundle to ensure idempotence. + // The reason why we do this is: after GCS restarts, placement group can be rescheduled + // directly without rolling back the operations performed before the restart. + const auto &bundle_id = bundle_spec.BundleId(); + auto iter = bundle_state_map_.find(bundle_id); + if (iter != bundle_state_map_.end()) { + if (iter->second->state == CommitState::COMMITTED) { + // If the bundle state is already committed, it means that prepare request is just + // stale. + RAY_LOG(INFO) << "Duplicate prepare bundle request, skip it directly."; + return true; + } else { + // If there was a bundle in prepare state, it already locked resources, we will + // return bundle resources. + ReturnBundle(bundle_spec); + } + } + + auto &local_resource_set = cluster_resource_map_[self_node_id_]; + auto bundle_state = std::make_shared(); + bool local_resource_enough = bundle_spec.GetRequiredResources().IsSubset( + local_resource_set.GetAvailableResources()); + + if (local_resource_enough) { + // Register states. + auto it = bundle_state_map_.find(bundle_id); + // Same bundle cannot be rescheduled. + RAY_CHECK(it == bundle_state_map_.end()); + + // Prepare resources. This shouldn't create formatted placement group resources + // because that'll be done at the commit phase. + bundle_state->acquired_resources = + local_available_resources_.Acquire(bundle_spec.GetRequiredResources()); + local_resource_set.Acquire(bundle_spec.GetRequiredResources()); + + // Register bundle state. + bundle_state->state = CommitState::PREPARED; + bundle_state_map_.emplace(bundle_id, bundle_state); + bundle_spec_map_.emplace( + bundle_id, std::make_shared(bundle_spec.GetMessage())); + } + return bundle_state->acquired_resources.AvailableResources().size() > 0; +} + +void OldPlacementGroupResourceManager::CommitBundle( + const BundleSpecification &bundle_spec) { + const auto &bundle_id = bundle_spec.BundleId(); + auto it = bundle_state_map_.find(bundle_id); + // When bundle is committed, it should've been prepared already. + // If GCS call `CommitBundleResources` after `CancelResourceReserve`, we will skip it + // directly. + if (it == bundle_state_map_.end()) { + RAY_LOG(INFO) << "The bundle has been cancelled. Skip it directly. Bundle info is " + << bundle_spec.DebugString(); + return; + } else { + // Ignore request If the bundle state is already committed. + if (it->second->state == CommitState::COMMITTED) { + RAY_LOG(INFO) << "Duplicate committ bundle request, skip it directly."; + return; + } + } + const auto &bundle_state = it->second; + bundle_state->state = CommitState::COMMITTED; + const auto &acquired_resources = bundle_state->acquired_resources; + + const auto &bundle_resource_labels = bundle_spec.GetFormattedResources(); + const auto &formatted_resource_set = ResourceSet(bundle_resource_labels); + local_available_resources_.Release(ResourceIdSet(formatted_resource_set)); + + cluster_resource_map_[self_node_id_].AddResource(ResourceSet(bundle_resource_labels)); + + RAY_CHECK(acquired_resources.AvailableResources().size() > 0) + << "Prepare should've been failed if there were no acquireable resources."; +} + +void OldPlacementGroupResourceManager::ReturnBundle( + const BundleSpecification &bundle_spec) { + // We should commit resources if it weren't because + // ReturnBundleResources requires resources to be committed when it is called. + auto it = bundle_state_map_.find(bundle_spec.BundleId()); + if (it == bundle_state_map_.end()) { + RAY_LOG(INFO) << "Duplicate cancel request, skip it directly."; + return; + } + const auto &bundle_state = it->second; + if (bundle_state->state == CommitState::PREPARED) { + CommitBundle(bundle_spec); + } + bundle_state_map_.erase(it); + + const auto &resource_set = bundle_spec.GetRequiredResources(); + const auto &placement_group_resource_labels = bundle_spec.GetFormattedResources(); + + // Return resources to ResourceIdSet. + local_available_resources_.Release(ResourceIdSet(resource_set)); + local_available_resources_.Acquire(ResourceSet(placement_group_resource_labels)); + + // Return resources to SchedulingResources. + cluster_resource_map_[self_node_id_].Release(resource_set); + cluster_resource_map_[self_node_id_].Acquire( + ResourceSet(placement_group_resource_labels)); +} + +void OldPlacementGroupResourceManager::ReturnUnusedBundle( + const std::unordered_set &in_use_bundles) { + for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) { + if (0 == in_use_bundles.count(iter->first)) { + ReturnBundle(*iter->second); + bundle_spec_map_.erase(iter++); + } else { + iter++; + } + } +} + +} // namespace raylet +} // namespace ray diff --git a/src/ray/raylet/placement_group_resource_manager.h b/src/ray/raylet/placement_group_resource_manager.h new file mode 100644 index 000000000..3b6e3a928 --- /dev/null +++ b/src/ray/raylet/placement_group_resource_manager.h @@ -0,0 +1,131 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "absl/container/flat_hash_map.h" +#include "ray/common/bundle_spec.h" +#include "ray/common/id.h" +#include "ray/common/task/scheduling_resources.h" + +namespace ray { + +namespace raylet { + +enum CommitState { + /// Resources are prepared. + PREPARED, + /// Resources are COMMITTED. + COMMITTED +}; + +struct BundleState { + /// Leasing state for 2PC protocol. + CommitState state; + /// Resources that are acquired at preparation stage. + ResourceIdSet acquired_resources; +}; + +struct pair_hash { + template + std::size_t operator()(const std::pair &pair) const { + return std::hash()(pair.first) ^ std::hash()(pair.second); + } +}; + +/// `PlacementGroupResourceManager` responsible for managing the resources that +/// about allocated for placement group bundles. +class PlacementGroupResourceManager { + public: + /// Lock the required resources from local available resources. Note that this is phase + /// one of 2PC, it will not convert placement group resource(like CPU -> CPU_group_i). + /// + /// \param bundle_spec: Specification of bundle whose resources will be prepared. + virtual bool PrepareBundle(const BundleSpecification &bundle_spec) = 0; + + /// Convert the required resources to placement group resources(like CPU -> + /// CPU_group_i). This is phase two of 2PC. + /// + /// \param bundle_spec: Specification of bundle whose resources will be commited. + virtual void CommitBundle(const BundleSpecification &bundle_spec) = 0; + + /// Return back all the bundle resource. + /// + /// \param bundle_spec: Specification of bundle whose resources will be returned. + virtual void ReturnBundle(const BundleSpecification &bundle_spec) = 0; + + /// Return back all the bundle(which is unused) resource. + /// + /// \param bundle_spec: A set of bundles which in use. + virtual void ReturnUnusedBundle( + const std::unordered_set &in_use_bundles) = 0; + + virtual ~PlacementGroupResourceManager() {} +}; + +/// Associated with old scheduler. +class OldPlacementGroupResourceManager : public PlacementGroupResourceManager { + public: + /// Create a local placement group manager. + /// + /// \param local_available_resources_: The resources (IDs specificed) that are currently + /// available. + /// \param cluster_resource_map_: The resources (without IDs specificed) that + /// are currently available. + /// \param self_node_id_: The related raylet with current + /// placement group manager. + OldPlacementGroupResourceManager( + ResourceIdSet &local_available_resources_, + std::unordered_map &cluster_resource_map_, + const NodeID &self_node_id_); + + virtual ~OldPlacementGroupResourceManager() = default; + + bool PrepareBundle(const BundleSpecification &bundle_spec); + + void CommitBundle(const BundleSpecification &bundle_spec); + + void ReturnBundle(const BundleSpecification &bundle_spec); + + void ReturnUnusedBundle(const std::unordered_set &in_use_bundles); + + /// Get all local available resource(IDs specificed). + const ResourceIdSet &GetAllResourceIdSet() const { return local_available_resources_; }; + + /// Get all local available resource(without IDs specificed). + const SchedulingResources &GetAllResourceSetWithoutId() const { + return cluster_resource_map_[self_node_id_]; + } + + private: + /// The resources (and specific resource IDs) that are currently available. + /// These two resource container is shared with `NodeManager`. + ResourceIdSet &local_available_resources_; + std::unordered_map &cluster_resource_map_; + + /// Related raylet with current placement group manager. + NodeID self_node_id_; + + /// This map represents the commit state of 2PC protocol for atomic placement group + /// creation. + absl::flat_hash_map, pair_hash> + bundle_state_map_; + + /// Save `BundleSpecification` for cleaning leaked bundles after GCS restart. + absl::flat_hash_map, pair_hash> + bundle_spec_map_; +}; + +} // namespace raylet +} // end namespace ray diff --git a/src/ray/raylet/placement_group_resource_manager_test.cc b/src/ray/raylet/placement_group_resource_manager_test.cc new file mode 100644 index 000000000..10011aece --- /dev/null +++ b/src/ray/raylet/placement_group_resource_manager_test.cc @@ -0,0 +1,270 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/placement_group_resource_manager.h" +#include "ray/common/bundle_spec.h" +#include "ray/common/id.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/gcs/test/gcs_test_util.h" + +#include + +#include "gtest/gtest.h" + +namespace ray { + +class OldPlacementGroupResourceManagerTest : public ::testing::Test { + public: + OldPlacementGroupResourceManagerTest() { + old_placement_group_resource_manager_.reset( + new raylet::OldPlacementGroupResourceManager( + local_available_resources_, cluster_resource_map_, self_node_id_)); + } + + std::unique_ptr + old_placement_group_resource_manager_; + + void InitLocalAvailableResource( + std::unordered_map &unit_resource) { + ResourceSet init_resourece(unit_resource); + cluster_resource_map_[self_node_id_] = SchedulingResources(init_resourece); + local_available_resources_ = ResourceIdSet(init_resourece); + } + + void CheckRemainingResourceCorrect(ResourceSet &result_resource) { + auto &remaining_resource = + old_placement_group_resource_manager_->GetAllResourceSetWithoutId(); + ASSERT_EQ(1, remaining_resource.GetAvailableResources().IsEqual(result_resource)) + << remaining_resource.GetAvailableResources().ToString() << " vs " + << result_resource.ToString(); + ASSERT_EQ(1, local_available_resources_.ToResourceSet().IsEqual(result_resource)) + << local_available_resources_.ToResourceSet().ToString() << " vs " + << result_resource.ToString(); + } + + protected: + ResourceIdSet local_available_resources_; + std::unordered_map cluster_resource_map_; + NodeID self_node_id_ = NodeID::FromRandom(); +}; + +TEST_F(OldPlacementGroupResourceManagerTest, TestPrepareBundleResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + InitLocalAvailableResource(unit_resource); + /// 3. prepare bundle resource. + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + /// 4. check remaining resources is correct. + auto &remaining_resource = + old_placement_group_resource_manager_->GetAllResourceSetWithoutId(); + ResourceSet result_resource; + ASSERT_EQ(0, local_available_resources_.AvailableResources().size()); + ASSERT_EQ(1, remaining_resource.GetAvailableResources().IsEqual(result_resource)) + << remaining_resource.GetAvailableResources().ToString() << " vs " + << result_resource.ToString(); + ASSERT_EQ(1, local_available_resources_.ToResourceSet().IsEqual(result_resource)) + << local_available_resources_.ToResourceSet().ToString() << " vs " + << result_resource.ToString(); +} + +TEST_F(OldPlacementGroupResourceManagerTest, TestPrepareBundleWithInsufficientResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 2.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + std::unordered_map init_unit_resource; + init_unit_resource.insert({"CPU", 1.0}); + InitLocalAvailableResource(init_unit_resource); + /// 3. prepare bundle resource. + ASSERT_FALSE(old_placement_group_resource_manager_->PrepareBundle(bundle_spec)); +} + +TEST_F(OldPlacementGroupResourceManagerTest, TestCommitBundleResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + InitLocalAvailableResource(unit_resource); + /// 3. prepare and commit bundle resource. + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + old_placement_group_resource_manager_->CommitBundle(bundle_spec); + /// 4. check remaining resources is correct. + auto &remaining_resource = + old_placement_group_resource_manager_->GetAllResourceSetWithoutId(); + std::vector resource_labels = {"CPU_group_" + group_id.Hex(), + "CPU_group_1_" + group_id.Hex()}; + std::vector resource_capacity = {1.0, 1.0}; + ResourceSet result_resource(resource_labels, resource_capacity); + ASSERT_EQ(2, local_available_resources_.AvailableResources().size()); + ASSERT_EQ(1, remaining_resource.GetAvailableResources().IsEqual(result_resource)) + << remaining_resource.GetAvailableResources().ToString() << " vs " + << result_resource.ToString(); + ASSERT_EQ(1, local_available_resources_.ToResourceSet().IsEqual(result_resource)) + << local_available_resources_.ToResourceSet().ToString() << " vs " + << result_resource.ToString(); +} + +TEST_F(OldPlacementGroupResourceManagerTest, TestReturnBundleResource) { + // 1. create bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + InitLocalAvailableResource(unit_resource); + /// 3. prepare and commit bundle resource. + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + old_placement_group_resource_manager_->CommitBundle(bundle_spec); + /// 4. return bundle resource. + old_placement_group_resource_manager_->ReturnBundle(bundle_spec); + /// 5. check remaining resources is correct. + auto &remaining_resource = + old_placement_group_resource_manager_->GetAllResourceSetWithoutId(); + ResourceSet result_resource(unit_resource); + ASSERT_EQ(1, local_available_resources_.AvailableResources().size()); + ASSERT_EQ(1, remaining_resource.GetAvailableResources().IsEqual(result_resource)) + << remaining_resource.GetAvailableResources().ToString() << " vs " + << result_resource.ToString(); + ASSERT_EQ(1, local_available_resources_.ToResourceSet().IsEqual(result_resource)) + << local_available_resources_.ToResourceSet().ToString() << " vs " + << result_resource.ToString(); +} + +TEST_F(OldPlacementGroupResourceManagerTest, TestMultipleBundlesCommitAndReturn) { + // 1. create two bundles spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto first_bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + auto second_bundle_spec = Mocker::GenBundleCreation(group_id, 2, unit_resource); + /// 2. init local available resource. + std::unordered_map init_unit_resource; + init_unit_resource.insert({"CPU", 2.0}); + InitLocalAvailableResource(init_unit_resource); + /// 3. prepare and commit two bundle resource. + old_placement_group_resource_manager_->PrepareBundle(first_bundle_spec); + old_placement_group_resource_manager_->PrepareBundle(second_bundle_spec); + old_placement_group_resource_manager_->CommitBundle(first_bundle_spec); + old_placement_group_resource_manager_->CommitBundle(second_bundle_spec); + /// 4. check remaining resources is correct after commit phase. + auto &remaining_resource = + old_placement_group_resource_manager_->GetAllResourceSetWithoutId(); + std::vector resource_labels = {"CPU_group_" + group_id.Hex(), + "CPU_group_1_" + group_id.Hex(), + "CPU_group_2_" + group_id.Hex()}; + std::vector resource_capacity = {2.0, 1.0, 1.0}; + ResourceSet result_resource(resource_labels, resource_capacity); + ASSERT_EQ(3, local_available_resources_.AvailableResources().size()); + ASSERT_EQ(1, remaining_resource.GetAvailableResources().IsEqual(result_resource)) + << remaining_resource.GetAvailableResources().ToString() << " vs " + << result_resource.ToString(); + ASSERT_EQ(1, local_available_resources_.ToResourceSet().IsEqual(result_resource)) + << local_available_resources_.ToResourceSet().ToString() << " vs " + << result_resource.ToString(); + /// 5. return second bundle. + old_placement_group_resource_manager_->ReturnBundle(second_bundle_spec); + /// 6. check remaining resources is correct after return second bundle. + resource_labels = {"CPU", "CPU_group_" + group_id.Hex(), + "CPU_group_1_" + group_id.Hex()}; + resource_capacity = {1.0, 1.0, 1.0}; + result_resource = ResourceSet(resource_labels, resource_capacity); + ASSERT_EQ(1, remaining_resource.GetAvailableResources().IsEqual(result_resource)) + << remaining_resource.GetAvailableResources().ToString() << " vs " + << result_resource.ToString(); + ASSERT_EQ(1, local_available_resources_.ToResourceSet().IsEqual(result_resource)) + << local_available_resources_.ToResourceSet().ToString() << " vs " + << result_resource.ToString(); + /// 7. return first bundel. + old_placement_group_resource_manager_->ReturnBundle(first_bundle_spec); + /// 8. check remaining resources is correct after all bundle returned. + result_resource = ResourceSet(init_unit_resource); + ASSERT_EQ(1, remaining_resource.GetAvailableResources().IsEqual(result_resource)) + << remaining_resource.GetAvailableResources().ToString() << " vs " + << result_resource.ToString(); + ASSERT_EQ(1, local_available_resources_.ToResourceSet().IsEqual(result_resource)) + << local_available_resources_.ToResourceSet().ToString() << " vs " + << result_resource.ToString(); +} + +TEST_F(OldPlacementGroupResourceManagerTest, TestIdempotencyWithMultiPrepare) { + // 1. create one bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + std::unordered_map available_resource = { + std::make_pair("CPU", 3.0)}; + InitLocalAvailableResource(available_resource); + /// 3. prepare bundle resource 10 times. + for (int i = 0; i < 10; i++) { + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + } + /// 4. check remaining resources is correct. + std::unordered_map result_resource_map = { + std::make_pair("CPU", 2.0)}; + ResourceSet result_resource(result_resource_map); + CheckRemainingResourceCorrect(result_resource); +} + +TEST_F(OldPlacementGroupResourceManagerTest, TestIdempotencyWithRandomOrder) { + // 1. create one bundle spec. + auto group_id = PlacementGroupID::FromRandom(); + std::unordered_map unit_resource; + unit_resource.insert({"CPU", 1.0}); + auto bundle_spec = Mocker::GenBundleCreation(group_id, 1, unit_resource); + /// 2. init local available resource. + std::unordered_map available_resource = { + std::make_pair("CPU", 3.0)}; + InitLocalAvailableResource(available_resource); + /// 3. prepare bundle -> commit bundle -> prepare bundle. + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + old_placement_group_resource_manager_->CommitBundle(bundle_spec); + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + /// 4. check remaining resources is correct. + std::vector resource_labels = {"CPU_group_" + group_id.Hex(), + "CPU_group_1_" + group_id.Hex(), "CPU"}; + std::vector resource_capacity = {1.0, 1.0, 2.0}; + ResourceSet result_resource(resource_labels, resource_capacity); + CheckRemainingResourceCorrect(result_resource); + old_placement_group_resource_manager_->ReturnBundle(bundle_spec); + // 5. prepare bundle -> commit bundle -> commit bundle. + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + old_placement_group_resource_manager_->CommitBundle(bundle_spec); + old_placement_group_resource_manager_->CommitBundle(bundle_spec); + // 6. check remaining resources is correct. + CheckRemainingResourceCorrect(result_resource); + old_placement_group_resource_manager_->ReturnBundle(bundle_spec); + // 7. prepare bundle -> return bundle -> commit bundle. + old_placement_group_resource_manager_->PrepareBundle(bundle_spec); + old_placement_group_resource_manager_->ReturnBundle(bundle_spec); + old_placement_group_resource_manager_->CommitBundle(bundle_spec); + result_resource = ResourceSet(available_resource); + CheckRemainingResourceCorrect(result_resource); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index f0599e11e..d4d2ffc11 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -151,38 +151,6 @@ std::unordered_map SchedulingPolicy::Schedule( return decision; } -bool SchedulingPolicy::ScheduleBundle( - std::unordered_map &cluster_resources, - const NodeID &local_node_id, const ray::BundleSpecification &bundle_spec) { -#ifndef NDEBUG - RAY_LOG(DEBUG) << "Cluster resource map: "; - for (const auto &node_resource_pair : cluster_resources) { - const NodeID &node_id = node_resource_pair.first; - const SchedulingResources &resources = node_resource_pair.second; - RAY_LOG(DEBUG) << "node_id: " << node_id << " " - << resources.GetAvailableResources().ToString(); - } -#endif - const auto &node_resource_pair = cluster_resources.find(local_node_id); - if (node_resource_pair == cluster_resources.end()) { - return false; - } - const auto &resource_demand = bundle_spec.GetRequiredResources(); - NodeID node_id = node_resource_pair->first; - const auto &node_resources = node_resource_pair->second; - ResourceSet available_node_resources = - ResourceSet(node_resources.GetAvailableResources()); - available_node_resources.SubtractResources(node_resources.GetLoadResources()); - RAY_LOG(DEBUG) << "Scheduling bundle, node id = " << node_id - << ", available resources = " - << node_resources.GetAvailableResources().ToString() - << ", resources load = " << node_resources.GetLoadResources().ToString() - << ", the resource needed = " << resource_demand.ToString(); - /// If the resource_demand is subset of the whole available_node_resources, this bundle - /// can be set in this node, return true. - return resource_demand.IsSubset(available_node_resources); -} - std::vector SchedulingPolicy::SpillOverInfeasibleTasks( SchedulingResources &node_resources) const { // The policy decision to be returned. diff --git a/src/ray/raylet/scheduling_policy.h b/src/ray/raylet/scheduling_policy.h index 24bfdb0cb..f719a1f6e 100644 --- a/src/ray/raylet/scheduling_policy.h +++ b/src/ray/raylet/scheduling_policy.h @@ -50,17 +50,6 @@ class SchedulingPolicy { std::unordered_map &cluster_resources, const NodeID &local_node_id); - /// \param cluster_resources: a set of cluster resources containing resource and load - /// information for some subset of the cluster. - /// \param local_node_id The ID of the node manager that owns this - /// SchedulingPolicy object. - /// \param bundle_spec the description of a bundle which include the resource the bundle - /// need. \return If this bundle can be scheduled in this node, return true; else return - /// false. - bool ScheduleBundle(std::unordered_map &cluster_resources, - const NodeID &local_node_id, - const ray::BundleSpecification &bundle_spec); - /// \brief Given a set of cluster resources, try to spillover infeasible tasks. /// /// \param node_resources The resource information for a node. This may be