diff --git a/src/ray/common/bundle_spec.cc b/src/ray/common/bundle_spec.cc index d4a295584..9e83b6a26 100644 --- a/src/ray/common/bundle_spec.cc +++ b/src/ray/common/bundle_spec.cc @@ -43,12 +43,6 @@ std::pair BundleSpecification::BundleId() const { PlacementGroupID::FromBinary(message_->bundle_id().placement_group_id()), index); } -std::string BundleSpecification::BundleIdAsString() const { - int64_t index = message_->bundle_id().bundle_index(); - return PlacementGroupID::FromBinary(message_->bundle_id().placement_group_id()).Hex() + - std::to_string(index); -} - PlacementGroupID BundleSpecification::PlacementGroupId() const { return PlacementGroupID::FromBinary(message_->bundle_id().placement_group_id()); } @@ -57,4 +51,25 @@ int64_t BundleSpecification::Index() const { return message_->bundle_id().bundle_index(); } +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + PlacementGroupID group_id, + int64_t bundle_index) { + auto str = original_resource_name + "_group_" + group_id.Hex() + "_" + + std::to_string(bundle_index); + RAY_CHECK(GetOriginalResourceName(str) == original_resource_name) << str; + return str; +} + +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + const BundleSpecification &bundle_spec) { + return FormatPlacementGroupResource( + original_resource_name, bundle_spec.PlacementGroupId(), bundle_spec.Index()); +} + +std::string GetOriginalResourceName(const std::string &resource) { + auto idx = resource.find("_group_"); + RAY_CHECK(idx >= 0) << "This isn't a placement group resource " << resource; + return resource.substr(0, idx); +} + } // namespace ray diff --git a/src/ray/common/bundle_spec.h b/src/ray/common/bundle_spec.h index 1176ce02a..1001af0de 100644 --- a/src/ray/common/bundle_spec.h +++ b/src/ray/common/bundle_spec.h @@ -51,9 +51,6 @@ class BundleSpecification : public MessageWrapper { // Return the bundle_id std::pair BundleId() const; - // Return the bundle_id of string. eg: placement_group_id + index. - std::string BundleIdAsString() const; - // Return the Placement Group id which the Bundle belong to. PlacementGroupID PlacementGroupId() const; @@ -94,4 +91,15 @@ class BundleSpecification : public MessageWrapper { mutable SpillbackBundleCallback on_spillback_ = nullptr; }; +/// Format a placement group resource, e.g., CPU -> CPU_group_YYY_i +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + PlacementGroupID group_id, int64_t bundle_index); + +/// Format a placement group resource, e.g., CPU -> CPU_group_YYY_i +std::string FormatPlacementGroupResource(const std::string &original_resource_name, + const BundleSpecification &bundle_spec); + +/// Return the original resource name of the placement group resource. +std::string GetOriginalResourceName(const std::string &resource); + } // namespace ray diff --git a/src/ray/common/task/scheduling_resources.cc b/src/ray/common/task/scheduling_resources.cc index 49f75176c..dc247fd30 100644 --- a/src/ray/common/task/scheduling_resources.cc +++ b/src/ray/common/task/scheduling_resources.cc @@ -3,6 +3,7 @@ #include #include +#include "ray/common/bundle_spec.h" #include "ray/util/logging.h" namespace ray { @@ -226,21 +227,22 @@ void ResourceSet::AddResources(const ResourceSet &other) { } } -void ResourceSet::AddBundleResources(const std::string &bundle_id, - const ResourceSet &other) { +void ResourceSet::AddBundleResources(const PlacementGroupID &bundle_id, + const int bundle_index, const ResourceSet &other) { for (const auto &resource_pair : other.GetResourceAmountMap()) { - const std::string &resource_label = bundle_id + "_" + resource_pair.first; + const std::string &resource_label = + FormatPlacementGroupResource(resource_pair.first, bundle_id, bundle_index); const FractionalResourceQuantity &resource_capacity = resource_pair.second; resource_capacity_[resource_label] += resource_capacity; } } -void ResourceSet::ReturnBundleResources(const std::string &bundle_id) { +void ResourceSet::ReturnBundleResources(const PlacementGroupID &bundle_id, + const int bundle_index) { for (auto iter = resource_capacity_.begin(); iter != resource_capacity_.end();) { const std::string &bundle_resource_label = iter->first; - if (bundle_resource_label.find(bundle_id) != std::string::npos) { - const std::string &resource_label = - bundle_resource_label.substr(bundle_resource_label.find_last_of("_") + 1); + if (bundle_resource_label.find(bundle_id.Hex()) != std::string::npos) { + const std::string &resource_label = GetOriginalResourceName(bundle_resource_label); const FractionalResourceQuantity &resource_capacity = iter->second; resource_capacity_[resource_label] += resource_capacity; iter = resource_capacity_.erase(iter); @@ -669,7 +671,7 @@ void ResourceIdSet::AddBundleResource(const std::string &resource_name, } void ResourceIdSet::CancelResourceReserve(const std::string &resource_name) { - std::string origin_resource_name = resource_name.substr(resource_name.find("_") + 1); + std::string origin_resource_name = GetOriginalResourceName(resource_name); auto iter_orig = available_resources_.find(origin_resource_name); auto iter_bundle = available_resources_.find(resource_name); if (iter_bundle == available_resources_.end()) { @@ -835,17 +837,19 @@ void SchedulingResources::UpdateResourceCapacity(const std::string &resource_nam } } -void SchedulingResources::UpdateBundleResource(const std::string &bundle_id, +void SchedulingResources::UpdateBundleResource(const PlacementGroupID &group, + const int bundle_index, const ResourceSet &resource_set) { resources_available_.SubtractResourcesStrict(resource_set); - resources_available_.AddBundleResources(bundle_id, resource_set); + resources_available_.AddBundleResources(group, bundle_index, resource_set); resources_total_.SubtractResourcesStrict(resource_set); - resources_total_.AddBundleResources(bundle_id, resource_set); + resources_total_.AddBundleResources(group, bundle_index, resource_set); } -void SchedulingResources::ReturnBundleResource(const std::string &bundle_id) { - resources_available_.ReturnBundleResources(bundle_id); - resources_total_.ReturnBundleResources(bundle_id); +void SchedulingResources::ReturnBundleResource(const PlacementGroupID &group, + const int bundle_index) { + resources_available_.ReturnBundleResources(group, bundle_index); + resources_total_.ReturnBundleResources(group, bundle_index); } void SchedulingResources::DeleteResource(const std::string &resource_name) { diff --git a/src/ray/common/task/scheduling_resources.h b/src/ray/common/task/scheduling_resources.h index 580ad0bbb..fa9afc558 100644 --- a/src/ray/common/task/scheduling_resources.h +++ b/src/ray/common/task/scheduling_resources.h @@ -5,6 +5,7 @@ #include #include +#include "ray/common/id.h" #include "ray/raylet/format/node_manager_generated.h" namespace ray { @@ -148,21 +149,22 @@ class ResourceSet { void AddResources(const ResourceSet &other); /// \brief Aggregate resources from the other set into this set, adding any missing - /// resource labels to this set. The resource id will change to bundle_id + "_" + - /// reource_id + /// resource labels to this set. /// + /// \param bundle_id: The placement group id. + /// \param bundle_index: The index of the bundle. /// \param other: The other resource set to add. - /// \param bundle_id: The bundle_id of the bundle. /// \return Void. - void AddBundleResources(const std::string &bundle_id, const ResourceSet &other); + void AddBundleResources(const PlacementGroupID &bundle_id, const int bundle_index, + const ResourceSet &other); /// \brief Return back all the bundle resource. Changing the resource name and adding - /// any missing resource labels to this set. The resource id will remove bundle_id + "_" - /// part. + /// any missing resource labels to this set. /// - /// \param bundle_id: The bundle_id of the bundle. + /// \param bundle_id: The placement group id. + /// \param bundle_index: The index of the bundle. /// \return Void. - void ReturnBundleResources(const std::string &bundle_id); + void ReturnBundleResources(const PlacementGroupID &bundle_id, const int bundle_index); /// \brief Subtract a set of resources from the current set of resources and /// check that the post-subtraction result nonnegative. Assumes other @@ -556,13 +558,13 @@ class SchedulingResources { /// Create if not exists. /// \param resource_name: Name of the resource to be modified /// \param resource_set: New resource_set of the resource. - void UpdateBundleResource(const std::string &bundle_id, + void UpdateBundleResource(const PlacementGroupID &group, const int bundle_index, const ResourceSet &resource_set); /// \brief delete total, available and load resources with the ResourceIds. /// Create if not exists. /// \param resource_name: Name of the resource to be deleted - void ReturnBundleResource(const std::string &bundle_id); + void ReturnBundleResource(const PlacementGroupID &group, const int bundle_index); /// \brief Delete resource from total, available and load resources. /// diff --git a/src/ray/common/task/scheduling_resources_test.cc b/src/ray/common/task/scheduling_resources_test.cc index 8611aca5f..73d3e50b3 100644 --- a/src/ray/common/task/scheduling_resources_test.cc +++ b/src/ray/common/task/scheduling_resources_test.cc @@ -33,20 +33,20 @@ class SchedulingResourcesTest : public ::testing::Test { }; TEST_F(SchedulingResourcesTest, AddBundleResources) { - UniqueID bundle_id = UniqueID::FromRandom(); + PlacementGroupID group_id = PlacementGroupID::FromRandom(); std::vector resource_labels = {"CPU"}; std::vector resource_capacity = {1.0}; ResourceSet resource(resource_labels, resource_capacity); - resource_set->AddBundleResources(bundle_id.Binary(), resource); + resource_set->AddBundleResources(group_id, 1, resource); resource_labels.pop_back(); - resource_labels.push_back(bundle_id.Binary() + "_" + "CPU"); + resource_labels.push_back("CPU_group_" + group_id.Hex() + "_1"); ResourceSet result_resource(resource_labels, resource_capacity); ASSERT_EQ(1, resource_set->IsEqual(result_resource)); } TEST_F(SchedulingResourcesTest, AddBundleResource) { - UniqueID bundle_id = UniqueID::FromRandom(); - std::string name = bundle_id.Binary() + "_" + "CPU"; + PlacementGroupID group_id = PlacementGroupID::FromRandom(); + std::string name = "CPU_group_" + group_id.Hex() + "_1"; std::vector whole_ids = {1, 2, 3}; ResourceIds resource_ids(whole_ids); resource_id_set->AddBundleResource(name, resource_ids); @@ -55,16 +55,16 @@ TEST_F(SchedulingResourcesTest, AddBundleResource) { } TEST_F(SchedulingResourcesTest, ReturnBundleResources) { - UniqueID bundle_id = UniqueID::FromRandom(); + PlacementGroupID group_id = PlacementGroupID::FromRandom(); std::vector resource_labels = {"CPU"}; std::vector resource_capacity = {1.0}; ResourceSet resource(resource_labels, resource_capacity); - resource_set->AddBundleResources(bundle_id.Binary(), resource); + resource_set->AddBundleResources(group_id, 1, resource); resource_labels.pop_back(); - resource_labels.push_back(bundle_id.Binary() + "_" + "CPU"); + resource_labels.push_back("CPU_group_" + group_id.Hex() + "_1"); ResourceSet result_resource(resource_labels, resource_capacity); ASSERT_EQ(1, resource_set->IsEqual(result_resource)); - resource_set->ReturnBundleResources(bundle_id.Binary()); + resource_set->ReturnBundleResources(group_id, 1); ASSERT_EQ(1, resource_set->IsEqual(resource)); } } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 04c56e597..1a52bf306 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -15,6 +15,7 @@ #include "ray/core_worker/core_worker.h" #include "boost/fiber/all.hpp" +#include "ray/common/bundle_spec.h" #include "ray/common/ray_config.h" #include "ray/common/task/task_util.h" #include "ray/core_worker/context.h" @@ -1194,7 +1195,7 @@ std::unordered_map AddPlacementGroupConstraint( if (placement_group_id != PlacementGroupID::Nil()) { for (auto iter = resources.begin(); iter != resources.end(); iter++) { auto new_name = - placement_group_id.Hex() + std::to_string(bundle_index) + "_" + iter->first; + FormatPlacementGroupResource(iter->first, placement_group_id, bundle_index); new_resources[new_name] = iter->second; } return new_resources; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index a2a6f0735..9b9f0936e 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -127,9 +127,11 @@ void GcsPlacementGroupScheduler::Schedule( if (lease_success) { rpc::ScheduleData data; for (size_t i = 0; i < bundles.size(); i++) { + // TODO(ekl) this is a hack to get a string key for the proto + auto key = bundles[i]->PlacementGroupId().Hex() + "_" + + std::to_string(bundles[i]->Index()); data.mutable_schedule_plan()->insert( - {bundles[i]->BundleIdAsString(), - (*decision)[bundles[i]->BundleId()].Binary()}); + {key, (*decision)[bundles[i]->BundleId()].Binary()}); } RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put( placement_group->GetPlacementGroupID(), data, [](Status status) {})); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 8dc9f6426..98a19f042 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1819,13 +1819,13 @@ void NodeManager::HandleCancelResourceReserve( auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(DEBUG) << "bundle return resource request " << bundle_spec.BundleId().first << bundle_spec.BundleId().second; - auto bundle_id_str = bundle_spec.BundleIdAsString(); auto resource_set = bundle_spec.GetRequiredResources(); for (auto resource : resource_set.GetResourceMap()) { - std::string resource_name = bundle_id_str + "_" + resource.first; + std::string resource_name = FormatPlacementGroupResource(resource.first, bundle_spec); local_available_resources_.CancelResourceReserve(resource_name); } - cluster_resource_map_[self_node_id_].ReturnBundleResource(bundle_id_str); + cluster_resource_map_[self_node_id_].ReturnBundleResource( + bundle_spec.PlacementGroupId(), bundle_spec.Index()); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -1987,16 +1987,17 @@ ResourceIdSet NodeManager::ScheduleBundle( // Invoke the scheduling policy. auto reserve_resource_success = scheduling_policy_.ScheduleBundle(resource_map, self_node_id_, bundle_spec); - auto bundle_id_str = bundle_spec.BundleIdAsString(); ResourceIdSet acquired_resources; if (reserve_resource_success) { acquired_resources = local_available_resources_.Acquire(bundle_spec.GetRequiredResources()); for (auto resource : acquired_resources.AvailableResources()) { - std::string resource_name = bundle_id_str + "_" + resource.first; + std::string resource_name = + FormatPlacementGroupResource(resource.first, bundle_spec); local_available_resources_.AddBundleResource(resource_name, resource.second); } - resource_map[self_node_id_].UpdateBundleResource(bundle_id_str, + resource_map[self_node_id_].UpdateBundleResource(bundle_spec.PlacementGroupId(), + bundle_spec.Index(), bundle_spec.GetRequiredResources()); } return acquired_resources; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 9fa038505..bb094f485 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -311,6 +311,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { ResourceIdSet ScheduleBundle( std::unordered_map &resource_map, const BundleSpecification &bundle_spec); + /// Handle a task whose return value(s) must be reconstructed. /// /// \param task_id The relevant task ID.