From 686d4caefe4cfefb9dbe8cdf9034d9ed0d8ee5fe Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Sat, 27 Apr 2019 18:45:23 -0700 Subject: [PATCH] Updates to scheduling objects to support dynamic custom resources (#4465) --- python/ray/includes/common.pxd | 3 +- src/ray/raylet/scheduling_resources.cc | 252 +++++++++++++++++++++++-- src/ray/raylet/scheduling_resources.h | 107 ++++++++++- 3 files changed, 338 insertions(+), 24 deletions(-) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 2fd9aaa37..3b6463fc9 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -119,9 +119,8 @@ cdef extern from "ray/raylet/scheduling_resources.h" \ c_bool IsEqual(const ResourceSet &other) const c_bool IsSubset(const ResourceSet &other) const c_bool IsSuperset(const ResourceSet &other) const - c_bool AddResource(const c_string &resource_name, double capacity) + c_bool AddOrUpdateResource(const c_string &resource_name, double capacity) c_bool RemoveResource(const c_string &resource_name) - c_bool AddResourcesStrict(const ResourceSet &other) void AddResources(const ResourceSet &other) c_bool SubtractResourcesStrict(const ResourceSet &other) c_bool GetResource(const c_string &resource_name, double *value) const diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index f53bf7958..0d114f0c2 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -131,8 +131,20 @@ bool ResourceSet::IsEqual(const ResourceSet &rhs) const { return (this->IsSubset(rhs) && rhs.IsSubset(*this)); } -bool ResourceSet::RemoveResource(const std::string &resource_name) { - throw std::runtime_error("Method not implemented"); +void ResourceSet::AddOrUpdateResource(const std::string &resource_name, + const FractionalResourceQuantity &capacity) { + if (capacity > 0) { + resource_capacity_[resource_name] = capacity; + } +} + +bool ResourceSet::DeleteResource(const std::string &resource_name) { + if (resource_capacity_.count(resource_name) == 1) { + resource_capacity_.erase(resource_name); + return true; + } else { + return false; + } } void ResourceSet::SubtractResources(const ResourceSet &other) { @@ -172,6 +184,35 @@ void ResourceSet::SubtractResourcesStrict(const ResourceSet &other) { } } +// Add a set of resources to the current set of resources subject to upper limits on +// capacity from the total_resource set +void ResourceSet::AddResourcesCapacityConstrained(const ResourceSet &other, + const ResourceSet &total_resources) { + const std::unordered_map &total_resource_map = + total_resources.GetResourceAmountMap(); + for (const auto &resource_pair : other.GetResourceAmountMap()) { + const std::string &to_add_resource_label = resource_pair.first; + const FractionalResourceQuantity &to_add_resource_capacity = resource_pair.second; + if (total_resource_map.count(to_add_resource_label) != 0) { + // If resource exists in total map, add to the local capacity map. + // If the new capacity will be greater the total capacity, set the new capacity to + // total capacity (capping to the total) + const FractionalResourceQuantity &total_capacity = + total_resource_map.at(to_add_resource_label); + resource_capacity_[to_add_resource_label] = + std::min(resource_capacity_[to_add_resource_label] + to_add_resource_capacity, + total_capacity); + } else { + // Resource does not exist in the total map, it probably got deleted from the total. + // Don't panic, do nothing and simply continue. + RAY_LOG(DEBUG) << "[AddResourcesCapacityConstrained] Resource " + << to_add_resource_label + << " not found in the total resource map. It probably got deleted, " + "not adding back to resource_capacity_."; + } + } +} + // Perform an outer join. void ResourceSet::AddResources(const ResourceSet &other) { for (const auto &resource_pair : other.GetResourceAmountMap()) { @@ -186,7 +227,7 @@ FractionalResourceQuantity ResourceSet::GetResource( if (resource_capacity_.count(resource_name) == 0) { return 0; } - const FractionalResourceQuantity capacity = resource_capacity_.at(resource_name); + const FractionalResourceQuantity &capacity = resource_capacity_.at(resource_name); return capacity; } @@ -235,6 +276,44 @@ const std::unordered_map return resource_capacity_; }; +ResourceSet ResourceSet::FindUpdatedResources( + const ray::raylet::ResourceSet &new_resource_set) const { + // Find any new resources and return a ResourceSet with the resource and new capacities + ResourceSet updated_resource_set; + for (const auto &resource_pair : new_resource_set.GetResourceAmountMap()) { + const std::string &resource_label = resource_pair.first; + const FractionalResourceQuantity &new_resource_capacity = resource_pair.second; + if (resource_capacity_.count(resource_label) == 1) { + // Resource exists, check if updated + const FractionalResourceQuantity &old_resource_capacity = + resource_capacity_.at(resource_label); + if (old_resource_capacity != new_resource_capacity) { + updated_resource_set.AddOrUpdateResource(resource_label, new_resource_capacity); + } + } else { + // Resource does not exist in the old set, add to return set + updated_resource_set.AddOrUpdateResource(resource_label, new_resource_capacity); + } + } + return updated_resource_set; +} + +ResourceSet ResourceSet::FindDeletedResources( + const ray::raylet::ResourceSet &new_resource_set) const { + // Find any new resources and return a ResourceSet with the resource and new capacities + ResourceSet deleted_resource_set; + auto &new_resource_map = new_resource_set.GetResourceAmountMap(); + for (const auto &resource_pair : resource_capacity_) { + const std::string &resource_label = resource_pair.first; + const FractionalResourceQuantity &old_resource_capacity = resource_pair.second; + if (new_resource_map.count(resource_label) != 1) { + // Resource does not exist, add to return set + deleted_resource_set.AddOrUpdateResource(resource_label, old_resource_capacity); + } + } + return deleted_resource_set; +} + /// ResourceIds class implementation ResourceIds::ResourceIds() {} @@ -245,20 +324,28 @@ ResourceIds::ResourceIds(double resource_quantity) { for (int64_t i = 0; i < whole_quantity; ++i) { whole_ids_.push_back(i); } + total_capacity_ = TotalQuantity(); + decrement_backlog_ = 0; } -ResourceIds::ResourceIds(const std::vector &whole_ids) : whole_ids_(whole_ids) {} +ResourceIds::ResourceIds(const std::vector &whole_ids) + : whole_ids_(whole_ids), total_capacity_(whole_ids.size()), decrement_backlog_(0) {} ResourceIds::ResourceIds( const std::vector> &fractional_ids) - : fractional_ids_(fractional_ids) {} + : fractional_ids_(fractional_ids), + total_capacity_(TotalQuantity()), + decrement_backlog_(0) {} ResourceIds::ResourceIds( const std::vector &whole_ids, const std::vector> &fractional_ids) - : whole_ids_(whole_ids), fractional_ids_(fractional_ids) {} + : whole_ids_(whole_ids), + fractional_ids_(fractional_ids), + total_capacity_(TotalQuantity()), + decrement_backlog_(0) {} -bool ResourceIds::Contains(FractionalResourceQuantity resource_quantity) const { +bool ResourceIds::Contains(const FractionalResourceQuantity &resource_quantity) const { if (resource_quantity >= 1) { double whole_quantity = resource_quantity.ToDouble(); RAY_CHECK(IsWhole(whole_quantity)); @@ -277,7 +364,7 @@ bool ResourceIds::Contains(FractionalResourceQuantity resource_quantity) const { } } -ResourceIds ResourceIds::Acquire(FractionalResourceQuantity resource_quantity) { +ResourceIds ResourceIds::Acquire(const FractionalResourceQuantity &resource_quantity) { if (resource_quantity >= 1) { // Handle the whole case. double whole_quantity = resource_quantity.ToDouble(); @@ -318,7 +405,7 @@ ResourceIds ResourceIds::Acquire(FractionalResourceQuantity resource_quantity) { auto return_pair = std::make_pair(whole_id, resource_quantity); // We cannot make use of the implicit conversion because ints have no // operator-(const FractionalResourceQuantity&) function. - FractionalResourceQuantity remaining_amount = + const FractionalResourceQuantity remaining_amount = FractionalResourceQuantity(1) - resource_quantity; fractional_ids_.push_back(std::make_pair(whole_id, remaining_amount)); return ResourceIds({return_pair}); @@ -328,9 +415,18 @@ ResourceIds ResourceIds::Acquire(FractionalResourceQuantity resource_quantity) { void ResourceIds::Release(const ResourceIds &resource_ids) { auto const &whole_ids_to_return = resource_ids.WholeIds(); - // Return the whole IDs. - whole_ids_.insert(whole_ids_.end(), whole_ids_to_return.begin(), - whole_ids_to_return.end()); + int64_t return_resource_count = whole_ids_to_return.size(); + if (return_resource_count > decrement_backlog_) { + // We are returning more resources than in the decrement backlog, thus set the backlog + // to zero and insert (count - decrement_backlog resources). + whole_ids_.insert(whole_ids_.end(), whole_ids_to_return.begin() + decrement_backlog_, + whole_ids_to_return.end()); + decrement_backlog_ = 0; + } else { + // Do not insert back to whole_ids_. Instead just decrement backlog by the return + // count + decrement_backlog_ -= return_resource_count; + } // Return the fractional IDs. auto const &fractional_ids_to_return = resource_ids.FractionalIds(); @@ -350,7 +446,12 @@ void ResourceIds::Release(const ResourceIds &resource_ids) { << fractional_pair_it->second.ToDouble() << ". Should have been less than one."; // If this makes the ID whole, then return it to the list of whole IDs. if (fractional_pair_it->second == 1) { - whole_ids_.push_back(resource_id); + if (decrement_backlog_ > 0) { + // There's a decrement backlog, do not add to whole_ids_ + decrement_backlog_--; + } else { + whole_ids_.push_back(resource_id); + } fractional_ids_.erase(fractional_pair_it); } } @@ -398,6 +499,57 @@ std::string ResourceIds::ToString() const { return return_string; } +void ResourceIds::UpdateCapacity(int64_t new_capacity) { + // Assert the new capacity is positive for sanity + RAY_CHECK(new_capacity >= 0); + int64_t capacity_delta = new_capacity - total_capacity_.ToDouble(); + if (capacity_delta < 0) { + DecreaseCapacity(-1 * capacity_delta); + } else { + IncreaseCapacity(capacity_delta); + } +} + +void ResourceIds::IncreaseCapacity(int64_t increment_quantity) { + // Adjust with decrement_backlog_ + int64_t actual_increment_quantity = 0; + actual_increment_quantity = + std::max(0, increment_quantity - decrement_backlog_); + decrement_backlog_ = std::max(0, decrement_backlog_ - increment_quantity); + + if (actual_increment_quantity > 0) { + for (int i = 0; i < actual_increment_quantity; i++) { + whole_ids_.push_back(-1); // Dynamic resources are assigned resource id -1. + } + total_capacity_ += actual_increment_quantity; + } +} + +void ResourceIds::DecreaseCapacity(int64_t decrement_quantity) { + // Get total quantity, but casting to int to truncate any fractional resources. Updates + // are supported only on whole resources. + int64_t available_quantity = TotalQuantity().ToDouble(); + RAY_LOG(DEBUG) << "[DecreaseCapacity] Available quantity: " << available_quantity; + + if (available_quantity < decrement_quantity) { + RAY_LOG(DEBUG) << "[DecreaseCapacity] Available quantity < decrement quantity " + << decrement_quantity; + // We're trying to remove more resources than are available + // In this case, add the difference to the decrement backlog, and when resources are + // released the backlog will be cleared + decrement_backlog_ += (decrement_quantity - available_quantity); + // To decrease capacity, just acquire resources and forget about them. They are popped + // from whole_ids when acquired. + Acquire(available_quantity); + } else { + RAY_LOG(DEBUG) << "[DecreaseCapacity] Available quantity > decrement quantity " + << decrement_quantity; + // Simply acquire resources if sufficient are available + Acquire(decrement_quantity); + } + total_capacity_ -= decrement_quantity; +} + bool ResourceIds::IsWhole(double resource_quantity) const { int64_t whole_quantity = resource_quantity; return whole_quantity == resource_quantity; @@ -422,7 +574,7 @@ ResourceIdSet::ResourceIdSet( bool ResourceIdSet::Contains(const ResourceSet &resource_set) const { for (auto const &resource_pair : resource_set.GetResourceAmountMap()) { auto const &resource_name = resource_pair.first; - FractionalResourceQuantity resource_quantity = resource_pair.second; + const FractionalResourceQuantity &resource_quantity = resource_pair.second; auto it = available_resources_.find(resource_name); if (it == available_resources_.end()) { @@ -441,7 +593,7 @@ ResourceIdSet ResourceIdSet::Acquire(const ResourceSet &resource_set) { for (auto const &resource_pair : resource_set.GetResourceAmountMap()) { auto const &resource_name = resource_pair.first; - FractionalResourceQuantity resource_quantity = resource_pair.second; + const FractionalResourceQuantity &resource_quantity = resource_pair.second; auto it = available_resources_.find(resource_name); RAY_CHECK(it != available_resources_.end()); @@ -468,6 +620,25 @@ void ResourceIdSet::Release(const ResourceIdSet &resource_id_set) { } } +void ResourceIdSet::ReleaseConstrained(const ResourceIdSet &resource_id_set, + const ResourceSet &resources_total) { + for (auto const &resource_pair : resource_id_set.AvailableResources()) { + auto const &resource_name = resource_pair.first; + // Release only if the resource exists in resources_total + if (resources_total.GetResource(resource_name) != 0) { + auto const &resource_ids = resource_pair.second; + RAY_CHECK(!resource_ids.TotalQuantityIsZero()); + + auto it = available_resources_.find(resource_name); + if (it == available_resources_.end()) { + available_resources_[resource_name] = resource_ids; + } else { + it->second.Release(resource_ids); + } + } + } +} + void ResourceIdSet::Clear() { available_resources_.clear(); } ResourceIdSet ResourceIdSet::Plus(const ResourceIdSet &resource_id_set) const { @@ -476,6 +647,23 @@ ResourceIdSet ResourceIdSet::Plus(const ResourceIdSet &resource_id_set) const { return resource_id_set_to_return; } +void ResourceIdSet::AddOrUpdateResource(const std::string &resource_name, + int64_t capacity) { + auto it = available_resources_.find(resource_name); + if (it != available_resources_.end()) { + // If resource exists, update capacity + ResourceIds &resid = (it->second); + resid.UpdateCapacity(capacity); + } else { + // If resource does not exist, create + available_resources_[resource_name] = ResourceIds(capacity); + } +} + +void ResourceIdSet::DeleteResource(const std::string &resource_name) { + available_resources_.erase(resource_name); +} + const std::unordered_map &ResourceIdSet::AvailableResources() const { return available_resources_; @@ -580,7 +768,8 @@ const ResourceSet &SchedulingResources::GetLoadResources() const { // Return specified resources back to SchedulingResources. void SchedulingResources::Release(const ResourceSet &resources) { - resources_available_.AddResources(resources); + return resources_available_.AddResourcesCapacityConstrained(resources, + resources_total_); } // Take specified resources from SchedulingResources. @@ -588,6 +777,37 @@ void SchedulingResources::Acquire(const ResourceSet &resources) { resources_available_.SubtractResourcesStrict(resources); } +void SchedulingResources::UpdateResource(const std::string &resource_name, + int64_t capacity) { + const FractionalResourceQuantity new_capacity = FractionalResourceQuantity(capacity); + const FractionalResourceQuantity ¤t_capacity = + resources_total_.GetResource(resource_name); + if (current_capacity > 0) { + // If the resource exists, add to total and available resources + const FractionalResourceQuantity capacity_difference = + new_capacity - current_capacity; + const FractionalResourceQuantity ¤t_available_capacity = + resources_available_.GetResource(resource_name); + FractionalResourceQuantity new_available_capacity = + current_available_capacity + capacity_difference; + if (new_available_capacity < 0) { + new_available_capacity = 0; + } + resources_total_.AddOrUpdateResource(resource_name, new_capacity); + resources_available_.AddOrUpdateResource(resource_name, new_available_capacity); + } else { + // Resource does not exist, just add it to total and available. Do not add to load. + resources_total_.AddOrUpdateResource(resource_name, new_capacity); + resources_available_.AddOrUpdateResource(resource_name, new_capacity); + } +} + +void SchedulingResources::DeleteResource(const std::string &resource_name) { + resources_total_.DeleteResource(resource_name); + resources_available_.DeleteResource(resource_name); + resources_load_.DeleteResource(resource_name); +} + std::string SchedulingResources::DebugString() const { std::stringstream result; result << "\n- total: " << resources_total_.ToString(); diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index aed05a6eb..9f64ddae6 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -110,11 +110,30 @@ class ResourceSet { /// False otherwise. bool IsSuperset(const ResourceSet &other) const; - /// \brief Remove the specified resource from the resource set. + /// \brief Add or update a new resource to the resource set. /// - /// \param resource_name: name/label of the resource to remove. - /// \return True, if the resource was successfully removed. False otherwise. - bool RemoveResource(const std::string &resource_name); + /// \param resource_name: name/label of the resource to add. + /// \param capacity: numeric capacity value for the resource to add. + /// \return True, if the resource was successfully added. False otherwise. + void AddOrUpdateResource(const std::string &resource_name, + const FractionalResourceQuantity &capacity); + + /// \brief Delete a resource from the resource set. + /// + /// \param resource_name: name/label of the resource to delete. + /// \return True if the resource was found while deleting, false if the resource did not + /// exist in the set. + bool DeleteResource(const std::string &resource_name); + + /// \brief Add a set of resources to the current set of resources subject to upper + /// limits on capacity from the total_resource set. + /// + /// \param other: The other resource set to add. + /// \param total_resources: Total resource set which sets upper limits on capacity for + /// each label. \return True if the resource set was added successfully. False + /// otherwise. + void AddResourcesCapacityConstrained(const ResourceSet &other, + const ResourceSet &total_resources); /// \brief Aggregate resources from the other set into this set, adding any missing /// resource labels to this set. @@ -139,6 +158,18 @@ class ResourceSet { /// \return Void. void SubtractResourcesStrict(const ResourceSet &other); + /// \brief Finds new resources created or updated in a new set. + /// + /// \param new_resource_set: The new resource set to compare with. + /// \return The ResourceSet of updated values + ResourceSet FindUpdatedResources(const ResourceSet &new_resource_set) const; + + /// \brief Finds resources deleted in a set. + /// + /// \param new_resource_set: The new resource set to compare with. + /// \return The ResourceSet of deleted resources with old capacities + ResourceSet FindDeletedResources(const ResourceSet &new_resource_set) const; + /// Return the capacity value associated with the specified resource. /// /// \param resource_name: Resource name for which capacity is requested. @@ -225,14 +256,14 @@ class ResourceIds { /// /// \param resource_quantity Either a whole number or a fraction less than 1. /// \return True if there we have enough of the resource. - bool Contains(FractionalResourceQuantity resource_quantity) const; + bool Contains(const FractionalResourceQuantity &resource_quantity) const; /// \brief Acquire the requested amount of the resource. /// /// \param resource_quantity The amount to acquire. Either a whole number or a /// fraction less than 1. /// \return A ResourceIds representing the specific acquired IDs. - ResourceIds Acquire(FractionalResourceQuantity resource_quantity); + ResourceIds Acquire(const FractionalResourceQuantity &resource_quantity); /// \brief Return some resource IDs. /// @@ -272,6 +303,13 @@ class ResourceIds { /// \return A human-readable string representing the object. std::string ToString() const; + /// \brief Increase resource capacity by the given amount. This may throw an error if + /// decrement is more than currently available resources. + /// + /// \param new_capacity int of new capacity + /// \return Void. + void UpdateCapacity(int64_t new_capacity); + private: /// Check that a double is in fact a whole number. /// @@ -279,11 +317,30 @@ class ResourceIds { /// \return True if the double is an integer and false otherwise. bool IsWhole(double resource_quantity) const; + /// \brief Increase resource capacity by the given amount. + /// + /// \param increment_quantity The quantity of resources to add. + /// \return Void. + void IncreaseCapacity(int64_t increment_quantity); + + /// \brief Decrease resource capacity by the given amount. Adds to the decrement backlog + /// if more than available resources are decremented. + /// + /// \param decrement_quantity The quantity of resources to remove. + /// \return Void. + void DecreaseCapacity(int64_t decrement_quantity); + /// A vector of distinct whole resource IDs. std::vector whole_ids_; /// A vector of pairs of resource ID and a fraction of that ID (the fraction /// is at least zero and strictly less than 1). std::vector> fractional_ids_; + /// Quantity to track the total capacity of the resource, since the whole_ids_ vector + /// keeps changing + FractionalResourceQuantity total_capacity_; + /// Quantity to track any pending decrements in capacity that weren't executed because + /// of insufficient available resources. This backlog in cleared in the release method. + int64_t decrement_backlog_; }; /// \class ResourceIdSet @@ -324,6 +381,16 @@ class ResourceIdSet { /// \return Void. void Release(const ResourceIdSet &resource_id_set); + /// \brief Return a set of resource IDs subject to their existence in the + /// resources_total set. + /// + /// \param resource_id_set The resource IDs to return. + /// \param resources_total Constraint set to restrict the release to. If a resource + /// exists in resource_id_set but not in resources_total, it is not added to this + /// ResourceIdSet. \return Void. + void ReleaseConstrained(const ResourceIdSet &resource_id_set, + const ResourceSet &resources_total); + /// \brief Clear out all of the resource IDs. /// /// \return Void. @@ -335,6 +402,20 @@ class ResourceIdSet { /// \return The combination of the two sets of resource IDs. ResourceIdSet Plus(const ResourceIdSet &resource_id_set) const; + /// \brief Creates or updates a resource in the ResourceIdSet if it already exists. + /// Raises an exception if the new capacity (when less than old capacity) cannot be set + /// because of busy resources. + /// + /// \param resource_name the name of the resource to create/update + /// \param capacity capacity of the resource being added + void AddOrUpdateResource(const std::string &resource_name, int64_t capacity); + + /// \brief Deletes a resource in the ResourceIdSet. This does not raise an exception, + /// just deletes the resource. Tasks with acquired resources keep running. + /// + /// \param resource_name the name of the resource to delete + void DeleteResource(const std::string &resource_name); + /// \brief Get the underlying mapping from resource name to resource IDs. /// /// \return The resource name to resource IDs mapping. @@ -427,6 +508,20 @@ class SchedulingResources { /// \return string. std::string DebugString() const; + /// \brief Update total, available and load resources with the specified capacity. + /// Create if not exists. + /// + /// \param resource_name: Name of the resource to be modified + /// \param capacity: New capacity of the resource. + /// \return Void. + void UpdateResource(const std::string &resource_name, int64_t capacity); + + /// \brief Delete resource from total, available and load resources. + /// + /// \param resource_name: Name of the resource to be deleted. + /// \return Void. + void DeleteResource(const std::string &resource_name); + private: /// Static resource configuration (e.g., static_resources). ResourceSet resources_total_;