From 9a40d7b4eec7d4ba4da87bdb6e901a30d3bd68ac Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Wed, 27 Jan 2021 15:30:58 -0800 Subject: [PATCH] [Core/Autoscaler] Properly clean up resource backlog from (#13727) --- .../raylet/scheduling/cluster_task_manager.cc | 21 ++++-- .../scheduling/cluster_task_manager_test.cc | 71 ++++++++++++------- 2 files changed, 62 insertions(+), 30 deletions(-) diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index a395e51b5..43c6ce1cc 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -618,12 +618,21 @@ bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending, std::string ClusterTaskManager::DebugStr() const { // TODO(Shanly): This method will be replaced with `DebugString` once we remove the // legacy scheduler. + auto accumulator = [](int state, const std::pair> &pair) { + return state + pair.second.size(); + }; + int num_infeasible_tasks = + std::accumulate(infeasible_tasks_.begin(), infeasible_tasks_.end(), 0, accumulator); + int num_tasks_to_schedule = std::accumulate(tasks_to_schedule_.begin(), + tasks_to_schedule_.end(), 0, accumulator); + int num_tasks_to_dispatch = std::accumulate(tasks_to_dispatch_.begin(), + tasks_to_dispatch_.end(), 0, accumulator); std::stringstream buffer; buffer << "========== Node: " << self_node_id_ << " =================\n"; - buffer << "Schedule queue length: " << tasks_to_schedule_.size() << "\n"; - buffer << "Dispatch queue length: " << tasks_to_dispatch_.size() << "\n"; + buffer << "Infeasible queue length: " << num_infeasible_tasks << "\n"; + buffer << "Schedule queue length: " << num_tasks_to_schedule << "\n"; + buffer << "Dispatch queue length: " << num_tasks_to_dispatch << "\n"; buffer << "Waiting tasks size: " << waiting_tasks_.size() << "\n"; - buffer << "infeasible queue length size: " << infeasible_tasks_.size() << "\n"; buffer << "cluster_resource_scheduler state: " << cluster_resource_scheduler_->DebugString() << "\n"; buffer << "=================================================="; @@ -673,7 +682,6 @@ void ClusterTaskManager::Dispatch( const Task &task, rpc::RequestWorkerLeaseReply *reply, std::function send_reply_callback) { const auto &task_spec = task.GetTaskSpecification(); - RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId(); // Pass the contact info of the worker to use. reply->set_worker_pid(worker->GetProcess().GetId()); reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); @@ -683,6 +691,7 @@ void ClusterTaskManager::Dispatch( RAY_CHECK(leased_workers.find(worker->WorkerId()) == leased_workers.end()); leased_workers[worker->WorkerId()] = worker; + RemoveFromBacklogTracker(task); // Update our internal view of the cluster state. std::shared_ptr allocated_resources; @@ -734,7 +743,9 @@ void ClusterTaskManager::Dispatch( } void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) { - const auto &task_spec = std::get<0>(work).GetTaskSpecification(); + const auto &task = std::get<0>(work); + const auto &task_spec = task.GetTaskSpecification(); + RemoveFromBacklogTracker(task); RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to; if (!cluster_resource_scheduler_->AllocateRemoteTaskResources( diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 7c5f00820..776e7fc53 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -554,48 +554,69 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { *callback_occurred_ptr = true; }; - std::shared_ptr worker = - std::make_shared(WorkerID::FromRandom(), 1234); - pool_.PushWorker(std::dynamic_pointer_cast(worker)); - std::vector to_cancel; - for (int i = 0; i < 10; i++) { - Task task = CreateTask({{ray::kCPU_ResourceLabel, 100}}); - task.SetBacklogSize(i); + // Don't add these fist 2 tasks to `to_cancel`. + for (int i = 0; i < 1; i++) { + Task task = CreateTask({{ray::kCPU_ResourceLabel, 8}}); + task.SetBacklogSize(10 - i); + task_manager_.QueueAndScheduleTask(task, &reply, callback); + } + + for (int i = 1; i < 10; i++) { + Task task = CreateTask({{ray::kCPU_ResourceLabel, 8}}); + task.SetBacklogSize(10 - i); task_manager_.QueueAndScheduleTask(task, &reply, callback); to_cancel.push_back(task.GetTaskSpecification().TaskId()); } ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); - ASSERT_EQ(pool_.workers.size(), 1); + ASSERT_EQ(pool_.workers.size(), 0); ASSERT_EQ(node_info_calls_, 0); - auto data = std::make_shared(); - task_manager_.FillResourceUsage(data); + { // No tasks can run because the worker pool is empty. + auto data = std::make_shared(); + task_manager_.FillResourceUsage(data); + auto resource_load_by_shape = data->resource_load_by_shape(); + auto shape1 = resource_load_by_shape.resource_demands()[0]; - auto resource_load_by_shape = data->resource_load_by_shape(); - auto shape1 = resource_load_by_shape.resource_demands()[0]; + ASSERT_EQ(shape1.backlog_size(), 55); + ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0); + ASSERT_EQ(shape1.num_ready_requests_queued(), 10); + } - ASSERT_EQ(shape1.backlog_size(), 45); - ASSERT_EQ(shape1.num_infeasible_requests_queued(), 10); - ASSERT_EQ(shape1.num_ready_requests_queued(), 0); + // Push a worker so the first task can run. + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + pool_.PushWorker(worker); + task_manager_.ScheduleAndDispatchTasks(); + { + auto data = std::make_shared(); + task_manager_.FillResourceUsage(data); + auto resource_load_by_shape = data->resource_load_by_shape(); + auto shape1 = resource_load_by_shape.resource_demands()[0]; + + ASSERT_TRUE(callback_occurred); + ASSERT_EQ(shape1.backlog_size(), 45); + ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0); + ASSERT_EQ(shape1.num_ready_requests_queued(), 9); + } + + // Cancel the rest. for (auto &task_id : to_cancel) { ASSERT_TRUE(task_manager_.CancelTask(task_id)); } + RAY_LOG(ERROR) << "Finished cancelling tasks"; - data = std::make_shared(); - task_manager_.FillResourceUsage(data); - - resource_load_by_shape = data->resource_load_by_shape(); - shape1 = resource_load_by_shape.resource_demands()[0]; - - ASSERT_EQ(shape1.backlog_size(), 0); - ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0); - ASSERT_EQ(shape1.num_ready_requests_queued(), 0); - AssertNoLeaks(); + { + auto data = std::make_shared(); + task_manager_.FillResourceUsage(data); + auto resource_load_by_shape = data->resource_load_by_shape(); + ASSERT_EQ(resource_load_by_shape.resource_demands().size(), 0); + AssertNoLeaks(); + } } TEST_F(ClusterTaskManagerTest, OwnerDeadTest) {