Revert "New scheduler local node (#7441)" (#7732)

This reverts commit 6141fdab95.
This commit is contained in:
Stephanie Wang
2020-03-24 18:32:16 -07:00
committed by GitHub
parent cc0490b55b
commit a1cee6af7b
6 changed files with 408 additions and 954 deletions
@@ -14,137 +14,6 @@
#include "cluster_resource_scheduler.h"
std::string VectorToString(const std::vector<double> &vector) {
std::stringstream buffer;
buffer << "[";
for (size_t i = 0; i < vector.size(); i++) {
buffer << vector[i];
if (i < vector.size() - 1) {
buffer << ", ";
}
}
buffer << "]";
return buffer.str();
}
std::string UnorderedMapToString(const std::unordered_map<std::string, double> &map) {
std::stringstream buffer;
buffer << "[";
for (auto it = map.begin(); it != map.end(); ++it) {
buffer << "(" << it->first << ":" << it->second << ")";
}
buffer << "]";
return buffer.str();
}
/// Convert a map of resources to a TaskRequest data structure.
TaskRequest ResourceMapToTaskRequest(
StringIdMap &string_to_int_map,
const std::unordered_map<std::string, double> &resource_map) {
size_t i = 0;
TaskRequest task_request;
task_request.predefined_resources.resize(PredefinedResources_MAX);
task_request.custom_resources.resize(resource_map.size());
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
task_request.predefined_resources[0].demand = 0;
task_request.predefined_resources[0].soft = false;
}
for (auto const &resource : resource_map) {
if (resource.first == ray::kCPU_ResourceLabel) {
task_request.predefined_resources[CPU].demand = resource.second;
} else if (resource.first == ray::kGPU_ResourceLabel) {
task_request.predefined_resources[GPU].demand = resource.second;
} else if (resource.first == ray::kTPU_ResourceLabel) {
task_request.predefined_resources[TPU].demand = resource.second;
} else if (resource.first == ray::kMemory_ResourceLabel) {
task_request.predefined_resources[MEM].demand = resource.second;
} else {
task_request.custom_resources[i].id = string_to_int_map.Insert(resource.first);
task_request.custom_resources[i].demand = resource.second;
task_request.custom_resources[i].soft = false;
i++;
}
}
task_request.custom_resources.resize(i);
return task_request;
}
TaskRequest TaskResourceInstances::ToTaskRequest() const {
TaskRequest task_req;
task_req.predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
task_req.predefined_resources[i].demand = 0;
for (auto predefined_resource_instance : this->predefined_resources[i]) {
task_req.predefined_resources[i].demand += predefined_resource_instance;
}
}
task_req.custom_resources.resize(this->custom_resources.size());
size_t i = 0;
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
task_req.custom_resources[i].id = it->first;
task_req.custom_resources[i].soft = false;
task_req.custom_resources[i].demand = 0;
for (size_t j = 0; j < it->second.size(); j++) {
task_req.custom_resources[i].demand += it->second[j];
}
i++;
}
return task_req;
}
/// Convert a map of resources to a TaskRequest data structure.
///
/// \param string_to_int_map: Map between names and ids maintained by the
/// \param resource_map_total: Total capacities of resources we want to convert.
/// \param resource_map_available: Available capacities of resources we want to convert.
///
/// \request Conversion result to a TaskRequest data structure.
NodeResources ResourceMapToNodeResources(
StringIdMap &string_to_int_map,
const std::unordered_map<std::string, double> &resource_map_total,
const std::unordered_map<std::string, double> &resource_map_available) {
NodeResources node_resources;
node_resources.predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
node_resources.predefined_resources[i].total =
node_resources.predefined_resources[i].available = 0;
}
for (auto const &resource : resource_map_total) {
ResourceCapacity resource_capacity;
resource_capacity.total = (int64_t)resource.second;
auto it = resource_map_available.find(resource.first);
if (it == resource_map_available.end()) {
resource_capacity.available = 0;
} else {
resource_capacity.available = (int64_t)it->second;
}
if (resource.first == ray::kCPU_ResourceLabel) {
node_resources.predefined_resources[CPU] = resource_capacity;
} else if (resource.first == ray::kGPU_ResourceLabel) {
node_resources.predefined_resources[GPU] = resource_capacity;
} else if (resource.first == ray::kTPU_ResourceLabel) {
node_resources.predefined_resources[TPU] = resource_capacity;
} else if (resource.first == ray::kMemory_ResourceLabel) {
node_resources.predefined_resources[MEM] = resource_capacity;
} else {
// This is a custom resource.
node_resources.custom_resources.emplace(string_to_int_map.Insert(resource.first),
resource_capacity);
}
}
return node_resources;
}
bool NodeResources::operator==(const NodeResources &other) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (this->predefined_resources[i].total != other.predefined_resources[i].total) {
@@ -176,25 +45,39 @@ bool NodeResources::operator==(const NodeResources &other) {
return true;
}
std::string NodeResources::DebugString(StringIdMap string_to_in_map) const {
std::string NodeResources::DebugString() {
std::stringstream buffer;
buffer << " {";
buffer << " node predefined resources {";
for (size_t i = 0; i < static_cast<size_t>(this->predefined_resources.size()); i++) {
buffer << "(" << this->predefined_resources[i].total << ":"
<< this->predefined_resources[i].available << ") ";
}
buffer << "}";
buffer << "}" << std::endl;
buffer << " {";
buffer << " node custom resources {";
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":"
<< it->second.available << ") ";
buffer << it->first << ":(" << it->second.total << ":" << it->second.available
<< ") ";
}
buffer << "}" << std::endl;
return buffer.str();
}
std::string VectorToString(std::vector<double> &vector) {
std::stringstream buffer;
buffer << "[";
for (size_t i = 0; i < vector.size(); i++) {
buffer << vector[i];
if (i < vector.size() - 1) {
buffer << ", ";
}
}
buffer << "]";
return buffer.str();
}
bool NodeResourceInstances::operator==(const NodeResourceInstances &other) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (!EqualVectors(this->predefined_resources[i].total,
@@ -227,20 +110,20 @@ bool NodeResourceInstances::operator==(const NodeResourceInstances &other) {
return true;
}
std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const {
std::string NodeResourceInstances::DebugString() {
std::stringstream buffer;
buffer << " {";
buffer << " node predefined resources {";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
buffer << "(" << VectorToString(predefined_resources[i].total) << ":"
<< VectorToString(this->predefined_resources[i].available) << ") ";
}
buffer << "}";
buffer << "}" << std::endl;
buffer << " {";
buffer << " node custom resources {";
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << string_to_int_map.Get(it->first) << ":(" << VectorToString(it->second.total)
<< ":" << VectorToString(it->second.available) << ") ";
buffer << it->first << ":(" << VectorToString(it->second.total) << ":"
<< VectorToString(it->second.available) << ") ";
}
buffer << "}" << std::endl;
return buffer.str();
@@ -261,60 +144,40 @@ TaskResourceInstances NodeResourceInstances::GetAvailableResourceInstances() {
return task_resources;
};
std::string TaskRequest::DebugString() const {
std::string TaskRequest::DebugString() {
std::stringstream buffer;
buffer << " {";
buffer << std::endl << " request predefined resources {";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
buffer << "(" << this->predefined_resources[i].demand << ":"
<< this->predefined_resources[i].soft << ") ";
}
buffer << "}";
buffer << "}" << std::endl;
buffer << " [";
buffer << " request custom resources {";
for (size_t i = 0; i < this->custom_resources.size(); i++) {
buffer << this->custom_resources[i].id << ":"
<< "(" << this->custom_resources[i].demand << ":"
<< this->custom_resources[i].soft << ") ";
}
buffer << "]" << std::endl;
buffer << "}" << std::endl;
return buffer.str();
}
bool TaskResourceInstances::IsEmpty() const {
// Check whether all resource instances of a task are zero.
for (const auto &predefined_resource : predefined_resources) {
for (const auto &predefined_resource_instance : predefined_resource) {
if (predefined_resource_instance != 0) {
return false;
}
}
}
for (const auto custom_resource : custom_resources) {
for (const auto custom_resource_instances : custom_resource.second) {
if (custom_resource_instances != 0) {
return false;
}
}
}
return true;
}
std::string TaskResourceInstances::DebugString() const {
std::string TaskResourceInstances::DebugString() {
std::stringstream buffer;
buffer << std::endl << " Allocation: {";
buffer << std::endl << " task allocation: P {";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
buffer << VectorToString(this->predefined_resources[i]);
}
buffer << "}";
buffer << " [";
buffer << " C {";
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << it->first << ":" << VectorToString(it->second) << ", ";
}
buffer << "]" << std::endl;
buffer << "}" << std::endl;
return buffer.str();
}
@@ -357,19 +220,15 @@ ClusterResourceScheduler::ClusterResourceScheduler(
const std::string &local_node_id,
const std::unordered_map<std::string, double> &local_node_resources) {
local_node_id_ = string_to_int_map_.Insert(local_node_id);
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, local_node_resources, local_node_resources);
AddOrUpdateNode(local_node_id_, node_resources);
InitLocalResources(node_resources);
AddOrUpdateNode(local_node_id, local_node_resources, local_node_resources);
}
void ClusterResourceScheduler::AddOrUpdateNode(
const std::string &node_id,
const std::unordered_map<std::string, double> &resources_total,
const std::unordered_map<std::string, double> &resources_available) {
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
NodeResources node_resources;
ResourceMapToNodeResources(resources_total, resources_available, &node_resources);
AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources);
}
@@ -430,35 +289,28 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
resources.predefined_resources[i].available) {
if (task_req.predefined_resources[i].soft) {
// A soft constraint has been violated.
// Just remember this as soft violations do not preclude a task
// from being scheduled.
violations++;
} else {
// A hard constraint has been violated, so we cannot schedule
// this task request.
// A hard constraint has been violated.
return -1;
}
}
}
// No check custom resources.
for (const auto task_req_custom_resource : task_req.custom_resources) {
auto it = resources.custom_resources.find(task_req_custom_resource.id);
for (size_t i = 0; i < task_req.custom_resources.size(); i++) {
auto it = resources.custom_resources.find(task_req.custom_resources[i].id);
if (it == resources.custom_resources.end()) {
// Requested resource doesn't exist at this node. However, this
// is a soft constraint, so just increment "violations" and continue.
if (task_req_custom_resource.soft) {
// Requested resource doesn't exist at this node.
if (task_req.custom_resources[i].soft) {
violations++;
} else {
// This is a hard constraint so cannot schedule this task request.
return -1;
}
} else {
if (task_req_custom_resource.demand > it->second.available) {
// Resource constraint is violated, but since it is soft
// just increase the "violations" and continue.
if (task_req_custom_resource.soft) {
if (task_req.custom_resources[i].demand > it->second.available) {
// Resource constraint is violated.
if (task_req.custom_resources[i].soft) {
violations++;
} else {
return -1;
@@ -471,7 +323,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
auto it_p = task_req.placement_hints.find(node_id);
if (it_p == task_req.placement_hints.end()) {
// Node not found in the placement_hints list, so
// record this as a soft constraint violation.
// record this a soft constraint violation.
violations++;
}
}
@@ -481,8 +333,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task_req,
int64_t *total_violations) {
// Minimum number of soft violations across all nodes that can schedule the request.
// We will pick the node with the smallest number of soft violations.
// Min number of violations across all nodes that can schedule the request.
int64_t min_violations = INT_MAX;
// Node associated to min_violations.
int64_t best_node = -1;
@@ -499,8 +350,9 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
// Check whether any node in the request placement_hints, satisfes
// all resource constraints of the request.
for (const auto &task_req_placement_hint : task_req.placement_hints) {
auto it = nodes_.find(task_req_placement_hint);
for (auto it_p = task_req.placement_hints.begin();
it_p != task_req.placement_hints.end(); ++it_p) {
auto it = nodes_.find(*it_p);
if (it != nodes_.end()) {
if (IsSchedulable(task_req, it->first, it->second) == 0) {
return it->first;
@@ -508,19 +360,19 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
}
}
for (const auto &node : nodes_) {
for (auto it = nodes_.begin(); it != nodes_.end(); ++it) {
// Return -1 if node not schedulable. otherwise return the number
// of soft constraint violations.
int64_t violations;
if ((violations = IsSchedulable(task_req, node.first, node.second)) == -1) {
if ((violations = IsSchedulable(task_req, it->first, it->second)) == -1) {
continue;
}
// Update the node with the smallest number of soft constraints violated.
if (min_violations > violations) {
min_violations = violations;
best_node = node.first;
best_node = it->first;
}
if (violations == 0) {
*total_violations = 0;
@@ -534,15 +386,14 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
std::string ClusterResourceScheduler::GetBestSchedulableNode(
const std::unordered_map<std::string, double> &task_resources,
int64_t *total_violations) {
TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources);
TaskRequest task_request;
ResourceMapToTaskRequest(task_resources, &task_request);
int64_t node_id = GetBestSchedulableNode(task_request, total_violations);
std::string id_string;
if (node_id == -1) {
// This is not a schedulable node, so return empty string.
return "";
}
// Return the string name of the node.
return string_to_int_map_.Get(node_id);
}
@@ -565,11 +416,11 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources(
task_req.predefined_resources[i].demand);
}
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = resources.custom_resources.find(task_req_custom_resource.id);
for (size_t i = 0; i < task_req.custom_resources.size(); i++) {
auto it = resources.custom_resources.find(task_req.custom_resources[i].id);
if (it != resources.custom_resources.end()) {
it->second.available =
std::max(0., it->second.available - task_req_custom_resource.demand);
std::max(0., it->second.available - task_req.custom_resources[i].demand);
}
}
return true;
@@ -578,7 +429,8 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources(
bool ClusterResourceScheduler::SubtractNodeAvailableResources(
const std::string &node_id,
const std::unordered_map<std::string, double> &resource_map) {
TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, resource_map);
TaskRequest task_request;
ResourceMapToTaskRequest(resource_map, &task_request);
return SubtractNodeAvailableResources(string_to_int_map_.Get(node_id), task_request);
}
@@ -597,11 +449,11 @@ bool ClusterResourceScheduler::AddNodeAvailableResources(int64_t node_id,
resources.predefined_resources[i].total);
}
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = resources.custom_resources.find(task_req_custom_resource.id);
for (size_t i = 0; i < task_req.custom_resources.size(); i++) {
auto it = resources.custom_resources.find(task_req.custom_resources[i].id);
if (it != resources.custom_resources.end()) {
it->second.available = std::min(
it->second.available + task_req_custom_resource.demand, it->second.total);
it->second.available + task_req.custom_resources[i].demand, it->second.total);
}
}
return true;
@@ -610,7 +462,8 @@ bool ClusterResourceScheduler::AddNodeAvailableResources(int64_t node_id,
bool ClusterResourceScheduler::AddNodeAvailableResources(
const std::string &node_id,
const std::unordered_map<std::string, double> &resource_map) {
TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, resource_map);
TaskRequest task_request;
ResourceMapToTaskRequest(resource_map, &task_request);
return AddNodeAvailableResources(string_to_int_map_.Get(node_id), task_request);
}
@@ -627,19 +480,79 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); }
void ClusterResourceScheduler::ResourceMapToNodeResources(
const std::unordered_map<std::string, double> &resource_map_total,
const std::unordered_map<std::string, double> &resource_map_available,
NodeResources *node_resources) {
node_resources->predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
node_resources->predefined_resources[i].total =
node_resources->predefined_resources[i].available = 0;
}
for (auto it = resource_map_total.begin(); it != resource_map_total.end(); ++it) {
ResourceCapacity resource_capacity;
resource_capacity.total = (int64_t)it->second;
auto it2 = resource_map_available.find(it->first);
if (it2 == resource_map_available.end()) {
resource_capacity.available = 0;
} else {
resource_capacity.available = (int64_t)it2->second;
}
if (it->first == ray::kCPU_ResourceLabel) {
node_resources->predefined_resources[CPU] = resource_capacity;
} else if (it->first == ray::kGPU_ResourceLabel) {
node_resources->predefined_resources[GPU] = resource_capacity;
} else if (it->first == ray::kTPU_ResourceLabel) {
node_resources->predefined_resources[TPU] = resource_capacity;
} else if (it->first == ray::kMemory_ResourceLabel) {
node_resources->predefined_resources[MEM] = resource_capacity;
} else {
// This is a custom resource.
node_resources->custom_resources.emplace(string_to_int_map_.Insert(it->first),
resource_capacity);
}
}
}
void ClusterResourceScheduler::ResourceMapToTaskRequest(
const std::unordered_map<std::string, double> &resource_map,
TaskRequest *task_request) {
size_t i = 0;
task_request->predefined_resources.resize(PredefinedResources_MAX);
task_request->custom_resources.resize(resource_map.size());
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
task_request->predefined_resources[0].demand = 0;
task_request->predefined_resources[0].soft = false;
}
for (auto it = resource_map.begin(); it != resource_map.end(); ++it) {
if (it->first == ray::kCPU_ResourceLabel) {
task_request->predefined_resources[CPU].demand = it->second;
} else if (it->first == ray::kGPU_ResourceLabel) {
task_request->predefined_resources[GPU].demand = it->second;
} else if (it->first == ray::kTPU_ResourceLabel) {
task_request->predefined_resources[TPU].demand = it->second;
} else if (it->first == ray::kMemory_ResourceLabel) {
task_request->predefined_resources[MEM].demand = it->second;
} else {
task_request->custom_resources[i].id = string_to_int_map_.Insert(it->first);
task_request->custom_resources[i].demand = it->second;
task_request->custom_resources[i].soft = false;
i++;
}
}
task_request->custom_resources.resize(i);
}
void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_id_string,
const std::string &resource_name,
int64_t resource_total) {
int64_t client_id = string_to_int_map_.Get(client_id_string);
auto it = nodes_.find(client_id);
if (it == nodes_.end()) {
NodeResources node_resources;
node_resources.predefined_resources.resize(PredefinedResources_MAX);
client_id = string_to_int_map_.Insert(client_id_string);
RAY_CHECK(nodes_.emplace(client_id, node_resources).second);
it = nodes_.find(client_id);
RAY_CHECK(it != nodes_.end());
return;
}
int idx = -1;
@@ -675,11 +588,10 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_
if (itr->second.total < 0) {
itr->second.total = 0;
}
} else {
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total;
it->second.custom_resources.emplace(resource_id, resource_capacity);
}
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total;
it->second.custom_resources.emplace(resource_id, resource_capacity);
}
}
@@ -713,19 +625,19 @@ void ClusterResourceScheduler::DeleteResource(const std::string &client_id_strin
}
}
std::string ClusterResourceScheduler::DebugString(void) const {
std::string ClusterResourceScheduler::DebugString(void) {
std::stringstream buffer;
buffer << "\n Local id: " << local_node_id_;
buffer << " Local resources: " << local_resources_.DebugString(string_to_int_map_);
for (auto &node : nodes_) {
buffer << " node id: " << node.first;
buffer << node.second.DebugString(string_to_int_map_);
buffer << std::endl << "local node id: " << local_node_id_ << std::endl;
for (auto it = nodes_.begin(); it != nodes_.end(); ++it) {
buffer << "node id: " << it->first << std::endl;
buffer << it->second.DebugString();
}
return buffer.str();
}
void ClusterResourceScheduler::InitResourceInstances(
double total, bool unit_instances, ResourceInstanceCapacities *instance_list) {
double total, bool unit_instances,
ResourceInstanceCapacities *instance_list /* return */) {
if (unit_instances) {
size_t num_instances = static_cast<size_t>(total);
instance_list->total.resize(num_instances);
@@ -766,38 +678,29 @@ void ClusterResourceScheduler::InitLocalResources(const NodeResources &node_reso
}
}
std::vector<double> ClusterResourceScheduler::AddAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances) {
std::vector<double> overflow(available.size(), 0.);
void ClusterResourceScheduler::AddAvailableResourceInstances(
std::vector<double> available,
ResourceInstanceCapacities *resource_instances /* return */) {
for (size_t i = 0; i < available.size(); i++) {
resource_instances->available[i] = resource_instances->available[i] + available[i];
if (resource_instances->available[i] > resource_instances->total[i]) {
overflow[i] = resource_instances->available[i] - resource_instances->total[i];
resource_instances->available[i] = resource_instances->total[i];
}
resource_instances->available[i] = std::min(
resource_instances->available[i] + available[i], resource_instances->total[i]);
}
return overflow;
}
std::vector<double> ClusterResourceScheduler::SubtractAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances) {
void ClusterResourceScheduler::SubtractAvailableResourceInstances(
std::vector<double> available,
ResourceInstanceCapacities *resource_instances /* return */) {
RAY_CHECK(available.size() == resource_instances->available.size());
std::vector<double> underflow(available.size(), 0.);
for (size_t i = 0; i < available.size(); i++) {
resource_instances->available[i] = resource_instances->available[i] - available[i];
if (resource_instances->available[i] < 0) {
underflow[i] = -resource_instances->available[i];
resource_instances->available[i] = 0;
}
resource_instances->available[i] =
std::max(resource_instances->available[i] - available[i], 0.);
}
return underflow;
}
bool ClusterResourceScheduler::AllocateResourceInstances(
double demand, bool soft, std::vector<double> &available,
std::vector<double> *allocation) {
std::vector<double> *allocation /* return */) {
allocation->resize(available.size());
double remaining_demand = demand;
@@ -887,9 +790,14 @@ bool ClusterResourceScheduler::AllocateResourceInstances(
}
bool ClusterResourceScheduler::AllocateTaskResourceInstances(
const TaskRequest &task_req, std::shared_ptr<TaskResourceInstances> task_allocation) {
RAY_CHECK(task_allocation != nullptr);
if (nodes_.find(local_node_id_) == nodes_.end()) {
const TaskRequest &task_req, TaskResourceInstances *task_allocation /* return */) {
auto it = nodes_.find(local_node_id_);
if (it == nodes_.end()) {
return false;
}
// Just double check this node can still schedule the task request.
if (IsSchedulable(task_req, local_node_id_, it->second) == -1) {
return false;
}
@@ -902,19 +810,19 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances(
&task_allocation->predefined_resources[i])) {
// Allocation failed. Restore node's local resources by freeing the resources
// of the failed allocation.
FreeTaskResourceInstances(task_allocation);
FreeTaskResourceInstances(*task_allocation);
return false;
}
}
}
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = local_resources_.custom_resources.find(task_req_custom_resource.id);
for (size_t i = 0; i < task_req.custom_resources.size(); i++) {
auto it = local_resources_.custom_resources.find(task_req.custom_resources[i].id);
if (it != local_resources_.custom_resources.end()) {
if (task_req_custom_resource.demand > 0) {
if (task_req.custom_resources[i].demand > 0) {
std::vector<double> allocation;
bool success = AllocateResourceInstances(task_req_custom_resource.demand,
task_req_custom_resource.soft,
bool success = AllocateResourceInstances(task_req.custom_resources[i].demand,
task_req.custom_resources[i].soft,
it->second.available, &allocation);
// Even if allocation failed we need to remember partial allocations to correctly
// free resources.
@@ -922,7 +830,7 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances(
if (!success) {
// Allocation failed. Restore node's local resources by freeing the resources
// of the failed allocation.
FreeTaskResourceInstances(task_allocation);
FreeTaskResourceInstances(*task_allocation);
return false;
}
}
@@ -933,128 +841,30 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances(
return true;
}
void ClusterResourceScheduler::UpdateLocalAvailableResourcesFromResourceInstances() {
auto it_local_node = nodes_.find(local_node_id_);
RAY_CHECK(it_local_node != nodes_.end());
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
it_local_node->second.predefined_resources[i].available = 0;
for (size_t j = 0; j < local_resources_.predefined_resources[i].available.size();
j++) {
it_local_node->second.predefined_resources[i].available +=
local_resources_.predefined_resources[i].available[j];
}
}
for (auto &custom_resource : it_local_node->second.custom_resources) {
auto it = local_resources_.custom_resources.find(custom_resource.first);
if (it != local_resources_.custom_resources.end()) {
custom_resource.second.available = 0;
for (const auto available : it->second.available) {
custom_resource.second.available += available;
}
}
}
}
void ClusterResourceScheduler::FreeTaskResourceInstances(
std::shared_ptr<TaskResourceInstances> task_allocation) {
RAY_CHECK(task_allocation != nullptr);
TaskResourceInstances &task_allocation) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
AddAvailableResourceInstances(task_allocation->predefined_resources[i],
AddAvailableResourceInstances(task_allocation.predefined_resources[i],
&local_resources_.predefined_resources[i]);
}
for (const auto task_allocation_custom_resource : task_allocation->custom_resources) {
auto it =
local_resources_.custom_resources.find(task_allocation_custom_resource.first);
if (it != local_resources_.custom_resources.end()) {
AddAvailableResourceInstances(task_allocation_custom_resource.second, &it->second);
for (auto it = task_allocation.custom_resources.begin();
it != task_allocation.custom_resources.end(); it++) {
auto it_local = local_resources_.custom_resources.find(it->first);
if (it_local != local_resources_.custom_resources.end()) {
AddAvailableResourceInstances(it->second, &it_local->second);
}
}
}
std::vector<double> ClusterResourceScheduler::AddCPUResourceInstances(
void ClusterResourceScheduler::AddCPUResourceInstances(
std::vector<double> &cpu_instances) {
if (cpu_instances.size() == 0) {
return cpu_instances; // No oveerflow.
}
RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end());
auto overflow = AddAvailableResourceInstances(
cpu_instances, &local_resources_.predefined_resources[CPU]);
UpdateLocalAvailableResourcesFromResourceInstances();
return overflow;
AddAvailableResourceInstances(cpu_instances,
&local_resources_.predefined_resources[CPU]);
}
std::vector<double> ClusterResourceScheduler::SubtractCPUResourceInstances(
void ClusterResourceScheduler::SubtractCPUResourceInstances(
std::vector<double> &cpu_instances) {
if (cpu_instances.size() == 0) {
return cpu_instances; // No underflow.
}
RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end());
auto underflow = SubtractAvailableResourceInstances(
cpu_instances, &local_resources_.predefined_resources[CPU]);
UpdateLocalAvailableResourcesFromResourceInstances();
return underflow;
}
bool ClusterResourceScheduler::AllocateTaskResources(
int64_t node_id, const TaskRequest &task_req,
std::shared_ptr<TaskResourceInstances> task_allocation) {
if (node_id == local_node_id_) {
RAY_CHECK(task_allocation != nullptr);
if (AllocateTaskResourceInstances(task_req, task_allocation)) {
UpdateLocalAvailableResourcesFromResourceInstances();
return true;
}
} else {
if (SubtractNodeAvailableResources(node_id, task_req)) {
return true;
}
}
return false;
}
bool ClusterResourceScheduler::AllocateLocalTaskResources(
const std::unordered_map<std::string, double> &task_resources,
std::shared_ptr<TaskResourceInstances> task_allocation) {
RAY_CHECK(task_allocation != nullptr);
TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources);
return AllocateTaskResources(local_node_id_, task_request, task_allocation);
}
std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx) {
if (res_idx == CPU) {
return ray::kCPU_ResourceLabel;
} else if (res_idx == GPU) {
return ray::kGPU_ResourceLabel;
} else if (res_idx == TPU) {
return ray::kTPU_ResourceLabel;
} else if (res_idx == MEM) {
return ray::kMemory_ResourceLabel;
} else {
return string_to_int_map_.Get((uint64_t)res_idx);
}
}
void ClusterResourceScheduler::AllocateRemoteTaskResources(
std::string &node_string,
const std::unordered_map<std::string, double> &task_resources) {
TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources);
auto node_id = string_to_int_map_.Insert(node_string);
RAY_CHECK(node_id != local_node_id_);
AllocateTaskResources(node_id, task_request, nullptr);
}
void ClusterResourceScheduler::FreeLocalTaskResources(
std::shared_ptr<TaskResourceInstances> task_allocation) {
if (task_allocation == nullptr || task_allocation->IsEmpty()) {
return;
}
FreeTaskResourceInstances(task_allocation);
UpdateLocalAvailableResourcesFromResourceInstances();
SubtractAvailableResourceInstances(cpu_instances,
&local_resources_.predefined_resources[CPU]);
}
@@ -60,7 +60,6 @@ struct ResourceRequestWithId : ResourceRequest {
int64_t id;
};
// Data structure specifying the capacity of each resource requested by a task.
class TaskRequest {
public:
/// List of predefined resources required by the task.
@@ -73,11 +72,10 @@ class TaskRequest {
/// nodes in this list can schedule this task.
absl::flat_hash_set<int64_t> placement_hints;
/// Returns human-readable string for this task request.
std::string DebugString() const;
std::string DebugString();
};
// Data structure specifying the capacity of each instance of each resource
// allocated to a task.
// Task request specifying instances for each resource.
class TaskResourceInstances {
public:
/// The list of instances of each predifined resource allocated to a task.
@@ -85,20 +83,10 @@ class TaskResourceInstances {
/// The list of instances of each custom resource allocated to a task.
absl::flat_hash_map<int64_t, std::vector<double>> custom_resources;
bool operator==(const TaskResourceInstances &other);
/// For each resource of this request aggregate its instances.
TaskRequest ToTaskRequest() const;
/// Get CPU instances only.
std::vector<double> GetCPUInstances() const {
if (!this->predefined_resources.empty()) {
return this->predefined_resources[CPU];
} else {
return {};
}
};
/// Check whether there are no resource instances.
bool IsEmpty() const;
std::vector<double> GetCPUInstances() { return this->predefined_resources[CPU]; };
/// Returns human-readable string for these resources.
std::string DebugString() const;
std::string DebugString();
};
/// Total and available capacities of each resource of a node.
@@ -112,7 +100,7 @@ class NodeResources {
/// Returns if this equals another node resources.
bool operator==(const NodeResources &other);
/// Returns human-readable string for these resources.
std::string DebugString(StringIdMap string_to_int_map) const;
std::string DebugString();
};
/// Total and available capacities of each resource instance.
@@ -129,7 +117,7 @@ class NodeResourceInstances {
/// Returns if this equals another node resources.
bool operator==(const NodeResourceInstances &other);
/// Returns human-readable string for these resources.
std::string DebugString(StringIdMap string_to_int_map) const;
std::string DebugString();
};
/// Class encapsulating the cluster resources and the logic to assign
@@ -161,19 +149,6 @@ class ClusterResourceScheduler {
const absl::flat_hash_map<int64_t, ResourceCapacity> &new_custom_resources,
absl::flat_hash_map<int64_t, ResourceCapacity> *old_custom_resources);
/// Subtract the resources required by a given task request (task_req) from
/// a given node (node_id).
///
/// \param node_id Node whose resources we allocate. Can be the local or a remote node.
/// \param task_req Task for which we allocate resources.
/// \param task_allocation Resources allocated to the task at instance granularity.
/// This is a return parameter.
///
/// \return True if the node has enough resources to satisfy the task request.
/// False otherwise.
bool AllocateTaskResources(int64_t node_id, const TaskRequest &task_req,
std::shared_ptr<TaskResourceInstances> task_allocation);
public:
ClusterResourceScheduler(void){};
@@ -188,9 +163,6 @@ class ClusterResourceScheduler {
const std::string &local_node_id,
const std::unordered_map<std::string, double> &local_node_resources);
// Mapping from predefined resource indexes to resource strings
std::string GetResourceNameFromIndex(int64_t res_idx);
/// Add a new node or overwrite the resources of an existing node.
///
/// \param node_id: Node ID.
@@ -292,19 +264,24 @@ class ClusterResourceScheduler {
/// Get number of nodes in the cluster.
int64_t NumNodes();
/// Update total capacity of a given resource of a given node.
///
/// \param node_name: Node whose resource we want to update.
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void UpdateResourceCapacity(const std::string &node_name,
/// Convert a map of resources to a TaskRequest data structure.
void ResourceMapToTaskRequest(
const std::unordered_map<std::string, double> &resource_map,
TaskRequest *task_request);
/// Convert a map of resources to a TaskRequest data structure.
void ResourceMapToNodeResources(
const std::unordered_map<std::string, double> &resource_map_total,
const std::unordered_map<std::string, double> &resource_map_available,
NodeResources *node_resources);
/// Update total capacity of resource resource_name at node client_id.
void UpdateResourceCapacity(const std::string &client_id,
const std::string &resource_name, int64_t resource_total);
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete
void DeleteResource(const std::string &node_name, const std::string &resource_name);
/// Delete resource resource_name from node cleint_id_string.
void DeleteResource(const std::string &client_id_string,
const std::string &resource_name);
/// Return local resources.
NodeResourceInstances GetLocalResources() { return local_resources_; };
@@ -328,27 +305,25 @@ class ClusterResourceScheduler {
ResourceInstanceCapacities *instance_list);
/// Allocate enough capacity across the instances of a resource to satisfy "demand".
/// If resource has multiple unit-capacity instances, we consider two cases.
/// If resource has multiple unit-capacity instance, we consider two cases.
///
/// 1) If the constraint is hard, allocate full unit-capacity instances until
/// demand becomes fractional, and then satisfy the fractional demand using the
/// demand becomes fractional, and then satisfy the fractional deman using the
/// instance with the smallest available capacity that can satisfy the fractional
/// demand. For example, assume a resource conisting of 4 instances, with available
/// capacities: (1., 1., .7, 0.5) and deman of 1.2. Then we allocate one full
/// instance and then allocate 0.2 of the 0.5 instance (as this is the instance
/// with the smalest available capacity that can satisfy the remaining demand of 0.2).
/// As a result remaining available capacities will be (0., 1., .7, .3).
/// Thus, if the constraint is hard, we will allocate a bunch of full instances and
/// at most a fractional instance.
/// As a result remaining available capacities will be (0., 1., .7, .2).
/// Thus, if the constraint is hard, we will allocate at most a fractional resource.
///
/// 2) If the constraint is soft, we can allocate multiple fractional resources,
/// and even overallocate the resource. For example, in the previous case, if we
/// have a demand of 1.8, we can allocate one full instance, the 0.5 instance, and
/// 0.3 from the 0.7 instance. Furthermore, if the demand is 3.5, then we allocate
/// 0.1 from the 0.7 instance. Furthermore, if the demand is 3.5, then we allocate
/// all instances, and return success (true), despite the fact that the total
/// available capacity of the rwsource is 3.2 (= 1. + 1. + .7 + .5), which is less
/// than the demand, 3.5. In this case, the remaining available resource is
/// (0., 0., 0., 0.)
/// than the demand, 3.5.
///
/// \param demand: The resource amount to be allocated.
/// \param soft: Specifies whether this demand has soft or hard constraints.
@@ -365,94 +340,45 @@ class ClusterResourceScheduler {
///
/// \param task_req: Resources requested by a task.
/// \param task_allocation: Local resources allocated to satsify task_req demand.
/// This is an output argument.
///
/// \return true, if allocation successful. If false, the caller needs to free the
/// allocated resources, i.e., task_allocation.
bool AllocateTaskResourceInstances(
const TaskRequest &task_req,
std::shared_ptr<TaskResourceInstances> task_allocation);
bool AllocateTaskResourceInstances(const TaskRequest &task_req,
TaskResourceInstances *task_allocation);
/// Free resources which were allocated with a task. The freed resources are
/// added back to the node's local available resources.
///
/// \param task_allocation: Task's resources to be freed.
void FreeTaskResourceInstances(std::shared_ptr<TaskResourceInstances> task_allocation);
void FreeTaskResourceInstances(TaskResourceInstances &task_allocation);
/// Increase the available capacities of the instances of a given resource.
///
/// \param available A list of available capacities for resource's instances.
/// \param resource_instances List of the resource instances being updated.
///
/// \return Overflow capacities of "resource_instances" after adding instance
/// capacities in "available", i.e.,
/// min(available + resource_instances.available, resource_instances.total)
std::vector<double> AddAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances);
void AddAvailableResourceInstances(std::vector<double> available,
ResourceInstanceCapacities *resource_instances);
/// Decrease the available capacities of the instances of a given resource.
///
/// \param free A list of capacities for resource's instances to be freed.
/// \param resource_instances List of the resource instances being updated.
/// \return Underflow of "resource_instances" after subtracting instance
/// capacities in "available", i.e.,.
/// max(available - reasource_instances.available, 0)
std::vector<double> SubtractAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances);
void SubtractAvailableResourceInstances(std::vector<double> free,
ResourceInstanceCapacities *resource_instances);
/// Increase the available CPU instances of this node.
///
/// \param cpu_instances CPU instances to be added to available cpus.
///
/// \return Overflow capacities of CPU instances after adding CPU
/// capacities in cpu_instances.
std::vector<double> AddCPUResourceInstances(std::vector<double> &cpu_instances);
void AddCPUResourceInstances(std::vector<double> &cpu_instances);
/// Decrease the available CPU instances of this node.
/// Decrease the available cpu instances of this node.
///
/// \param cpu_instances CPU instances to be removed from available cpus.
///
/// \return Underflow capacities of CPU instances after subtracting CPU
/// capacities in cpu_instances.
std::vector<double> SubtractCPUResourceInstances(std::vector<double> &cpu_instances);
/// Subtract the resources required by a given task request (task_req) from the
/// local node. This function also updates the local node resources
/// at the instance granularity.
///
/// \param task_req Task for which we allocate resources.
/// \param task_allocation Resources allocated to the task at instance granularity.
/// This is a return parameter.
///
/// \return True if local node has enough resources to satisfy the task request.
/// False otherwise.
bool AllocateLocalTaskResources(
const std::unordered_map<std::string, double> &task_resources,
std::shared_ptr<TaskResourceInstances> task_allocation);
/// Subtract the resources required by a given task request (task_req) from a given
/// remote node.
///
/// \param node_id Remote node whose resources we allocate.
/// \param task_req Task for which we allocate resources.
void AllocateRemoteTaskResources(
std::string &node_id,
const std::unordered_map<std::string, double> &task_resources);
void FreeLocalTaskResources(std::shared_ptr<TaskResourceInstances> task_allocation);
/// Update the available resources of the local node given
/// the available instances of each resource of the local node.
/// Basically, this means computing the available resources
/// by adding up the available quantities of each instance of that
/// resources.
///
/// Example: Assume the local node has four GPU instances with the
/// following availabilities: 0.2, 0.3, 0.1, 1. Then the total GPU
// resources availabile at that node is 0.2 + 0.3 + 0.1 + 1. = 1.6
void UpdateLocalAvailableResourcesFromResourceInstances();
/// \param cpu_instances Cpu instances to be removed from available cpus.
void SubtractCPUResourceInstances(std::vector<double> &cpu_instances);
/// Return human-readable string for this scheduler state.
std::string DebugString() const;
std::string DebugString();
};
#endif // RAY_COMMON_SCHEDULING_SCHEDULING_H
+12 -150
View File
@@ -596,10 +596,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
EmptyBoolVector, EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
TaskResourceInstances task_allocation;
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation);
ASSERT_EQ(success, true);
@@ -622,10 +621,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
EmptyBoolVector, EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
TaskResourceInstances task_allocation;
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation);
ASSERT_EQ(success, false);
ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true);
@@ -644,10 +642,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
EmptyBoolVector, EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
TaskResourceInstances task_allocation;
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation);
ASSERT_EQ(success, true);
@@ -680,10 +677,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
TaskResourceInstances task_allocation;
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation);
ASSERT_EQ(success, true);
@@ -710,10 +706,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
TaskResourceInstances task_allocation;
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation);
ASSERT_EQ(success, false);
ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true);
@@ -737,10 +732,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
TaskResourceInstances task_allocation;
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
cluster_resources.AllocateTaskResourceInstances(task_req, &task_allocation);
ASSERT_EQ(success, true);
@@ -758,138 +752,6 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
}
}
TEST_F(SchedulingTest, TaskResourceInstancesTest2) {
{
NodeResources node_resources;
vector<int64_t> pred_capacities{4 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{4, 4};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false};
vector<double> cust_demands{3, 2};
vector<bool> cust_soft{false, false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
bool success =
cluster_resources.AllocateTaskResourceInstances(task_req, task_allocation);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
ASSERT_EQ(success, true);
std::vector<double> cpu_instances = task_allocation->GetCPUInstances();
cluster_resources.AddCPUResourceInstances(cpu_instances);
cluster_resources.SubtractCPUResourceInstances(cpu_instances);
ASSERT_EQ((cluster_resources.GetLocalResources() == old_local_resources), true);
}
}
TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
{
NodeResources node_resources;
vector<int64_t> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
std::vector<double> allocate_cpu_instances{0.5, 0.5, 0.5, 0.5};
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
std::vector<double> expected_available_cpu_instances{0.5, 0.5, 0.5, 0.5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
expected_available_cpu_instances = {1., 1., 1., 1.};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
allocate_cpu_instances = {1.5, 1.5, .5, 1.5};
std::vector<double> underflow =
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
std::vector<double> expected_underflow{.5, .5, 0., .5};
ASSERT_TRUE(
std::equal(underflow.begin(), underflow.end(), expected_underflow.begin()));
available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
expected_available_cpu_instances = {0., 0., 0.5, 0.};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
allocate_cpu_instances = {1.0, .5, 1., .5};
std::vector<double> overflow =
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
std::vector<double> expected_overflow{.0, .0, .5, 0.};
ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin()));
available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
expected_available_cpu_instances = {1., .5, 1., .5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
}
}
TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
{
NodeResources node_resources;
vector<int64_t> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
{
std::vector<double> allocate_cpu_instances{0.5, 0.5, 2, 0.5};
// SubtractCPUResourceInstances() calls
// UpdateLocalAvailableResourcesFromResourceInstances() under the hood.
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
std::vector<double> expected_available_cpu_instances{0.5, 0.5, 0., 0.5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(),
available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
NodeResources nr;
cluster_resources.GetNodeResources(0, &nr);
ASSERT_TRUE(nr.predefined_resources[0].available == 1.5);
}
{
std::vector<double> allocate_cpu_instances{1.5, 0.5, 2, 0.3};
// SubtractCPUResourceInstances() calls
// UpdateLocalAvailableResourcesFromResourceInstances() under the hood.
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
std::vector<double> expected_available_cpu_instances{1., 1., 1., 0.8};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(),
available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
NodeResources nr;
cluster_resources.GetNodeResources(0, &nr);
ASSERT_TRUE(nr.predefined_resources[0].available == 3.8);
}
}
}
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
TEST_F(SchedulingTest, SchedulingMapPerformanceTest) {
size_t map_len = 1000000;
+165 -263
View File
@@ -84,41 +84,6 @@ namespace ray {
namespace raylet {
// A helper function to print the leased workers.
std::string LeasedWorkersSring(
const std::unordered_map<WorkerID, std::shared_ptr<Worker>> &leased_workers) {
std::stringstream buffer;
buffer << " @leased_workers: (";
for (const auto &pair : leased_workers) {
auto &worker = pair.second;
buffer << worker->WorkerId() << ", ";
}
buffer << ")";
return buffer.str();
}
// A helper function to print the workers in worker_pool_.
std::string WorkerPoolString(const std::vector<std::shared_ptr<Worker>> &worker_pool) {
std::stringstream buffer;
buffer << " @worker_pool: (";
for (const auto &worker : worker_pool) {
buffer << worker->WorkerId() << ", ";
}
buffer << ")";
return buffer.str();
}
// Helper function to print the worker's owner worker and and node owner.
std::string WorkerOwnerString(std::shared_ptr<Worker> &worker) {
std::stringstream buffer;
const auto owner_worker_id =
WorkerID::FromBinary(worker->GetOwnerAddress().worker_id());
const auto owner_node_id = WorkerID::FromBinary(worker->GetOwnerAddress().raylet_id());
buffer << "leased_worker Lease " << worker->WorkerId() << " owned by "
<< owner_worker_id << " / " << owner_node_id;
return buffer.str();
}
NodeManager::NodeManager(boost::asio::io_service &io_service,
const ClientID &self_node_id, const NodeManagerConfig &config,
ObjectManager &object_manager,
@@ -702,6 +667,7 @@ void NodeManager::ResourceDeleted(const ClientID &client_id,
new_resource_scheduler_->DeleteResource(client_id.Binary(), resource_label);
}
}
RAY_LOG(DEBUG) << "[ResourceDeleted] Updated cluster_resource_map.";
return;
}
@@ -735,6 +701,7 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
<< client_id;
return;
}
// Trigger local GC at the next heartbeat interval.
if (heartbeat_data.should_global_gc()) {
should_local_gc_ = true;
@@ -909,7 +876,6 @@ void NodeManager::DispatchTasks(
// one class of tasks become stuck behind others in the queue, causing Ray to start
// many workers. See #3644 for a more detailed description of this issue.
std::vector<const std::pair<const SchedulingClass, ordered_set<TaskID>> *> fair_order;
RAY_CHECK(new_scheduler_enabled_ == false);
for (auto &it : tasks_by_class) {
fair_order.emplace_back(&it);
}
@@ -966,7 +932,6 @@ void NodeManager::ProcessClientMessage(
<< (registered_worker
? std::to_string(registered_worker->GetProcess().GetId())
: "nil");
if (registered_worker && registered_worker->IsDead()) {
// For a worker that is marked as dead (because the job has died already),
// all the messages are ignored except DisconnectClient.
@@ -1081,6 +1046,8 @@ void NodeManager::ProcessRegisterClientRequestMessage(
static_cast<int64_t>(protocol::MessageType::RegisterClientReply), fbb.GetSize(),
fbb.GetBufferPointer(), [this, client](const ray::Status &status) {
if (!status.ok()) {
RAY_LOG(WARNING)
<< "Failed to send RegisterClientReply to client, so disconnecting";
ProcessDisconnectClientMessage(client);
}
});
@@ -1179,7 +1146,6 @@ void NodeManager::HandleWorkerAvailable(
void NodeManager::HandleWorkerAvailable(const std::shared_ptr<Worker> &worker) {
RAY_CHECK(worker);
bool worker_idle = true;
// If the worker was assigned a task, mark it as finished.
if (!worker->GetAssignedTaskId().IsNil()) {
worker_idle = FinishAssignedTask(*worker);
@@ -1190,10 +1156,10 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<Worker> &worker) {
worker_pool_.PushWorker(worker);
}
// Local resource availability changed: invoke scheduling policy for local node.
if (new_scheduler_enabled_) {
NewSchedulerSchedulePendingTasks();
DispatchScheduledTasksToWorkers();
} else {
// Local resource availability changed: invoke scheduling policy for local node.
cluster_resource_map_[self_node_id_].SetLoadResources(
local_queues_.GetResourceLoad());
// Call task dispatch to assign work to the new worker.
@@ -1216,10 +1182,10 @@ void NodeManager::ProcessDisconnectClientMessage(
} else {
RAY_LOG(INFO) << "Ignoring client disconnect because the client has already "
<< "been disconnected.";
return;
}
}
RAY_CHECK(!(is_worker && is_driver));
// If the client has any blocked tasks, mark them as unblocked. In
// particular, we are no longer waiting for their dependencies.
if (worker) {
@@ -1238,7 +1204,6 @@ void NodeManager::ProcessDisconnectClientMessage(
// Clean up any open ray.wait calls that the worker made.
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
}
// Erase any lease metadata.
leased_workers_.erase(worker->WorkerId());
@@ -1296,34 +1261,24 @@ void NodeManager::ProcessDisconnectClientMessage(
worker_pool_.DisconnectWorker(worker);
// Return the resources that were being used by this worker.
if (new_scheduler_enabled_) {
new_resource_scheduler_->SubtractCPUResourceInstances(
worker->GetBorrowedCPUInstances());
new_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances());
worker->ClearAllocatedInstances();
new_resource_scheduler_->FreeLocalTaskResources(
worker->GetLifetimeAllocatedInstances());
worker->ClearLifetimeAllocatedInstances();
} else {
auto const &task_resources = worker->GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker->ResetTaskResourceIds();
auto const &task_resources = worker->GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker->ResetTaskResourceIds();
auto const &lifetime_resources = worker->GetLifetimeResourceIds();
local_available_resources_.ReleaseConstrained(
lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet());
worker->ResetLifetimeResourceIds();
}
auto const &lifetime_resources = worker->GetLifetimeResourceIds();
local_available_resources_.ReleaseConstrained(
lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet());
worker->ResetLifetimeResourceIds();
// Since some resources may have been released, we can try to dispatch more tasks. YYY
if (new_scheduler_enabled_) {
NewSchedulerSchedulePendingTasks();
} else {
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
RAY_LOG(DEBUG) << "Worker (pid=" << worker->GetProcess().GetId()
<< ") is disconnected. "
<< "job_id: " << worker->GetAssignedJobId();
// Since some resources may have been released, we can try to dispatch more tasks.
DispatchTasks(local_queues_.GetReadyTasksByClass());
} else if (is_driver) {
// The client is a driver.
const auto job_id = worker->GetAssignedJobId();
@@ -1335,7 +1290,7 @@ void NodeManager::ProcessDisconnectClientMessage(
RAY_LOG(DEBUG) << "Driver (pid=" << worker->GetProcess().GetId()
<< ") is disconnected. "
<< "job_id: " << worker->GetAssignedJobId();
<< "job_id: " << job_id;
}
client->Close();
@@ -1421,6 +1376,9 @@ void NodeManager::ProcessWaitRequestMessage(
}
} else {
// We failed to write to the client, so disconnect the client.
RAY_LOG(WARNING)
<< "Failed to send WaitReply to client, so disconnecting client";
// We failed to send the reply to the client, so disconnect the worker.
ProcessDisconnectClientMessage(client);
}
});
@@ -1449,7 +1407,7 @@ void NodeManager::ProcessWaitForDirectActorCallArgsRequestMessage(
[this, client, tag](std::vector<ObjectID> found, std::vector<ObjectID> remaining) {
RAY_CHECK(remaining.empty());
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
if (!worker) {
if (worker == nullptr) {
RAY_LOG(ERROR) << "Lost worker for wait request " << client;
} else {
worker->DirectActorCallArgWaitComplete(tag);
@@ -1535,67 +1493,38 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) {
void NodeManager::DispatchScheduledTasksToWorkers() {
RAY_CHECK(new_scheduler_enabled_);
// Check every task in task_to_dispatch queue to see
// whether it can be dispatched and ran. This avoids head-of-line
// blocking where a task which cannot be dispatched because
// there are not enough available resources blocks other
// tasks from being dispatched.
for (size_t queue_size = tasks_to_dispatch_.size(); queue_size > 0; queue_size--) {
while (!tasks_to_dispatch_.empty()) {
auto task = tasks_to_dispatch_.front();
auto reply = task.first;
auto spec = task.second.GetTaskSpecification();
tasks_to_dispatch_.pop_front();
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec);
if (!worker) {
// No worker available to schedule this task.
// Put the task back in the dispatch queue.
tasks_to_dispatch_.push_front(task);
if (worker == nullptr) {
return;
}
std::shared_ptr<TaskResourceInstances> allocated_instances(
new TaskResourceInstances());
bool schedulable = new_resource_scheduler_->AllocateLocalTaskResources(
spec.GetRequiredResources().GetResourceMap(), allocated_instances);
bool schedulable = new_resource_scheduler_->SubtractNodeAvailableResources(
self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap());
if (!schedulable) {
// Not enough resources to schedule this task.
// Put it back at the end of the dispatch queue.
tasks_to_dispatch_.push_back(task);
worker_pool_.PushWorker(worker);
// Try next task in the dispatch queue.
continue;
return;
}
worker->SetOwnerAddress(spec.CallerAddress());
// Handle the allocation to specific resource IDs.
auto acquired_resources =
local_available_resources_.Acquire(spec.GetRequiredResources());
cluster_resource_map_[self_node_id_].Acquire(spec.GetRequiredResources());
if (spec.IsActorCreationTask()) {
worker->SetLifetimeAllocatedInstances(allocated_instances);
worker->SetLifetimeResourceIds(acquired_resources);
} else {
worker->SetAllocatedInstances(allocated_instances);
worker->SetTaskResourceIds(acquired_resources);
}
worker->AssignTaskId(spec.TaskId());
worker->AssignJobId(spec.JobId());
worker->SetAssignedTask(task.second);
reply(worker, ClientID::Nil(), "", -1);
tasks_to_dispatch_.pop_front();
}
}
void NodeManager::NewSchedulerSchedulePendingTasks() {
RAY_CHECK(new_scheduler_enabled_);
size_t queue_size = tasks_to_schedule_.size();
// Check every task in task_to_schedule queue to see
// whether it can be scheduled. This avoids head-of-line
// blocking where a task which cannot be scheduled because
// there are not enough available resources blocks other
// tasks from being scheduled.
while (queue_size > 0) {
if (queue_size == 0) {
return;
} else {
queue_size--;
}
while (!tasks_to_schedule_.empty()) {
auto work = tasks_to_schedule_.front();
auto task = work.second;
auto request_resources =
@@ -1605,16 +1534,13 @@ void NodeManager::NewSchedulerSchedulePendingTasks() {
new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations);
if (node_id_string.empty()) {
/// There is no node that has available resources to run the request.
tasks_to_schedule_.pop_front();
tasks_to_schedule_.push_back(work);
continue;
break;
} else {
if (node_id_string == self_node_id_.Binary()) {
WaitForTaskArgsRequests(work);
} else {
new_resource_scheduler_->AllocateRemoteTaskResources(node_id_string,
request_resources);
new_resource_scheduler_->SubtractNodeAvailableResources(node_id_string,
request_resources);
ClientID node_id = ClientID::FromBinary(node_id_string);
auto node_info_opt = gcs_client_->Nodes().Get(node_id);
RAY_CHECK(node_info_opt)
@@ -1631,19 +1557,17 @@ void NodeManager::NewSchedulerSchedulePendingTasks() {
void NodeManager::WaitForTaskArgsRequests(std::pair<ScheduleFn, Task> &work) {
RAY_CHECK(new_scheduler_enabled_);
const Task &task = work.second;
std::vector<ObjectID> object_ids = task.GetTaskSpecification().GetDependencies();
std::vector<ObjectID> object_ids = work.second.GetTaskSpecification().GetDependencies();
if (object_ids.size() > 0) {
bool args_ready = task_dependency_manager_.SubscribeGetDependencies(
task.GetTaskSpecification().TaskId(), task.GetDependencies());
if (args_ready) {
task_dependency_manager_.UnsubscribeGetDependencies(
task.GetTaskSpecification().TaskId());
tasks_to_dispatch_.push_back(work);
} else {
waiting_tasks_[task.GetTaskSpecification().TaskId()] = work;
}
ray::Status status = object_manager_.Wait(
object_ids, -1, object_ids.size(), false,
[this, work](std::vector<ObjectID> found, std::vector<ObjectID> remaining) {
RAY_CHECK(remaining.empty());
tasks_to_dispatch_.push_back(work);
DispatchScheduledTasksToWorkers();
});
RAY_CHECK_OK(status);
} else {
tasks_to_dispatch_.push_back(work);
}
@@ -1657,7 +1581,6 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
Task task(task_message);
bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask();
ActorID actor_id = ActorID::Nil();
if (is_actor_creation_task) {
actor_id = task.GetTaskSpecification().ActorCreationId();
@@ -1670,11 +1593,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
}
if (new_scheduler_enabled_) {
auto task_spec = task.GetTaskSpecification();
auto request_resources = task.GetTaskSpecification().GetRequiredResources();
auto work = std::make_pair(
[this, task_spec, reply, send_reply_callback](std::shared_ptr<Worker> worker,
ClientID spillback_to,
std::string address, int port) {
[this, request_resources, reply, send_reply_callback](
std::shared_ptr<Worker> worker, ClientID spillback_to, std::string address,
int port) {
if (worker != nullptr) {
reply->mutable_worker_address()->set_ip_address(
initial_config_.node_manager_address);
@@ -1683,52 +1606,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary());
RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end());
leased_workers_[worker->WorkerId()] = worker;
// TODO (Ion): Fix handling floating point errors, maybe by moving to integers.
#define ZERO_CAPACITY 1.0e-5
std::shared_ptr<TaskResourceInstances> allocated_resources;
if (task_spec.IsActorCreationTask()) {
allocated_resources = worker->GetLifetimeAllocatedInstances();
} else {
allocated_resources = worker->GetAllocatedInstances();
}
auto predefined_resources = allocated_resources->predefined_resources;
::ray::rpc::ResourceMapEntry *resource;
for (size_t res_idx = 0; res_idx < predefined_resources.size(); res_idx++) {
bool first = true; // Set resource name only if at least one of its
// instances has available capacity.
for (size_t inst_idx = 0; inst_idx < predefined_resources[res_idx].size();
inst_idx++) {
if (std::abs(predefined_resources[res_idx][inst_idx]) > ZERO_CAPACITY) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
new_resource_scheduler_->GetResourceNameFromIndex(res_idx));
first = false;
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(predefined_resources[res_idx][inst_idx]);
}
}
}
auto custom_resources = allocated_resources->custom_resources;
for (auto it = custom_resources.begin(); it != custom_resources.end(); ++it) {
bool first = true; // Set resource name only if at least one of its
// instances has available capacity.
for (size_t inst_idx = 0; inst_idx < it->second.size(); inst_idx++) {
if (std::abs(it->second[inst_idx]) > ZERO_CAPACITY) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
new_resource_scheduler_->GetResourceNameFromIndex(it->first));
first = false;
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(it->second[inst_idx]);
}
}
}
leased_worker_resources_[worker->WorkerId()] = request_resources;
} else {
reply->mutable_retry_at_raylet_address()->set_ip_address(address);
reply->mutable_retry_at_raylet_address()->set_port(port);
@@ -1771,6 +1649,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
}
}
send_reply_callback(Status::OK(), nullptr, nullptr);
RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end())
<< "Worker is already leased out " << worker_id;
@@ -1794,12 +1673,47 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
rpc::SendReplyCallback send_reply_callback) {
// Read the resource spec submitted by the client.
auto worker_id = WorkerID::FromBinary(request.worker_id());
RAY_LOG(DEBUG) << "Return worker " << worker_id;
std::shared_ptr<Worker> worker = leased_workers_[worker_id];
if (new_scheduler_enabled_) {
if (worker->IsBlocked()) {
// If worker blocked, unblock it to return the cpu resources back to the worker.
HandleDirectCallTaskUnblocked(worker);
}
auto it = leased_worker_resources_.find(worker_id);
RAY_CHECK(it != leased_worker_resources_.end());
new_resource_scheduler_->AddNodeAvailableResources(self_node_id_.Binary(),
it->second.GetResourceMap());
if (worker->borrowed_cpu_resources_.GetResourceMap().size()) {
// This machine is oversubscribed, so the worker didn't get back cpus when
// unblocked. Thus we need to substract these cpus, as the previous
// "AddNodeAvailableResources" call assumed they were allocated to this worker.
new_resource_scheduler_->SubtractNodeAvailableResources(
self_node_id_.Binary(), worker->borrowed_cpu_resources_.GetResourceMap());
worker->borrowed_cpu_resources_ = ResourceSet();
}
leased_worker_resources_.erase(it);
// Update resource ids.
auto const &task_resources = worker->GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker->ResetTaskResourceIds();
// TODO (ion): Handle ProcessDisconnectClientMessage()
HandleWorkerAvailable(worker);
leased_workers_.erase(worker_id);
send_reply_callback(Status::OK(), nullptr, nullptr);
return;
}
leased_workers_.erase(worker_id);
Status status;
if (worker) {
leased_workers_.erase(worker_id);
if (request.disconnect_worker()) {
ProcessDisconnectClientMessage(worker->Connection());
} else {
@@ -1808,12 +1722,6 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
if (worker->IsBlocked()) {
HandleDirectCallTaskUnblocked(worker);
}
if (new_scheduler_enabled_) {
new_resource_scheduler_->SubtractCPUResourceInstances(
worker->GetBorrowedCPUInstances());
new_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances());
worker->ClearAllocatedInstances();
}
HandleWorkerAvailable(worker);
}
} else {
@@ -2192,16 +2100,14 @@ void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr<Worker> &wor
if (!worker) {
return;
}
std::vector<double> cpu_instances;
if (worker->GetAllocatedInstances() != nullptr) {
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstances();
}
if (cpu_instances.size() > 0) {
std::vector<double> borrowed_cpu_instances =
new_resource_scheduler_->AddCPUResourceInstances(cpu_instances);
worker->SetBorrowedCPUInstances(borrowed_cpu_instances);
worker->MarkBlocked();
}
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
local_available_resources_.Release(cpu_resource_ids);
cluster_resource_map_[self_node_id_].Release(cpu_resource_ids.ToResourceSet());
new_resource_scheduler_->AddNodeAvailableResources(
self_node_id_.Binary(), // A
cpu_resource_ids.ToResourceSet().GetResourceMap());
worker->MarkBlocked();
NewSchedulerSchedulePendingTasks();
return;
}
@@ -2221,23 +2127,43 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr<Worker> &w
if (!worker) {
return;
}
std::vector<double> cpu_instances;
if (worker->GetAllocatedInstances() != nullptr) {
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstances();
}
if (cpu_instances.size() > 0) {
new_resource_scheduler_->SubtractCPUResourceInstances(cpu_instances);
new_resource_scheduler_->AddCPUResourceInstances(worker->GetBorrowedCPUInstances());
worker->MarkUnblocked();
auto it = leased_worker_resources_.find(worker->WorkerId());
RAY_CHECK(it != leased_worker_resources_.end());
const auto cpu_resources = it->second.GetNumCpus();
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);
if (!oversubscribed) {
// Reacquire the CPU resources for the worker. Note that care needs to be
// taken if the user is using the specific CPU IDs since the IDs that we
// reacquire here may be different from the ones that the task started with.
auto const resource_ids = local_available_resources_.Acquire(cpu_resources);
worker->AcquireTaskCpuResources(resource_ids);
cluster_resource_map_[self_node_id_].Acquire(cpu_resources);
new_resource_scheduler_->SubtractNodeAvailableResources(
self_node_id_.Binary(), cpu_resources.GetResourceMap());
worker->borrowed_cpu_resources_ = ResourceSet();
} else {
// Remember these are borrowed cpus resources, i.e., we did not return then to the
// worker.
worker->borrowed_cpu_resources_ = cpu_resources;
}
worker->MarkUnblocked();
NewSchedulerSchedulePendingTasks();
return;
}
if (!worker || worker->GetAssignedTaskId().IsNil() || !worker->IsBlocked()) {
if (!worker || worker->GetAssignedTaskId().IsNil()) {
return; // The worker may have died or is no longer processing the task.
}
TaskID task_id = worker->GetAssignedTaskId();
// First, always release task dependencies. This ensures we don't leak resources even
// if we don't need to unblock the worker below.
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
if (!worker->IsBlocked()) {
return; // Don't need to unblock the worker.
}
Task task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING);
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
const ResourceSet cpu_resources = required_resources.GetNumCpus();
@@ -2258,7 +2184,6 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr<Worker> &w
<< cluster_resource_map_[self_node_id_].GetAvailableResources().ToString();
}
worker->MarkUnblocked();
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
}
void NodeManager::AsyncResolveObjects(
@@ -2488,29 +2413,18 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
TaskID task_id = worker.GetAssignedTaskId();
RAY_LOG(DEBUG) << "Finished task " << task_id;
// (See design_docs/task_states.rst for the state transition diagram.)
Task task;
if (new_scheduler_enabled_) {
task = worker.GetAssignedTask();
// leased_workers_.erase(worker.WorkerId()); // Maybe RAY_CHECK ???
if (worker.GetAllocatedInstances() != nullptr) {
new_resource_scheduler_->SubtractCPUResourceInstances(
worker.GetBorrowedCPUInstances());
new_resource_scheduler_->FreeLocalTaskResources(worker.GetAllocatedInstances());
worker.ClearAllocatedInstances();
}
} else {
// (See design_docs/task_states.rst for the state transition diagram.)
RAY_CHECK(local_queues_.RemoveTask(task_id, &task));
RAY_CHECK(local_queues_.RemoveTask(task_id, &task));
// Release task's resources. The worker's lifetime resources are still held.
auto const &task_resources = worker.GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker.ResetTaskResourceIds();
}
// Release task's resources. The worker's lifetime resources are still held.
auto const &task_resources = worker.GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker.ResetTaskResourceIds();
const auto &spec = task.GetTaskSpecification(); //
const auto &spec = task.GetTaskSpecification();
if ((spec.IsActorCreationTask() || spec.IsActorTask())) {
// If this was an actor or actor creation task, handle the actor's new
// state.
@@ -2843,43 +2757,31 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
<< " on " << self_node_id_ << ", " << ready_task_ids.size()
<< " tasks ready";
// Transition the tasks whose dependencies are now fulfilled to the ready state.
if (new_scheduler_enabled_) {
for (auto task_id : ready_task_ids) {
auto it = waiting_tasks_.find(task_id);
if (it != waiting_tasks_.end()) {
task_dependency_manager_.UnsubscribeGetDependencies(task_id);
tasks_to_dispatch_.push_back(it->second);
waiting_tasks_.erase(it);
}
if (ready_task_ids.size() > 0) {
std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(),
ready_task_ids.end());
// First filter out the tasks that should not be moved to READY.
local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED);
local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING);
local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER);
local_queues_.FilterState(ready_task_id_set, TaskState::WAITING_FOR_ACTOR_CREATION);
// Make sure that the remaining tasks are all WAITING or direct call
// actors.
auto ready_task_id_set_copy = ready_task_id_set;
local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING);
// Filter out direct call actors. These are not tracked by the raylet and
// their assigned task ID is the actor ID.
for (const auto &id : ready_task_id_set_copy) {
RAY_CHECK(actor_registry_.count(id.ActorId()) > 0);
ready_task_id_set.erase(id);
}
NewSchedulerSchedulePendingTasks();
} else {
if (ready_task_ids.size() > 0) {
std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(),
ready_task_ids.end());
// First filter out the tasks that should not be moved to READY.
local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED);
local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING);
local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER);
local_queues_.FilterState(ready_task_id_set, TaskState::WAITING_FOR_ACTOR_CREATION);
// Make sure that the remaining tasks are all WAITING or direct call
// actors.
auto ready_task_id_set_copy = ready_task_id_set;
local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING);
// Filter out direct call actors. These are not tracked by the raylet and
// their assigned task ID is the actor ID.
for (const auto &id : ready_task_id_set_copy) {
RAY_CHECK(actor_registry_.count(id.ActorId()) > 0);
ready_task_id_set.erase(id);
}
// Queue and dispatch the tasks that are ready to run (i.e., WAITING).
auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set);
local_queues_.QueueTasks(ready_tasks, TaskState::READY);
DispatchTasks(MakeTasksByClass(ready_tasks));
}
// Queue and dispatch the tasks that are ready to run (i.e., WAITING).
auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set);
local_queues_.QueueTasks(ready_tasks, TaskState::READY);
DispatchTasks(MakeTasksByClass(ready_tasks));
}
}
+3 -2
View File
@@ -720,6 +720,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// The new resource scheduler for direct task calls.
std::shared_ptr<ClusterResourceScheduler> new_resource_scheduler_;
/// Map of leased workers to their current resource usage.
/// TODO(ion): Check whether we can track these resources in the worker.
std::unordered_map<WorkerID, ResourceSet> leased_worker_resources_;
typedef std::function<void(std::shared_ptr<Worker>, ClientID spillback_to,
std::string address, int port)>
@@ -730,8 +733,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
std::deque<std::pair<ScheduleFn, Task>> tasks_to_schedule_;
/// Queue of lease requests that should be scheduled onto workers.
std::deque<std::pair<ScheduleFn, Task>> tasks_to_dispatch_;
/// Queue tasks waiting for arguments to be transferred locally.
absl::flat_hash_map<TaskID, std::pair<ScheduleFn, Task>> waiting_tasks_;
/// Cache of gRPC clients to workers (not necessarily running on this node).
/// Also includes the number of inflight requests to each worker - when this
+5 -52
View File
@@ -19,8 +19,6 @@
#include "ray/common/client_connection.h"
#include "ray/common/id.h"
#include "ray/common/scheduling/cluster_resource_scheduler.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/common/task/task.h"
#include "ray/common/task/task_common.h"
@@ -87,40 +85,11 @@ class Worker {
void DirectActorCallArgWaitComplete(int64_t tag);
void WorkerLeaseGranted(const std::string &address, int port);
// Setter, geter, and clear methods for allocated_instances_.
void SetAllocatedInstances(
std::shared_ptr<TaskResourceInstances> &allocated_instances) {
allocated_instances_ = allocated_instances;
};
std::shared_ptr<TaskResourceInstances> GetAllocatedInstances() {
return allocated_instances_;
};
void ClearAllocatedInstances() { allocated_instances_ = nullptr; };
void SetLifetimeAllocatedInstances(
std::shared_ptr<TaskResourceInstances> &allocated_instances) {
lifetime_allocated_instances_ = allocated_instances;
};
std::shared_ptr<TaskResourceInstances> GetLifetimeAllocatedInstances() {
return lifetime_allocated_instances_;
};
void ClearLifetimeAllocatedInstances() { lifetime_allocated_instances_ = nullptr; };
void SetBorrowedCPUInstances(std::vector<double> &cpu_instances) {
borrowed_cpu_instances_ = cpu_instances;
};
std::vector<double> &GetBorrowedCPUInstances() { return borrowed_cpu_instances_; };
void ClearBorrowedCPUInstances() { return borrowed_cpu_instances_.clear(); };
Task &GetAssignedTask() { return assigned_task_; };
void SetAssignedTask(Task &assigned_task) { assigned_task_ = assigned_task; };
/// Cpus borrowed by the worker. This happens when the machine is oversubscribed
/// and the worker does not get back the cpu resources when unblocked.
/// TODO (ion): Add methods to access this variable.
/// TODO (ion): Investigate a more intuitive alternative to track these Cpus.
ResourceSet borrowed_cpu_resources_;
rpc::CoreWorkerClient *rpc_client() { return rpc_client_.get(); }
@@ -165,22 +134,6 @@ class Worker {
/// The address of this worker's owner. The owner is the worker that
/// currently holds the lease on this worker, if any.
rpc::Address owner_address_;
/// The capacity of each resource instance allocated to this worker in order
/// to satisfy the resource requests of the task is currently running.
std::shared_ptr<TaskResourceInstances> allocated_instances_;
/// The capacity of each resource instance allocated to this worker
/// when running as an actor.
std::shared_ptr<TaskResourceInstances> lifetime_allocated_instances_;
/// CPUs borrowed by the worker. This happens in the following scenario:
/// 1) Worker A is blocked, so it donates its CPUs back to the node.
/// 2) Other workers are scheduled and are allocated some of the CPUs donated by A.
/// 3) Task A is unblocked, but it cannot get all CPUs back. At this point,
/// the node is oversubscribed. borrowed_cpu_instances_ represents the number
/// of CPUs this node is oversubscribed by.
/// TODO (Ion): Investigate a more intuitive alternative to track these Cpus.
std::vector<double> borrowed_cpu_instances_;
/// Task being assigned to this worker.
Task assigned_task_;
};
} // namespace raylet