New scheduler integration (#6321)

This commit is contained in:
Ion
2019-12-02 14:42:16 -08:00
committed by Eric Liang
parent 43d20fff62
commit 2a3adf2d70
9 changed files with 578 additions and 10 deletions
+5
View File
@@ -40,6 +40,11 @@ RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000)
/// type of task from starving other types (see issue #3664).
RAY_CONFIG(bool, fair_queueing_enabled, true)
/// Whether to enable the new scheduler. The new scheduler is designed
/// only to work with direct calls. Once direct calls afre becoming
/// the default, this scheduler will also become the default.
RAY_CONFIG(bool, new_scheduler_enabled, false)
// The max allowed size in bytes of a return object from direct actor calls.
// Objects larger than this size will be spilled to plasma.
RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)
@@ -6,6 +6,22 @@ ClusterResourceScheduler::ClusterResourceScheduler(
AddOrUpdateNode(local_node_id_, local_node_resources);
}
ClusterResourceScheduler::ClusterResourceScheduler(
const std::string &local_node_id,
const std::unordered_map<std::string, double> &local_node_resources) {
local_node_id_ = string_to_int_map_.Insert(local_node_id);
AddOrUpdateNode(local_node_id, local_node_resources, local_node_resources);
}
void ClusterResourceScheduler::AddOrUpdateNode(
const std::string &node_id,
const std::unordered_map<std::string, double> &resources_total,
const std::unordered_map<std::string, double> &resources_available) {
NodeResources node_resources;
ResourceMapToNodeResources(resources_total, resources_available, &node_resources);
AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources);
}
void ClusterResourceScheduler::SetPredefinedResources(const NodeResources &new_resources,
NodeResources *old_resources) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
@@ -45,6 +61,7 @@ bool ClusterResourceScheduler::RemoveNode(int64_t node_id) {
} else {
it->second.custom_resources.clear();
nodes_.erase(it);
string_to_int_map_.Remove(node_id);
return true;
}
}
@@ -136,8 +153,9 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
int64_t violations;
if ((violations = IsSchedulable(task_req, it->first, it->second)) == -1) {
break;
continue;
}
// Update the node with the smallest number of soft constraints violated.
if (min_violations > violations) {
min_violations = violations;
@@ -152,6 +170,20 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task
return best_node;
}
std::string ClusterResourceScheduler::GetBestSchedulableNode(
const std::unordered_map<std::string, double> &task_resources,
int64_t *total_violations) {
TaskRequest task_request;
ResourceMapToTaskRequest(task_resources, &task_request);
int64_t node_id = GetBestSchedulableNode(task_request, total_violations);
std::string id_string;
if (node_id == -1) {
return "";
}
return string_to_int_map_.Get(node_id);
}
bool ClusterResourceScheduler::SubtractNodeAvailableResources(
int64_t node_id, const TaskRequest &task_req) {
auto it = nodes_.find(node_id);
@@ -182,6 +214,45 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources(
return true;
}
bool ClusterResourceScheduler::SubtractNodeAvailableResources(
const std::string &node_id,
const std::unordered_map<std::string, double> &resource_map) {
TaskRequest task_request;
ResourceMapToTaskRequest(resource_map, &task_request);
return SubtractNodeAvailableResources(string_to_int_map_.Get(node_id), task_request);
}
bool ClusterResourceScheduler::AddNodeAvailableResources(int64_t node_id,
const TaskRequest &task_req) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
}
NodeResources &resources = it->second;
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
resources.capacities[i].available =
resources.capacities[i].available + task_req.predefined_resources[i].demand;
}
for (size_t i = 0; i < task_req.custom_resources.size(); i++) {
auto it = resources.custom_resources.find(task_req.custom_resources[i].id);
if (it != resources.custom_resources.end()) {
it->second.available =
it->second.available + task_req.custom_resources[i].req.demand;
}
}
return true;
}
bool ClusterResourceScheduler::AddNodeAvailableResources(
const std::string &node_id,
const std::unordered_map<std::string, double> &resource_map) {
TaskRequest task_request;
ResourceMapToTaskRequest(resource_map, &task_request);
return AddNodeAvailableResources(string_to_int_map_.Get(node_id), task_request);
}
bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
NodeResources *ret_resources) {
auto it = nodes_.find(node_id);
@@ -194,3 +265,228 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
}
int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); }
void ClusterResourceScheduler::ResourceMapToNodeResources(
const std::unordered_map<std::string, double> &resource_map_total,
const std::unordered_map<std::string, double> &resource_map_available,
NodeResources *node_resources) {
node_resources->capacities.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
node_resources->capacities[i].total = node_resources->capacities[i].available = 0;
}
for (auto it = resource_map_total.begin(); it != resource_map_total.end(); ++it) {
ResourceCapacity resource_capacity;
resource_capacity.total = (int64_t)it->second;
auto it2 = resource_map_available.find(it->first);
if (it2 == resource_map_available.end()) {
resource_capacity.available = 0;
} else {
resource_capacity.available = (int64_t)it2->second;
}
if (it->first == ray::kCPU_ResourceLabel) {
node_resources->capacities[CPU] = resource_capacity;
} else if (it->first == ray::kGPU_ResourceLabel) {
node_resources->capacities[GPU] = resource_capacity;
} else if (it->first == ray::kTPU_ResourceLabel) {
node_resources->capacities[TPU] = resource_capacity;
} else if (it->first == ray::kMemory_ResourceLabel) {
node_resources->capacities[MEM] = resource_capacity;
} else {
// This is a custom resource.
node_resources->custom_resources.emplace(string_to_int_map_.Insert(it->first),
resource_capacity);
}
}
}
void ClusterResourceScheduler::ResourceMapToTaskRequest(
const std::unordered_map<std::string, double> &resource_map,
TaskRequest *task_request) {
size_t i = 0;
task_request->predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
task_request->predefined_resources[0].demand = 0;
task_request->predefined_resources[0].soft = false;
}
for (auto it = resource_map.begin(); it != resource_map.end(); ++it) {
if (it->first == ray::kCPU_ResourceLabel) {
task_request->predefined_resources[CPU].demand = it->second;
} else if (it->first == ray::kGPU_ResourceLabel) {
task_request->predefined_resources[GPU].demand = it->second;
} else if (it->first == ray::kTPU_ResourceLabel) {
task_request->predefined_resources[TPU].demand = it->second;
} else if (it->first == ray::kMemory_ResourceLabel) {
task_request->predefined_resources[MEM].demand = it->second;
} else {
// This is a custom resource.
task_request->custom_resources[i].id = string_to_int_map_.Insert(it->first);
task_request->custom_resources[i].req.demand = it->second;
task_request->custom_resources[i].req.soft = false;
i++;
}
}
}
void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_id_string,
const std::string &resource_name,
int64_t resource_total) {
int64_t client_id = string_to_int_map_.Get(client_id_string);
auto it = nodes_.find(client_id);
if (it == nodes_.end()) {
return;
}
int idx = -1;
if (resource_name == ray::kCPU_ResourceLabel) {
idx = (int)CPU;
} else if (resource_name == ray::kGPU_ResourceLabel) {
idx = (int)GPU;
} else if (resource_name == ray::kTPU_ResourceLabel) {
idx = (int)TPU;
} else if (resource_name == ray::kMemory_ResourceLabel) {
idx = (int)MEM;
};
if (idx != -1) {
int64_t diff_capacity = resource_total - it->second.capacities[idx].total;
it->second.capacities[idx].total += diff_capacity;
it->second.capacities[idx].available += diff_capacity;
if (it->second.capacities[idx].available < 0) {
it->second.capacities[idx].available = 0;
}
if (it->second.capacities[idx].total < 0) {
it->second.capacities[idx].total = 0;
}
} else {
int64_t resource_id = string_to_int_map_.Insert(resource_name);
auto itr = it->second.custom_resources.find(resource_id);
if (itr != it->second.custom_resources.end()) {
int64_t diff_capacity = resource_total - itr->second.total;
itr->second.total += diff_capacity;
itr->second.available += diff_capacity;
if (itr->second.available < 0) {
itr->second.available = 0;
}
if (itr->second.total < 0) {
itr->second.total = 0;
}
}
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total;
it->second.custom_resources.emplace(resource_id, resource_capacity);
}
}
void ClusterResourceScheduler::DeleteResource(const std::string &client_id_string,
const std::string &resource_name) {
int64_t client_id = string_to_int_map_.Get(client_id_string);
auto it = nodes_.find(client_id);
if (it == nodes_.end()) {
return;
}
int idx = -1;
if (resource_name == ray::kCPU_ResourceLabel) {
idx = (int)CPU;
} else if (resource_name == ray::kGPU_ResourceLabel) {
idx = (int)GPU;
} else if (resource_name == ray::kTPU_ResourceLabel) {
idx = (int)TPU;
} else if (resource_name == ray::kMemory_ResourceLabel) {
idx = (int)MEM;
};
if (idx != -1) {
it->second.capacities[idx].total = 0;
} else {
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = it->second.custom_resources.find(resource_id);
if (itr != it->second.custom_resources.end()) {
string_to_int_map_.Remove(resource_id);
it->second.custom_resources.erase(itr);
}
}
}
bool ClusterResourceScheduler::EqualNodeResources(const NodeResources &node_resources1,
const NodeResources &node_resources2) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (node_resources1.capacities[i].total != node_resources2.capacities[i].total) {
return false;
}
if (node_resources1.capacities[i].available !=
node_resources2.capacities[i].available) {
return false;
}
}
if (node_resources1.custom_resources.size() !=
node_resources2.custom_resources.size()) {
return true;
}
for (auto it1 = node_resources1.custom_resources.begin();
it1 != node_resources1.custom_resources.end(); ++it1) {
auto it2 = node_resources2.custom_resources.find(it1->first);
if (it2 == node_resources2.custom_resources.end()) {
return false;
}
if (it1->second.total != it2->second.total) {
return false;
}
if (it1->second.available != it2->second.available) {
return false;
}
}
return true;
}
std::string ClusterResourceScheduler::NodeResourcesDebugString(
const NodeResources &node_resources) {
std::stringstream buffer;
buffer << " node predefined resources {";
for (size_t i = 0; i < node_resources.capacities.size(); i++) {
buffer << "(" << node_resources.capacities[i].total << ":"
<< node_resources.capacities[i].available << ") ";
}
buffer << "}" << std::endl;
buffer << " node custom resources {";
for (auto it = node_resources.custom_resources.begin();
it != node_resources.custom_resources.end(); ++it) {
buffer << it->first << ":(" << it->second.total << ":" << it->second.available
<< ") ";
}
buffer << "}" << std::endl;
return buffer.str();
}
std::string ClusterResourceScheduler::TaskRequestDebugString(
const TaskRequest &task_request) {
std::stringstream buffer;
buffer << std::endl << " request predefined resources {";
for (size_t i = 0; i < task_request.predefined_resources.size(); i++) {
buffer << "(" << task_request.predefined_resources[i].demand << ":"
<< task_request.predefined_resources[i].soft << ") ";
}
buffer << "}" << std::endl;
buffer << " request custom resources {";
for (size_t i = 0; i < task_request.custom_resources.size(); i++) {
buffer << task_request.custom_resources[i].id << ":"
<< "(" << task_request.custom_resources[i].req.demand << ":"
<< task_request.custom_resources[i].req.soft << ") ";
}
buffer << "}" << std::endl;
return buffer.str();
}
std::string ClusterResourceScheduler::DebugString(void) {
std::stringstream buffer;
buffer << std::endl << "local node id: " << local_node_id_ << std::endl;
for (auto it = nodes_.begin(); it != nodes_.end(); ++it) {
buffer << "node id: " << it->first << std::endl;
buffer << NodeResourcesDebugString(it->second);
}
return buffer.str();
}
@@ -3,8 +3,12 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/util/logging.h"
#include <iostream>
#include <sstream>
#include <vector>
/// List of predefined resources.
@@ -62,6 +66,9 @@ class ClusterResourceScheduler {
absl::flat_hash_map<int64_t, NodeResources> nodes_;
/// ID of local node.
int64_t local_node_id_;
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap string_to_int_map_;
/// Set predefined resources.
///
@@ -77,21 +84,33 @@ class ClusterResourceScheduler {
const absl::flat_hash_map<int64_t, ResourceCapacity> &new_custom_resources,
absl::flat_hash_map<int64_t, ResourceCapacity> *old_custom_resources);
std::string TaskRequestDebugString(const TaskRequest &task_request);
std::string NodeResourcesDebugString(const NodeResources &node_resources);
std::string DebugString(void);
public:
ClusterResourceScheduler(void){};
/// Constructor initializing the resources associated with the local node.
///
/// \param local_node_id: ID of local node,
/// \param node_resources: The total and the available resources associated
/// \param local_node_resources: The total and the available resources associated
/// with the local node.
ClusterResourceScheduler(int64_t local_node_id, const NodeResources &node_resources);
ClusterResourceScheduler(int64_t local_node_id,
const NodeResources &local_node_resources);
ClusterResourceScheduler(
const std::string &local_node_id,
const std::unordered_map<std::string, double> &local_node_resources);
/// Add a new node or overwrite the resources of an existing node.
///
/// \param node_id: Node ID.
/// \param node_resources: Up to date total and available resources of the node.
void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources);
void AddOrUpdateNode(
const std::string &node_id,
const std::unordered_map<std::string, double> &resource_map_total,
const std::unordered_map<std::string, double> &resource_map_available);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
@@ -132,16 +151,26 @@ class ClusterResourceScheduler {
///
/// Finally, if no such node exists, return -1.
///
/// \param task_req: Task to be scheduled.
/// \param task_request: Task to be scheduled.
/// \param violations: The number of soft constraint violations associated
/// with the node returned by this function (assuming
/// a node that can schedule task_req is found).
///
/// \return -1, if no node can schedule the current request; otherwise,
/// return the ID of a node that can schedule the task request.
int64_t GetBestSchedulableNode(const TaskRequest &task_req, int64_t *violations);
int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t *violations);
/// Update the available resources of a node when a task request is
/// Similar to
/// int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t
/// *violations)
/// but the return value is different:
/// \return "", if no node can schedule the current request; otherwise,
/// return the ID in string format of a node that can schedule the
// task request.
std::string GetBestSchedulableNode(
const std::unordered_map<std::string, double> &task_request, int64_t *violations);
/// Decrease the available resources of a node when a task request is
/// scheduled on the given node.
///
/// \param node_id: ID of node on which request is being scheduled.
@@ -149,7 +178,23 @@ class ClusterResourceScheduler {
///
/// \return true, if task_req can be indeed scheduled on the node,
/// and false otherwise.
bool SubtractNodeAvailableResources(int64_t node_id, const TaskRequest &task_req);
bool SubtractNodeAvailableResources(int64_t node_id, const TaskRequest &task_request);
bool SubtractNodeAvailableResources(
const std::string &node_id,
const std::unordered_map<std::string, double> &task_request);
/// Increase available resources of a node when a worker has Finished
/// a task.
///
/// \param node_id: ID of node on which request is being scheduled.
/// \param task_request: resource requests of the task finishing execution.
///
/// \return true, if task_req can be indeed scheduled on the node,
/// and false otherwise.
bool AddNodeAvailableResources(int64_t node_id, const TaskRequest &task_request);
bool AddNodeAvailableResources(
const std::string &node_id,
const std::unordered_map<std::string, double> &task_request);
/// Return resources associated to the given node_id in ret_resources.
/// If node_id not found, return false; otherwise return true.
@@ -157,6 +202,29 @@ class ClusterResourceScheduler {
/// Get number of nodes in the cluster.
int64_t NumNodes();
/// Convert a map of resources to a TaskRequest data structure.
void ResourceMapToTaskRequest(
const std::unordered_map<std::string, double> &resource_map,
TaskRequest *task_request);
/// Convert a map of resources to a TaskRequest data structure.
void ResourceMapToNodeResources(
const std::unordered_map<std::string, double> &resource_map_total,
const std::unordered_map<std::string, double> &resource_map_available,
NodeResources *node_resources);
/// Update total capacity of resource resource_name at node client_id.
void UpdateResourceCapacity(const std::string &client_id,
const std::string &resource_name, int64_t resource_total);
/// Delete resource resource_name from node cleint_id_string.
void DeleteResource(const std::string &client_id_string,
const std::string &resource_name);
/// Check whether two node resources are identical.
bool EqualNodeResources(const NodeResources &node_resources1,
const NodeResources &node_resources2);
};
#endif // RAY_COMMON_SCHEDULING_SCHEDULING_H
@@ -9,6 +9,17 @@ int64_t StringIdMap::Get(const std::string &string_id) {
}
};
std::string StringIdMap::Get(uint64_t id) {
std::string id_string;
auto it = int_to_string_.find(id);
if (it == int_to_string_.end()) {
id_string = std::to_string(-1);
} else {
id_string = it->second;
}
return id_string;
};
int64_t StringIdMap::Insert(const std::string &string_id, uint8_t max_id) {
auto sit = string_to_int_.find(string_id);
if (sit == string_to_int_.end()) {
@@ -24,6 +24,12 @@ class StringIdMap {
/// \return The integer ID associated with the given string ID.
int64_t Get(const std::string &string_id);
/// Get string ID associated with an existing integer ID.
///
/// \param Integre ID.
/// \return The string ID associated with the given integer ID.
std::string Get(uint64_t id);
/// Insert a string ID and get the associated integer ID.
///
/// \param String ID to be inserted.
@@ -16,6 +16,9 @@ namespace ray {
constexpr double kResourceConversionFactor = 10000;
const std::string kCPU_ResourceLabel = "CPU";
const std::string kGPU_ResourceLabel = "GPU";
const std::string kTPU_ResourceLabel = "TPU";
const std::string kMemory_ResourceLabel = "memory";
/// \class FractionalResourceQuantity
/// \brief Converts the resource quantities to an internal representation to
+3
View File
@@ -56,6 +56,9 @@ Status RedisAsyncContext::RedisAsyncCommand(redisCallbackFn *fn, void *privdata,
{
// `redisvAsyncCommand` will mutate `redis_async_context_`, use a lock to protect it.
std::lock_guard<std::mutex> lock(mutex_);
if (!redis_async_context_) {
return Status::NotImplemented("...");
}
ret_code = redisvAsyncCommand(redis_async_context_, fn, privdata, format, ap);
}
+149 -3
View File
@@ -108,7 +108,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
actor_registry_(),
node_manager_server_("NodeManager", config.node_manager_port),
node_manager_service_(io_service, *this),
client_call_manager_(io_service) {
client_call_manager_(io_service),
new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()) {
RAY_CHECK(heartbeat_period_.count() > 0);
// Initialize the resource map with own cluster resource configuration.
ClientID local_client_id = gcs_client_->client_table().GetLocalClientId();
@@ -123,6 +124,13 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
if (new_scheduler_enabled_) {
SchedulingResources &local_resources = cluster_resource_map_[local_client_id];
new_resource_scheduler_ =
std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler(
client_id_.Binary(), local_resources.GetTotalResources().GetResourceMap()));
}
RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str()));
// Run the node manger rpc server.
node_manager_server_.RegisterService(node_manager_service_);
@@ -573,6 +581,10 @@ void NodeManager::ResourceCreateUpdated(const ClientID &client_id,
local_available_resources_.AddOrUpdateResource(resource_label,
new_resource_capacity);
}
if (new_scheduler_enabled_) {
new_resource_scheduler_->UpdateResourceCapacity(client_id.Binary(), resource_label,
new_resource_capacity);
}
}
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
@@ -605,6 +617,9 @@ void NodeManager::ResourceDeleted(const ClientID &client_id,
if (client_id == local_client_id) {
local_available_resources_.DeleteResource(resource_label);
}
if (new_scheduler_enabled_) {
new_resource_scheduler_->DeleteResource(client_id.Binary(), resource_label);
}
}
RAY_LOG(DEBUG) << "[ResourceDeleted] Updated cluster_resource_map.";
return;
@@ -643,6 +658,8 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
}
SchedulingResources &remote_resources = it->second;
ResourceSet remote_total(VectorFromProtobuf(heartbeat_data.resources_total_label()),
VectorFromProtobuf(heartbeat_data.resources_total_capacity()));
ResourceSet remote_available(
VectorFromProtobuf(heartbeat_data.resources_available_label()),
VectorFromProtobuf(heartbeat_data.resources_available_capacity()));
@@ -652,6 +669,15 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
remote_resources.SetAvailableResources(std::move(remote_available));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
if (new_scheduler_enabled_ && client_id != client_id_) {
new_resource_scheduler_->AddOrUpdateNode(client_id.Binary(),
remote_total.GetResourceMap(),
remote_available.GetResourceMap());
NewSchedulerSchedulePendingTasks();
return;
}
// Extract decision for this raylet.
auto decision = scheduling_policy_.SpillOver(remote_resources);
std::unordered_set<TaskID> local_task_ids;
@@ -1087,6 +1113,12 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<Worker> &worker) {
// Return the worker to the idle pool.
worker_pool_.PushWorker(std::move(worker));
if (new_scheduler_enabled_) {
DispatchScheduledTasksToWorkers();
return;
}
// Local resource availability changed: invoke scheduling policy for local node.
const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId();
cluster_resource_map_[local_client_id].SetLoadResources(
@@ -1187,12 +1219,20 @@ void NodeManager::ProcessDisconnectClientMessage(
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[client_id].GetTotalResources());
cluster_resource_map_[client_id].Release(task_resources.ToResourceSet());
if (new_scheduler_enabled_) {
new_resource_scheduler_->AddNodeAvailableResources(
client_id_.Binary(), task_resources.ToResourceSet().GetResourceMap());
}
worker->ResetTaskResourceIds();
auto const &lifetime_resources = worker->GetLifetimeResourceIds();
local_available_resources_.ReleaseConstrained(
lifetime_resources, cluster_resource_map_[client_id].GetTotalResources());
cluster_resource_map_[client_id].Release(lifetime_resources.ToResourceSet());
if (new_scheduler_enabled_) {
new_resource_scheduler_->AddNodeAvailableResources(
client_id_.Binary(), lifetime_resources.ToResourceSet().GetResourceMap());
}
worker->ResetLifetimeResourceIds();
RAY_LOG(DEBUG) << "Worker (pid=" << worker->Pid() << ") is disconnected. "
@@ -1440,15 +1480,94 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) {
SubmitTask(Task(task_message), Lineage());
}
void NodeManager::DispatchScheduledTasksToWorkers() {
RAY_CHECK(new_scheduler_enabled_);
while (!tasks_to_dispatch_.empty()) {
auto task = tasks_to_dispatch_.front();
auto reply = task.first;
std::shared_ptr<Worker> worker =
worker_pool_.PopWorker(task.second.GetTaskSpecification());
if (worker == nullptr) {
return;
}
reply(worker, ClientID::Nil(), "", -1);
tasks_to_dispatch_.pop_front();
}
}
void NodeManager::NewSchedulerSchedulePendingTasks() {
RAY_CHECK(new_scheduler_enabled_);
while (!tasks_to_schedule_.empty()) {
auto work = tasks_to_schedule_.front();
auto task = work.second;
auto request_resources =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
int64_t violations = 0;
std::string node_id_string =
new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations);
if (node_id_string.empty()) {
/// There is no node that has available resources to run the request.
break;
} else {
new_resource_scheduler_->SubtractNodeAvailableResources(node_id_string,
request_resources);
if (node_id_string == client_id_.Binary()) {
tasks_to_dispatch_.push_back(work);
} else {
ClientID node_id = ClientID::FromBinary(node_id_string);
GcsNodeInfo node_info;
bool found = gcs_client_->client_table().GetClient(node_id, &node_info);
RAY_CHECK(found)
<< "Spilling back to a node manager, but no GCS info found for node "
<< node_id;
work.first(nullptr, node_id, node_info.node_manager_address(),
node_info.node_manager_port());
}
tasks_to_schedule_.pop_front();
}
}
DispatchScheduledTasksToWorkers();
}
void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &request,
rpc::WorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {
rpc::Task task_message;
task_message.mutable_task_spec()->CopyFrom(request.resource_spec());
Task task(task_message);
if (new_scheduler_enabled_) {
auto request_resources =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
auto work = std::make_pair(
[this, request_resources, reply, send_reply_callback](
std::shared_ptr<Worker> worker, ClientID spillback_to, std::string address,
int port) {
if (worker != nullptr) {
reply->mutable_worker_address()->set_ip_address(
initial_config_.node_manager_address);
reply->mutable_worker_address()->set_port(worker->Port());
reply->mutable_worker_address()->set_raylet_id(
gcs_client_->client_table().GetLocalClientId().Binary());
RAY_CHECK(leased_workers_.find(worker->Port()) == leased_workers_.end());
leased_workers_[worker->Port()] = worker;
leased_worker_resources_[worker->Port()] = request_resources;
} else {
reply->mutable_retry_at_raylet_address()->set_ip_address(address);
reply->mutable_retry_at_raylet_address()->set_port(port);
reply->mutable_retry_at_raylet_address()->set_raylet_id(
spillback_to.Binary());
}
send_reply_callback(Status::OK(), nullptr, nullptr);
},
task);
tasks_to_schedule_.push_back(work);
NewSchedulerSchedulePendingTasks();
return;
}
// Override the task dispatch to call back to the client instead of executing the
// task directly on the worker.
Task task(task_message);
RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId();
TaskID task_id = task.GetTaskSpecification().TaskId();
task.OnDispatchInstead(
@@ -1485,6 +1604,15 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
// Read the resource spec submitted by the client.
auto worker_port = request.worker_port();
std::shared_ptr<Worker> worker = std::move(leased_workers_[worker_port]);
if (new_scheduler_enabled_) {
auto it = leased_worker_resources_.find(worker_port);
RAY_CHECK(it != leased_worker_resources_.end());
new_resource_scheduler_->AddNodeAvailableResources(client_id_.Binary(), it->second);
leased_worker_resources_.erase(it);
NewSchedulerSchedulePendingTasks();
}
leased_workers_.erase(worker_port);
Status status;
if (worker) {
@@ -1883,6 +2011,13 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
}
void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr<Worker> &worker) {
if (new_scheduler_enabled_) {
// TODO (ion): replace this hard coded # of CPUs.
std::unordered_map<std::string, double> task_request;
task_request.emplace(kCPU_ResourceLabel, 1.);
new_resource_scheduler_->AddNodeAvailableResources(client_id_.Binary(), task_request);
return;
}
if (!worker || worker->GetAssignedTaskId().IsNil() || worker->IsBlocked()) {
return; // The worker may have died or is no longer processing the task.
}
@@ -1947,7 +2082,6 @@ void NodeManager::AsyncResolveObjects(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
cpu_resource_ids.ToResourceSet());
worker->MarkBlocked();
// Try dispatching tasks since we may have released some resources.
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
@@ -2015,6 +2149,10 @@ void NodeManager::AsyncResolveObjectsFinish(
worker->AcquireTaskCpuResources(resource_ids);
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
cpu_resources);
if (new_scheduler_enabled_) {
new_resource_scheduler_->SubtractNodeAvailableResources(
client_id_.Binary(), cpu_resources.GetResourceMap());
}
} else {
// In this case, we simply don't reacquire the CPU resources for the worker.
// The worker can keep running and when the task finishes, it will simply
@@ -2099,6 +2237,10 @@ bool NodeManager::AssignTask(const Task &task,
local_available_resources_.Acquire(spec.GetRequiredResources());
const auto &my_client_id = gcs_client_->client_table().GetLocalClientId();
cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources());
if (new_scheduler_enabled_) {
new_resource_scheduler_->AddNodeAvailableResources(
client_id_.Binary(), spec.GetRequiredResources().GetResourceMap());
}
if (spec.IsActorCreationTask()) {
// Check that the actor's placement resource requirements are satisfied.
@@ -2161,6 +2303,10 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
task_resources, cluster_resource_map_[client_id].GetTotalResources());
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
task_resources.ToResourceSet());
if (new_scheduler_enabled_) {
new_resource_scheduler_->AddNodeAvailableResources(
client_id_.Binary(), task_resources.ToResourceSet().GetResourceMap());
}
worker.ResetTaskResourceIds();
const auto &spec = task.GetTaskSpecification();
+30
View File
@@ -11,6 +11,8 @@
#include "ray/common/client_connection.h"
#include "ray/common/task/task_common.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/common/scheduling/cluster_resource_scheduler.h"
#include "ray/object_manager/object_manager.h"
#include "ray/raylet/actor_registration.h"
#include "ray/raylet/lineage_cache.h"
@@ -544,6 +546,15 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// unable to schedule new tasks or actors at all.
void WarnResourceDeadlock();
/// Dispatch tasks to available workers.
void DispatchScheduledTasksToWorkers();
/// For the pending task at the head of tasks_to_schedule_, return a node
/// in the system (local or remote) that has enough resources available to
/// run the task, if any such node exist.
/// Repeat the process as long as we can schedule a task.
void NewSchedulerSchedulePendingTasks();
// GCS client ID for this node.
ClientID client_id_;
boost::asio::io_service &io_service_;
@@ -619,6 +630,25 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Map of workers leased out to direct call clients.
std::unordered_map<int, std::shared_ptr<Worker>> leased_workers_;
/// Whether new schedule is enabled.
const bool new_scheduler_enabled_;
/// The new resource scheduler for direct task calls.
std::shared_ptr<ClusterResourceScheduler> new_resource_scheduler_;
/// Map of leased workers to their current resource usage.
std::unordered_map<int, std::unordered_map<std::string, double>>
leased_worker_resources_;
typedef std::function<void(std::shared_ptr<Worker>, ClientID spillback_to,
std::string address, int port)>
ScheduleFn;
/// Queue of lease requests that are waiting for resources to become available.
/// TODO this should be a queue for each SchedulingClass
std::deque<std::pair<ScheduleFn, Task>> tasks_to_schedule_;
/// Queue of lease requests that should be scheduled onto workers.
std::deque<std::pair<ScheduleFn, Task>> tasks_to_dispatch_;
};
} // namespace raylet