diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.cc b/src/ray/common/scheduling/cluster_resource_scheduler.cc index 14472fce3..1e76a4a8e 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/common/scheduling/cluster_resource_scheduler.cc @@ -14,137 +14,6 @@ #include "cluster_resource_scheduler.h" -std::string VectorToString(const std::vector &vector) { - std::stringstream buffer; - - buffer << "["; - for (size_t i = 0; i < vector.size(); i++) { - buffer << vector[i]; - if (i < vector.size() - 1) { - buffer << ", "; - } - } - buffer << "]"; - return buffer.str(); -} - -std::string UnorderedMapToString(const std::unordered_map &map) { - std::stringstream buffer; - - buffer << "["; - for (auto it = map.begin(); it != map.end(); ++it) { - buffer << "(" << it->first << ":" << it->second << ")"; - } - buffer << "]"; - return buffer.str(); -} - -/// Convert a map of resources to a TaskRequest data structure. -TaskRequest ResourceMapToTaskRequest( - StringIdMap &string_to_int_map, - const std::unordered_map &resource_map) { - size_t i = 0; - - TaskRequest task_request; - - task_request.predefined_resources.resize(PredefinedResources_MAX); - task_request.custom_resources.resize(resource_map.size()); - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - task_request.predefined_resources[0].demand = 0; - task_request.predefined_resources[0].soft = false; - } - - for (auto const &resource : resource_map) { - if (resource.first == ray::kCPU_ResourceLabel) { - task_request.predefined_resources[CPU].demand = resource.second; - } else if (resource.first == ray::kGPU_ResourceLabel) { - task_request.predefined_resources[GPU].demand = resource.second; - } else if (resource.first == ray::kTPU_ResourceLabel) { - task_request.predefined_resources[TPU].demand = resource.second; - } else if (resource.first == ray::kMemory_ResourceLabel) { - task_request.predefined_resources[MEM].demand = resource.second; - } else { - task_request.custom_resources[i].id = string_to_int_map.Insert(resource.first); - task_request.custom_resources[i].demand = resource.second; - task_request.custom_resources[i].soft = false; - i++; - } - } - task_request.custom_resources.resize(i); - - return task_request; -} - -TaskRequest TaskResourceInstances::ToTaskRequest() const { - TaskRequest task_req; - task_req.predefined_resources.resize(PredefinedResources_MAX); - - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - task_req.predefined_resources[i].demand = 0; - for (auto predefined_resource_instance : this->predefined_resources[i]) { - task_req.predefined_resources[i].demand += predefined_resource_instance; - } - } - - task_req.custom_resources.resize(this->custom_resources.size()); - size_t i = 0; - for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); - ++it) { - task_req.custom_resources[i].id = it->first; - task_req.custom_resources[i].soft = false; - task_req.custom_resources[i].demand = 0; - for (size_t j = 0; j < it->second.size(); j++) { - task_req.custom_resources[i].demand += it->second[j]; - } - i++; - } - return task_req; -} - -/// Convert a map of resources to a TaskRequest data structure. -/// -/// \param string_to_int_map: Map between names and ids maintained by the -/// \param resource_map_total: Total capacities of resources we want to convert. -/// \param resource_map_available: Available capacities of resources we want to convert. -/// -/// \request Conversion result to a TaskRequest data structure. -NodeResources ResourceMapToNodeResources( - StringIdMap &string_to_int_map, - const std::unordered_map &resource_map_total, - const std::unordered_map &resource_map_available) { - NodeResources node_resources; - node_resources.predefined_resources.resize(PredefinedResources_MAX); - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - node_resources.predefined_resources[i].total = - node_resources.predefined_resources[i].available = 0; - } - - for (auto const &resource : resource_map_total) { - ResourceCapacity resource_capacity; - resource_capacity.total = (int64_t)resource.second; - auto it = resource_map_available.find(resource.first); - if (it == resource_map_available.end()) { - resource_capacity.available = 0; - } else { - resource_capacity.available = (int64_t)it->second; - } - if (resource.first == ray::kCPU_ResourceLabel) { - node_resources.predefined_resources[CPU] = resource_capacity; - } else if (resource.first == ray::kGPU_ResourceLabel) { - node_resources.predefined_resources[GPU] = resource_capacity; - } else if (resource.first == ray::kTPU_ResourceLabel) { - node_resources.predefined_resources[TPU] = resource_capacity; - } else if (resource.first == ray::kMemory_ResourceLabel) { - node_resources.predefined_resources[MEM] = resource_capacity; - } else { - // This is a custom resource. - node_resources.custom_resources.emplace(string_to_int_map.Insert(resource.first), - resource_capacity); - } - } - return node_resources; -} - bool NodeResources::operator==(const NodeResources &other) { for (size_t i = 0; i < PredefinedResources_MAX; i++) { if (this->predefined_resources[i].total != other.predefined_resources[i].total) { @@ -176,25 +45,39 @@ bool NodeResources::operator==(const NodeResources &other) { return true; } -std::string NodeResources::DebugString(StringIdMap string_to_in_map) const { +std::string NodeResources::DebugString() { std::stringstream buffer; - buffer << " {"; + buffer << " node predefined resources {"; for (size_t i = 0; i < static_cast(this->predefined_resources.size()); i++) { buffer << "(" << this->predefined_resources[i].total << ":" << this->predefined_resources[i].available << ") "; } - buffer << "}"; + buffer << "}" << std::endl; - buffer << " {"; + buffer << " node custom resources {"; for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":" - << it->second.available << ") "; + buffer << it->first << ":(" << it->second.total << ":" << it->second.available + << ") "; } buffer << "}" << std::endl; return buffer.str(); } +std::string VectorToString(std::vector &vector) { + std::stringstream buffer; + + buffer << "["; + for (size_t i = 0; i < vector.size(); i++) { + buffer << vector[i]; + if (i < vector.size() - 1) { + buffer << ", "; + } + } + buffer << "]"; + return buffer.str(); +} + bool NodeResourceInstances::operator==(const NodeResourceInstances &other) { for (size_t i = 0; i < PredefinedResources_MAX; i++) { if (!EqualVectors(this->predefined_resources[i].total, @@ -227,20 +110,20 @@ bool NodeResourceInstances::operator==(const NodeResourceInstances &other) { return true; } -std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const { +std::string NodeResourceInstances::DebugString() { std::stringstream buffer; - buffer << " {"; + buffer << " node predefined resources {"; for (size_t i = 0; i < this->predefined_resources.size(); i++) { buffer << "(" << VectorToString(predefined_resources[i].total) << ":" << VectorToString(this->predefined_resources[i].available) << ") "; } - buffer << "}"; + buffer << "}" << std::endl; - buffer << " {"; + buffer << " node custom resources {"; for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { - buffer << string_to_int_map.Get(it->first) << ":(" << VectorToString(it->second.total) - << ":" << VectorToString(it->second.available) << ") "; + buffer << it->first << ":(" << VectorToString(it->second.total) << ":" + << VectorToString(it->second.available) << ") "; } buffer << "}" << std::endl; return buffer.str(); @@ -261,60 +144,40 @@ TaskResourceInstances NodeResourceInstances::GetAvailableResourceInstances() { return task_resources; }; -std::string TaskRequest::DebugString() const { +std::string TaskRequest::DebugString() { std::stringstream buffer; - buffer << " {"; + buffer << std::endl << " request predefined resources {"; for (size_t i = 0; i < this->predefined_resources.size(); i++) { buffer << "(" << this->predefined_resources[i].demand << ":" << this->predefined_resources[i].soft << ") "; } - buffer << "}"; + buffer << "}" << std::endl; - buffer << " ["; + buffer << " request custom resources {"; for (size_t i = 0; i < this->custom_resources.size(); i++) { buffer << this->custom_resources[i].id << ":" << "(" << this->custom_resources[i].demand << ":" << this->custom_resources[i].soft << ") "; } - buffer << "]" << std::endl; + buffer << "}" << std::endl; return buffer.str(); } -bool TaskResourceInstances::IsEmpty() const { - // Check whether all resource instances of a task are zero. - for (const auto &predefined_resource : predefined_resources) { - for (const auto &predefined_resource_instance : predefined_resource) { - if (predefined_resource_instance != 0) { - return false; - } - } - } - - for (const auto custom_resource : custom_resources) { - for (const auto custom_resource_instances : custom_resource.second) { - if (custom_resource_instances != 0) { - return false; - } - } - } - return true; -} - -std::string TaskResourceInstances::DebugString() const { +std::string TaskResourceInstances::DebugString() { std::stringstream buffer; - buffer << std::endl << " Allocation: {"; + buffer << std::endl << " task allocation: P {"; for (size_t i = 0; i < this->predefined_resources.size(); i++) { buffer << VectorToString(this->predefined_resources[i]); } buffer << "}"; - buffer << " ["; + buffer << " C {"; for (auto it = this->custom_resources.begin(); it != this->custom_resources.end(); ++it) { buffer << it->first << ":" << VectorToString(it->second) << ", "; } - buffer << "]" << std::endl; + buffer << "}" << std::endl; return buffer.str(); } @@ -357,19 +220,15 @@ ClusterResourceScheduler::ClusterResourceScheduler( const std::string &local_node_id, const std::unordered_map &local_node_resources) { local_node_id_ = string_to_int_map_.Insert(local_node_id); - NodeResources node_resources = ResourceMapToNodeResources( - string_to_int_map_, local_node_resources, local_node_resources); - - AddOrUpdateNode(local_node_id_, node_resources); - InitLocalResources(node_resources); + AddOrUpdateNode(local_node_id, local_node_resources, local_node_resources); } void ClusterResourceScheduler::AddOrUpdateNode( const std::string &node_id, const std::unordered_map &resources_total, const std::unordered_map &resources_available) { - NodeResources node_resources = ResourceMapToNodeResources( - string_to_int_map_, resources_total, resources_available); + NodeResources node_resources; + ResourceMapToNodeResources(resources_total, resources_available, &node_resources); AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources); } @@ -430,35 +289,28 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, resources.predefined_resources[i].available) { if (task_req.predefined_resources[i].soft) { // A soft constraint has been violated. - // Just remember this as soft violations do not preclude a task - // from being scheduled. violations++; } else { - // A hard constraint has been violated, so we cannot schedule - // this task request. + // A hard constraint has been violated. return -1; } } } - // No check custom resources. - for (const auto task_req_custom_resource : task_req.custom_resources) { - auto it = resources.custom_resources.find(task_req_custom_resource.id); + for (size_t i = 0; i < task_req.custom_resources.size(); i++) { + auto it = resources.custom_resources.find(task_req.custom_resources[i].id); if (it == resources.custom_resources.end()) { - // Requested resource doesn't exist at this node. However, this - // is a soft constraint, so just increment "violations" and continue. - if (task_req_custom_resource.soft) { + // Requested resource doesn't exist at this node. + if (task_req.custom_resources[i].soft) { violations++; } else { - // This is a hard constraint so cannot schedule this task request. return -1; } } else { - if (task_req_custom_resource.demand > it->second.available) { - // Resource constraint is violated, but since it is soft - // just increase the "violations" and continue. - if (task_req_custom_resource.soft) { + if (task_req.custom_resources[i].demand > it->second.available) { + // Resource constraint is violated. + if (task_req.custom_resources[i].soft) { violations++; } else { return -1; @@ -471,7 +323,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, auto it_p = task_req.placement_hints.find(node_id); if (it_p == task_req.placement_hints.end()) { // Node not found in the placement_hints list, so - // record this as a soft constraint violation. + // record this a soft constraint violation. violations++; } } @@ -481,8 +333,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task_req, int64_t *total_violations) { - // Minimum number of soft violations across all nodes that can schedule the request. - // We will pick the node with the smallest number of soft violations. + // Min number of violations across all nodes that can schedule the request. int64_t min_violations = INT_MAX; // Node associated to min_violations. int64_t best_node = -1; @@ -499,8 +350,9 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task // Check whether any node in the request placement_hints, satisfes // all resource constraints of the request. - for (const auto &task_req_placement_hint : task_req.placement_hints) { - auto it = nodes_.find(task_req_placement_hint); + for (auto it_p = task_req.placement_hints.begin(); + it_p != task_req.placement_hints.end(); ++it_p) { + auto it = nodes_.find(*it_p); if (it != nodes_.end()) { if (IsSchedulable(task_req, it->first, it->second) == 0) { return it->first; @@ -508,19 +360,19 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task } } - for (const auto &node : nodes_) { + for (auto it = nodes_.begin(); it != nodes_.end(); ++it) { // Return -1 if node not schedulable. otherwise return the number // of soft constraint violations. int64_t violations; - if ((violations = IsSchedulable(task_req, node.first, node.second)) == -1) { + if ((violations = IsSchedulable(task_req, it->first, it->second)) == -1) { continue; } // Update the node with the smallest number of soft constraints violated. if (min_violations > violations) { min_violations = violations; - best_node = node.first; + best_node = it->first; } if (violations == 0) { *total_violations = 0; @@ -534,15 +386,14 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task std::string ClusterResourceScheduler::GetBestSchedulableNode( const std::unordered_map &task_resources, int64_t *total_violations) { - TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); + TaskRequest task_request; + ResourceMapToTaskRequest(task_resources, &task_request); int64_t node_id = GetBestSchedulableNode(task_request, total_violations); std::string id_string; if (node_id == -1) { - // This is not a schedulable node, so return empty string. return ""; } - // Return the string name of the node. return string_to_int_map_.Get(node_id); } @@ -565,11 +416,11 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources( task_req.predefined_resources[i].demand); } - for (const auto &task_req_custom_resource : task_req.custom_resources) { - auto it = resources.custom_resources.find(task_req_custom_resource.id); + for (size_t i = 0; i < task_req.custom_resources.size(); i++) { + auto it = resources.custom_resources.find(task_req.custom_resources[i].id); if (it != resources.custom_resources.end()) { it->second.available = - std::max(0., it->second.available - task_req_custom_resource.demand); + std::max(0., it->second.available - task_req.custom_resources[i].demand); } } return true; @@ -578,7 +429,8 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources( bool ClusterResourceScheduler::SubtractNodeAvailableResources( const std::string &node_id, const std::unordered_map &resource_map) { - TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, resource_map); + TaskRequest task_request; + ResourceMapToTaskRequest(resource_map, &task_request); return SubtractNodeAvailableResources(string_to_int_map_.Get(node_id), task_request); } @@ -597,11 +449,11 @@ bool ClusterResourceScheduler::AddNodeAvailableResources(int64_t node_id, resources.predefined_resources[i].total); } - for (const auto &task_req_custom_resource : task_req.custom_resources) { - auto it = resources.custom_resources.find(task_req_custom_resource.id); + for (size_t i = 0; i < task_req.custom_resources.size(); i++) { + auto it = resources.custom_resources.find(task_req.custom_resources[i].id); if (it != resources.custom_resources.end()) { it->second.available = std::min( - it->second.available + task_req_custom_resource.demand, it->second.total); + it->second.available + task_req.custom_resources[i].demand, it->second.total); } } return true; @@ -610,7 +462,8 @@ bool ClusterResourceScheduler::AddNodeAvailableResources(int64_t node_id, bool ClusterResourceScheduler::AddNodeAvailableResources( const std::string &node_id, const std::unordered_map &resource_map) { - TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, resource_map); + TaskRequest task_request; + ResourceMapToTaskRequest(resource_map, &task_request); return AddNodeAvailableResources(string_to_int_map_.Get(node_id), task_request); } @@ -627,19 +480,79 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); } +void ClusterResourceScheduler::ResourceMapToNodeResources( + const std::unordered_map &resource_map_total, + const std::unordered_map &resource_map_available, + NodeResources *node_resources) { + node_resources->predefined_resources.resize(PredefinedResources_MAX); + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + node_resources->predefined_resources[i].total = + node_resources->predefined_resources[i].available = 0; + } + + for (auto it = resource_map_total.begin(); it != resource_map_total.end(); ++it) { + ResourceCapacity resource_capacity; + resource_capacity.total = (int64_t)it->second; + auto it2 = resource_map_available.find(it->first); + if (it2 == resource_map_available.end()) { + resource_capacity.available = 0; + } else { + resource_capacity.available = (int64_t)it2->second; + } + if (it->first == ray::kCPU_ResourceLabel) { + node_resources->predefined_resources[CPU] = resource_capacity; + } else if (it->first == ray::kGPU_ResourceLabel) { + node_resources->predefined_resources[GPU] = resource_capacity; + } else if (it->first == ray::kTPU_ResourceLabel) { + node_resources->predefined_resources[TPU] = resource_capacity; + } else if (it->first == ray::kMemory_ResourceLabel) { + node_resources->predefined_resources[MEM] = resource_capacity; + } else { + // This is a custom resource. + node_resources->custom_resources.emplace(string_to_int_map_.Insert(it->first), + resource_capacity); + } + } +} + +void ClusterResourceScheduler::ResourceMapToTaskRequest( + const std::unordered_map &resource_map, + TaskRequest *task_request) { + size_t i = 0; + + task_request->predefined_resources.resize(PredefinedResources_MAX); + task_request->custom_resources.resize(resource_map.size()); + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + task_request->predefined_resources[0].demand = 0; + task_request->predefined_resources[0].soft = false; + } + + for (auto it = resource_map.begin(); it != resource_map.end(); ++it) { + if (it->first == ray::kCPU_ResourceLabel) { + task_request->predefined_resources[CPU].demand = it->second; + } else if (it->first == ray::kGPU_ResourceLabel) { + task_request->predefined_resources[GPU].demand = it->second; + } else if (it->first == ray::kTPU_ResourceLabel) { + task_request->predefined_resources[TPU].demand = it->second; + } else if (it->first == ray::kMemory_ResourceLabel) { + task_request->predefined_resources[MEM].demand = it->second; + } else { + task_request->custom_resources[i].id = string_to_int_map_.Insert(it->first); + task_request->custom_resources[i].demand = it->second; + task_request->custom_resources[i].soft = false; + i++; + } + } + task_request->custom_resources.resize(i); +} + void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_id_string, const std::string &resource_name, int64_t resource_total) { int64_t client_id = string_to_int_map_.Get(client_id_string); - auto it = nodes_.find(client_id); if (it == nodes_.end()) { - NodeResources node_resources; - node_resources.predefined_resources.resize(PredefinedResources_MAX); - client_id = string_to_int_map_.Insert(client_id_string); - RAY_CHECK(nodes_.emplace(client_id, node_resources).second); - it = nodes_.find(client_id); - RAY_CHECK(it != nodes_.end()); + return; } int idx = -1; @@ -675,11 +588,10 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_ if (itr->second.total < 0) { itr->second.total = 0; } - } else { - ResourceCapacity resource_capacity; - resource_capacity.total = resource_capacity.available = resource_total; - it->second.custom_resources.emplace(resource_id, resource_capacity); } + ResourceCapacity resource_capacity; + resource_capacity.total = resource_capacity.available = resource_total; + it->second.custom_resources.emplace(resource_id, resource_capacity); } } @@ -713,19 +625,19 @@ void ClusterResourceScheduler::DeleteResource(const std::string &client_id_strin } } -std::string ClusterResourceScheduler::DebugString(void) const { +std::string ClusterResourceScheduler::DebugString(void) { std::stringstream buffer; - buffer << "\n Local id: " << local_node_id_; - buffer << " Local resources: " << local_resources_.DebugString(string_to_int_map_); - for (auto &node : nodes_) { - buffer << " node id: " << node.first; - buffer << node.second.DebugString(string_to_int_map_); + buffer << std::endl << "local node id: " << local_node_id_ << std::endl; + for (auto it = nodes_.begin(); it != nodes_.end(); ++it) { + buffer << "node id: " << it->first << std::endl; + buffer << it->second.DebugString(); } return buffer.str(); } void ClusterResourceScheduler::InitResourceInstances( - double total, bool unit_instances, ResourceInstanceCapacities *instance_list) { + double total, bool unit_instances, + ResourceInstanceCapacities *instance_list /* return */) { if (unit_instances) { size_t num_instances = static_cast(total); instance_list->total.resize(num_instances); @@ -766,38 +678,29 @@ void ClusterResourceScheduler::InitLocalResources(const NodeResources &node_reso } } -std::vector ClusterResourceScheduler::AddAvailableResourceInstances( - std::vector available, ResourceInstanceCapacities *resource_instances) { - std::vector overflow(available.size(), 0.); +void ClusterResourceScheduler::AddAvailableResourceInstances( + std::vector available, + ResourceInstanceCapacities *resource_instances /* return */) { for (size_t i = 0; i < available.size(); i++) { - resource_instances->available[i] = resource_instances->available[i] + available[i]; - if (resource_instances->available[i] > resource_instances->total[i]) { - overflow[i] = resource_instances->available[i] - resource_instances->total[i]; - resource_instances->available[i] = resource_instances->total[i]; - } + resource_instances->available[i] = std::min( + resource_instances->available[i] + available[i], resource_instances->total[i]); } - - return overflow; } -std::vector ClusterResourceScheduler::SubtractAvailableResourceInstances( - std::vector available, ResourceInstanceCapacities *resource_instances) { +void ClusterResourceScheduler::SubtractAvailableResourceInstances( + std::vector available, + ResourceInstanceCapacities *resource_instances /* return */) { RAY_CHECK(available.size() == resource_instances->available.size()); - std::vector underflow(available.size(), 0.); for (size_t i = 0; i < available.size(); i++) { - resource_instances->available[i] = resource_instances->available[i] - available[i]; - if (resource_instances->available[i] < 0) { - underflow[i] = -resource_instances->available[i]; - resource_instances->available[i] = 0; - } + resource_instances->available[i] = + std::max(resource_instances->available[i] - available[i], 0.); } - return underflow; } bool ClusterResourceScheduler::AllocateResourceInstances( double demand, bool soft, std::vector &available, - std::vector *allocation) { + std::vector *allocation /* return */) { allocation->resize(available.size()); double remaining_demand = demand; @@ -887,9 +790,14 @@ bool ClusterResourceScheduler::AllocateResourceInstances( } bool ClusterResourceScheduler::AllocateTaskResourceInstances( - const TaskRequest &task_req, std::shared_ptr task_allocation) { - RAY_CHECK(task_allocation != nullptr); - if (nodes_.find(local_node_id_) == nodes_.end()) { + const TaskRequest &task_req, TaskResourceInstances *task_allocation /* return */) { + auto it = nodes_.find(local_node_id_); + if (it == nodes_.end()) { + return false; + } + + // Just double check this node can still schedule the task request. + if (IsSchedulable(task_req, local_node_id_, it->second) == -1) { return false; } @@ -902,19 +810,19 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances( &task_allocation->predefined_resources[i])) { // Allocation failed. Restore node's local resources by freeing the resources // of the failed allocation. - FreeTaskResourceInstances(task_allocation); + FreeTaskResourceInstances(*task_allocation); return false; } } } - for (const auto &task_req_custom_resource : task_req.custom_resources) { - auto it = local_resources_.custom_resources.find(task_req_custom_resource.id); + for (size_t i = 0; i < task_req.custom_resources.size(); i++) { + auto it = local_resources_.custom_resources.find(task_req.custom_resources[i].id); if (it != local_resources_.custom_resources.end()) { - if (task_req_custom_resource.demand > 0) { + if (task_req.custom_resources[i].demand > 0) { std::vector allocation; - bool success = AllocateResourceInstances(task_req_custom_resource.demand, - task_req_custom_resource.soft, + bool success = AllocateResourceInstances(task_req.custom_resources[i].demand, + task_req.custom_resources[i].soft, it->second.available, &allocation); // Even if allocation failed we need to remember partial allocations to correctly // free resources. @@ -922,7 +830,7 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances( if (!success) { // Allocation failed. Restore node's local resources by freeing the resources // of the failed allocation. - FreeTaskResourceInstances(task_allocation); + FreeTaskResourceInstances(*task_allocation); return false; } } @@ -933,128 +841,30 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances( return true; } -void ClusterResourceScheduler::UpdateLocalAvailableResourcesFromResourceInstances() { - auto it_local_node = nodes_.find(local_node_id_); - RAY_CHECK(it_local_node != nodes_.end()); - - for (size_t i = 0; i < PredefinedResources_MAX; i++) { - it_local_node->second.predefined_resources[i].available = 0; - for (size_t j = 0; j < local_resources_.predefined_resources[i].available.size(); - j++) { - it_local_node->second.predefined_resources[i].available += - local_resources_.predefined_resources[i].available[j]; - } - } - - for (auto &custom_resource : it_local_node->second.custom_resources) { - auto it = local_resources_.custom_resources.find(custom_resource.first); - if (it != local_resources_.custom_resources.end()) { - custom_resource.second.available = 0; - for (const auto available : it->second.available) { - custom_resource.second.available += available; - } - } - } -} - void ClusterResourceScheduler::FreeTaskResourceInstances( - std::shared_ptr task_allocation) { - RAY_CHECK(task_allocation != nullptr); + TaskResourceInstances &task_allocation) { for (size_t i = 0; i < PredefinedResources_MAX; i++) { - AddAvailableResourceInstances(task_allocation->predefined_resources[i], + AddAvailableResourceInstances(task_allocation.predefined_resources[i], &local_resources_.predefined_resources[i]); } - for (const auto task_allocation_custom_resource : task_allocation->custom_resources) { - auto it = - local_resources_.custom_resources.find(task_allocation_custom_resource.first); - if (it != local_resources_.custom_resources.end()) { - AddAvailableResourceInstances(task_allocation_custom_resource.second, &it->second); + for (auto it = task_allocation.custom_resources.begin(); + it != task_allocation.custom_resources.end(); it++) { + auto it_local = local_resources_.custom_resources.find(it->first); + if (it_local != local_resources_.custom_resources.end()) { + AddAvailableResourceInstances(it->second, &it_local->second); } } } -std::vector ClusterResourceScheduler::AddCPUResourceInstances( +void ClusterResourceScheduler::AddCPUResourceInstances( std::vector &cpu_instances) { - if (cpu_instances.size() == 0) { - return cpu_instances; // No oveerflow. - } - RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end()); - - auto overflow = AddAvailableResourceInstances( - cpu_instances, &local_resources_.predefined_resources[CPU]); - UpdateLocalAvailableResourcesFromResourceInstances(); - - return overflow; + AddAvailableResourceInstances(cpu_instances, + &local_resources_.predefined_resources[CPU]); } -std::vector ClusterResourceScheduler::SubtractCPUResourceInstances( +void ClusterResourceScheduler::SubtractCPUResourceInstances( std::vector &cpu_instances) { - if (cpu_instances.size() == 0) { - return cpu_instances; // No underflow. - } - RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end()); - - auto underflow = SubtractAvailableResourceInstances( - cpu_instances, &local_resources_.predefined_resources[CPU]); - UpdateLocalAvailableResourcesFromResourceInstances(); - - return underflow; -} - -bool ClusterResourceScheduler::AllocateTaskResources( - int64_t node_id, const TaskRequest &task_req, - std::shared_ptr task_allocation) { - if (node_id == local_node_id_) { - RAY_CHECK(task_allocation != nullptr); - if (AllocateTaskResourceInstances(task_req, task_allocation)) { - UpdateLocalAvailableResourcesFromResourceInstances(); - return true; - } - } else { - if (SubtractNodeAvailableResources(node_id, task_req)) { - return true; - } - } - return false; -} - -bool ClusterResourceScheduler::AllocateLocalTaskResources( - const std::unordered_map &task_resources, - std::shared_ptr task_allocation) { - RAY_CHECK(task_allocation != nullptr); - TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); - return AllocateTaskResources(local_node_id_, task_request, task_allocation); -} - -std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx) { - if (res_idx == CPU) { - return ray::kCPU_ResourceLabel; - } else if (res_idx == GPU) { - return ray::kGPU_ResourceLabel; - } else if (res_idx == TPU) { - return ray::kTPU_ResourceLabel; - } else if (res_idx == MEM) { - return ray::kMemory_ResourceLabel; - } else { - return string_to_int_map_.Get((uint64_t)res_idx); - } -} - -void ClusterResourceScheduler::AllocateRemoteTaskResources( - std::string &node_string, - const std::unordered_map &task_resources) { - TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); - auto node_id = string_to_int_map_.Insert(node_string); - RAY_CHECK(node_id != local_node_id_); - AllocateTaskResources(node_id, task_request, nullptr); -} - -void ClusterResourceScheduler::FreeLocalTaskResources( - std::shared_ptr task_allocation) { - if (task_allocation == nullptr || task_allocation->IsEmpty()) { - return; - } - FreeTaskResourceInstances(task_allocation); - UpdateLocalAvailableResourcesFromResourceInstances(); + SubtractAvailableResourceInstances(cpu_instances, + &local_resources_.predefined_resources[CPU]); } diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.h b/src/ray/common/scheduling/cluster_resource_scheduler.h index fb114b146..8ad84064c 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.h +++ b/src/ray/common/scheduling/cluster_resource_scheduler.h @@ -60,7 +60,6 @@ struct ResourceRequestWithId : ResourceRequest { int64_t id; }; -// Data structure specifying the capacity of each resource requested by a task. class TaskRequest { public: /// List of predefined resources required by the task. @@ -73,11 +72,10 @@ class TaskRequest { /// nodes in this list can schedule this task. absl::flat_hash_set placement_hints; /// Returns human-readable string for this task request. - std::string DebugString() const; + std::string DebugString(); }; -// Data structure specifying the capacity of each instance of each resource -// allocated to a task. +// Task request specifying instances for each resource. class TaskResourceInstances { public: /// The list of instances of each predifined resource allocated to a task. @@ -85,20 +83,10 @@ class TaskResourceInstances { /// The list of instances of each custom resource allocated to a task. absl::flat_hash_map> custom_resources; bool operator==(const TaskResourceInstances &other); - /// For each resource of this request aggregate its instances. - TaskRequest ToTaskRequest() const; /// Get CPU instances only. - std::vector GetCPUInstances() const { - if (!this->predefined_resources.empty()) { - return this->predefined_resources[CPU]; - } else { - return {}; - } - }; - /// Check whether there are no resource instances. - bool IsEmpty() const; + std::vector GetCPUInstances() { return this->predefined_resources[CPU]; }; /// Returns human-readable string for these resources. - std::string DebugString() const; + std::string DebugString(); }; /// Total and available capacities of each resource of a node. @@ -112,7 +100,7 @@ class NodeResources { /// Returns if this equals another node resources. bool operator==(const NodeResources &other); /// Returns human-readable string for these resources. - std::string DebugString(StringIdMap string_to_int_map) const; + std::string DebugString(); }; /// Total and available capacities of each resource instance. @@ -129,7 +117,7 @@ class NodeResourceInstances { /// Returns if this equals another node resources. bool operator==(const NodeResourceInstances &other); /// Returns human-readable string for these resources. - std::string DebugString(StringIdMap string_to_int_map) const; + std::string DebugString(); }; /// Class encapsulating the cluster resources and the logic to assign @@ -161,19 +149,6 @@ class ClusterResourceScheduler { const absl::flat_hash_map &new_custom_resources, absl::flat_hash_map *old_custom_resources); - /// Subtract the resources required by a given task request (task_req) from - /// a given node (node_id). - /// - /// \param node_id Node whose resources we allocate. Can be the local or a remote node. - /// \param task_req Task for which we allocate resources. - /// \param task_allocation Resources allocated to the task at instance granularity. - /// This is a return parameter. - /// - /// \return True if the node has enough resources to satisfy the task request. - /// False otherwise. - bool AllocateTaskResources(int64_t node_id, const TaskRequest &task_req, - std::shared_ptr task_allocation); - public: ClusterResourceScheduler(void){}; @@ -188,9 +163,6 @@ class ClusterResourceScheduler { const std::string &local_node_id, const std::unordered_map &local_node_resources); - // Mapping from predefined resource indexes to resource strings - std::string GetResourceNameFromIndex(int64_t res_idx); - /// Add a new node or overwrite the resources of an existing node. /// /// \param node_id: Node ID. @@ -292,19 +264,24 @@ class ClusterResourceScheduler { /// Get number of nodes in the cluster. int64_t NumNodes(); - /// Update total capacity of a given resource of a given node. - /// - /// \param node_name: Node whose resource we want to update. - /// \param resource_name: Resource which we want to update. - /// \param resource_total: New capacity of the resource. - void UpdateResourceCapacity(const std::string &node_name, + /// Convert a map of resources to a TaskRequest data structure. + void ResourceMapToTaskRequest( + const std::unordered_map &resource_map, + TaskRequest *task_request); + + /// Convert a map of resources to a TaskRequest data structure. + void ResourceMapToNodeResources( + const std::unordered_map &resource_map_total, + const std::unordered_map &resource_map_available, + NodeResources *node_resources); + + /// Update total capacity of resource resource_name at node client_id. + void UpdateResourceCapacity(const std::string &client_id, const std::string &resource_name, int64_t resource_total); - /// Delete a given resource from a given node. - /// - /// \param node_name: Node whose resource we want to delete. - /// \param resource_name: Resource we want to delete - void DeleteResource(const std::string &node_name, const std::string &resource_name); + /// Delete resource resource_name from node cleint_id_string. + void DeleteResource(const std::string &client_id_string, + const std::string &resource_name); /// Return local resources. NodeResourceInstances GetLocalResources() { return local_resources_; }; @@ -328,27 +305,25 @@ class ClusterResourceScheduler { ResourceInstanceCapacities *instance_list); /// Allocate enough capacity across the instances of a resource to satisfy "demand". - /// If resource has multiple unit-capacity instances, we consider two cases. + /// If resource has multiple unit-capacity instance, we consider two cases. /// /// 1) If the constraint is hard, allocate full unit-capacity instances until - /// demand becomes fractional, and then satisfy the fractional demand using the + /// demand becomes fractional, and then satisfy the fractional deman using the /// instance with the smallest available capacity that can satisfy the fractional /// demand. For example, assume a resource conisting of 4 instances, with available /// capacities: (1., 1., .7, 0.5) and deman of 1.2. Then we allocate one full /// instance and then allocate 0.2 of the 0.5 instance (as this is the instance /// with the smalest available capacity that can satisfy the remaining demand of 0.2). - /// As a result remaining available capacities will be (0., 1., .7, .3). - /// Thus, if the constraint is hard, we will allocate a bunch of full instances and - /// at most a fractional instance. + /// As a result remaining available capacities will be (0., 1., .7, .2). + /// Thus, if the constraint is hard, we will allocate at most a fractional resource. /// /// 2) If the constraint is soft, we can allocate multiple fractional resources, /// and even overallocate the resource. For example, in the previous case, if we /// have a demand of 1.8, we can allocate one full instance, the 0.5 instance, and - /// 0.3 from the 0.7 instance. Furthermore, if the demand is 3.5, then we allocate + /// 0.1 from the 0.7 instance. Furthermore, if the demand is 3.5, then we allocate /// all instances, and return success (true), despite the fact that the total /// available capacity of the rwsource is 3.2 (= 1. + 1. + .7 + .5), which is less - /// than the demand, 3.5. In this case, the remaining available resource is - /// (0., 0., 0., 0.) + /// than the demand, 3.5. /// /// \param demand: The resource amount to be allocated. /// \param soft: Specifies whether this demand has soft or hard constraints. @@ -365,94 +340,45 @@ class ClusterResourceScheduler { /// /// \param task_req: Resources requested by a task. /// \param task_allocation: Local resources allocated to satsify task_req demand. + /// This is an output argument. /// /// \return true, if allocation successful. If false, the caller needs to free the /// allocated resources, i.e., task_allocation. - bool AllocateTaskResourceInstances( - const TaskRequest &task_req, - std::shared_ptr task_allocation); + bool AllocateTaskResourceInstances(const TaskRequest &task_req, + TaskResourceInstances *task_allocation); /// Free resources which were allocated with a task. The freed resources are /// added back to the node's local available resources. /// /// \param task_allocation: Task's resources to be freed. - void FreeTaskResourceInstances(std::shared_ptr task_allocation); + void FreeTaskResourceInstances(TaskResourceInstances &task_allocation); /// Increase the available capacities of the instances of a given resource. /// /// \param available A list of available capacities for resource's instances. /// \param resource_instances List of the resource instances being updated. - /// - /// \return Overflow capacities of "resource_instances" after adding instance - /// capacities in "available", i.e., - /// min(available + resource_instances.available, resource_instances.total) - std::vector AddAvailableResourceInstances( - std::vector available, ResourceInstanceCapacities *resource_instances); + void AddAvailableResourceInstances(std::vector available, + ResourceInstanceCapacities *resource_instances); /// Decrease the available capacities of the instances of a given resource. /// /// \param free A list of capacities for resource's instances to be freed. /// \param resource_instances List of the resource instances being updated. - /// \return Underflow of "resource_instances" after subtracting instance - /// capacities in "available", i.e.,. - /// max(available - reasource_instances.available, 0) - std::vector SubtractAvailableResourceInstances( - std::vector available, ResourceInstanceCapacities *resource_instances); + void SubtractAvailableResourceInstances(std::vector free, + ResourceInstanceCapacities *resource_instances); /// Increase the available CPU instances of this node. /// /// \param cpu_instances CPU instances to be added to available cpus. - /// - /// \return Overflow capacities of CPU instances after adding CPU - /// capacities in cpu_instances. - std::vector AddCPUResourceInstances(std::vector &cpu_instances); + void AddCPUResourceInstances(std::vector &cpu_instances); - /// Decrease the available CPU instances of this node. + /// Decrease the available cpu instances of this node. /// - /// \param cpu_instances CPU instances to be removed from available cpus. - /// - /// \return Underflow capacities of CPU instances after subtracting CPU - /// capacities in cpu_instances. - std::vector SubtractCPUResourceInstances(std::vector &cpu_instances); - - /// Subtract the resources required by a given task request (task_req) from the - /// local node. This function also updates the local node resources - /// at the instance granularity. - /// - /// \param task_req Task for which we allocate resources. - /// \param task_allocation Resources allocated to the task at instance granularity. - /// This is a return parameter. - /// - /// \return True if local node has enough resources to satisfy the task request. - /// False otherwise. - bool AllocateLocalTaskResources( - const std::unordered_map &task_resources, - std::shared_ptr task_allocation); - - /// Subtract the resources required by a given task request (task_req) from a given - /// remote node. - /// - /// \param node_id Remote node whose resources we allocate. - /// \param task_req Task for which we allocate resources. - void AllocateRemoteTaskResources( - std::string &node_id, - const std::unordered_map &task_resources); - - void FreeLocalTaskResources(std::shared_ptr task_allocation); - - /// Update the available resources of the local node given - /// the available instances of each resource of the local node. - /// Basically, this means computing the available resources - /// by adding up the available quantities of each instance of that - /// resources. - /// - /// Example: Assume the local node has four GPU instances with the - /// following availabilities: 0.2, 0.3, 0.1, 1. Then the total GPU - // resources availabile at that node is 0.2 + 0.3 + 0.1 + 1. = 1.6 - void UpdateLocalAvailableResourcesFromResourceInstances(); + /// \param cpu_instances Cpu instances to be removed from available cpus. + void SubtractCPUResourceInstances(std::vector &cpu_instances); /// Return human-readable string for this scheduler state. - std::string DebugString() const; + std::string DebugString(); }; #endif // RAY_COMMON_SCHEDULING_SCHEDULING_H diff --git a/src/ray/common/scheduling/scheduling_test.cc b/src/ray/common/scheduling/scheduling_test.cc index 087af7d1e..28a97ea88 100644 --- a/src/ray/common/scheduling/scheduling_test.cc +++ b/src/ray/common/scheduling/scheduling_test.cc @@ -596,10 +596,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { EmptyBoolVector, EmptyIntVector); NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); - std::shared_ptr task_allocation = - std::make_shared(); + TaskResourceInstances task_allocation; bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation); ASSERT_EQ(success, true); @@ -622,10 +621,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { EmptyBoolVector, EmptyIntVector); NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); - std::shared_ptr task_allocation = - std::make_shared(); + TaskResourceInstances task_allocation; bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation); ASSERT_EQ(success, false); ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); @@ -644,10 +642,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { EmptyBoolVector, EmptyIntVector); NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); - std::shared_ptr task_allocation = - std::make_shared(); + TaskResourceInstances task_allocation; bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation); ASSERT_EQ(success, true); @@ -680,10 +677,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { EmptyIntVector); NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); - std::shared_ptr task_allocation = - std::make_shared(); + TaskResourceInstances task_allocation; bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation); ASSERT_EQ(success, true); @@ -710,10 +706,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { EmptyIntVector); NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); - std::shared_ptr task_allocation = - std::make_shared(); + TaskResourceInstances task_allocation; bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation); ASSERT_EQ(success, false); ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); @@ -737,10 +732,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { EmptyIntVector); NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); - std::shared_ptr task_allocation = - std::make_shared(); + TaskResourceInstances task_allocation; bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); + cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation); ASSERT_EQ(success, true); @@ -758,138 +752,6 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { } } -TEST_F(SchedulingTest, TaskResourceInstancesTest2) { - { - NodeResources node_resources; - vector pred_capacities{4 /* CPU */, 4 /* MEM */, 5 /* GPU */}; - vector cust_ids{1, 2}; - vector cust_capacities{4, 4}; - initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); - - TaskRequest task_req; - vector pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */}; - vector pred_soft = {false}; - vector cust_demands{3, 2}; - vector cust_soft{false, false}; - initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, - EmptyIntVector); - - std::shared_ptr task_allocation = - std::make_shared(); - bool success = - cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation); - - NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources(); - ASSERT_EQ(success, true); - std::vector cpu_instances = task_allocation->GetCPUInstances(); - cluster_resources.AddCPUResourceInstances(cpu_instances); - cluster_resources.SubtractCPUResourceInstances(cpu_instances); - - ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true); - } -} - -TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) { - { - NodeResources node_resources; - vector pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */}; - vector cust_ids{1}; - vector cust_capacities{8}; - initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); - - std::vector allocate_cpu_instances{0.5, 0.5, 0.5, 0.5}; - cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances); - std::vector available_cpu_instances = cluster_resources.GetLocalResources() - .GetAvailableResourceInstances() - .GetCPUInstances(); - std::vector expected_available_cpu_instances{0.5, 0.5, 0.5, 0.5}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); - - cluster_resources.AddCPUResourceInstances(allocate_cpu_instances); - available_cpu_instances = cluster_resources.GetLocalResources() - .GetAvailableResourceInstances() - .GetCPUInstances(); - expected_available_cpu_instances = {1., 1., 1., 1.}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); - - allocate_cpu_instances = {1.5, 1.5, .5, 1.5}; - std::vector underflow = - cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances); - std::vector expected_underflow{.5, .5, 0., .5}; - ASSERT_TRUE( - std::equal(underflow.begin(), underflow.end(), expected_underflow.begin())); - available_cpu_instances = cluster_resources.GetLocalResources() - .GetAvailableResourceInstances() - .GetCPUInstances(); - expected_available_cpu_instances = {0., 0., 0.5, 0.}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); - - allocate_cpu_instances = {1.0, .5, 1., .5}; - std::vector overflow = - cluster_resources.AddCPUResourceInstances(allocate_cpu_instances); - std::vector expected_overflow{.0, .0, .5, 0.}; - ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin())); - available_cpu_instances = cluster_resources.GetLocalResources() - .GetAvailableResourceInstances() - .GetCPUInstances(); - expected_available_cpu_instances = {1., .5, 1., .5}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(), - expected_available_cpu_instances.begin())); - } -} - -TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) { - { - NodeResources node_resources; - vector pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */}; - vector cust_ids{1}; - vector cust_capacities{8}; - initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); - ClusterResourceScheduler cluster_resources(0, node_resources); - - { - std::vector allocate_cpu_instances{0.5, 0.5, 2, 0.5}; - // SubtractCPUResourceInstances() calls - // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. - cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances); - std::vector available_cpu_instances = cluster_resources.GetLocalResources() - .GetAvailableResourceInstances() - .GetCPUInstances(); - std::vector expected_available_cpu_instances{0.5, 0.5, 0., 0.5}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), - available_cpu_instances.end(), - expected_available_cpu_instances.begin())); - - NodeResources nr; - cluster_resources.GetNodeResources(0, &nr); - ASSERT_TRUE(nr.predefined_resources[0].available == 1.5); - } - - { - std::vector allocate_cpu_instances{1.5, 0.5, 2, 0.3}; - // SubtractCPUResourceInstances() calls - // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. - cluster_resources.AddCPUResourceInstances(allocate_cpu_instances); - std::vector available_cpu_instances = cluster_resources.GetLocalResources() - .GetAvailableResourceInstances() - .GetCPUInstances(); - std::vector expected_available_cpu_instances{1., 1., 1., 0.8}; - ASSERT_TRUE(std::equal(available_cpu_instances.begin(), - available_cpu_instances.end(), - expected_available_cpu_instances.begin())); - - NodeResources nr; - cluster_resources.GetNodeResources(0, &nr); - ASSERT_TRUE(nr.predefined_resources[0].available == 3.8); - } - } -} - #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION TEST_F(SchedulingTest, SchedulingMapPerformanceTest) { size_t map_len = 1000000; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 516cc2f6d..35b901b2d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -84,41 +84,6 @@ namespace ray { namespace raylet { -// A helper function to print the leased workers. -std::string LeasedWorkersSring( - const std::unordered_map> &leased_workers) { - std::stringstream buffer; - buffer << " @leased_workers: ("; - for (const auto &pair : leased_workers) { - auto &worker = pair.second; - buffer << worker->WorkerId() << ", "; - } - buffer << ")"; - return buffer.str(); -} - -// A helper function to print the workers in worker_pool_. -std::string WorkerPoolString(const std::vector> &worker_pool) { - std::stringstream buffer; - buffer << " @worker_pool: ("; - for (const auto &worker : worker_pool) { - buffer << worker->WorkerId() << ", "; - } - buffer << ")"; - return buffer.str(); -} - -// Helper function to print the worker's owner worker and and node owner. -std::string WorkerOwnerString(std::shared_ptr &worker) { - std::stringstream buffer; - const auto owner_worker_id = - WorkerID::FromBinary(worker->GetOwnerAddress().worker_id()); - const auto owner_node_id = WorkerID::FromBinary(worker->GetOwnerAddress().raylet_id()); - buffer << "leased_worker Lease " << worker->WorkerId() << " owned by " - << owner_worker_id << " / " << owner_node_id; - return buffer.str(); -} - NodeManager::NodeManager(boost::asio::io_service &io_service, const ClientID &self_node_id, const NodeManagerConfig &config, ObjectManager &object_manager, @@ -702,6 +667,7 @@ void NodeManager::ResourceDeleted(const ClientID &client_id, new_resource_scheduler_->DeleteResource(client_id.Binary(), resource_label); } } + RAY_LOG(DEBUG) << "[ResourceDeleted] Updated cluster_resource_map."; return; } @@ -735,6 +701,7 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, << client_id; return; } + // Trigger local GC at the next heartbeat interval. if (heartbeat_data.should_global_gc()) { should_local_gc_ = true; @@ -909,7 +876,6 @@ void NodeManager::DispatchTasks( // one class of tasks become stuck behind others in the queue, causing Ray to start // many workers. See #3644 for a more detailed description of this issue. std::vector> *> fair_order; - RAY_CHECK(new_scheduler_enabled_ == false); for (auto &it : tasks_by_class) { fair_order.emplace_back(&it); } @@ -966,7 +932,6 @@ void NodeManager::ProcessClientMessage( << (registered_worker ? std::to_string(registered_worker->GetProcess().GetId()) : "nil"); - if (registered_worker && registered_worker->IsDead()) { // For a worker that is marked as dead (because the job has died already), // all the messages are ignored except DisconnectClient. @@ -1081,6 +1046,8 @@ void NodeManager::ProcessRegisterClientRequestMessage( static_cast(protocol::MessageType::RegisterClientReply), fbb.GetSize(), fbb.GetBufferPointer(), [this, client](const ray::Status &status) { if (!status.ok()) { + RAY_LOG(WARNING) + << "Failed to send RegisterClientReply to client, so disconnecting"; ProcessDisconnectClientMessage(client); } }); @@ -1179,7 +1146,6 @@ void NodeManager::HandleWorkerAvailable( void NodeManager::HandleWorkerAvailable(const std::shared_ptr &worker) { RAY_CHECK(worker); bool worker_idle = true; - // If the worker was assigned a task, mark it as finished. if (!worker->GetAssignedTaskId().IsNil()) { worker_idle = FinishAssignedTask(*worker); @@ -1190,10 +1156,10 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr &worker) { worker_pool_.PushWorker(worker); } - // Local resource availability changed: invoke scheduling policy for local node. if (new_scheduler_enabled_) { - NewSchedulerSchedulePendingTasks(); + DispatchScheduledTasksToWorkers(); } else { + // Local resource availability changed: invoke scheduling policy for local node. cluster_resource_map_[self_node_id_].SetLoadResources( local_queues_.GetResourceLoad()); // Call task dispatch to assign work to the new worker. @@ -1216,10 +1182,10 @@ void NodeManager::ProcessDisconnectClientMessage( } else { RAY_LOG(INFO) << "Ignoring client disconnect because the client has already " << "been disconnected."; - return; } } RAY_CHECK(!(is_worker && is_driver)); + // If the client has any blocked tasks, mark them as unblocked. In // particular, we are no longer waiting for their dependencies. if (worker) { @@ -1238,7 +1204,6 @@ void NodeManager::ProcessDisconnectClientMessage( // Clean up any open ray.wait calls that the worker made. task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); } - // Erase any lease metadata. leased_workers_.erase(worker->WorkerId()); @@ -1296,34 +1261,24 @@ void NodeManager::ProcessDisconnectClientMessage( worker_pool_.DisconnectWorker(worker); // Return the resources that were being used by this worker. - if (new_scheduler_enabled_) { - new_resource_scheduler_->SubtractCPUResourceInstances( - worker->GetBorrowedCPUInstances()); - new_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances()); - worker->ClearAllocatedInstances(); - new_resource_scheduler_->FreeLocalTaskResources( - worker->GetLifetimeAllocatedInstances()); - worker->ClearLifetimeAllocatedInstances(); - } else { - auto const &task_resources = worker->GetTaskResourceIds(); - local_available_resources_.ReleaseConstrained( - task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); - cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); - worker->ResetTaskResourceIds(); + auto const &task_resources = worker->GetTaskResourceIds(); + local_available_resources_.ReleaseConstrained( + task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); + worker->ResetTaskResourceIds(); - auto const &lifetime_resources = worker->GetLifetimeResourceIds(); - local_available_resources_.ReleaseConstrained( - lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); - cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); - worker->ResetLifetimeResourceIds(); - } + auto const &lifetime_resources = worker->GetLifetimeResourceIds(); + local_available_resources_.ReleaseConstrained( + lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); + worker->ResetLifetimeResourceIds(); - // Since some resources may have been released, we can try to dispatch more tasks. YYY - if (new_scheduler_enabled_) { - NewSchedulerSchedulePendingTasks(); - } else { - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } + RAY_LOG(DEBUG) << "Worker (pid=" << worker->GetProcess().GetId() + << ") is disconnected. " + << "job_id: " << worker->GetAssignedJobId(); + + // Since some resources may have been released, we can try to dispatch more tasks. + DispatchTasks(local_queues_.GetReadyTasksByClass()); } else if (is_driver) { // The client is a driver. const auto job_id = worker->GetAssignedJobId(); @@ -1335,7 +1290,7 @@ void NodeManager::ProcessDisconnectClientMessage( RAY_LOG(DEBUG) << "Driver (pid=" << worker->GetProcess().GetId() << ") is disconnected. " - << "job_id: " << worker->GetAssignedJobId(); + << "job_id: " << job_id; } client->Close(); @@ -1421,6 +1376,9 @@ void NodeManager::ProcessWaitRequestMessage( } } else { // We failed to write to the client, so disconnect the client. + RAY_LOG(WARNING) + << "Failed to send WaitReply to client, so disconnecting client"; + // We failed to send the reply to the client, so disconnect the worker. ProcessDisconnectClientMessage(client); } }); @@ -1449,7 +1407,7 @@ void NodeManager::ProcessWaitForDirectActorCallArgsRequestMessage( [this, client, tag](std::vector found, std::vector remaining) { RAY_CHECK(remaining.empty()); std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); - if (!worker) { + if (worker == nullptr) { RAY_LOG(ERROR) << "Lost worker for wait request " << client; } else { worker->DirectActorCallArgWaitComplete(tag); @@ -1535,67 +1493,38 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) { void NodeManager::DispatchScheduledTasksToWorkers() { RAY_CHECK(new_scheduler_enabled_); - - // Check every task in task_to_dispatch queue to see - // whether it can be dispatched and ran. This avoids head-of-line - // blocking where a task which cannot be dispatched because - // there are not enough available resources blocks other - // tasks from being dispatched. - for (size_t queue_size = tasks_to_dispatch_.size(); queue_size > 0; queue_size--) { + while (!tasks_to_dispatch_.empty()) { auto task = tasks_to_dispatch_.front(); auto reply = task.first; auto spec = task.second.GetTaskSpecification(); - tasks_to_dispatch_.pop_front(); - std::shared_ptr worker = worker_pool_.PopWorker(spec); - if (!worker) { - // No worker available to schedule this task. - // Put the task back in the dispatch queue. - tasks_to_dispatch_.push_front(task); + if (worker == nullptr) { return; } - std::shared_ptr allocated_instances( - new TaskResourceInstances()); - bool schedulable = new_resource_scheduler_->AllocateLocalTaskResources( - spec.GetRequiredResources().GetResourceMap(), allocated_instances); + bool schedulable = new_resource_scheduler_->SubtractNodeAvailableResources( + self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap()); if (!schedulable) { - // Not enough resources to schedule this task. - // Put it back at the end of the dispatch queue. - tasks_to_dispatch_.push_back(task); - worker_pool_.PushWorker(worker); - // Try next task in the dispatch queue. - continue; + return; } - worker->SetOwnerAddress(spec.CallerAddress()); + // Handle the allocation to specific resource IDs. + auto acquired_resources = + local_available_resources_.Acquire(spec.GetRequiredResources()); + cluster_resource_map_[self_node_id_].Acquire(spec.GetRequiredResources()); if (spec.IsActorCreationTask()) { - worker->SetLifetimeAllocatedInstances(allocated_instances); + worker->SetLifetimeResourceIds(acquired_resources); } else { - worker->SetAllocatedInstances(allocated_instances); + worker->SetTaskResourceIds(acquired_resources); } - worker->AssignTaskId(spec.TaskId()); - worker->AssignJobId(spec.JobId()); - worker->SetAssignedTask(task.second); reply(worker, ClientID::Nil(), "", -1); + tasks_to_dispatch_.pop_front(); } } void NodeManager::NewSchedulerSchedulePendingTasks() { RAY_CHECK(new_scheduler_enabled_); - size_t queue_size = tasks_to_schedule_.size(); - - // Check every task in task_to_schedule queue to see - // whether it can be scheduled. This avoids head-of-line - // blocking where a task which cannot be scheduled because - // there are not enough available resources blocks other - // tasks from being scheduled. - while (queue_size > 0) { - if (queue_size == 0) { - return; - } else { - queue_size--; - } + while (!tasks_to_schedule_.empty()) { auto work = tasks_to_schedule_.front(); auto task = work.second; auto request_resources = @@ -1605,16 +1534,13 @@ void NodeManager::NewSchedulerSchedulePendingTasks() { new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations); if (node_id_string.empty()) { /// There is no node that has available resources to run the request. - tasks_to_schedule_.pop_front(); - tasks_to_schedule_.push_back(work); - continue; + break; } else { if (node_id_string == self_node_id_.Binary()) { WaitForTaskArgsRequests(work); } else { - new_resource_scheduler_->AllocateRemoteTaskResources(node_id_string, - request_resources); - + new_resource_scheduler_->SubtractNodeAvailableResources(node_id_string, + request_resources); ClientID node_id = ClientID::FromBinary(node_id_string); auto node_info_opt = gcs_client_->Nodes().Get(node_id); RAY_CHECK(node_info_opt) @@ -1631,19 +1557,17 @@ void NodeManager::NewSchedulerSchedulePendingTasks() { void NodeManager::WaitForTaskArgsRequests(std::pair &work) { RAY_CHECK(new_scheduler_enabled_); - const Task &task = work.second; - std::vector object_ids = task.GetTaskSpecification().GetDependencies(); + std::vector object_ids = work.second.GetTaskSpecification().GetDependencies(); if (object_ids.size() > 0) { - bool args_ready = task_dependency_manager_.SubscribeGetDependencies( - task.GetTaskSpecification().TaskId(), task.GetDependencies()); - if (args_ready) { - task_dependency_manager_.UnsubscribeGetDependencies( - task.GetTaskSpecification().TaskId()); - tasks_to_dispatch_.push_back(work); - } else { - waiting_tasks_[task.GetTaskSpecification().TaskId()] = work; - } + ray::Status status = object_manager_.Wait( + object_ids, -1, object_ids.size(), false, + [this, work](std::vector found, std::vector remaining) { + RAY_CHECK(remaining.empty()); + tasks_to_dispatch_.push_back(work); + DispatchScheduledTasksToWorkers(); + }); + RAY_CHECK_OK(status); } else { tasks_to_dispatch_.push_back(work); } @@ -1657,7 +1581,6 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest Task task(task_message); bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask(); ActorID actor_id = ActorID::Nil(); - if (is_actor_creation_task) { actor_id = task.GetTaskSpecification().ActorCreationId(); @@ -1670,11 +1593,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest } if (new_scheduler_enabled_) { - auto task_spec = task.GetTaskSpecification(); + auto request_resources = task.GetTaskSpecification().GetRequiredResources(); auto work = std::make_pair( - [this, task_spec, reply, send_reply_callback](std::shared_ptr worker, - ClientID spillback_to, - std::string address, int port) { + [this, request_resources, reply, send_reply_callback]( + std::shared_ptr worker, ClientID spillback_to, std::string address, + int port) { if (worker != nullptr) { reply->mutable_worker_address()->set_ip_address( initial_config_.node_manager_address); @@ -1683,52 +1606,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary()); RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end()); leased_workers_[worker->WorkerId()] = worker; -// TODO (Ion): Fix handling floating point errors, maybe by moving to integers. -#define ZERO_CAPACITY 1.0e-5 - std::shared_ptr allocated_resources; - if (task_spec.IsActorCreationTask()) { - allocated_resources = worker->GetLifetimeAllocatedInstances(); - } else { - allocated_resources = worker->GetAllocatedInstances(); - } - auto predefined_resources = allocated_resources->predefined_resources; - ::ray::rpc::ResourceMapEntry *resource; - for (size_t res_idx = 0; res_idx < predefined_resources.size(); res_idx++) { - bool first = true; // Set resource name only if at least one of its - // instances has available capacity. - for (size_t inst_idx = 0; inst_idx < predefined_resources[res_idx].size(); - inst_idx++) { - if (std::abs(predefined_resources[res_idx][inst_idx]) > ZERO_CAPACITY) { - if (first) { - resource = reply->add_resource_mapping(); - resource->set_name( - new_resource_scheduler_->GetResourceNameFromIndex(res_idx)); - first = false; - } - auto rid = resource->add_resource_ids(); - rid->set_index(inst_idx); - rid->set_quantity(predefined_resources[res_idx][inst_idx]); - } - } - } - auto custom_resources = allocated_resources->custom_resources; - for (auto it = custom_resources.begin(); it != custom_resources.end(); ++it) { - bool first = true; // Set resource name only if at least one of its - // instances has available capacity. - for (size_t inst_idx = 0; inst_idx < it->second.size(); inst_idx++) { - if (std::abs(it->second[inst_idx]) > ZERO_CAPACITY) { - if (first) { - resource = reply->add_resource_mapping(); - resource->set_name( - new_resource_scheduler_->GetResourceNameFromIndex(it->first)); - first = false; - } - auto rid = resource->add_resource_ids(); - rid->set_index(inst_idx); - rid->set_quantity(it->second[inst_idx]); - } - } - } + leased_worker_resources_[worker->WorkerId()] = request_resources; } else { reply->mutable_retry_at_raylet_address()->set_ip_address(address); reply->mutable_retry_at_raylet_address()->set_port(port); @@ -1771,6 +1649,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest } } send_reply_callback(Status::OK(), nullptr, nullptr); + RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end()) << "Worker is already leased out " << worker_id; @@ -1794,12 +1673,47 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, rpc::SendReplyCallback send_reply_callback) { // Read the resource spec submitted by the client. auto worker_id = WorkerID::FromBinary(request.worker_id()); + RAY_LOG(DEBUG) << "Return worker " << worker_id; std::shared_ptr worker = leased_workers_[worker_id]; + if (new_scheduler_enabled_) { + if (worker->IsBlocked()) { + // If worker blocked, unblock it to return the cpu resources back to the worker. + HandleDirectCallTaskUnblocked(worker); + } + auto it = leased_worker_resources_.find(worker_id); + RAY_CHECK(it != leased_worker_resources_.end()); + + new_resource_scheduler_->AddNodeAvailableResources(self_node_id_.Binary(), + it->second.GetResourceMap()); + + if (worker->borrowed_cpu_resources_.GetResourceMap().size()) { + // This machine is oversubscribed, so the worker didn't get back cpus when + // unblocked. Thus we need to substract these cpus, as the previous + // "AddNodeAvailableResources" call assumed they were allocated to this worker. + new_resource_scheduler_->SubtractNodeAvailableResources( + self_node_id_.Binary(), worker->borrowed_cpu_resources_.GetResourceMap()); + worker->borrowed_cpu_resources_ = ResourceSet(); + } + leased_worker_resources_.erase(it); + + // Update resource ids. + auto const &task_resources = worker->GetTaskResourceIds(); + local_available_resources_.ReleaseConstrained( + task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); + worker->ResetTaskResourceIds(); + + // TODO (ion): Handle ProcessDisconnectClientMessage() + HandleWorkerAvailable(worker); + leased_workers_.erase(worker_id); + send_reply_callback(Status::OK(), nullptr, nullptr); + return; + } + + leased_workers_.erase(worker_id); Status status; if (worker) { - leased_workers_.erase(worker_id); - if (request.disconnect_worker()) { ProcessDisconnectClientMessage(worker->Connection()); } else { @@ -1808,12 +1722,6 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, if (worker->IsBlocked()) { HandleDirectCallTaskUnblocked(worker); } - if (new_scheduler_enabled_) { - new_resource_scheduler_->SubtractCPUResourceInstances( - worker->GetBorrowedCPUInstances()); - new_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances()); - worker->ClearAllocatedInstances(); - } HandleWorkerAvailable(worker); } } else { @@ -2192,16 +2100,14 @@ void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr &wor if (!worker) { return; } - std::vector cpu_instances; - if (worker->GetAllocatedInstances() != nullptr) { - cpu_instances = worker->GetAllocatedInstances()->GetCPUInstances(); - } - if (cpu_instances.size() > 0) { - std::vector borrowed_cpu_instances = - new_resource_scheduler_->AddCPUResourceInstances(cpu_instances); - worker->SetBorrowedCPUInstances(borrowed_cpu_instances); - worker->MarkBlocked(); - } + auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); + local_available_resources_.Release(cpu_resource_ids); + cluster_resource_map_[self_node_id_].Release(cpu_resource_ids.ToResourceSet()); + new_resource_scheduler_->AddNodeAvailableResources( + self_node_id_.Binary(), // A + cpu_resource_ids.ToResourceSet().GetResourceMap()); + + worker->MarkBlocked(); NewSchedulerSchedulePendingTasks(); return; } @@ -2221,23 +2127,43 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &w if (!worker) { return; } - std::vector cpu_instances; - if (worker->GetAllocatedInstances() != nullptr) { - cpu_instances = worker->GetAllocatedInstances()->GetCPUInstances(); - } - if (cpu_instances.size() > 0) { - new_resource_scheduler_->SubtractCPUResourceInstances(cpu_instances); - new_resource_scheduler_->AddCPUResourceInstances(worker->GetBorrowedCPUInstances()); - worker->MarkUnblocked(); + auto it = leased_worker_resources_.find(worker->WorkerId()); + RAY_CHECK(it != leased_worker_resources_.end()); + const auto cpu_resources = it->second.GetNumCpus(); + bool oversubscribed = !local_available_resources_.Contains(cpu_resources); + if (!oversubscribed) { + // Reacquire the CPU resources for the worker. Note that care needs to be + // taken if the user is using the specific CPU IDs since the IDs that we + // reacquire here may be different from the ones that the task started with. + auto const resource_ids = local_available_resources_.Acquire(cpu_resources); + worker->AcquireTaskCpuResources(resource_ids); + cluster_resource_map_[self_node_id_].Acquire(cpu_resources); + new_resource_scheduler_->SubtractNodeAvailableResources( + self_node_id_.Binary(), cpu_resources.GetResourceMap()); + worker->borrowed_cpu_resources_ = ResourceSet(); + } else { + // Remember these are borrowed cpus resources, i.e., we did not return then to the + // worker. + worker->borrowed_cpu_resources_ = cpu_resources; } + worker->MarkUnblocked(); NewSchedulerSchedulePendingTasks(); return; } - if (!worker || worker->GetAssignedTaskId().IsNil() || !worker->IsBlocked()) { + if (!worker || worker->GetAssignedTaskId().IsNil()) { return; // The worker may have died or is no longer processing the task. } TaskID task_id = worker->GetAssignedTaskId(); + + // First, always release task dependencies. This ensures we don't leak resources even + // if we don't need to unblock the worker below. + task_dependency_manager_.UnsubscribeGetDependencies(task_id); + + if (!worker->IsBlocked()) { + return; // Don't need to unblock the worker. + } + Task task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING); const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); const ResourceSet cpu_resources = required_resources.GetNumCpus(); @@ -2258,7 +2184,6 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &w << cluster_resource_map_[self_node_id_].GetAvailableResources().ToString(); } worker->MarkUnblocked(); - task_dependency_manager_.UnsubscribeGetDependencies(task_id); } void NodeManager::AsyncResolveObjects( @@ -2488,29 +2413,18 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { TaskID task_id = worker.GetAssignedTaskId(); RAY_LOG(DEBUG) << "Finished task " << task_id; + // (See design_docs/task_states.rst for the state transition diagram.) Task task; - if (new_scheduler_enabled_) { - task = worker.GetAssignedTask(); - // leased_workers_.erase(worker.WorkerId()); // Maybe RAY_CHECK ??? - if (worker.GetAllocatedInstances() != nullptr) { - new_resource_scheduler_->SubtractCPUResourceInstances( - worker.GetBorrowedCPUInstances()); - new_resource_scheduler_->FreeLocalTaskResources(worker.GetAllocatedInstances()); - worker.ClearAllocatedInstances(); - } - } else { - // (See design_docs/task_states.rst for the state transition diagram.) - RAY_CHECK(local_queues_.RemoveTask(task_id, &task)); + RAY_CHECK(local_queues_.RemoveTask(task_id, &task)); - // Release task's resources. The worker's lifetime resources are still held. - auto const &task_resources = worker.GetTaskResourceIds(); - local_available_resources_.ReleaseConstrained( - task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); - cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); - worker.ResetTaskResourceIds(); - } + // Release task's resources. The worker's lifetime resources are still held. + auto const &task_resources = worker.GetTaskResourceIds(); + local_available_resources_.ReleaseConstrained( + task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); + worker.ResetTaskResourceIds(); - const auto &spec = task.GetTaskSpecification(); // + const auto &spec = task.GetTaskSpecification(); if ((spec.IsActorCreationTask() || spec.IsActorTask())) { // If this was an actor or actor creation task, handle the actor's new // state. @@ -2843,43 +2757,31 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { << " on " << self_node_id_ << ", " << ready_task_ids.size() << " tasks ready"; // Transition the tasks whose dependencies are now fulfilled to the ready state. - if (new_scheduler_enabled_) { - for (auto task_id : ready_task_ids) { - auto it = waiting_tasks_.find(task_id); - if (it != waiting_tasks_.end()) { - task_dependency_manager_.UnsubscribeGetDependencies(task_id); - tasks_to_dispatch_.push_back(it->second); - waiting_tasks_.erase(it); - } + if (ready_task_ids.size() > 0) { + std::unordered_set ready_task_id_set(ready_task_ids.begin(), + ready_task_ids.end()); + + // First filter out the tasks that should not be moved to READY. + local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED); + local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING); + local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER); + local_queues_.FilterState(ready_task_id_set, TaskState::WAITING_FOR_ACTOR_CREATION); + + // Make sure that the remaining tasks are all WAITING or direct call + // actors. + auto ready_task_id_set_copy = ready_task_id_set; + local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING); + // Filter out direct call actors. These are not tracked by the raylet and + // their assigned task ID is the actor ID. + for (const auto &id : ready_task_id_set_copy) { + RAY_CHECK(actor_registry_.count(id.ActorId()) > 0); + ready_task_id_set.erase(id); } - NewSchedulerSchedulePendingTasks(); - } else { - if (ready_task_ids.size() > 0) { - std::unordered_set ready_task_id_set(ready_task_ids.begin(), - ready_task_ids.end()); - // First filter out the tasks that should not be moved to READY. - local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED); - local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING); - local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER); - local_queues_.FilterState(ready_task_id_set, TaskState::WAITING_FOR_ACTOR_CREATION); - - // Make sure that the remaining tasks are all WAITING or direct call - // actors. - auto ready_task_id_set_copy = ready_task_id_set; - local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING); - // Filter out direct call actors. These are not tracked by the raylet and - // their assigned task ID is the actor ID. - for (const auto &id : ready_task_id_set_copy) { - RAY_CHECK(actor_registry_.count(id.ActorId()) > 0); - ready_task_id_set.erase(id); - } - - // Queue and dispatch the tasks that are ready to run (i.e., WAITING). - auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set); - local_queues_.QueueTasks(ready_tasks, TaskState::READY); - DispatchTasks(MakeTasksByClass(ready_tasks)); - } + // Queue and dispatch the tasks that are ready to run (i.e., WAITING). + auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set); + local_queues_.QueueTasks(ready_tasks, TaskState::READY); + DispatchTasks(MakeTasksByClass(ready_tasks)); } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 6c2eb0c9e..74a1fecd5 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -720,6 +720,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// The new resource scheduler for direct task calls. std::shared_ptr new_resource_scheduler_; + /// Map of leased workers to their current resource usage. + /// TODO(ion): Check whether we can track these resources in the worker. + std::unordered_map leased_worker_resources_; typedef std::function, ClientID spillback_to, std::string address, int port)> @@ -730,8 +733,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { std::deque> tasks_to_schedule_; /// Queue of lease requests that should be scheduled onto workers. std::deque> tasks_to_dispatch_; - /// Queue tasks waiting for arguments to be transferred locally. - absl::flat_hash_map> waiting_tasks_; /// Cache of gRPC clients to workers (not necessarily running on this node). /// Also includes the number of inflight requests to each worker - when this diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index bf774f4e8..ee0a8a1ec 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -19,8 +19,6 @@ #include "ray/common/client_connection.h" #include "ray/common/id.h" -#include "ray/common/scheduling/cluster_resource_scheduler.h" -#include "ray/common/scheduling/scheduling_ids.h" #include "ray/common/task/scheduling_resources.h" #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" @@ -87,40 +85,11 @@ class Worker { void DirectActorCallArgWaitComplete(int64_t tag); void WorkerLeaseGranted(const std::string &address, int port); - // Setter, geter, and clear methods for allocated_instances_. - void SetAllocatedInstances( - std::shared_ptr &allocated_instances) { - allocated_instances_ = allocated_instances; - }; - - std::shared_ptr GetAllocatedInstances() { - return allocated_instances_; - }; - - void ClearAllocatedInstances() { allocated_instances_ = nullptr; }; - - void SetLifetimeAllocatedInstances( - std::shared_ptr &allocated_instances) { - lifetime_allocated_instances_ = allocated_instances; - }; - - std::shared_ptr GetLifetimeAllocatedInstances() { - return lifetime_allocated_instances_; - }; - - void ClearLifetimeAllocatedInstances() { lifetime_allocated_instances_ = nullptr; }; - - void SetBorrowedCPUInstances(std::vector &cpu_instances) { - borrowed_cpu_instances_ = cpu_instances; - }; - - std::vector &GetBorrowedCPUInstances() { return borrowed_cpu_instances_; }; - - void ClearBorrowedCPUInstances() { return borrowed_cpu_instances_.clear(); }; - - Task &GetAssignedTask() { return assigned_task_; }; - - void SetAssignedTask(Task &assigned_task) { assigned_task_ = assigned_task; }; + /// Cpus borrowed by the worker. This happens when the machine is oversubscribed + /// and the worker does not get back the cpu resources when unblocked. + /// TODO (ion): Add methods to access this variable. + /// TODO (ion): Investigate a more intuitive alternative to track these Cpus. + ResourceSet borrowed_cpu_resources_; rpc::CoreWorkerClient *rpc_client() { return rpc_client_.get(); } @@ -165,22 +134,6 @@ class Worker { /// The address of this worker's owner. The owner is the worker that /// currently holds the lease on this worker, if any. rpc::Address owner_address_; - /// The capacity of each resource instance allocated to this worker in order - /// to satisfy the resource requests of the task is currently running. - std::shared_ptr allocated_instances_; - /// The capacity of each resource instance allocated to this worker - /// when running as an actor. - std::shared_ptr lifetime_allocated_instances_; - /// CPUs borrowed by the worker. This happens in the following scenario: - /// 1) Worker A is blocked, so it donates its CPUs back to the node. - /// 2) Other workers are scheduled and are allocated some of the CPUs donated by A. - /// 3) Task A is unblocked, but it cannot get all CPUs back. At this point, - /// the node is oversubscribed. borrowed_cpu_instances_ represents the number - /// of CPUs this node is oversubscribed by. - /// TODO (Ion): Investigate a more intuitive alternative to track these Cpus. - std::vector borrowed_cpu_instances_; - /// Task being assigned to this worker. - Task assigned_task_; }; } // namespace raylet