Clean up formatting of placement group resources (#9740)

This commit is contained in:
Eric Liang
2020-07-30 15:52:32 -07:00
committed by GitHub
parent 940617d092
commit 73df3f7bd2
9 changed files with 85 additions and 51 deletions
+21 -6
View File
@@ -43,12 +43,6 @@ std::pair<PlacementGroupID, int64_t> BundleSpecification::BundleId() const {
PlacementGroupID::FromBinary(message_->bundle_id().placement_group_id()), index);
}
std::string BundleSpecification::BundleIdAsString() const {
int64_t index = message_->bundle_id().bundle_index();
return PlacementGroupID::FromBinary(message_->bundle_id().placement_group_id()).Hex() +
std::to_string(index);
}
PlacementGroupID BundleSpecification::PlacementGroupId() const {
return PlacementGroupID::FromBinary(message_->bundle_id().placement_group_id());
}
@@ -57,4 +51,25 @@ int64_t BundleSpecification::Index() const {
return message_->bundle_id().bundle_index();
}
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
PlacementGroupID group_id,
int64_t bundle_index) {
auto str = original_resource_name + "_group_" + group_id.Hex() + "_" +
std::to_string(bundle_index);
RAY_CHECK(GetOriginalResourceName(str) == original_resource_name) << str;
return str;
}
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const BundleSpecification &bundle_spec) {
return FormatPlacementGroupResource(
original_resource_name, bundle_spec.PlacementGroupId(), bundle_spec.Index());
}
std::string GetOriginalResourceName(const std::string &resource) {
auto idx = resource.find("_group_");
RAY_CHECK(idx >= 0) << "This isn't a placement group resource " << resource;
return resource.substr(0, idx);
}
} // namespace ray
+11 -3
View File
@@ -51,9 +51,6 @@ class BundleSpecification : public MessageWrapper<rpc::Bundle> {
// Return the bundle_id
std::pair<PlacementGroupID, int64_t> BundleId() const;
// Return the bundle_id of string. eg: placement_group_id + index.
std::string BundleIdAsString() const;
// Return the Placement Group id which the Bundle belong to.
PlacementGroupID PlacementGroupId() const;
@@ -94,4 +91,15 @@ class BundleSpecification : public MessageWrapper<rpc::Bundle> {
mutable SpillbackBundleCallback on_spillback_ = nullptr;
};
/// Format a placement group resource, e.g., CPU -> CPU_group_YYY_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
PlacementGroupID group_id, int64_t bundle_index);
/// Format a placement group resource, e.g., CPU -> CPU_group_YYY_i
std::string FormatPlacementGroupResource(const std::string &original_resource_name,
const BundleSpecification &bundle_spec);
/// Return the original resource name of the placement group resource.
std::string GetOriginalResourceName(const std::string &resource);
} // namespace ray
+18 -14
View File
@@ -3,6 +3,7 @@
#include <cmath>
#include <sstream>
#include "ray/common/bundle_spec.h"
#include "ray/util/logging.h"
namespace ray {
@@ -226,21 +227,22 @@ void ResourceSet::AddResources(const ResourceSet &other) {
}
}
void ResourceSet::AddBundleResources(const std::string &bundle_id,
const ResourceSet &other) {
void ResourceSet::AddBundleResources(const PlacementGroupID &bundle_id,
const int bundle_index, const ResourceSet &other) {
for (const auto &resource_pair : other.GetResourceAmountMap()) {
const std::string &resource_label = bundle_id + "_" + resource_pair.first;
const std::string &resource_label =
FormatPlacementGroupResource(resource_pair.first, bundle_id, bundle_index);
const FractionalResourceQuantity &resource_capacity = resource_pair.second;
resource_capacity_[resource_label] += resource_capacity;
}
}
void ResourceSet::ReturnBundleResources(const std::string &bundle_id) {
void ResourceSet::ReturnBundleResources(const PlacementGroupID &bundle_id,
const int bundle_index) {
for (auto iter = resource_capacity_.begin(); iter != resource_capacity_.end();) {
const std::string &bundle_resource_label = iter->first;
if (bundle_resource_label.find(bundle_id) != std::string::npos) {
const std::string &resource_label =
bundle_resource_label.substr(bundle_resource_label.find_last_of("_") + 1);
if (bundle_resource_label.find(bundle_id.Hex()) != std::string::npos) {
const std::string &resource_label = GetOriginalResourceName(bundle_resource_label);
const FractionalResourceQuantity &resource_capacity = iter->second;
resource_capacity_[resource_label] += resource_capacity;
iter = resource_capacity_.erase(iter);
@@ -669,7 +671,7 @@ void ResourceIdSet::AddBundleResource(const std::string &resource_name,
}
void ResourceIdSet::CancelResourceReserve(const std::string &resource_name) {
std::string origin_resource_name = resource_name.substr(resource_name.find("_") + 1);
std::string origin_resource_name = GetOriginalResourceName(resource_name);
auto iter_orig = available_resources_.find(origin_resource_name);
auto iter_bundle = available_resources_.find(resource_name);
if (iter_bundle == available_resources_.end()) {
@@ -835,17 +837,19 @@ void SchedulingResources::UpdateResourceCapacity(const std::string &resource_nam
}
}
void SchedulingResources::UpdateBundleResource(const std::string &bundle_id,
void SchedulingResources::UpdateBundleResource(const PlacementGroupID &group,
const int bundle_index,
const ResourceSet &resource_set) {
resources_available_.SubtractResourcesStrict(resource_set);
resources_available_.AddBundleResources(bundle_id, resource_set);
resources_available_.AddBundleResources(group, bundle_index, resource_set);
resources_total_.SubtractResourcesStrict(resource_set);
resources_total_.AddBundleResources(bundle_id, resource_set);
resources_total_.AddBundleResources(group, bundle_index, resource_set);
}
void SchedulingResources::ReturnBundleResource(const std::string &bundle_id) {
resources_available_.ReturnBundleResources(bundle_id);
resources_total_.ReturnBundleResources(bundle_id);
void SchedulingResources::ReturnBundleResource(const PlacementGroupID &group,
const int bundle_index) {
resources_available_.ReturnBundleResources(group, bundle_index);
resources_total_.ReturnBundleResources(group, bundle_index);
}
void SchedulingResources::DeleteResource(const std::string &resource_name) {
+12 -10
View File
@@ -5,6 +5,7 @@
#include <unordered_set>
#include <vector>
#include "ray/common/id.h"
#include "ray/raylet/format/node_manager_generated.h"
namespace ray {
@@ -148,21 +149,22 @@ class ResourceSet {
void AddResources(const ResourceSet &other);
/// \brief Aggregate resources from the other set into this set, adding any missing
/// resource labels to this set. The resource id will change to bundle_id + "_" +
/// reource_id
/// resource labels to this set.
///
/// \param bundle_id: The placement group id.
/// \param bundle_index: The index of the bundle.
/// \param other: The other resource set to add.
/// \param bundle_id: The bundle_id of the bundle.
/// \return Void.
void AddBundleResources(const std::string &bundle_id, const ResourceSet &other);
void AddBundleResources(const PlacementGroupID &bundle_id, const int bundle_index,
const ResourceSet &other);
/// \brief Return back all the bundle resource. Changing the resource name and adding
/// any missing resource labels to this set. The resource id will remove bundle_id + "_"
/// part.
/// any missing resource labels to this set.
///
/// \param bundle_id: The bundle_id of the bundle.
/// \param bundle_id: The placement group id.
/// \param bundle_index: The index of the bundle.
/// \return Void.
void ReturnBundleResources(const std::string &bundle_id);
void ReturnBundleResources(const PlacementGroupID &bundle_id, const int bundle_index);
/// \brief Subtract a set of resources from the current set of resources and
/// check that the post-subtraction result nonnegative. Assumes other
@@ -556,13 +558,13 @@ class SchedulingResources {
/// Create if not exists.
/// \param resource_name: Name of the resource to be modified
/// \param resource_set: New resource_set of the resource.
void UpdateBundleResource(const std::string &bundle_id,
void UpdateBundleResource(const PlacementGroupID &group, const int bundle_index,
const ResourceSet &resource_set);
/// \brief delete total, available and load resources with the ResourceIds.
/// Create if not exists.
/// \param resource_name: Name of the resource to be deleted
void ReturnBundleResource(const std::string &bundle_id);
void ReturnBundleResource(const PlacementGroupID &group, const int bundle_index);
/// \brief Delete resource from total, available and load resources.
///
@@ -33,20 +33,20 @@ class SchedulingResourcesTest : public ::testing::Test {
};
TEST_F(SchedulingResourcesTest, AddBundleResources) {
UniqueID bundle_id = UniqueID::FromRandom();
PlacementGroupID group_id = PlacementGroupID::FromRandom();
std::vector<std::string> resource_labels = {"CPU"};
std::vector<double> resource_capacity = {1.0};
ResourceSet resource(resource_labels, resource_capacity);
resource_set->AddBundleResources(bundle_id.Binary(), resource);
resource_set->AddBundleResources(group_id, 1, resource);
resource_labels.pop_back();
resource_labels.push_back(bundle_id.Binary() + "_" + "CPU");
resource_labels.push_back("CPU_group_" + group_id.Hex() + "_1");
ResourceSet result_resource(resource_labels, resource_capacity);
ASSERT_EQ(1, resource_set->IsEqual(result_resource));
}
TEST_F(SchedulingResourcesTest, AddBundleResource) {
UniqueID bundle_id = UniqueID::FromRandom();
std::string name = bundle_id.Binary() + "_" + "CPU";
PlacementGroupID group_id = PlacementGroupID::FromRandom();
std::string name = "CPU_group_" + group_id.Hex() + "_1";
std::vector<int64_t> whole_ids = {1, 2, 3};
ResourceIds resource_ids(whole_ids);
resource_id_set->AddBundleResource(name, resource_ids);
@@ -55,16 +55,16 @@ TEST_F(SchedulingResourcesTest, AddBundleResource) {
}
TEST_F(SchedulingResourcesTest, ReturnBundleResources) {
UniqueID bundle_id = UniqueID::FromRandom();
PlacementGroupID group_id = PlacementGroupID::FromRandom();
std::vector<std::string> resource_labels = {"CPU"};
std::vector<double> resource_capacity = {1.0};
ResourceSet resource(resource_labels, resource_capacity);
resource_set->AddBundleResources(bundle_id.Binary(), resource);
resource_set->AddBundleResources(group_id, 1, resource);
resource_labels.pop_back();
resource_labels.push_back(bundle_id.Binary() + "_" + "CPU");
resource_labels.push_back("CPU_group_" + group_id.Hex() + "_1");
ResourceSet result_resource(resource_labels, resource_capacity);
ASSERT_EQ(1, resource_set->IsEqual(result_resource));
resource_set->ReturnBundleResources(bundle_id.Binary());
resource_set->ReturnBundleResources(group_id, 1);
ASSERT_EQ(1, resource_set->IsEqual(resource));
}
} // namespace ray
+2 -1
View File
@@ -15,6 +15,7 @@
#include "ray/core_worker/core_worker.h"
#include "boost/fiber/all.hpp"
#include "ray/common/bundle_spec.h"
#include "ray/common/ray_config.h"
#include "ray/common/task/task_util.h"
#include "ray/core_worker/context.h"
@@ -1194,7 +1195,7 @@ std::unordered_map<std::string, double> AddPlacementGroupConstraint(
if (placement_group_id != PlacementGroupID::Nil()) {
for (auto iter = resources.begin(); iter != resources.end(); iter++) {
auto new_name =
placement_group_id.Hex() + std::to_string(bundle_index) + "_" + iter->first;
FormatPlacementGroupResource(iter->first, placement_group_id, bundle_index);
new_resources[new_name] = iter->second;
}
return new_resources;
@@ -127,9 +127,11 @@ void GcsPlacementGroupScheduler::Schedule(
if (lease_success) {
rpc::ScheduleData data;
for (size_t i = 0; i < bundles.size(); i++) {
// TODO(ekl) this is a hack to get a string key for the proto
auto key = bundles[i]->PlacementGroupId().Hex() + "_" +
std::to_string(bundles[i]->Index());
data.mutable_schedule_plan()->insert(
{bundles[i]->BundleIdAsString(),
(*decision)[bundles[i]->BundleId()].Binary()});
{key, (*decision)[bundles[i]->BundleId()].Binary()});
}
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put(
placement_group->GetPlacementGroupID(), data, [](Status status) {}));
+7 -6
View File
@@ -1819,13 +1819,13 @@ void NodeManager::HandleCancelResourceReserve(
auto bundle_spec = BundleSpecification(request.bundle_spec());
RAY_LOG(DEBUG) << "bundle return resource request " << bundle_spec.BundleId().first
<< bundle_spec.BundleId().second;
auto bundle_id_str = bundle_spec.BundleIdAsString();
auto resource_set = bundle_spec.GetRequiredResources();
for (auto resource : resource_set.GetResourceMap()) {
std::string resource_name = bundle_id_str + "_" + resource.first;
std::string resource_name = FormatPlacementGroupResource(resource.first, bundle_spec);
local_available_resources_.CancelResourceReserve(resource_name);
}
cluster_resource_map_[self_node_id_].ReturnBundleResource(bundle_id_str);
cluster_resource_map_[self_node_id_].ReturnBundleResource(
bundle_spec.PlacementGroupId(), bundle_spec.Index());
send_reply_callback(Status::OK(), nullptr, nullptr);
}
@@ -1987,16 +1987,17 @@ ResourceIdSet NodeManager::ScheduleBundle(
// Invoke the scheduling policy.
auto reserve_resource_success =
scheduling_policy_.ScheduleBundle(resource_map, self_node_id_, bundle_spec);
auto bundle_id_str = bundle_spec.BundleIdAsString();
ResourceIdSet acquired_resources;
if (reserve_resource_success) {
acquired_resources =
local_available_resources_.Acquire(bundle_spec.GetRequiredResources());
for (auto resource : acquired_resources.AvailableResources()) {
std::string resource_name = bundle_id_str + "_" + resource.first;
std::string resource_name =
FormatPlacementGroupResource(resource.first, bundle_spec);
local_available_resources_.AddBundleResource(resource_name, resource.second);
}
resource_map[self_node_id_].UpdateBundleResource(bundle_id_str,
resource_map[self_node_id_].UpdateBundleResource(bundle_spec.PlacementGroupId(),
bundle_spec.Index(),
bundle_spec.GetRequiredResources());
}
return acquired_resources;
+1
View File
@@ -311,6 +311,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
ResourceIdSet ScheduleBundle(
std::unordered_map<ClientID, SchedulingResources> &resource_map,
const BundleSpecification &bundle_spec);
/// Handle a task whose return value(s) must be reconstructed.
///
/// \param task_id The relevant task ID.