Updates to scheduling objects to support dynamic custom resources (#4465)

This commit is contained in:
Romil Bhardwaj
2019-04-27 18:45:23 -07:00
committed by Robert Nishihara
parent 9ce3039390
commit 686d4caefe
3 changed files with 338 additions and 24 deletions
+1 -2
View File
@@ -119,9 +119,8 @@ cdef extern from "ray/raylet/scheduling_resources.h" \
c_bool IsEqual(const ResourceSet &other) const
c_bool IsSubset(const ResourceSet &other) const
c_bool IsSuperset(const ResourceSet &other) const
c_bool AddResource(const c_string &resource_name, double capacity)
c_bool AddOrUpdateResource(const c_string &resource_name, double capacity)
c_bool RemoveResource(const c_string &resource_name)
c_bool AddResourcesStrict(const ResourceSet &other)
void AddResources(const ResourceSet &other)
c_bool SubtractResourcesStrict(const ResourceSet &other)
c_bool GetResource(const c_string &resource_name, double *value) const
+236 -16
View File
@@ -131,8 +131,20 @@ bool ResourceSet::IsEqual(const ResourceSet &rhs) const {
return (this->IsSubset(rhs) && rhs.IsSubset(*this));
}
bool ResourceSet::RemoveResource(const std::string &resource_name) {
throw std::runtime_error("Method not implemented");
void ResourceSet::AddOrUpdateResource(const std::string &resource_name,
const FractionalResourceQuantity &capacity) {
if (capacity > 0) {
resource_capacity_[resource_name] = capacity;
}
}
bool ResourceSet::DeleteResource(const std::string &resource_name) {
if (resource_capacity_.count(resource_name) == 1) {
resource_capacity_.erase(resource_name);
return true;
} else {
return false;
}
}
void ResourceSet::SubtractResources(const ResourceSet &other) {
@@ -172,6 +184,35 @@ void ResourceSet::SubtractResourcesStrict(const ResourceSet &other) {
}
}
// Add a set of resources to the current set of resources subject to upper limits on
// capacity from the total_resource set
void ResourceSet::AddResourcesCapacityConstrained(const ResourceSet &other,
const ResourceSet &total_resources) {
const std::unordered_map<std::string, FractionalResourceQuantity> &total_resource_map =
total_resources.GetResourceAmountMap();
for (const auto &resource_pair : other.GetResourceAmountMap()) {
const std::string &to_add_resource_label = resource_pair.first;
const FractionalResourceQuantity &to_add_resource_capacity = resource_pair.second;
if (total_resource_map.count(to_add_resource_label) != 0) {
// If resource exists in total map, add to the local capacity map.
// If the new capacity will be greater the total capacity, set the new capacity to
// total capacity (capping to the total)
const FractionalResourceQuantity &total_capacity =
total_resource_map.at(to_add_resource_label);
resource_capacity_[to_add_resource_label] =
std::min(resource_capacity_[to_add_resource_label] + to_add_resource_capacity,
total_capacity);
} else {
// Resource does not exist in the total map, it probably got deleted from the total.
// Don't panic, do nothing and simply continue.
RAY_LOG(DEBUG) << "[AddResourcesCapacityConstrained] Resource "
<< to_add_resource_label
<< " not found in the total resource map. It probably got deleted, "
"not adding back to resource_capacity_.";
}
}
}
// Perform an outer join.
void ResourceSet::AddResources(const ResourceSet &other) {
for (const auto &resource_pair : other.GetResourceAmountMap()) {
@@ -186,7 +227,7 @@ FractionalResourceQuantity ResourceSet::GetResource(
if (resource_capacity_.count(resource_name) == 0) {
return 0;
}
const FractionalResourceQuantity capacity = resource_capacity_.at(resource_name);
const FractionalResourceQuantity &capacity = resource_capacity_.at(resource_name);
return capacity;
}
@@ -235,6 +276,44 @@ const std::unordered_map<std::string, FractionalResourceQuantity>
return resource_capacity_;
};
ResourceSet ResourceSet::FindUpdatedResources(
const ray::raylet::ResourceSet &new_resource_set) const {
// Find any new resources and return a ResourceSet with the resource and new capacities
ResourceSet updated_resource_set;
for (const auto &resource_pair : new_resource_set.GetResourceAmountMap()) {
const std::string &resource_label = resource_pair.first;
const FractionalResourceQuantity &new_resource_capacity = resource_pair.second;
if (resource_capacity_.count(resource_label) == 1) {
// Resource exists, check if updated
const FractionalResourceQuantity &old_resource_capacity =
resource_capacity_.at(resource_label);
if (old_resource_capacity != new_resource_capacity) {
updated_resource_set.AddOrUpdateResource(resource_label, new_resource_capacity);
}
} else {
// Resource does not exist in the old set, add to return set
updated_resource_set.AddOrUpdateResource(resource_label, new_resource_capacity);
}
}
return updated_resource_set;
}
ResourceSet ResourceSet::FindDeletedResources(
const ray::raylet::ResourceSet &new_resource_set) const {
// Find any new resources and return a ResourceSet with the resource and new capacities
ResourceSet deleted_resource_set;
auto &new_resource_map = new_resource_set.GetResourceAmountMap();
for (const auto &resource_pair : resource_capacity_) {
const std::string &resource_label = resource_pair.first;
const FractionalResourceQuantity &old_resource_capacity = resource_pair.second;
if (new_resource_map.count(resource_label) != 1) {
// Resource does not exist, add to return set
deleted_resource_set.AddOrUpdateResource(resource_label, old_resource_capacity);
}
}
return deleted_resource_set;
}
/// ResourceIds class implementation
ResourceIds::ResourceIds() {}
@@ -245,20 +324,28 @@ ResourceIds::ResourceIds(double resource_quantity) {
for (int64_t i = 0; i < whole_quantity; ++i) {
whole_ids_.push_back(i);
}
total_capacity_ = TotalQuantity();
decrement_backlog_ = 0;
}
ResourceIds::ResourceIds(const std::vector<int64_t> &whole_ids) : whole_ids_(whole_ids) {}
ResourceIds::ResourceIds(const std::vector<int64_t> &whole_ids)
: whole_ids_(whole_ids), total_capacity_(whole_ids.size()), decrement_backlog_(0) {}
ResourceIds::ResourceIds(
const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &fractional_ids)
: fractional_ids_(fractional_ids) {}
: fractional_ids_(fractional_ids),
total_capacity_(TotalQuantity()),
decrement_backlog_(0) {}
ResourceIds::ResourceIds(
const std::vector<int64_t> &whole_ids,
const std::vector<std::pair<int64_t, FractionalResourceQuantity>> &fractional_ids)
: whole_ids_(whole_ids), fractional_ids_(fractional_ids) {}
: whole_ids_(whole_ids),
fractional_ids_(fractional_ids),
total_capacity_(TotalQuantity()),
decrement_backlog_(0) {}
bool ResourceIds::Contains(FractionalResourceQuantity resource_quantity) const {
bool ResourceIds::Contains(const FractionalResourceQuantity &resource_quantity) const {
if (resource_quantity >= 1) {
double whole_quantity = resource_quantity.ToDouble();
RAY_CHECK(IsWhole(whole_quantity));
@@ -277,7 +364,7 @@ bool ResourceIds::Contains(FractionalResourceQuantity resource_quantity) const {
}
}
ResourceIds ResourceIds::Acquire(FractionalResourceQuantity resource_quantity) {
ResourceIds ResourceIds::Acquire(const FractionalResourceQuantity &resource_quantity) {
if (resource_quantity >= 1) {
// Handle the whole case.
double whole_quantity = resource_quantity.ToDouble();
@@ -318,7 +405,7 @@ ResourceIds ResourceIds::Acquire(FractionalResourceQuantity resource_quantity) {
auto return_pair = std::make_pair(whole_id, resource_quantity);
// We cannot make use of the implicit conversion because ints have no
// operator-(const FractionalResourceQuantity&) function.
FractionalResourceQuantity remaining_amount =
const FractionalResourceQuantity remaining_amount =
FractionalResourceQuantity(1) - resource_quantity;
fractional_ids_.push_back(std::make_pair(whole_id, remaining_amount));
return ResourceIds({return_pair});
@@ -328,9 +415,18 @@ ResourceIds ResourceIds::Acquire(FractionalResourceQuantity resource_quantity) {
void ResourceIds::Release(const ResourceIds &resource_ids) {
auto const &whole_ids_to_return = resource_ids.WholeIds();
// Return the whole IDs.
whole_ids_.insert(whole_ids_.end(), whole_ids_to_return.begin(),
whole_ids_to_return.end());
int64_t return_resource_count = whole_ids_to_return.size();
if (return_resource_count > decrement_backlog_) {
// We are returning more resources than in the decrement backlog, thus set the backlog
// to zero and insert (count - decrement_backlog resources).
whole_ids_.insert(whole_ids_.end(), whole_ids_to_return.begin() + decrement_backlog_,
whole_ids_to_return.end());
decrement_backlog_ = 0;
} else {
// Do not insert back to whole_ids_. Instead just decrement backlog by the return
// count
decrement_backlog_ -= return_resource_count;
}
// Return the fractional IDs.
auto const &fractional_ids_to_return = resource_ids.FractionalIds();
@@ -350,7 +446,12 @@ void ResourceIds::Release(const ResourceIds &resource_ids) {
<< fractional_pair_it->second.ToDouble() << ". Should have been less than one.";
// If this makes the ID whole, then return it to the list of whole IDs.
if (fractional_pair_it->second == 1) {
whole_ids_.push_back(resource_id);
if (decrement_backlog_ > 0) {
// There's a decrement backlog, do not add to whole_ids_
decrement_backlog_--;
} else {
whole_ids_.push_back(resource_id);
}
fractional_ids_.erase(fractional_pair_it);
}
}
@@ -398,6 +499,57 @@ std::string ResourceIds::ToString() const {
return return_string;
}
void ResourceIds::UpdateCapacity(int64_t new_capacity) {
// Assert the new capacity is positive for sanity
RAY_CHECK(new_capacity >= 0);
int64_t capacity_delta = new_capacity - total_capacity_.ToDouble();
if (capacity_delta < 0) {
DecreaseCapacity(-1 * capacity_delta);
} else {
IncreaseCapacity(capacity_delta);
}
}
void ResourceIds::IncreaseCapacity(int64_t increment_quantity) {
// Adjust with decrement_backlog_
int64_t actual_increment_quantity = 0;
actual_increment_quantity =
std::max<int64_t>(0, increment_quantity - decrement_backlog_);
decrement_backlog_ = std::max<int64_t>(0, decrement_backlog_ - increment_quantity);
if (actual_increment_quantity > 0) {
for (int i = 0; i < actual_increment_quantity; i++) {
whole_ids_.push_back(-1); // Dynamic resources are assigned resource id -1.
}
total_capacity_ += actual_increment_quantity;
}
}
void ResourceIds::DecreaseCapacity(int64_t decrement_quantity) {
// Get total quantity, but casting to int to truncate any fractional resources. Updates
// are supported only on whole resources.
int64_t available_quantity = TotalQuantity().ToDouble();
RAY_LOG(DEBUG) << "[DecreaseCapacity] Available quantity: " << available_quantity;
if (available_quantity < decrement_quantity) {
RAY_LOG(DEBUG) << "[DecreaseCapacity] Available quantity < decrement quantity "
<< decrement_quantity;
// We're trying to remove more resources than are available
// In this case, add the difference to the decrement backlog, and when resources are
// released the backlog will be cleared
decrement_backlog_ += (decrement_quantity - available_quantity);
// To decrease capacity, just acquire resources and forget about them. They are popped
// from whole_ids when acquired.
Acquire(available_quantity);
} else {
RAY_LOG(DEBUG) << "[DecreaseCapacity] Available quantity > decrement quantity "
<< decrement_quantity;
// Simply acquire resources if sufficient are available
Acquire(decrement_quantity);
}
total_capacity_ -= decrement_quantity;
}
bool ResourceIds::IsWhole(double resource_quantity) const {
int64_t whole_quantity = resource_quantity;
return whole_quantity == resource_quantity;
@@ -422,7 +574,7 @@ ResourceIdSet::ResourceIdSet(
bool ResourceIdSet::Contains(const ResourceSet &resource_set) const {
for (auto const &resource_pair : resource_set.GetResourceAmountMap()) {
auto const &resource_name = resource_pair.first;
FractionalResourceQuantity resource_quantity = resource_pair.second;
const FractionalResourceQuantity &resource_quantity = resource_pair.second;
auto it = available_resources_.find(resource_name);
if (it == available_resources_.end()) {
@@ -441,7 +593,7 @@ ResourceIdSet ResourceIdSet::Acquire(const ResourceSet &resource_set) {
for (auto const &resource_pair : resource_set.GetResourceAmountMap()) {
auto const &resource_name = resource_pair.first;
FractionalResourceQuantity resource_quantity = resource_pair.second;
const FractionalResourceQuantity &resource_quantity = resource_pair.second;
auto it = available_resources_.find(resource_name);
RAY_CHECK(it != available_resources_.end());
@@ -468,6 +620,25 @@ void ResourceIdSet::Release(const ResourceIdSet &resource_id_set) {
}
}
void ResourceIdSet::ReleaseConstrained(const ResourceIdSet &resource_id_set,
const ResourceSet &resources_total) {
for (auto const &resource_pair : resource_id_set.AvailableResources()) {
auto const &resource_name = resource_pair.first;
// Release only if the resource exists in resources_total
if (resources_total.GetResource(resource_name) != 0) {
auto const &resource_ids = resource_pair.second;
RAY_CHECK(!resource_ids.TotalQuantityIsZero());
auto it = available_resources_.find(resource_name);
if (it == available_resources_.end()) {
available_resources_[resource_name] = resource_ids;
} else {
it->second.Release(resource_ids);
}
}
}
}
void ResourceIdSet::Clear() { available_resources_.clear(); }
ResourceIdSet ResourceIdSet::Plus(const ResourceIdSet &resource_id_set) const {
@@ -476,6 +647,23 @@ ResourceIdSet ResourceIdSet::Plus(const ResourceIdSet &resource_id_set) const {
return resource_id_set_to_return;
}
void ResourceIdSet::AddOrUpdateResource(const std::string &resource_name,
int64_t capacity) {
auto it = available_resources_.find(resource_name);
if (it != available_resources_.end()) {
// If resource exists, update capacity
ResourceIds &resid = (it->second);
resid.UpdateCapacity(capacity);
} else {
// If resource does not exist, create
available_resources_[resource_name] = ResourceIds(capacity);
}
}
void ResourceIdSet::DeleteResource(const std::string &resource_name) {
available_resources_.erase(resource_name);
}
const std::unordered_map<std::string, ResourceIds> &ResourceIdSet::AvailableResources()
const {
return available_resources_;
@@ -580,7 +768,8 @@ const ResourceSet &SchedulingResources::GetLoadResources() const {
// Return specified resources back to SchedulingResources.
void SchedulingResources::Release(const ResourceSet &resources) {
resources_available_.AddResources(resources);
return resources_available_.AddResourcesCapacityConstrained(resources,
resources_total_);
}
// Take specified resources from SchedulingResources.
@@ -588,6 +777,37 @@ void SchedulingResources::Acquire(const ResourceSet &resources) {
resources_available_.SubtractResourcesStrict(resources);
}
void SchedulingResources::UpdateResource(const std::string &resource_name,
int64_t capacity) {
const FractionalResourceQuantity new_capacity = FractionalResourceQuantity(capacity);
const FractionalResourceQuantity &current_capacity =
resources_total_.GetResource(resource_name);
if (current_capacity > 0) {
// If the resource exists, add to total and available resources
const FractionalResourceQuantity capacity_difference =
new_capacity - current_capacity;
const FractionalResourceQuantity &current_available_capacity =
resources_available_.GetResource(resource_name);
FractionalResourceQuantity new_available_capacity =
current_available_capacity + capacity_difference;
if (new_available_capacity < 0) {
new_available_capacity = 0;
}
resources_total_.AddOrUpdateResource(resource_name, new_capacity);
resources_available_.AddOrUpdateResource(resource_name, new_available_capacity);
} else {
// Resource does not exist, just add it to total and available. Do not add to load.
resources_total_.AddOrUpdateResource(resource_name, new_capacity);
resources_available_.AddOrUpdateResource(resource_name, new_capacity);
}
}
void SchedulingResources::DeleteResource(const std::string &resource_name) {
resources_total_.DeleteResource(resource_name);
resources_available_.DeleteResource(resource_name);
resources_load_.DeleteResource(resource_name);
}
std::string SchedulingResources::DebugString() const {
std::stringstream result;
result << "\n- total: " << resources_total_.ToString();
+101 -6
View File
@@ -110,11 +110,30 @@ class ResourceSet {
/// False otherwise.
bool IsSuperset(const ResourceSet &other) const;
/// \brief Remove the specified resource from the resource set.
/// \brief Add or update a new resource to the resource set.
///
/// \param resource_name: name/label of the resource to remove.
/// \return True, if the resource was successfully removed. False otherwise.
bool RemoveResource(const std::string &resource_name);
/// \param resource_name: name/label of the resource to add.
/// \param capacity: numeric capacity value for the resource to add.
/// \return True, if the resource was successfully added. False otherwise.
void AddOrUpdateResource(const std::string &resource_name,
const FractionalResourceQuantity &capacity);
/// \brief Delete a resource from the resource set.
///
/// \param resource_name: name/label of the resource to delete.
/// \return True if the resource was found while deleting, false if the resource did not
/// exist in the set.
bool DeleteResource(const std::string &resource_name);
/// \brief Add a set of resources to the current set of resources subject to upper
/// limits on capacity from the total_resource set.
///
/// \param other: The other resource set to add.
/// \param total_resources: Total resource set which sets upper limits on capacity for
/// each label. \return True if the resource set was added successfully. False
/// otherwise.
void AddResourcesCapacityConstrained(const ResourceSet &other,
const ResourceSet &total_resources);
/// \brief Aggregate resources from the other set into this set, adding any missing
/// resource labels to this set.
@@ -139,6 +158,18 @@ class ResourceSet {
/// \return Void.
void SubtractResourcesStrict(const ResourceSet &other);
/// \brief Finds new resources created or updated in a new set.
///
/// \param new_resource_set: The new resource set to compare with.
/// \return The ResourceSet of updated values
ResourceSet FindUpdatedResources(const ResourceSet &new_resource_set) const;
/// \brief Finds resources deleted in a set.
///
/// \param new_resource_set: The new resource set to compare with.
/// \return The ResourceSet of deleted resources with old capacities
ResourceSet FindDeletedResources(const ResourceSet &new_resource_set) const;
/// Return the capacity value associated with the specified resource.
///
/// \param resource_name: Resource name for which capacity is requested.
@@ -225,14 +256,14 @@ class ResourceIds {
///
/// \param resource_quantity Either a whole number or a fraction less than 1.
/// \return True if there we have enough of the resource.
bool Contains(FractionalResourceQuantity resource_quantity) const;
bool Contains(const FractionalResourceQuantity &resource_quantity) const;
/// \brief Acquire the requested amount of the resource.
///
/// \param resource_quantity The amount to acquire. Either a whole number or a
/// fraction less than 1.
/// \return A ResourceIds representing the specific acquired IDs.
ResourceIds Acquire(FractionalResourceQuantity resource_quantity);
ResourceIds Acquire(const FractionalResourceQuantity &resource_quantity);
/// \brief Return some resource IDs.
///
@@ -272,6 +303,13 @@ class ResourceIds {
/// \return A human-readable string representing the object.
std::string ToString() const;
/// \brief Increase resource capacity by the given amount. This may throw an error if
/// decrement is more than currently available resources.
///
/// \param new_capacity int of new capacity
/// \return Void.
void UpdateCapacity(int64_t new_capacity);
private:
/// Check that a double is in fact a whole number.
///
@@ -279,11 +317,30 @@ class ResourceIds {
/// \return True if the double is an integer and false otherwise.
bool IsWhole(double resource_quantity) const;
/// \brief Increase resource capacity by the given amount.
///
/// \param increment_quantity The quantity of resources to add.
/// \return Void.
void IncreaseCapacity(int64_t increment_quantity);
/// \brief Decrease resource capacity by the given amount. Adds to the decrement backlog
/// if more than available resources are decremented.
///
/// \param decrement_quantity The quantity of resources to remove.
/// \return Void.
void DecreaseCapacity(int64_t decrement_quantity);
/// A vector of distinct whole resource IDs.
std::vector<int64_t> whole_ids_;
/// A vector of pairs of resource ID and a fraction of that ID (the fraction
/// is at least zero and strictly less than 1).
std::vector<std::pair<int64_t, FractionalResourceQuantity>> fractional_ids_;
/// Quantity to track the total capacity of the resource, since the whole_ids_ vector
/// keeps changing
FractionalResourceQuantity total_capacity_;
/// Quantity to track any pending decrements in capacity that weren't executed because
/// of insufficient available resources. This backlog in cleared in the release method.
int64_t decrement_backlog_;
};
/// \class ResourceIdSet
@@ -324,6 +381,16 @@ class ResourceIdSet {
/// \return Void.
void Release(const ResourceIdSet &resource_id_set);
/// \brief Return a set of resource IDs subject to their existence in the
/// resources_total set.
///
/// \param resource_id_set The resource IDs to return.
/// \param resources_total Constraint set to restrict the release to. If a resource
/// exists in resource_id_set but not in resources_total, it is not added to this
/// ResourceIdSet. \return Void.
void ReleaseConstrained(const ResourceIdSet &resource_id_set,
const ResourceSet &resources_total);
/// \brief Clear out all of the resource IDs.
///
/// \return Void.
@@ -335,6 +402,20 @@ class ResourceIdSet {
/// \return The combination of the two sets of resource IDs.
ResourceIdSet Plus(const ResourceIdSet &resource_id_set) const;
/// \brief Creates or updates a resource in the ResourceIdSet if it already exists.
/// Raises an exception if the new capacity (when less than old capacity) cannot be set
/// because of busy resources.
///
/// \param resource_name the name of the resource to create/update
/// \param capacity capacity of the resource being added
void AddOrUpdateResource(const std::string &resource_name, int64_t capacity);
/// \brief Deletes a resource in the ResourceIdSet. This does not raise an exception,
/// just deletes the resource. Tasks with acquired resources keep running.
///
/// \param resource_name the name of the resource to delete
void DeleteResource(const std::string &resource_name);
/// \brief Get the underlying mapping from resource name to resource IDs.
///
/// \return The resource name to resource IDs mapping.
@@ -427,6 +508,20 @@ class SchedulingResources {
/// \return string.
std::string DebugString() const;
/// \brief Update total, available and load resources with the specified capacity.
/// Create if not exists.
///
/// \param resource_name: Name of the resource to be modified
/// \param capacity: New capacity of the resource.
/// \return Void.
void UpdateResource(const std::string &resource_name, int64_t capacity);
/// \brief Delete resource from total, available and load resources.
///
/// \param resource_name: Name of the resource to be deleted.
/// \return Void.
void DeleteResource(const std::string &resource_name);
private:
/// Static resource configuration (e.g., static_resources).
ResourceSet resources_total_;