From 50f28811ac12dc53515726452d20d87070480735 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Tue, 8 Dec 2020 13:46:58 -0500 Subject: [PATCH] [new scheduler] Always spill back to a feasible node if the local node is not feasible (#12557) * fix lint * feasible nodes * Enable test, cleanup * Revert "fix" This reverts commit aef81d04c0b4560b758f846e1afdafbdb5552efe. * unit test * doc --- python/ray/tests/test_reconstruction.py | 2 - .../scheduling/cluster_resource_scheduler.cc | 51 ++++++++++++++++--- .../scheduling/cluster_resource_scheduler.h | 36 +++++++++---- .../cluster_resource_scheduler_test.cc | 28 ++++++++++ .../raylet/scheduling/cluster_task_manager.cc | 5 ++ 5 files changed, 102 insertions(+), 20 deletions(-) diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index b471964c2..382225cea 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -9,7 +9,6 @@ import ray from ray.test_utils import ( wait_for_condition, wait_for_pid_to_exit, - new_scheduler_enabled, ) SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM @@ -488,7 +487,6 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): raise e.as_instanceof_cause() -@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs") def test_reconstruction_stress(ray_start_cluster): config = { "num_heartbeats_timeout": 10, diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 8154a858f..6b53f212b 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -98,9 +98,34 @@ bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) { return RemoveNode(node_id); } +bool ClusterResourceScheduler::IsFeasible(const TaskRequest &task_req, + const NodeResources &resources) const { + // First, check predefined resources. + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + if (task_req.predefined_resources[i].demand > + resources.predefined_resources[i].total) { + return false; + } + } + + // Now check custom resources. + for (const auto &task_req_custom_resource : task_req.custom_resources) { + auto it = resources.custom_resources.find(task_req_custom_resource.id); + + if (it == resources.custom_resources.end()) { + return false; + } + if (task_req_custom_resource.demand > it->second.total) { + return false; + } + } + + return true; +} + int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, int64_t node_id, - const NodeResources &resources) { + const NodeResources &resources) const { int violations = 0; // First, check predefined resources. @@ -189,9 +214,9 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task // Check whether local node is schedulable. We return immediately // the local node only if there are zero violations. - auto it = nodes_.find(local_node_id_); - if (it != nodes_.end()) { - if (IsSchedulable(task_req, it->first, it->second) == 0) { + const auto local_node_it = nodes_.find(local_node_id_); + if (local_node_it != nodes_.end()) { + if (IsSchedulable(task_req, local_node_it->first, local_node_it->second) == 0) { return local_node_id_; } } @@ -207,12 +232,24 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task } } + bool local_node_feasible = IsFeasible(task_req, local_node_it->second); + for (const auto &node : nodes_) { // Return -1 if node not schedulable. otherwise return the number // of soft constraint violations. - int64_t violations; - - if ((violations = IsSchedulable(task_req, node.first, node.second)) == -1) { + int64_t violations = IsSchedulable(task_req, node.first, node.second); + if (violations == -1) { + if (!local_node_feasible && best_node == -1 && IsFeasible(task_req, node.second)) { + // If the local node is not feasible, and a better node has not yet + // been found, and this node does not currently have the resources + // available but is feasible, then schedule to this node. + // NOTE(swang): This is needed to make sure that tasks that are not + // feasible on this node are spilled back to a node that does have the + // appropriate total resources in a timely manner. If there are + // multiple feasible nodes, this algorithm can still introduce delays + // because of inefficient load-balancing. + best_node = node.first; + } continue; } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index c476e174a..f8ef7307e 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -113,6 +113,14 @@ class ClusterResourceScheduler { bool RemoveNode(int64_t node_id); bool RemoveNode(const std::string &node_id_string); + /// Check whether a task request is feasible on a given node. A node is + /// feasible if it has the total resources needed to eventually execute the + /// task, even if those resources are currently allocated. + /// + /// \param task_req Task request to be scheduled. + /// \param resources Node's resources. + bool IsFeasible(const TaskRequest &task_req, const NodeResources &resources) const; + /// Check whether a task request can be scheduled given a node. /// /// \param task_req: Task request to be scheduled. @@ -128,23 +136,29 @@ class ClusterResourceScheduler { /// >= 0, the number soft constraint violations. If 0, no /// constraint is violated. int64_t IsSchedulable(const TaskRequest &task_req, int64_t node_id, - const NodeResources &resources); + const NodeResources &resources) const; /// Find a node in the cluster on which we can schedule a given task request. /// - /// First, this function checks whether the local node can schedule - /// the request without violating any constraints. If yes, it returns the - /// ID of the local node. + /// Ignoring soft constraints, this policy prioritizes nodes in the + /// following order: /// - /// If not, this function checks whether there is another node in the cluster - /// that satisfies all request's constraints (both soft and hard). + /// 1. Local node if resources available. + /// 2. Any remote node if resources available. + /// 3. If the local node is not feasible, any remote node if feasible. /// - /// If no such node exists, the function checks whether there are nodes - /// that satisfy all the request's hard constraints, but might violate some - /// soft constraints. Among these nodes, it returns a node which violates - /// the least number of soft constraints. + /// If soft constraints are specified, then this policy will prioritize: + /// 1. Local node if resources available and does not violate soft + /// constraints. + /// 2. Any remote node if resources available and does not violate soft + /// constraints. + /// 3. Out of all the nodes, including the local node, pick the one that + /// has resources available and violates the fewest soft constraints. + /// 4. If the local node is not feasible, any remote node if feasible. /// - /// Finally, if no such node exists, return -1. + /// If no node can meet any of these, returns -1, in which case the caller + /// should queue the task and try again once resource availability has been + /// updated. /// /// \param task_request: Task to be scheduled. /// \param actor_creation: True if this is an actor creation task. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 08a8c7d30..fbd24185e 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -981,6 +981,34 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) { ASSERT_TRUE(EqualVectors(cpu_instances, expect_cpu_instance)); } +TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { + std::unordered_map resource_spec({{"CPU", 1}}); + ClusterResourceScheduler cluster_resources("local", {}); + for (int i = 0; i < 100; i++) { + cluster_resources.AddOrUpdateNode(std::to_string(i), {}, {}); + } + + // No feasible nodes. + int64_t total_violations; + ASSERT_EQ( + cluster_resources.GetBestSchedulableNode(resource_spec, false, &total_violations), + ""); + + // Feasible remote node, but doesn't currently have resources available. We + // should spill there. + cluster_resources.AddOrUpdateNode("remote_feasible", resource_spec, {{"CPU", 0.}}); + ASSERT_EQ( + cluster_resources.GetBestSchedulableNode(resource_spec, false, &total_violations), + "remote_feasible"); + + // Feasible remote node, and it currently has resources available. We should + // prefer to spill there. + cluster_resources.AddOrUpdateNode("remote_available", resource_spec, resource_spec); + ASSERT_EQ( + cluster_resources.GetBestSchedulableNode(resource_spec, false, &total_violations), + "remote_available"); +} + TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { vector cust_ids{1, 2, 3, 4, 5}; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index c77a89726..9cd6832f0 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -45,6 +45,8 @@ bool ClusterTaskManager::SchedulePendingTasks() { if (node_id_string.empty()) { // There is no node that has available resources to run the request. // Move on to the next shape. + RAY_LOG(DEBUG) << "No feasible node found for task " + << task.GetTaskSpecification().TaskId(); break; } else { if (node_id_string == self_node_id_.Binary()) { @@ -81,6 +83,8 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) { << task.GetTaskSpecification().TaskId(); tasks_to_dispatch_[scheduling_key].push_back(work); } else { + RAY_LOG(DEBUG) << "Waiting for args for task: " + << task.GetTaskSpecification().TaskId(); can_dispatch = false; TaskID task_id = task.GetTaskSpecification().TaskId(); waiting_tasks_[task_id] = work; @@ -236,6 +240,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) { + RAY_LOG(DEBUG) << "Canceling task " << task_id; ReplyCancelled(*work_it); work_queue.erase(work_it); if (work_queue.empty()) {