From 2a3adf2d70934b362fb2d6bec102f4ec6c01969d Mon Sep 17 00:00:00 2001 From: Ion Date: Mon, 2 Dec 2019 14:42:16 -0800 Subject: [PATCH] New scheduler integration (#6321) --- src/ray/common/ray_config_def.h | 5 + .../scheduling/cluster_resource_scheduler.cc | 298 +++++++++++++++++- .../scheduling/cluster_resource_scheduler.h | 80 ++++- src/ray/common/scheduling/scheduling_ids.cc | 11 + src/ray/common/scheduling/scheduling_ids.h | 6 + src/ray/common/task/scheduling_resources.h | 3 + src/ray/gcs/redis_async_context.cc | 3 + src/ray/raylet/node_manager.cc | 152 ++++++++- src/ray/raylet/node_manager.h | 30 ++ 9 files changed, 578 insertions(+), 10 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index cd3b03182..fe64479d0 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -40,6 +40,11 @@ RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000) /// type of task from starving other types (see issue #3664). RAY_CONFIG(bool, fair_queueing_enabled, true) +/// Whether to enable the new scheduler. The new scheduler is designed +/// only to work with direct calls. Once direct calls afre becoming +/// the default, this scheduler will also become the default. +RAY_CONFIG(bool, new_scheduler_enabled, false) + // The max allowed size in bytes of a return object from direct actor calls. // Objects larger than this size will be spilled to plasma. RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024) diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.cc b/src/ray/common/scheduling/cluster_resource_scheduler.cc index 00069055a..6200ee92a 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/common/scheduling/cluster_resource_scheduler.cc @@ -6,6 +6,22 @@ ClusterResourceScheduler::ClusterResourceScheduler( AddOrUpdateNode(local_node_id_, local_node_resources); } +ClusterResourceScheduler::ClusterResourceScheduler( + const std::string &local_node_id, + const std::unordered_map &local_node_resources) { + local_node_id_ = string_to_int_map_.Insert(local_node_id); + AddOrUpdateNode(local_node_id, local_node_resources, local_node_resources); +} + +void ClusterResourceScheduler::AddOrUpdateNode( + const std::string &node_id, + const std::unordered_map &resources_total, + const std::unordered_map &resources_available) { + NodeResources node_resources; + ResourceMapToNodeResources(resources_total, resources_available, &node_resources); + AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources); +} + void ClusterResourceScheduler::SetPredefinedResources(const NodeResources &new_resources, NodeResources *old_resources) { for (size_t i = 0; i < PredefinedResources_MAX; i++) { @@ -45,6 +61,7 @@ bool ClusterResourceScheduler::RemoveNode(int64_t node_id) { } else { it->second.custom_resources.clear(); nodes_.erase(it); + string_to_int_map_.Remove(node_id); return true; } } @@ -136,8 +153,9 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task int64_t violations; if ((violations = IsSchedulable(task_req, it->first, it->second)) == -1) { - break; + continue; } + // Update the node with the smallest number of soft constraints violated. if (min_violations > violations) { min_violations = violations; @@ -152,6 +170,20 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task return best_node; } +std::string ClusterResourceScheduler::GetBestSchedulableNode( + const std::unordered_map &task_resources, + int64_t *total_violations) { + TaskRequest task_request; + ResourceMapToTaskRequest(task_resources, &task_request); + int64_t node_id = GetBestSchedulableNode(task_request, total_violations); + + std::string id_string; + if (node_id == -1) { + return ""; + } + return string_to_int_map_.Get(node_id); +} + bool ClusterResourceScheduler::SubtractNodeAvailableResources( int64_t node_id, const TaskRequest &task_req) { auto it = nodes_.find(node_id); @@ -182,6 +214,45 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources( return true; } +bool ClusterResourceScheduler::SubtractNodeAvailableResources( + const std::string &node_id, + const std::unordered_map &resource_map) { + TaskRequest task_request; + ResourceMapToTaskRequest(resource_map, &task_request); + return SubtractNodeAvailableResources(string_to_int_map_.Get(node_id), task_request); +} + +bool ClusterResourceScheduler::AddNodeAvailableResources(int64_t node_id, + const TaskRequest &task_req) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + return false; + } + NodeResources &resources = it->second; + + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + resources.capacities[i].available = + resources.capacities[i].available + task_req.predefined_resources[i].demand; + } + + for (size_t i = 0; i < task_req.custom_resources.size(); i++) { + auto it = resources.custom_resources.find(task_req.custom_resources[i].id); + if (it != resources.custom_resources.end()) { + it->second.available = + it->second.available + task_req.custom_resources[i].req.demand; + } + } + return true; +} + +bool ClusterResourceScheduler::AddNodeAvailableResources( + const std::string &node_id, + const std::unordered_map &resource_map) { + TaskRequest task_request; + ResourceMapToTaskRequest(resource_map, &task_request); + return AddNodeAvailableResources(string_to_int_map_.Get(node_id), task_request); +} + bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, NodeResources *ret_resources) { auto it = nodes_.find(node_id); @@ -194,3 +265,228 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, } int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); } + +void ClusterResourceScheduler::ResourceMapToNodeResources( + const std::unordered_map &resource_map_total, + const std::unordered_map &resource_map_available, + NodeResources *node_resources) { + node_resources->capacities.resize(PredefinedResources_MAX); + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + node_resources->capacities[i].total = node_resources->capacities[i].available = 0; + } + + for (auto it = resource_map_total.begin(); it != resource_map_total.end(); ++it) { + ResourceCapacity resource_capacity; + resource_capacity.total = (int64_t)it->second; + auto it2 = resource_map_available.find(it->first); + if (it2 == resource_map_available.end()) { + resource_capacity.available = 0; + } else { + resource_capacity.available = (int64_t)it2->second; + } + if (it->first == ray::kCPU_ResourceLabel) { + node_resources->capacities[CPU] = resource_capacity; + } else if (it->first == ray::kGPU_ResourceLabel) { + node_resources->capacities[GPU] = resource_capacity; + } else if (it->first == ray::kTPU_ResourceLabel) { + node_resources->capacities[TPU] = resource_capacity; + } else if (it->first == ray::kMemory_ResourceLabel) { + node_resources->capacities[MEM] = resource_capacity; + } else { + // This is a custom resource. + node_resources->custom_resources.emplace(string_to_int_map_.Insert(it->first), + resource_capacity); + } + } +} + +void ClusterResourceScheduler::ResourceMapToTaskRequest( + const std::unordered_map &resource_map, + TaskRequest *task_request) { + size_t i = 0; + task_request->predefined_resources.resize(PredefinedResources_MAX); + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + task_request->predefined_resources[0].demand = 0; + task_request->predefined_resources[0].soft = false; + } + + for (auto it = resource_map.begin(); it != resource_map.end(); ++it) { + if (it->first == ray::kCPU_ResourceLabel) { + task_request->predefined_resources[CPU].demand = it->second; + } else if (it->first == ray::kGPU_ResourceLabel) { + task_request->predefined_resources[GPU].demand = it->second; + } else if (it->first == ray::kTPU_ResourceLabel) { + task_request->predefined_resources[TPU].demand = it->second; + } else if (it->first == ray::kMemory_ResourceLabel) { + task_request->predefined_resources[MEM].demand = it->second; + } else { + // This is a custom resource. + task_request->custom_resources[i].id = string_to_int_map_.Insert(it->first); + task_request->custom_resources[i].req.demand = it->second; + task_request->custom_resources[i].req.soft = false; + i++; + } + } +} + +void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_id_string, + const std::string &resource_name, + int64_t resource_total) { + int64_t client_id = string_to_int_map_.Get(client_id_string); + auto it = nodes_.find(client_id); + if (it == nodes_.end()) { + return; + } + + int idx = -1; + if (resource_name == ray::kCPU_ResourceLabel) { + idx = (int)CPU; + } else if (resource_name == ray::kGPU_ResourceLabel) { + idx = (int)GPU; + } else if (resource_name == ray::kTPU_ResourceLabel) { + idx = (int)TPU; + } else if (resource_name == ray::kMemory_ResourceLabel) { + idx = (int)MEM; + }; + if (idx != -1) { + int64_t diff_capacity = resource_total - it->second.capacities[idx].total; + it->second.capacities[idx].total += diff_capacity; + it->second.capacities[idx].available += diff_capacity; + if (it->second.capacities[idx].available < 0) { + it->second.capacities[idx].available = 0; + } + if (it->second.capacities[idx].total < 0) { + it->second.capacities[idx].total = 0; + } + } else { + int64_t resource_id = string_to_int_map_.Insert(resource_name); + auto itr = it->second.custom_resources.find(resource_id); + if (itr != it->second.custom_resources.end()) { + int64_t diff_capacity = resource_total - itr->second.total; + itr->second.total += diff_capacity; + itr->second.available += diff_capacity; + if (itr->second.available < 0) { + itr->second.available = 0; + } + if (itr->second.total < 0) { + itr->second.total = 0; + } + } + ResourceCapacity resource_capacity; + resource_capacity.total = resource_capacity.available = resource_total; + it->second.custom_resources.emplace(resource_id, resource_capacity); + } +} + +void ClusterResourceScheduler::DeleteResource(const std::string &client_id_string, + const std::string &resource_name) { + int64_t client_id = string_to_int_map_.Get(client_id_string); + auto it = nodes_.find(client_id); + if (it == nodes_.end()) { + return; + } + + int idx = -1; + if (resource_name == ray::kCPU_ResourceLabel) { + idx = (int)CPU; + } else if (resource_name == ray::kGPU_ResourceLabel) { + idx = (int)GPU; + } else if (resource_name == ray::kTPU_ResourceLabel) { + idx = (int)TPU; + } else if (resource_name == ray::kMemory_ResourceLabel) { + idx = (int)MEM; + }; + if (idx != -1) { + it->second.capacities[idx].total = 0; + } else { + int64_t resource_id = string_to_int_map_.Get(resource_name); + auto itr = it->second.custom_resources.find(resource_id); + if (itr != it->second.custom_resources.end()) { + string_to_int_map_.Remove(resource_id); + it->second.custom_resources.erase(itr); + } + } +} + +bool ClusterResourceScheduler::EqualNodeResources(const NodeResources &node_resources1, + const NodeResources &node_resources2) { + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + if (node_resources1.capacities[i].total != node_resources2.capacities[i].total) { + return false; + } + if (node_resources1.capacities[i].available != + node_resources2.capacities[i].available) { + return false; + } + } + + if (node_resources1.custom_resources.size() != + node_resources2.custom_resources.size()) { + return true; + } + + for (auto it1 = node_resources1.custom_resources.begin(); + it1 != node_resources1.custom_resources.end(); ++it1) { + auto it2 = node_resources2.custom_resources.find(it1->first); + if (it2 == node_resources2.custom_resources.end()) { + return false; + } + if (it1->second.total != it2->second.total) { + return false; + } + if (it1->second.available != it2->second.available) { + return false; + } + } + return true; +} + +std::string ClusterResourceScheduler::NodeResourcesDebugString( + const NodeResources &node_resources) { + std::stringstream buffer; + buffer << " node predefined resources {"; + for (size_t i = 0; i < node_resources.capacities.size(); i++) { + buffer << "(" << node_resources.capacities[i].total << ":" + << node_resources.capacities[i].available << ") "; + } + buffer << "}" << std::endl; + + buffer << " node custom resources {"; + for (auto it = node_resources.custom_resources.begin(); + it != node_resources.custom_resources.end(); ++it) { + buffer << it->first << ":(" << it->second.total << ":" << it->second.available + << ") "; + } + buffer << "}" << std::endl; + return buffer.str(); +} + +std::string ClusterResourceScheduler::TaskRequestDebugString( + const TaskRequest &task_request) { + std::stringstream buffer; + buffer << std::endl << " request predefined resources {"; + for (size_t i = 0; i < task_request.predefined_resources.size(); i++) { + buffer << "(" << task_request.predefined_resources[i].demand << ":" + << task_request.predefined_resources[i].soft << ") "; + } + buffer << "}" << std::endl; + + buffer << " request custom resources {"; + for (size_t i = 0; i < task_request.custom_resources.size(); i++) { + buffer << task_request.custom_resources[i].id << ":" + << "(" << task_request.custom_resources[i].req.demand << ":" + << task_request.custom_resources[i].req.soft << ") "; + } + buffer << "}" << std::endl; + return buffer.str(); +} + +std::string ClusterResourceScheduler::DebugString(void) { + std::stringstream buffer; + buffer << std::endl << "local node id: " << local_node_id_ << std::endl; + for (auto it = nodes_.begin(); it != nodes_.end(); ++it) { + buffer << "node id: " << it->first << std::endl; + buffer << NodeResourcesDebugString(it->second); + } + return buffer.str(); +} diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.h b/src/ray/common/scheduling/cluster_resource_scheduler.h index b90fff6c8..8614a3014 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.h +++ b/src/ray/common/scheduling/cluster_resource_scheduler.h @@ -3,8 +3,12 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" +#include "ray/common/scheduling/scheduling_ids.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/util/logging.h" #include +#include #include /// List of predefined resources. @@ -62,6 +66,9 @@ class ClusterResourceScheduler { absl::flat_hash_map nodes_; /// ID of local node. int64_t local_node_id_; + /// Keep the mapping between node and resource IDs in string representation + /// to integer representation. Used for improving map performance. + StringIdMap string_to_int_map_; /// Set predefined resources. /// @@ -77,21 +84,33 @@ class ClusterResourceScheduler { const absl::flat_hash_map &new_custom_resources, absl::flat_hash_map *old_custom_resources); + std::string TaskRequestDebugString(const TaskRequest &task_request); + std::string NodeResourcesDebugString(const NodeResources &node_resources); + std::string DebugString(void); + public: ClusterResourceScheduler(void){}; /// Constructor initializing the resources associated with the local node. /// /// \param local_node_id: ID of local node, - /// \param node_resources: The total and the available resources associated + /// \param local_node_resources: The total and the available resources associated /// with the local node. - ClusterResourceScheduler(int64_t local_node_id, const NodeResources &node_resources); + ClusterResourceScheduler(int64_t local_node_id, + const NodeResources &local_node_resources); + ClusterResourceScheduler( + const std::string &local_node_id, + const std::unordered_map &local_node_resources); /// Add a new node or overwrite the resources of an existing node. /// /// \param node_id: Node ID. /// \param node_resources: Up to date total and available resources of the node. void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources); + void AddOrUpdateNode( + const std::string &node_id, + const std::unordered_map &resource_map_total, + const std::unordered_map &resource_map_available); /// Remove node from the cluster data structure. This happens /// when a node fails or it is removed from the cluster. @@ -132,16 +151,26 @@ class ClusterResourceScheduler { /// /// Finally, if no such node exists, return -1. /// - /// \param task_req: Task to be scheduled. + /// \param task_request: Task to be scheduled. /// \param violations: The number of soft constraint violations associated /// with the node returned by this function (assuming /// a node that can schedule task_req is found). /// /// \return -1, if no node can schedule the current request; otherwise, /// return the ID of a node that can schedule the task request. - int64_t GetBestSchedulableNode(const TaskRequest &task_req, int64_t *violations); + int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t *violations); - /// Update the available resources of a node when a task request is + /// Similar to + /// int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t + /// *violations) + /// but the return value is different: + /// \return "", if no node can schedule the current request; otherwise, + /// return the ID in string format of a node that can schedule the + // task request. + std::string GetBestSchedulableNode( + const std::unordered_map &task_request, int64_t *violations); + + /// Decrease the available resources of a node when a task request is /// scheduled on the given node. /// /// \param node_id: ID of node on which request is being scheduled. @@ -149,7 +178,23 @@ class ClusterResourceScheduler { /// /// \return true, if task_req can be indeed scheduled on the node, /// and false otherwise. - bool SubtractNodeAvailableResources(int64_t node_id, const TaskRequest &task_req); + bool SubtractNodeAvailableResources(int64_t node_id, const TaskRequest &task_request); + bool SubtractNodeAvailableResources( + const std::string &node_id, + const std::unordered_map &task_request); + + /// Increase available resources of a node when a worker has Finished + /// a task. + /// + /// \param node_id: ID of node on which request is being scheduled. + /// \param task_request: resource requests of the task finishing execution. + /// + /// \return true, if task_req can be indeed scheduled on the node, + /// and false otherwise. + bool AddNodeAvailableResources(int64_t node_id, const TaskRequest &task_request); + bool AddNodeAvailableResources( + const std::string &node_id, + const std::unordered_map &task_request); /// Return resources associated to the given node_id in ret_resources. /// If node_id not found, return false; otherwise return true. @@ -157,6 +202,29 @@ class ClusterResourceScheduler { /// Get number of nodes in the cluster. int64_t NumNodes(); + + /// Convert a map of resources to a TaskRequest data structure. + void ResourceMapToTaskRequest( + const std::unordered_map &resource_map, + TaskRequest *task_request); + + /// Convert a map of resources to a TaskRequest data structure. + void ResourceMapToNodeResources( + const std::unordered_map &resource_map_total, + const std::unordered_map &resource_map_available, + NodeResources *node_resources); + + /// Update total capacity of resource resource_name at node client_id. + void UpdateResourceCapacity(const std::string &client_id, + const std::string &resource_name, int64_t resource_total); + + /// Delete resource resource_name from node cleint_id_string. + void DeleteResource(const std::string &client_id_string, + const std::string &resource_name); + + /// Check whether two node resources are identical. + bool EqualNodeResources(const NodeResources &node_resources1, + const NodeResources &node_resources2); }; #endif // RAY_COMMON_SCHEDULING_SCHEDULING_H diff --git a/src/ray/common/scheduling/scheduling_ids.cc b/src/ray/common/scheduling/scheduling_ids.cc index c43f38049..95e7a6de9 100644 --- a/src/ray/common/scheduling/scheduling_ids.cc +++ b/src/ray/common/scheduling/scheduling_ids.cc @@ -9,6 +9,17 @@ int64_t StringIdMap::Get(const std::string &string_id) { } }; +std::string StringIdMap::Get(uint64_t id) { + std::string id_string; + auto it = int_to_string_.find(id); + if (it == int_to_string_.end()) { + id_string = std::to_string(-1); + } else { + id_string = it->second; + } + return id_string; +}; + int64_t StringIdMap::Insert(const std::string &string_id, uint8_t max_id) { auto sit = string_to_int_.find(string_id); if (sit == string_to_int_.end()) { diff --git a/src/ray/common/scheduling/scheduling_ids.h b/src/ray/common/scheduling/scheduling_ids.h index 469aac7f8..8110cf18d 100644 --- a/src/ray/common/scheduling/scheduling_ids.h +++ b/src/ray/common/scheduling/scheduling_ids.h @@ -24,6 +24,12 @@ class StringIdMap { /// \return The integer ID associated with the given string ID. int64_t Get(const std::string &string_id); + /// Get string ID associated with an existing integer ID. + /// + /// \param Integre ID. + /// \return The string ID associated with the given integer ID. + std::string Get(uint64_t id); + /// Insert a string ID and get the associated integer ID. /// /// \param String ID to be inserted. diff --git a/src/ray/common/task/scheduling_resources.h b/src/ray/common/task/scheduling_resources.h index cee29d2fd..95f4b3d6e 100644 --- a/src/ray/common/task/scheduling_resources.h +++ b/src/ray/common/task/scheduling_resources.h @@ -16,6 +16,9 @@ namespace ray { constexpr double kResourceConversionFactor = 10000; const std::string kCPU_ResourceLabel = "CPU"; +const std::string kGPU_ResourceLabel = "GPU"; +const std::string kTPU_ResourceLabel = "TPU"; +const std::string kMemory_ResourceLabel = "memory"; /// \class FractionalResourceQuantity /// \brief Converts the resource quantities to an internal representation to diff --git a/src/ray/gcs/redis_async_context.cc b/src/ray/gcs/redis_async_context.cc index 64b7656f7..602e22ab5 100644 --- a/src/ray/gcs/redis_async_context.cc +++ b/src/ray/gcs/redis_async_context.cc @@ -56,6 +56,9 @@ Status RedisAsyncContext::RedisAsyncCommand(redisCallbackFn *fn, void *privdata, { // `redisvAsyncCommand` will mutate `redis_async_context_`, use a lock to protect it. std::lock_guard lock(mutex_); + if (!redis_async_context_) { + return Status::NotImplemented("..."); + } ret_code = redisvAsyncCommand(redis_async_context_, fn, privdata, format, ap); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c450f2007..3e252c31e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -108,7 +108,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, actor_registry_(), node_manager_server_("NodeManager", config.node_manager_port), node_manager_service_(io_service, *this), - client_call_manager_(io_service) { + client_call_manager_(io_service), + new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()) { RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. ClientID local_client_id = gcs_client_->client_table().GetLocalClientId(); @@ -123,6 +124,13 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); + if (new_scheduler_enabled_) { + SchedulingResources &local_resources = cluster_resource_map_[local_client_id]; + new_resource_scheduler_ = + std::shared_ptr(new ClusterResourceScheduler( + client_id_.Binary(), local_resources.GetTotalResources().GetResourceMap())); + } + RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); // Run the node manger rpc server. node_manager_server_.RegisterService(node_manager_service_); @@ -573,6 +581,10 @@ void NodeManager::ResourceCreateUpdated(const ClientID &client_id, local_available_resources_.AddOrUpdateResource(resource_label, new_resource_capacity); } + if (new_scheduler_enabled_) { + new_resource_scheduler_->UpdateResourceCapacity(client_id.Binary(), resource_label, + new_resource_capacity); + } } RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map."; @@ -605,6 +617,9 @@ void NodeManager::ResourceDeleted(const ClientID &client_id, if (client_id == local_client_id) { local_available_resources_.DeleteResource(resource_label); } + if (new_scheduler_enabled_) { + new_resource_scheduler_->DeleteResource(client_id.Binary(), resource_label); + } } RAY_LOG(DEBUG) << "[ResourceDeleted] Updated cluster_resource_map."; return; @@ -643,6 +658,8 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, } SchedulingResources &remote_resources = it->second; + ResourceSet remote_total(VectorFromProtobuf(heartbeat_data.resources_total_label()), + VectorFromProtobuf(heartbeat_data.resources_total_capacity())); ResourceSet remote_available( VectorFromProtobuf(heartbeat_data.resources_available_label()), VectorFromProtobuf(heartbeat_data.resources_available_capacity())); @@ -652,6 +669,15 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, remote_resources.SetAvailableResources(std::move(remote_available)); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load)); + + if (new_scheduler_enabled_ && client_id != client_id_) { + new_resource_scheduler_->AddOrUpdateNode(client_id.Binary(), + remote_total.GetResourceMap(), + remote_available.GetResourceMap()); + NewSchedulerSchedulePendingTasks(); + return; + } + // Extract decision for this raylet. auto decision = scheduling_policy_.SpillOver(remote_resources); std::unordered_set local_task_ids; @@ -1087,6 +1113,12 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr &worker) { // Return the worker to the idle pool. worker_pool_.PushWorker(std::move(worker)); + + if (new_scheduler_enabled_) { + DispatchScheduledTasksToWorkers(); + return; + } + // Local resource availability changed: invoke scheduling policy for local node. const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); cluster_resource_map_[local_client_id].SetLoadResources( @@ -1187,12 +1219,20 @@ void NodeManager::ProcessDisconnectClientMessage( local_available_resources_.ReleaseConstrained( task_resources, cluster_resource_map_[client_id].GetTotalResources()); cluster_resource_map_[client_id].Release(task_resources.ToResourceSet()); + if (new_scheduler_enabled_) { + new_resource_scheduler_->AddNodeAvailableResources( + client_id_.Binary(), task_resources.ToResourceSet().GetResourceMap()); + } worker->ResetTaskResourceIds(); auto const &lifetime_resources = worker->GetLifetimeResourceIds(); local_available_resources_.ReleaseConstrained( lifetime_resources, cluster_resource_map_[client_id].GetTotalResources()); cluster_resource_map_[client_id].Release(lifetime_resources.ToResourceSet()); + if (new_scheduler_enabled_) { + new_resource_scheduler_->AddNodeAvailableResources( + client_id_.Binary(), lifetime_resources.ToResourceSet().GetResourceMap()); + } worker->ResetLifetimeResourceIds(); RAY_LOG(DEBUG) << "Worker (pid=" << worker->Pid() << ") is disconnected. " @@ -1440,15 +1480,94 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) { SubmitTask(Task(task_message), Lineage()); } +void NodeManager::DispatchScheduledTasksToWorkers() { + RAY_CHECK(new_scheduler_enabled_); + while (!tasks_to_dispatch_.empty()) { + auto task = tasks_to_dispatch_.front(); + auto reply = task.first; + std::shared_ptr worker = + worker_pool_.PopWorker(task.second.GetTaskSpecification()); + if (worker == nullptr) { + return; + } + reply(worker, ClientID::Nil(), "", -1); + tasks_to_dispatch_.pop_front(); + } +} + +void NodeManager::NewSchedulerSchedulePendingTasks() { + RAY_CHECK(new_scheduler_enabled_); + while (!tasks_to_schedule_.empty()) { + auto work = tasks_to_schedule_.front(); + auto task = work.second; + auto request_resources = + task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); + int64_t violations = 0; + std::string node_id_string = + new_resource_scheduler_->GetBestSchedulableNode(request_resources, &violations); + if (node_id_string.empty()) { + /// There is no node that has available resources to run the request. + break; + } else { + new_resource_scheduler_->SubtractNodeAvailableResources(node_id_string, + request_resources); + if (node_id_string == client_id_.Binary()) { + tasks_to_dispatch_.push_back(work); + } else { + ClientID node_id = ClientID::FromBinary(node_id_string); + GcsNodeInfo node_info; + bool found = gcs_client_->client_table().GetClient(node_id, &node_info); + RAY_CHECK(found) + << "Spilling back to a node manager, but no GCS info found for node " + << node_id; + work.first(nullptr, node_id, node_info.node_manager_address(), + node_info.node_manager_port()); + } + tasks_to_schedule_.pop_front(); + } + } + DispatchScheduledTasksToWorkers(); +} + void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &request, rpc::WorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) { rpc::Task task_message; task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); + Task task(task_message); + + if (new_scheduler_enabled_) { + auto request_resources = + task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); + auto work = std::make_pair( + [this, request_resources, reply, send_reply_callback]( + std::shared_ptr worker, ClientID spillback_to, std::string address, + int port) { + if (worker != nullptr) { + reply->mutable_worker_address()->set_ip_address( + initial_config_.node_manager_address); + reply->mutable_worker_address()->set_port(worker->Port()); + reply->mutable_worker_address()->set_raylet_id( + gcs_client_->client_table().GetLocalClientId().Binary()); + RAY_CHECK(leased_workers_.find(worker->Port()) == leased_workers_.end()); + leased_workers_[worker->Port()] = worker; + leased_worker_resources_[worker->Port()] = request_resources; + } else { + reply->mutable_retry_at_raylet_address()->set_ip_address(address); + reply->mutable_retry_at_raylet_address()->set_port(port); + reply->mutable_retry_at_raylet_address()->set_raylet_id( + spillback_to.Binary()); + } + send_reply_callback(Status::OK(), nullptr, nullptr); + }, + task); + tasks_to_schedule_.push_back(work); + NewSchedulerSchedulePendingTasks(); + return; + } // Override the task dispatch to call back to the client instead of executing the // task directly on the worker. - Task task(task_message); RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId(); TaskID task_id = task.GetTaskSpecification().TaskId(); task.OnDispatchInstead( @@ -1485,6 +1604,15 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, // Read the resource spec submitted by the client. auto worker_port = request.worker_port(); std::shared_ptr worker = std::move(leased_workers_[worker_port]); + + if (new_scheduler_enabled_) { + auto it = leased_worker_resources_.find(worker_port); + RAY_CHECK(it != leased_worker_resources_.end()); + new_resource_scheduler_->AddNodeAvailableResources(client_id_.Binary(), it->second); + leased_worker_resources_.erase(it); + NewSchedulerSchedulePendingTasks(); + } + leased_workers_.erase(worker_port); Status status; if (worker) { @@ -1883,6 +2011,13 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag } void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr &worker) { + if (new_scheduler_enabled_) { + // TODO (ion): replace this hard coded # of CPUs. + std::unordered_map task_request; + task_request.emplace(kCPU_ResourceLabel, 1.); + new_resource_scheduler_->AddNodeAvailableResources(client_id_.Binary(), task_request); + return; + } if (!worker || worker->GetAssignedTaskId().IsNil() || worker->IsBlocked()) { return; // The worker may have died or is no longer processing the task. } @@ -1947,7 +2082,6 @@ void NodeManager::AsyncResolveObjects( cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( cpu_resource_ids.ToResourceSet()); worker->MarkBlocked(); - // Try dispatching tasks since we may have released some resources. DispatchTasks(local_queues_.GetReadyTasksByClass()); } @@ -2015,6 +2149,10 @@ void NodeManager::AsyncResolveObjectsFinish( worker->AcquireTaskCpuResources(resource_ids); cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( cpu_resources); + if (new_scheduler_enabled_) { + new_resource_scheduler_->SubtractNodeAvailableResources( + client_id_.Binary(), cpu_resources.GetResourceMap()); + } } else { // In this case, we simply don't reacquire the CPU resources for the worker. // The worker can keep running and when the task finishes, it will simply @@ -2099,6 +2237,10 @@ bool NodeManager::AssignTask(const Task &task, local_available_resources_.Acquire(spec.GetRequiredResources()); const auto &my_client_id = gcs_client_->client_table().GetLocalClientId(); cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()); + if (new_scheduler_enabled_) { + new_resource_scheduler_->AddNodeAvailableResources( + client_id_.Binary(), spec.GetRequiredResources().GetResourceMap()); + } if (spec.IsActorCreationTask()) { // Check that the actor's placement resource requirements are satisfied. @@ -2161,6 +2303,10 @@ void NodeManager::FinishAssignedTask(Worker &worker) { task_resources, cluster_resource_map_[client_id].GetTotalResources()); cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( task_resources.ToResourceSet()); + if (new_scheduler_enabled_) { + new_resource_scheduler_->AddNodeAvailableResources( + client_id_.Binary(), task_resources.ToResourceSet().GetResourceMap()); + } worker.ResetTaskResourceIds(); const auto &spec = task.GetTaskSpecification(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d57169dc3..37c9d2ccd 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -11,6 +11,8 @@ #include "ray/common/client_connection.h" #include "ray/common/task/task_common.h" #include "ray/common/task/scheduling_resources.h" +#include "ray/common/scheduling/scheduling_ids.h" +#include "ray/common/scheduling/cluster_resource_scheduler.h" #include "ray/object_manager/object_manager.h" #include "ray/raylet/actor_registration.h" #include "ray/raylet/lineage_cache.h" @@ -544,6 +546,15 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// unable to schedule new tasks or actors at all. void WarnResourceDeadlock(); + /// Dispatch tasks to available workers. + void DispatchScheduledTasksToWorkers(); + + /// For the pending task at the head of tasks_to_schedule_, return a node + /// in the system (local or remote) that has enough resources available to + /// run the task, if any such node exist. + /// Repeat the process as long as we can schedule a task. + void NewSchedulerSchedulePendingTasks(); + // GCS client ID for this node. ClientID client_id_; boost::asio::io_service &io_service_; @@ -619,6 +630,25 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Map of workers leased out to direct call clients. std::unordered_map> leased_workers_; + + /// Whether new schedule is enabled. + const bool new_scheduler_enabled_; + + /// The new resource scheduler for direct task calls. + std::shared_ptr new_resource_scheduler_; + /// Map of leased workers to their current resource usage. + std::unordered_map> + leased_worker_resources_; + + typedef std::function, ClientID spillback_to, + std::string address, int port)> + ScheduleFn; + + /// Queue of lease requests that are waiting for resources to become available. + /// TODO this should be a queue for each SchedulingClass + std::deque> tasks_to_schedule_; + /// Queue of lease requests that should be scheduled onto workers. + std::deque> tasks_to_dispatch_; }; } // namespace raylet