From 295782d4116e3aa127c13bb1276f0cdba510ed86 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Wed, 23 Sep 2020 15:46:31 -0700 Subject: [PATCH] [New Scheduler] Refactor cluster resource scheduler (#10938) --- python/ray/tests/test_basic.py | 2 +- .../scheduling/cluster_resource_data.cc | 384 +++++++++++++++++ .../raylet/scheduling/cluster_resource_data.h | 186 ++++++++ .../scheduling/cluster_resource_scheduler.cc | 407 ------------------ .../scheduling/cluster_resource_scheduler.h | 149 +------ src/ray/raylet/scheduling/fixed_point.cc | 14 + src/ray/raylet/scheduling/fixed_point.h | 15 +- 7 files changed, 594 insertions(+), 563 deletions(-) create mode 100644 src/ray/raylet/scheduling/cluster_resource_data.cc create mode 100644 src/ray/raylet/scheduling/cluster_resource_data.h diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 375f2938a..d59be1b33 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -361,7 +361,7 @@ def test_ray_options(shutdown_only): to_check = ["CPU", "GPU", "memory", "custom1"] for key in to_check: - assert without_options[key] != with_options[key] + assert without_options[key] != with_options[key], key assert without_options != with_options diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc new file mode 100644 index 000000000..d082f0c2e --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -0,0 +1,384 @@ +#include "ray/raylet/scheduling/cluster_resource_data.h" + +std::string VectorToString(const std::vector &vector) { + std::stringstream buffer; + + buffer << "["; + for (size_t i = 0; i < vector.size(); i++) { + buffer << vector[i]; + if (i < vector.size() - 1) { + buffer << ", "; + } + } + buffer << "]"; + return buffer.str(); +} + +std::string UnorderedMapToString(const std::unordered_map &map) { + std::stringstream buffer; + + buffer << "["; + for (auto it = map.begin(); it != map.end(); ++it) { + buffer << "(" << it->first << ":" << it->second << ")"; + } + buffer << "]"; + return buffer.str(); +} + +/// Convert a vector of doubles to a vector of resource units. +std::vector VectorDoubleToVectorFixedPoint( + const std::vector &vector) { + std::vector vector_fp(vector.size()); + for (size_t i = 0; i < vector.size(); i++) { + vector_fp[i] = vector[i]; + } + return vector_fp; +} + +/// Convert a vector of resource units to a vector of doubles. +std::vector VectorFixedPointToVectorDouble( + const std::vector &vector_fp) { + std::vector vector(vector_fp.size()); + for (size_t i = 0; i < vector_fp.size(); i++) { + vector[i] = FixedPoint(vector_fp[i]).Double(); + } + return vector; +} + +/// Convert a map of resources to a TaskRequest data structure. +TaskRequest ResourceMapToTaskRequest( + StringIdMap &string_to_int_map, + const std::unordered_map &resource_map) { + size_t i = 0; + + TaskRequest task_request; + + task_request.predefined_resources.resize(PredefinedResources_MAX); + task_request.custom_resources.resize(resource_map.size()); + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + task_request.predefined_resources[0].demand = 0; + task_request.predefined_resources[0].soft = false; + } + + for (auto const &resource : resource_map) { + if (resource.first == ray::kCPU_ResourceLabel) { + task_request.predefined_resources[CPU].demand = resource.second; + } else if (resource.first == ray::kGPU_ResourceLabel) { + task_request.predefined_resources[GPU].demand = resource.second; + } else if (resource.first == ray::kTPU_ResourceLabel) { + task_request.predefined_resources[TPU].demand = resource.second; + } else if (resource.first == ray::kMemory_ResourceLabel) { + task_request.predefined_resources[MEM].demand = resource.second; + } else { + task_request.custom_resources[i].id = string_to_int_map.Insert(resource.first); + task_request.custom_resources[i].demand = resource.second; + task_request.custom_resources[i].soft = false; + i++; + } + } + task_request.custom_resources.resize(i); + + return task_request; +} + +TaskRequest TaskResourceInstances::ToTaskRequest() const { + TaskRequest task_req; + task_req.predefined_resources.resize(PredefinedResources_MAX); + + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + task_req.predefined_resources[i].demand = 0; + for (auto predefined_resource_instance : this->predefined_resources[i]) { + task_req.predefined_resources[i].demand += predefined_resource_instance; + } + } + + task_req.custom_resources.resize(this->custom_resources.size()); + size_t i = 0; + for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); + ++it) { + task_req.custom_resources[i].id = it->first; + task_req.custom_resources[i].soft = false; + task_req.custom_resources[i].demand = 0; + for (size_t j = 0; j < it->second.size(); j++) { + task_req.custom_resources[i].demand += it->second[j]; + } + i++; + } + return task_req; +} + +/// Convert a map of resources to a TaskRequest data structure. +/// +/// \param string_to_int_map: Map between names and ids maintained by the +/// \param resource_map_total: Total capacities of resources we want to convert. +/// \param resource_map_available: Available capacities of resources we want to convert. +/// +/// \request Conversion result to a TaskRequest data structure. +NodeResources ResourceMapToNodeResources( + StringIdMap &string_to_int_map, + const std::unordered_map &resource_map_total, + const std::unordered_map &resource_map_available) { + NodeResources node_resources; + node_resources.predefined_resources.resize(PredefinedResources_MAX); + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + node_resources.predefined_resources[i].total = + node_resources.predefined_resources[i].available = 0; + } + + for (auto const &resource : resource_map_total) { + ResourceCapacity resource_capacity; + resource_capacity.total = resource.second; + auto it = resource_map_available.find(resource.first); + if (it == resource_map_available.end()) { + resource_capacity.available = 0; + } else { + resource_capacity.available = it->second; + } + if (resource.first == ray::kCPU_ResourceLabel) { + node_resources.predefined_resources[CPU] = resource_capacity; + } else if (resource.first == ray::kGPU_ResourceLabel) { + node_resources.predefined_resources[GPU] = resource_capacity; + } else if (resource.first == ray::kTPU_ResourceLabel) { + node_resources.predefined_resources[TPU] = resource_capacity; + } else if (resource.first == ray::kMemory_ResourceLabel) { + node_resources.predefined_resources[MEM] = resource_capacity; + } else { + // This is a custom resource. + node_resources.custom_resources.emplace(string_to_int_map.Insert(resource.first), + resource_capacity); + } + } + return node_resources; +} + +bool NodeResources::operator==(const NodeResources &other) { + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + if (this->predefined_resources[i].total != other.predefined_resources[i].total) { + return false; + } + if (this->predefined_resources[i].available != + other.predefined_resources[i].available) { + return false; + } + } + + if (this->custom_resources.size() != other.custom_resources.size()) { + return false; + } + + for (auto it1 = this->custom_resources.begin(); it1 != this->custom_resources.end(); + ++it1) { + auto it2 = other.custom_resources.find(it1->first); + if (it2 == other.custom_resources.end()) { + return false; + } + if (it1->second.total != it2->second.total) { + return false; + } + if (it1->second.available != it2->second.available) { + return false; + } + } + return true; +} + +std::string NodeResources::DebugString(StringIdMap string_to_in_map) const { + std::stringstream buffer; + buffer << " {\n"; + for (size_t i = 0; i < this->predefined_resources.size(); i++) { + buffer << "\t"; + switch (i) { + case CPU: + buffer << "CPU: "; + break; + case MEM: + buffer << "MEM: "; + break; + case GPU: + buffer << "GPU: "; + break; + case TPU: + buffer << "TPU: "; + break; + default: + RAY_CHECK(false) << "This should never happen."; + break; + } + buffer << "(" << this->predefined_resources[i].total << ":" + << this->predefined_resources[i].available << ")\n"; + } + for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); + ++it) { + buffer << "\t" << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":" + << it->second.available << ")\n"; + } + buffer << "}" << std::endl; + return buffer.str(); +} + +bool NodeResourceInstances::operator==(const NodeResourceInstances &other) { + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + if (!EqualVectors(this->predefined_resources[i].total, + other.predefined_resources[i].total)) { + return false; + } + if (!EqualVectors(this->predefined_resources[i].available, + other.predefined_resources[i].available)) { + return false; + } + } + + if (this->custom_resources.size() != other.custom_resources.size()) { + return false; + } + + for (auto it1 = this->custom_resources.begin(); it1 != this->custom_resources.end(); + ++it1) { + auto it2 = other.custom_resources.find(it1->first); + if (it2 == other.custom_resources.end()) { + return false; + } + if (!EqualVectors(it1->second.total, it2->second.total)) { + return false; + } + if (!EqualVectors(it1->second.available, it2->second.available)) { + return false; + } + } + return true; +} + +std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const { + std::stringstream buffer; + buffer << "{\n"; + for (size_t i = 0; i < this->predefined_resources.size(); i++) { + buffer << "\t"; + switch (i) { + case CPU: + buffer << "CPU: "; + break; + case MEM: + buffer << "MEM: "; + break; + case GPU: + buffer << "GPU: "; + break; + case TPU: + buffer << "TPU: "; + break; + default: + RAY_CHECK(false) << "This should never happen."; + break; + } + buffer << "(" << VectorToString(predefined_resources[i].total) << ":" + << VectorToString(this->predefined_resources[i].available) << ")\n"; + } + for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); + ++it) { + buffer << "\t" << string_to_int_map.Get(it->first) << ":(" + << VectorToString(it->second.total) << ":" + << VectorToString(it->second.available) << ")\n"; + } + buffer << "}" << std::endl; + return buffer.str(); +}; + +TaskResourceInstances NodeResourceInstances::GetAvailableResourceInstances() { + TaskResourceInstances task_resources; + task_resources.predefined_resources.resize(PredefinedResources_MAX); + + for (size_t i = 0; i < this->predefined_resources.size(); i++) { + task_resources.predefined_resources[i] = this->predefined_resources[i].available; + } + + for (const auto &it : this->custom_resources) { + task_resources.custom_resources.emplace(it.first, it.second.available); + } + + return task_resources; +}; + +std::string TaskRequest::DebugString() const { + std::stringstream buffer; + buffer << " {"; + for (size_t i = 0; i < this->predefined_resources.size(); i++) { + buffer << "(" << this->predefined_resources[i].demand << ":" + << this->predefined_resources[i].soft << ") "; + } + buffer << "}"; + + buffer << " ["; + for (size_t i = 0; i < this->custom_resources.size(); i++) { + buffer << this->custom_resources[i].id << ":" + << "(" << this->custom_resources[i].demand << ":" + << this->custom_resources[i].soft << ") "; + } + buffer << "]" << std::endl; + return buffer.str(); +} + +bool TaskResourceInstances::IsEmpty() const { + // Check whether all resource instances of a task are zero. + for (const auto &predefined_resource : predefined_resources) { + for (const auto &predefined_resource_instance : predefined_resource) { + if (predefined_resource_instance != 0) { + return false; + } + } + } + + for (const auto &custom_resource : custom_resources) { + for (const auto &custom_resource_instances : custom_resource.second) { + if (custom_resource_instances != 0) { + return false; + } + } + } + return true; +} + +std::string TaskResourceInstances::DebugString() const { + std::stringstream buffer; + buffer << std::endl << " Allocation: {"; + for (size_t i = 0; i < this->predefined_resources.size(); i++) { + buffer << VectorToString(this->predefined_resources[i]); + } + buffer << "}"; + + buffer << " ["; + for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); + ++it) { + buffer << it->first << ":" << VectorToString(it->second) << ", "; + } + + buffer << "]" << std::endl; + return buffer.str(); +} + +bool EqualVectors(const std::vector &v1, const std::vector &v2) { + return (v1.size() == v2.size() && std::equal(v1.begin(), v1.end(), v2.begin())); +} + +bool TaskResourceInstances::operator==(const TaskResourceInstances &other) { + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + if (!EqualVectors(this->predefined_resources[i], other.predefined_resources[i])) { + return false; + } + } + + if (this->custom_resources.size() != other.custom_resources.size()) { + return false; + } + + for (auto it1 = this->custom_resources.begin(); it1 != this->custom_resources.end(); + ++it1) { + auto it2 = other.custom_resources.find(it1->first); + if (it2 == other.custom_resources.end()) { + return false; + } + if (!EqualVectors(it1->second, it2->second)) { + return false; + } + } + return true; +} diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h new file mode 100644 index 000000000..27a45fc4a --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -0,0 +1,186 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/raylet/scheduling/fixed_point.h" +#include "ray/raylet/scheduling/scheduling_ids.h" +#include "ray/util/logging.h" + +/// List of predefined resources. +enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX }; + +/// Helper function to compare two vectors with FixedPoint values. +bool EqualVectors(const std::vector &v1, const std::vector &v2); + +/// Convert a vector of doubles to a vector of resource units. +std::vector VectorDoubleToVectorFixedPoint(const std::vector &vector); + +/// Convert a vector of resource units to a vector of doubles. +std::vector VectorFixedPointToVectorDouble( + const std::vector &vector_fp); + +struct ResourceCapacity { + FixedPoint total; + FixedPoint available; +}; + +/// Capacities of each instance of a resource. +struct ResourceInstanceCapacities { + std::vector total; + std::vector available; +}; + +struct ResourceRequest { + /// Amount of resource being requested. + FixedPoint demand; + /// Specify whether the request is soft or hard. + /// If hard, the entire request is denied if the demand exceeds the resource + /// availability. Otherwise, the request can be still be granted. + /// Prefernces are given to the nodes with the lowest number of violations. + bool soft; +}; + +/// Resource request, including resource ID. This is used for custom resources. +struct ResourceRequestWithId : ResourceRequest { + /// Resource ID. + int64_t id; +}; + +// Data structure specifying the capacity of each resource requested by a task. +class TaskRequest { + public: + /// List of predefined resources required by the task. + std::vector predefined_resources; + /// List of custom resources required by the task. + std::vector custom_resources; + /// List of placement hints. A placement hint is a node on which + /// we desire to run this task. This is a soft constraint in that + /// the task will run on a different node in the cluster, if none of the + /// nodes in this list can schedule this task. + absl::flat_hash_set placement_hints; + /// Returns human-readable string for this task request. + std::string DebugString() const; +}; + +// Data structure specifying the capacity of each instance of each resource +// allocated to a task. +class TaskResourceInstances { + public: + /// The list of instances of each predifined resource allocated to a task. + std::vector> predefined_resources; + /// The list of instances of each custom resource allocated to a task. + absl::flat_hash_map> custom_resources; + bool operator==(const TaskResourceInstances &other); + /// For each resource of this request aggregate its instances. + TaskRequest ToTaskRequest() const; + /// Get CPU instances only. + std::vector GetCPUInstances() const { + if (!this->predefined_resources.empty()) { + return this->predefined_resources[CPU]; + } else { + return {}; + } + }; + std::vector GetCPUInstancesDouble() const { + if (!this->predefined_resources.empty()) { + return VectorFixedPointToVectorDouble(this->predefined_resources[CPU]); + } else { + return {}; + } + }; + /// Get GPU instances only. + std::vector GetGPUInstances() const { + if (!this->predefined_resources.empty()) { + return this->predefined_resources[GPU]; + } else { + return {}; + } + }; + std::vector GetGPUInstancesDouble() const { + if (!this->predefined_resources.empty()) { + return VectorFixedPointToVectorDouble(this->predefined_resources[GPU]); + } else { + return {}; + } + }; + /// Get mem instances only. + std::vector GetMemInstances() const { + if (!this->predefined_resources.empty()) { + return this->predefined_resources[MEM]; + } else { + return {}; + } + }; + std::vector GetMemInstancesDouble() const { + if (!this->predefined_resources.empty()) { + return VectorFixedPointToVectorDouble(this->predefined_resources[MEM]); + } else { + return {}; + } + }; + /// Check whether there are no resource instances. + bool IsEmpty() const; + /// Returns human-readable string for these resources. + std::string DebugString() const; +}; + +/// Total and available capacities of each resource of a node. +class NodeResources { + public: + /// Available and total capacities for predefined resources. + std::vector predefined_resources; + /// Map containing custom resources. The key of each entry represents the + /// custom resource ID. + absl::flat_hash_map custom_resources; + /// Returns if this equals another node resources. + bool operator==(const NodeResources &other); + /// Returns human-readable string for these resources. + std::string DebugString(StringIdMap string_to_int_map) const; +}; + +/// Total and available capacities of each resource instance. +/// This is used to describe the resources of the local node. +class NodeResourceInstances { + public: + /// Available and total capacities for each instance of a predefined resource. + std::vector predefined_resources; + /// Map containing custom resources. The key of each entry represents the + /// custom resource ID. + absl::flat_hash_map custom_resources; + /// Extract available resource instances. + TaskResourceInstances GetAvailableResourceInstances(); + /// Returns if this equals another node resources. + bool operator==(const NodeResourceInstances &other); + /// Returns human-readable string for these resources. + std::string DebugString(StringIdMap string_to_int_map) const; +}; + +/// \request Conversion result to a TaskRequest data structure. +NodeResources ResourceMapToNodeResources( + StringIdMap &string_to_int_map, + const std::unordered_map &resource_map_total, + const std::unordered_map &resource_map_available); + +/// Convert a map of resources to a TaskRequest data structure. +TaskRequest ResourceMapToTaskRequest( + StringIdMap &string_to_int_map, + const std::unordered_map &resource_map); diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index dc7e6df79..f8cb4ae26 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -14,413 +14,6 @@ #include "ray/raylet/scheduling/cluster_resource_scheduler.h" -std::string VectorToString(const std::vector &vector) { - std::stringstream buffer; - - buffer << "["; - for (size_t i = 0; i < vector.size(); i++) { - buffer << vector[i]; - if (i < vector.size() - 1) { - buffer << ", "; - } - } - buffer << "]"; - return buffer.str(); -} - -std::string UnorderedMapToString(const std::unordered_map &map) { - std::stringstream buffer; - - buffer << "["; - for (auto it = map.begin(); it != map.end(); ++it) { - buffer << "(" << it->first << ":" << it->second << ")"; - } - buffer << "]"; - return buffer.str(); -} - -/// Convert a vector of doubles to a vector of resource units. -std::vector VectorDoubleToVectorFixedPoint( - const std::vector &vector) { - std::vector vector_fp(vector.size()); - for (size_t i = 0; i < vector.size(); i++) { - vector_fp[i] = vector[i]; - } - return vector_fp; -} - -/// Convert a vector of resource units to a vector of doubles. -std::vector VectorFixedPointToVectorDouble( - const std::vector &vector_fp) { - std::vector vector(vector_fp.size()); - for (size_t i = 0; i < vector_fp.size(); i++) { - vector[i] = FixedPoint(vector_fp[i]).Double(); - } - return vector; -} - -/// Convert a map of resources to a TaskRequest data structure. -TaskRequest ResourceMapToTaskRequest( - StringIdMap &string_to_int_map, - const std::unordered_map &resource_map) { - size_t i = 0; - - TaskRequest task_request; - - task_request.predefined_resources.resize(PredefinedResources_MAX); - task_request.custom_resources.resize(resource_map.size()); - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - task_request.predefined_resources[0].demand = 0; - task_request.predefined_resources[0].soft = false; - } - - for (auto const &resource : resource_map) { - if (resource.first == ray::kCPU_ResourceLabel) { - task_request.predefined_resources[CPU].demand = resource.second; - } else if (resource.first == ray::kGPU_ResourceLabel) { - task_request.predefined_resources[GPU].demand = resource.second; - } else if (resource.first == ray::kTPU_ResourceLabel) { - task_request.predefined_resources[TPU].demand = resource.second; - } else if (resource.first == ray::kMemory_ResourceLabel) { - task_request.predefined_resources[MEM].demand = resource.second; - } else { - task_request.custom_resources[i].id = string_to_int_map.Insert(resource.first); - task_request.custom_resources[i].demand = resource.second; - task_request.custom_resources[i].soft = false; - i++; - } - } - task_request.custom_resources.resize(i); - - return task_request; -} - -TaskRequest TaskResourceInstances::ToTaskRequest() const { - TaskRequest task_req; - task_req.predefined_resources.resize(PredefinedResources_MAX); - - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - task_req.predefined_resources[i].demand = 0; - for (auto predefined_resource_instance : this->predefined_resources[i]) { - task_req.predefined_resources[i].demand += predefined_resource_instance; - } - } - - task_req.custom_resources.resize(this->custom_resources.size()); - size_t i = 0; - for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); - ++it) { - task_req.custom_resources[i].id = it->first; - task_req.custom_resources[i].soft = false; - task_req.custom_resources[i].demand = 0; - for (size_t j = 0; j < it->second.size(); j++) { - task_req.custom_resources[i].demand += it->second[j]; - } - i++; - } - return task_req; -} - -/// Convert a map of resources to a TaskRequest data structure. -/// -/// \param string_to_int_map: Map between names and ids maintained by the -/// \param resource_map_total: Total capacities of resources we want to convert. -/// \param resource_map_available: Available capacities of resources we want to convert. -/// -/// \request Conversion result to a TaskRequest data structure. -NodeResources ResourceMapToNodeResources( - StringIdMap &string_to_int_map, - const std::unordered_map &resource_map_total, - const std::unordered_map &resource_map_available) { - NodeResources node_resources; - node_resources.predefined_resources.resize(PredefinedResources_MAX); - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - node_resources.predefined_resources[i].total = - node_resources.predefined_resources[i].available = 0; - } - - for (auto const &resource : resource_map_total) { - ResourceCapacity resource_capacity; - resource_capacity.total = resource.second; - auto it = resource_map_available.find(resource.first); - if (it == resource_map_available.end()) { - resource_capacity.available = 0; - } else { - resource_capacity.available = it->second; - } - if (resource.first == ray::kCPU_ResourceLabel) { - node_resources.predefined_resources[CPU] = resource_capacity; - } else if (resource.first == ray::kGPU_ResourceLabel) { - node_resources.predefined_resources[GPU] = resource_capacity; - } else if (resource.first == ray::kTPU_ResourceLabel) { - node_resources.predefined_resources[TPU] = resource_capacity; - } else if (resource.first == ray::kMemory_ResourceLabel) { - node_resources.predefined_resources[MEM] = resource_capacity; - } else { - // This is a custom resource. - node_resources.custom_resources.emplace(string_to_int_map.Insert(resource.first), - resource_capacity); - } - } - return node_resources; -} - -bool operator<(FixedPoint const &ru1, FixedPoint const &ru2) { - return (ru1.i_ < ru2.i_); -}; -bool operator>(FixedPoint const &ru1, FixedPoint const &ru2) { - return (ru1.i_ > ru2.i_); -}; -bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2) { - return (ru1.i_ <= ru2.i_); -}; -bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2) { - return (ru1.i_ >= ru2.i_); -}; -bool operator==(FixedPoint const &ru1, FixedPoint const &ru2) { - return (ru1.i_ == ru2.i_); -}; -bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2) { - return (ru1.i_ != ru2.i_); -}; - -std::ostream &operator<<(std::ostream &out, const FixedPoint &ru) { - out << ru.i_; - return out; -} - -bool NodeResources::operator==(const NodeResources &other) { - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - if (this->predefined_resources[i].total != other.predefined_resources[i].total) { - return false; - } - if (this->predefined_resources[i].available != - other.predefined_resources[i].available) { - return false; - } - } - - if (this->custom_resources.size() != other.custom_resources.size()) { - return false; - } - - for (auto it1 = this->custom_resources.begin(); it1 != this->custom_resources.end(); - ++it1) { - auto it2 = other.custom_resources.find(it1->first); - if (it2 == other.custom_resources.end()) { - return false; - } - if (it1->second.total != it2->second.total) { - return false; - } - if (it1->second.available != it2->second.available) { - return false; - } - } - return true; -} - -std::string NodeResources::DebugString(StringIdMap string_to_in_map) const { - std::stringstream buffer; - buffer << " {\n"; - for (size_t i = 0; i < this->predefined_resources.size(); i++) { - buffer << "\t"; - switch (i) { - case CPU: - buffer << "CPU: "; - break; - case MEM: - buffer << "MEM: "; - break; - case GPU: - buffer << "GPU: "; - break; - case TPU: - buffer << "TPU: "; - break; - default: - RAY_CHECK(false) << "This should never happen."; - break; - } - buffer << "(" << this->predefined_resources[i].total << ":" - << this->predefined_resources[i].available << ")\n"; - } - for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); - ++it) { - buffer << "\t" << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":" - << it->second.available << ")\n"; - } - buffer << "}" << std::endl; - return buffer.str(); -} - -bool NodeResourceInstances::operator==(const NodeResourceInstances &other) { - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - if (!EqualVectors(this->predefined_resources[i].total, - other.predefined_resources[i].total)) { - return false; - } - if (!EqualVectors(this->predefined_resources[i].available, - other.predefined_resources[i].available)) { - return false; - } - } - - if (this->custom_resources.size() != other.custom_resources.size()) { - return false; - } - - for (auto it1 = this->custom_resources.begin(); it1 != this->custom_resources.end(); - ++it1) { - auto it2 = other.custom_resources.find(it1->first); - if (it2 == other.custom_resources.end()) { - return false; - } - if (!EqualVectors(it1->second.total, it2->second.total)) { - return false; - } - if (!EqualVectors(it1->second.available, it2->second.available)) { - return false; - } - } - return true; -} - -std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const { - std::stringstream buffer; - buffer << "{\n"; - for (size_t i = 0; i < this->predefined_resources.size(); i++) { - buffer << "\t"; - switch (i) { - case CPU: - buffer << "CPU: "; - break; - case MEM: - buffer << "MEM: "; - break; - case GPU: - buffer << "GPU: "; - break; - case TPU: - buffer << "TPU: "; - break; - default: - RAY_CHECK(false) << "This should never happen."; - break; - } - buffer << "(" << VectorToString(predefined_resources[i].total) << ":" - << VectorToString(this->predefined_resources[i].available) << ")\n"; - } - for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); - ++it) { - buffer << "\t" << string_to_int_map.Get(it->first) << ":(" - << VectorToString(it->second.total) << ":" - << VectorToString(it->second.available) << ")\n"; - } - buffer << "}" << std::endl; - return buffer.str(); -}; - -TaskResourceInstances NodeResourceInstances::GetAvailableResourceInstances() { - TaskResourceInstances task_resources; - task_resources.predefined_resources.resize(PredefinedResources_MAX); - - for (size_t i = 0; i < this->predefined_resources.size(); i++) { - task_resources.predefined_resources[i] = this->predefined_resources[i].available; - } - - for (const auto &it : this->custom_resources) { - task_resources.custom_resources.emplace(it.first, it.second.available); - } - - return task_resources; -}; - -std::string TaskRequest::DebugString() const { - std::stringstream buffer; - buffer << " {"; - for (size_t i = 0; i < this->predefined_resources.size(); i++) { - buffer << "(" << this->predefined_resources[i].demand << ":" - << this->predefined_resources[i].soft << ") "; - } - buffer << "}"; - - buffer << " ["; - for (size_t i = 0; i < this->custom_resources.size(); i++) { - buffer << this->custom_resources[i].id << ":" - << "(" << this->custom_resources[i].demand << ":" - << this->custom_resources[i].soft << ") "; - } - buffer << "]" << std::endl; - return buffer.str(); -} - -bool TaskResourceInstances::IsEmpty() const { - // Check whether all resource instances of a task are zero. - for (const auto &predefined_resource : predefined_resources) { - for (const auto &predefined_resource_instance : predefined_resource) { - if (predefined_resource_instance != 0) { - return false; - } - } - } - - for (const auto &custom_resource : custom_resources) { - for (const auto &custom_resource_instances : custom_resource.second) { - if (custom_resource_instances != 0) { - return false; - } - } - } - return true; -} - -std::string TaskResourceInstances::DebugString() const { - std::stringstream buffer; - buffer << std::endl << " Allocation: {"; - for (size_t i = 0; i < this->predefined_resources.size(); i++) { - buffer << VectorToString(this->predefined_resources[i]); - } - buffer << "}"; - - buffer << " ["; - for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); - ++it) { - buffer << it->first << ":" << VectorToString(it->second) << ", "; - } - - buffer << "]" << std::endl; - return buffer.str(); -} - -bool EqualVectors(const std::vector &v1, const std::vector &v2) { - return (v1.size() == v2.size() && std::equal(v1.begin(), v1.end(), v2.begin())); -} - -bool TaskResourceInstances::operator==(const TaskResourceInstances &other) { - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - if (!EqualVectors(this->predefined_resources[i], other.predefined_resources[i])) { - return false; - } - } - - if (this->custom_resources.size() != other.custom_resources.size()) { - return false; - } - - for (auto it1 = this->custom_resources.begin(); it1 != this->custom_resources.end(); - ++it1) { - auto it2 = other.custom_resources.find(it1->first); - if (it2 == other.custom_resources.end()) { - return false; - } - if (!EqualVectors(it1->second, it2->second)) { - return false; - } - } - return true; -} - ClusterResourceScheduler::ClusterResourceScheduler( int64_t local_node_id, const NodeResources &local_node_resources) : local_node_id_(local_node_id) { diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index e6fe3a75e..bfb9f984b 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -21,161 +21,14 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "ray/common/task/scheduling_resources.h" +#include "ray/raylet/scheduling/cluster_resource_data.h" #include "ray/raylet/scheduling/fixed_point.h" #include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/util/logging.h" -/// List of predefined resources. -enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX }; // Specify resources that consists of unit-size instances. static std::unordered_set UnitInstanceResources{CPU, GPU, TPU}; -/// Helper function to compare two vectors with FixedPoint values. -bool EqualVectors(const std::vector &v1, const std::vector &v2); - -/// Convert a vector of doubles to a vector of resource units. -std::vector VectorDoubleToVectorFixedPoint(const std::vector &vector); - -/// Convert a vector of resource units to a vector of doubles. -std::vector VectorFixedPointToVectorDouble( - const std::vector &vector_fp); - -struct ResourceCapacity { - FixedPoint total; - FixedPoint available; -}; - -/// Capacities of each instance of a resource. -struct ResourceInstanceCapacities { - std::vector total; - std::vector available; -}; - -struct ResourceRequest { - /// Amount of resource being requested. - FixedPoint demand; - /// Specify whether the request is soft or hard. - /// If hard, the entire request is denied if the demand exceeds the resource - /// availability. Otherwise, the request can be still be granted. - /// Prefernces are given to the nodes with the lowest number of violations. - bool soft; -}; - -/// Resource request, including resource ID. This is used for custom resources. -struct ResourceRequestWithId : ResourceRequest { - /// Resource ID. - int64_t id; -}; - -// Data structure specifying the capacity of each resource requested by a task. -class TaskRequest { - public: - /// List of predefined resources required by the task. - std::vector predefined_resources; - /// List of custom resources required by the task. - std::vector custom_resources; - /// List of placement hints. A placement hint is a node on which - /// we desire to run this task. This is a soft constraint in that - /// the task will run on a different node in the cluster, if none of the - /// nodes in this list can schedule this task. - absl::flat_hash_set placement_hints; - /// Returns human-readable string for this task request. - std::string DebugString() const; -}; - -// Data structure specifying the capacity of each instance of each resource -// allocated to a task. -class TaskResourceInstances { - public: - /// The list of instances of each predifined resource allocated to a task. - std::vector> predefined_resources; - /// The list of instances of each custom resource allocated to a task. - absl::flat_hash_map> custom_resources; - bool operator==(const TaskResourceInstances &other); - /// For each resource of this request aggregate its instances. - TaskRequest ToTaskRequest() const; - /// Get CPU instances only. - std::vector GetCPUInstances() const { - if (!this->predefined_resources.empty()) { - return this->predefined_resources[CPU]; - } else { - return {}; - } - }; - std::vector GetCPUInstancesDouble() const { - if (!this->predefined_resources.empty()) { - return VectorFixedPointToVectorDouble(this->predefined_resources[CPU]); - } else { - return {}; - } - }; - /// Get GPU instances only. - std::vector GetGPUInstances() const { - if (!this->predefined_resources.empty()) { - return this->predefined_resources[GPU]; - } else { - return {}; - } - }; - std::vector GetGPUInstancesDouble() const { - if (!this->predefined_resources.empty()) { - return VectorFixedPointToVectorDouble(this->predefined_resources[GPU]); - } else { - return {}; - } - }; - /// Get mem instances only. - std::vector GetMemInstances() const { - if (!this->predefined_resources.empty()) { - return this->predefined_resources[MEM]; - } else { - return {}; - } - }; - std::vector GetMemInstancesDouble() const { - if (!this->predefined_resources.empty()) { - return VectorFixedPointToVectorDouble(this->predefined_resources[MEM]); - } else { - return {}; - } - }; - /// Check whether there are no resource instances. - bool IsEmpty() const; - /// Returns human-readable string for these resources. - std::string DebugString() const; -}; - -/// Total and available capacities of each resource of a node. -class NodeResources { - public: - /// Available and total capacities for predefined resources. - std::vector predefined_resources; - /// Map containing custom resources. The key of each entry represents the - /// custom resource ID. - absl::flat_hash_map custom_resources; - /// Returns if this equals another node resources. - bool operator==(const NodeResources &other); - /// Returns human-readable string for these resources. - std::string DebugString(StringIdMap string_to_int_map) const; -}; - -/// Total and available capacities of each resource instance. -/// This is used to describe the resources of the local node. -class NodeResourceInstances { - public: - /// Available and total capacities for each instance of a predefined resource. - std::vector predefined_resources; - /// Map containing custom resources. The key of each entry represents the - /// custom resource ID. - absl::flat_hash_map custom_resources; - /// Extract available resource instances. - TaskResourceInstances GetAvailableResourceInstances(); - /// Returns if this equals another node resources. - bool operator==(const NodeResourceInstances &other); - /// Returns human-readable string for these resources. - std::string DebugString(StringIdMap string_to_int_map) const; -}; - /// Class encapsulating the cluster resources and the logic to assign /// tasks to nodes based on the task's constraints and the available /// resources at those nodes. diff --git a/src/ray/raylet/scheduling/fixed_point.cc b/src/ray/raylet/scheduling/fixed_point.cc index d6f109b3b..e7e7c8741 100644 --- a/src/ray/raylet/scheduling/fixed_point.cc +++ b/src/ray/raylet/scheduling/fixed_point.cc @@ -8,6 +8,8 @@ FixedPoint::FixedPoint(double d) { i_ = (uint64_t)((d * RESOURCE_UNIT_SCALING) + 0.5); } +FixedPoint::FixedPoint(int i) { i_ = (i * RESOURCE_UNIT_SCALING); } + FixedPoint FixedPoint::operator+(FixedPoint const &ru) { FixedPoint res; res.i_ = i_ + ru.i_; @@ -53,4 +55,16 @@ FixedPoint FixedPoint::operator=(double const d) { return *this; } +bool FixedPoint::operator<(FixedPoint const &ru1) const { return (i_ < ru1.i_); }; +bool FixedPoint::operator>(FixedPoint const &ru1) const { return (i_ > ru1.i_); }; +bool FixedPoint::operator<=(FixedPoint const &ru1) const { return (i_ <= ru1.i_); }; +bool FixedPoint::operator>=(FixedPoint const &ru1) const { return (i_ >= ru1.i_); }; +bool FixedPoint::operator==(FixedPoint const &ru1) const { return (i_ == ru1.i_); }; +bool FixedPoint::operator!=(FixedPoint const &ru1) const { return (i_ != ru1.i_); }; + +std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1) { + out << ru1.i_; + return out; +} + double FixedPoint::Double() { return round(i_) / RESOURCE_UNIT_SCALING; }; diff --git a/src/ray/raylet/scheduling/fixed_point.h b/src/ray/raylet/scheduling/fixed_point.h index dbdad32ef..a975a0158 100644 --- a/src/ray/raylet/scheduling/fixed_point.h +++ b/src/ray/raylet/scheduling/fixed_point.h @@ -12,6 +12,7 @@ class FixedPoint { public: FixedPoint(double d = 0); + FixedPoint(int i); FixedPoint operator+(FixedPoint const &ru); @@ -29,14 +30,14 @@ class FixedPoint { FixedPoint operator=(double const d); - friend bool operator<(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator>(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator==(FixedPoint const &ru1, FixedPoint const &ru2); - friend bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2); + bool operator<(FixedPoint const &ru1) const; + bool operator>(FixedPoint const &ru1) const; + bool operator<=(FixedPoint const &ru1) const; + bool operator>=(FixedPoint const &ru1) const; + bool operator==(FixedPoint const &ru1) const; + bool operator!=(FixedPoint const &ru1) const; double Double(); - friend std::ostream &operator<<(std::ostream &out, const FixedPoint &ru); + friend std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1); };