From aa64cd45343427ca03f4f997ecf163d84d0154bb Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Fri, 11 Dec 2020 21:47:01 -0800 Subject: [PATCH] [New scheduler] Fix test_global_state (#12586) --- python/ray/tests/test_global_state.py | 7 +- src/ray/common/task/task.cc | 2 + src/ray/common/task/task.h | 2 + .../scheduling/cluster_resource_scheduler.cc | 9 ++ .../scheduling/cluster_resource_scheduler.h | 7 + .../raylet/scheduling/cluster_task_manager.cc | 129 ++++++++++++++++-- .../raylet/scheduling/cluster_task_manager.h | 11 +- .../scheduling/cluster_task_manager_test.cc | 57 ++++++++ 8 files changed, 208 insertions(+), 16 deletions(-) diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index 21808eb2b..c201b6bc3 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -144,7 +144,6 @@ def test_global_state_actor_entry(ray_start_regular): @pytest.mark.parametrize("max_shapes", [0, 2, -1]) -@pytest.mark.skipif(new_scheduler_enabled(), reason="broken") def test_load_report(shutdown_only, max_shapes): resource1 = "A" resource2 = "B" @@ -196,6 +195,8 @@ def test_load_report(shutdown_only, max_shapes): if max_shapes != -1: assert len(checker.report) <= max_shapes + print(checker.report) + if max_shapes > 0: # Check that we always include the 1-CPU resource shape. one_cpu_shape = {"CPU": 1} @@ -216,7 +217,8 @@ def test_load_report(shutdown_only, max_shapes): global_state_accessor.disconnect() -@pytest.mark.skipif(new_scheduler_enabled(), reason="broken") +@pytest.mark.skipif( + new_scheduler_enabled(), reason="requires placement groups") def test_placement_group_load_report(ray_start_cluster): cluster = ray_start_cluster # Add a head node that doesn't have gpu resource. @@ -285,7 +287,6 @@ def test_placement_group_load_report(ray_start_cluster): global_state_accessor.disconnect() -@pytest.mark.skipif(new_scheduler_enabled(), reason="broken") def test_backlog_report(shutdown_only): cluster = ray.init( num_cpus=1, _system_config={ diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index c8ec17a17..bf0b8dfb1 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -35,6 +35,8 @@ void Task::CopyTaskExecutionSpec(const Task &task) { task_execution_spec_ = task.task_execution_spec_; } +void Task::SetBacklogSize(int64_t backlog_size) { backlog_size_ = backlog_size; } + int64_t Task::BacklogSize() const { return backlog_size_; } std::string Task::DebugString() const { diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 1fe9e24fc..cc4010dcd 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -89,6 +89,8 @@ class Task { /// Returns the cancellation task callback, or nullptr. const CancelTaskCallback &OnCancellation() const { return on_cancellation_; } + void SetBacklogSize(int64_t backlog_size); + int64_t BacklogSize() const; std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 70173c016..2245760c2 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -76,6 +76,15 @@ bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) { return RemoveNode(node_id); } +bool ClusterResourceScheduler::IsLocallyFeasible( + const std::unordered_map shape) { + const TaskRequest task_req = ResourceMapToTaskRequest(string_to_int_map_, shape); + RAY_CHECK(nodes_.contains(local_node_id_)); + const auto &it = nodes_.find(local_node_id_); + RAY_CHECK(it != nodes_.end()); + return IsFeasible(task_req, it->second.GetLocalView()); +} + bool ClusterResourceScheduler::IsFeasible(const TaskRequest &task_req, const NodeResources &resources) const { // First, check predefined resources. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 05686c4ae..dfaa61fbd 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -73,6 +73,13 @@ class ClusterResourceScheduler { bool RemoveNode(int64_t node_id); bool RemoveNode(const std::string &node_id_string); + /// Check whether a task request is feasible on a given node. A node is + /// feasible if it has the total resources needed to eventually execute the + /// task, even if those resources are currently allocated. + /// + /// \param shape The resource demand's shape. + bool IsLocallyFeasible(const std::unordered_map shape); + /// Check whether a task request is feasible on a given node. A node is /// feasible if it has the total resources needed to eventually execute the /// task, even if those resources are currently allocated. diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 1e936254e..4b6e5e0d7 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -17,7 +17,10 @@ ClusterTaskManager::ClusterTaskManager( cluster_resource_scheduler_(cluster_resource_scheduler), fulfills_dependencies_func_(fulfills_dependencies_func), is_owner_alive_(is_owner_alive), - get_node_info_(get_node_info) {} + get_node_info_(get_node_info), + max_resource_shapes_per_load_report_( + RayConfig::instance().max_resource_shapes_per_load_report()), + report_worker_backlog_(RayConfig::instance().report_worker_backlog()) {} bool ClusterTaskManager::SchedulePendingTasks() { bool did_schedule = false; @@ -134,7 +137,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( if (worker_leased) { auto reply = std::get<1>(*work_it); auto callback = std::get<2>(*work_it); - Dispatch(worker, leased_workers, spec, reply, callback); + Dispatch(worker, leased_workers, task, reply, callback); } else { worker_pool.PushWorker(worker); } @@ -199,6 +202,7 @@ void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseRepl Work work = std::make_tuple(task, reply, callback); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); tasks_to_schedule_[scheduling_class].push_back(work); + AddToBacklogTracker(task); } void ClusterTaskManager::TasksUnblocked(const std::vector ready_ids) { @@ -236,7 +240,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { shapes_it++) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { - if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) { + const auto &task = std::get<0>(*work_it); + if (task.GetTaskSpecification().TaskId() == task_id) { + RemoveFromBacklogTracker(task); RAY_LOG(DEBUG) << "Canceling task " << task_id; ReplyCancelled(*work_it); work_queue.erase(work_it); @@ -251,7 +257,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { shapes_it++) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { - if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) { + const auto &task = std::get<0>(*work_it); + if (task.GetTaskSpecification().TaskId() == task_id) { + RemoveFromBacklogTracker(task); ReplyCancelled(*work_it); work_queue.erase(work_it); if (work_queue.empty()) { @@ -264,6 +272,8 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { auto iter = waiting_tasks_.find(task_id); if (iter != waiting_tasks_.end()) { + const auto &task = std::get<0>(iter->second); + RemoveFromBacklogTracker(task); ReplyCancelled(iter->second); waiting_tasks_.erase(iter); return true; @@ -275,6 +285,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { void ClusterTaskManager::FillResourceUsage( bool light_report_resource_usage_enabled, std::shared_ptr data) const { + if (max_resource_shapes_per_load_report_ == 0) { + return; + } // TODO (WangTao): Find a way to check if load changed and combine it with light // heartbeat. Now we just report it every time. data->set_resource_load_changed(true); @@ -282,9 +295,59 @@ void ClusterTaskManager::FillResourceUsage( auto resource_load_by_shape = data->mutable_resource_load_by_shape()->mutable_resource_demands(); - // TODO (Alex): Implement the 1-CPU task optimization. + int num_reported = 0; + + // 1-CPU optimization + static const ResourceSet one_cpu_resource_set( + std::unordered_map({{kCPU_ResourceLabel, 1}})); + static const SchedulingClass one_cpu_scheduling_cls( + TaskSpecification::GetSchedulingClass(one_cpu_resource_set)); + { + num_reported++; + int count = 0; + auto it = tasks_to_schedule_.find(one_cpu_scheduling_cls); + if (it != tasks_to_schedule_.end()) { + count += it->second.size(); + } + it = tasks_to_dispatch_.find(one_cpu_scheduling_cls); + if (it != tasks_to_dispatch_.end()) { + count += it->second.size(); + } + + if (count > 0) { + auto by_shape_entry = resource_load_by_shape->Add(); + + for (const auto &resource : one_cpu_resource_set.GetResourceMap()) { + // Add to `resource_loads`. + const auto &label = resource.first; + const auto &quantity = resource.second; + (*resource_loads)[label] += quantity * count; + + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + } + + int num_ready = by_shape_entry->num_ready_requests_queued(); + by_shape_entry->set_num_ready_requests_queued(num_ready + count); + + auto backlog_it = backlog_tracker_.find(one_cpu_scheduling_cls); + if (backlog_it != backlog_tracker_.end()) { + by_shape_entry->set_backlog_size(backlog_it->second); + } + } + } + for (const auto &pair : tasks_to_schedule_) { const auto &scheduling_class = pair.first; + if (scheduling_class == one_cpu_scheduling_cls) { + continue; + } + if (num_reported++ >= max_resource_shapes_per_load_report_ && + max_resource_shapes_per_load_report_ >= 0) { + // TODO (Alex): It's possible that we skip a different scheduling key which contains + // the same resources. + break; + } const auto &resources = TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) .GetResourceMap(); @@ -301,14 +364,35 @@ void ClusterTaskManager::FillResourceUsage( // Add to `resource_load_by_shape`. (*by_shape_entry->mutable_shape())[label] = quantity; - // TODO (Alex): Technically being on `tasks_to_schedule` could also mean - // that the entire cluster is utilized. - by_shape_entry->set_num_infeasible_requests_queued(count); + } + + // If a task is not feasible on the local node it will not be feasible on any other + // node in the cluster. See the scheduling policy defined by + // ClusterResourceScheduler::GetBestSchedulableNode for more details. + if (cluster_resource_scheduler_->IsLocallyFeasible(resources)) { + int num_ready = by_shape_entry->num_ready_requests_queued(); + by_shape_entry->set_num_ready_requests_queued(num_ready + count); + } else { + int num_infeasible = by_shape_entry->num_infeasible_requests_queued(); + by_shape_entry->set_num_infeasible_requests_queued(num_infeasible + count); + } + auto backlog_it = backlog_tracker_.find(scheduling_class); + if (backlog_it != backlog_tracker_.end()) { + by_shape_entry->set_backlog_size(backlog_it->second); } } for (const auto &pair : tasks_to_dispatch_) { const auto &scheduling_class = pair.first; + if (scheduling_class == one_cpu_scheduling_cls) { + continue; + } + if (num_reported++ >= max_resource_shapes_per_load_report_ && + max_resource_shapes_per_load_report_ >= 0) { + // TODO (Alex): It's possible that we skip a different scheduling key which contains + // the same resources. + break; + } const auto &resources = TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) .GetResourceMap(); @@ -325,9 +409,12 @@ void ClusterTaskManager::FillResourceUsage( // Add to `resource_load_by_shape`. (*by_shape_entry->mutable_shape())[label] = quantity; - // TODO (Alex): Technically being on `tasks_to_schedule` could also mean - // that the entire cluster is utilized. - by_shape_entry->set_num_ready_requests_queued(count); + } + int num_ready = by_shape_entry->num_ready_requests_queued(); + by_shape_entry->set_num_ready_requests_queued(num_ready + count); + auto backlog_it = backlog_tracker_.find(scheduling_class); + if (backlog_it != backlog_tracker_.end()) { + by_shape_entry->set_backlog_size(backlog_it->second); } } } @@ -347,8 +434,9 @@ std::string ClusterTaskManager::DebugString() const { void ClusterTaskManager::Dispatch( std::shared_ptr worker, std::unordered_map> &leased_workers, - const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, + const Task &task, rpc::RequestWorkerLeaseReply *reply, std::function send_reply_callback) { + const auto &task_spec = task.GetTaskSpecification(); RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId(); // Pass the contact info of the worker to use. reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); @@ -433,5 +521,22 @@ void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) send_reply_callback(); } +void ClusterTaskManager::AddToBacklogTracker(const Task &task) { + if (report_worker_backlog_) { + auto cls = task.GetTaskSpecification().GetSchedulingClass(); + backlog_tracker_[cls] += task.BacklogSize(); + } +} + +void ClusterTaskManager::RemoveFromBacklogTracker(const Task &task) { + if (report_worker_backlog_) { + SchedulingClass cls = task.GetTaskSpecification().GetSchedulingClass(); + backlog_tracker_[cls] -= task.BacklogSize(); + if (backlog_tracker_[cls] == 0) { + backlog_tracker_.erase(backlog_tracker_.find(cls)); + } + } +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 6e15e0e35..6ec2db994 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -128,6 +128,9 @@ class ClusterTaskManager { std::function is_owner_alive_; NodeInfoGetter get_node_info_; + const int max_resource_shapes_per_load_report_; + const bool report_worker_backlog_; + /// Queue of lease requests that are waiting for resources to become available. /// Tasks move from scheduled -> dispatch | waiting. std::unordered_map> tasks_to_schedule_; @@ -140,6 +143,9 @@ class ClusterTaskManager { /// Tasks move from waiting -> dispatch. absl::flat_hash_map waiting_tasks_; + /// Track the cumulative backlog of all workers requesting a lease to this raylet. + std::unordered_map backlog_tracker_; + /// Determine whether a task should be immediately dispatched, /// or placed on a wait queue. /// @@ -149,10 +155,13 @@ class ClusterTaskManager { void Dispatch( std::shared_ptr worker, std::unordered_map> &leased_workers_, - const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, + const Task &task, rpc::RequestWorkerLeaseReply *reply, std::function send_reply_callback); void Spillback(const NodeID &spillback_to, const Work &work); + + void AddToBacklogTracker(const Task &task); + void RemoveFromBacklogTracker(const Task &task); }; } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 2e9f54705..ddda8fed5 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -480,6 +480,63 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { } } +TEST_F(ClusterTaskManagerTest, BacklogReportTest) { + /* + Test basic scheduler functionality: + 1. Queue and attempt to schedule/dispatch atest with no workers available + 2. A worker becomes available, dispatch again. + */ + rpc::RequestWorkerLeaseReply reply; + bool callback_occurred = false; + bool *callback_occurred_ptr = &callback_occurred; + auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; + + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + + std::vector to_cancel; + + for (int i = 0; i < 10; i++) { + Task task = CreateTask({{ray::kCPU_ResourceLabel, 100}}); + task.SetBacklogSize(i); + task_manager_.QueueTask(task, &reply, callback); + to_cancel.push_back(task.GetTaskSpecification().TaskId()); + } + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + + ASSERT_FALSE(callback_occurred); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_EQ(pool_.workers.size(), 1); + ASSERT_EQ(fulfills_dependencies_calls_, 0); + ASSERT_EQ(node_info_calls_, 0); + + auto data = std::make_shared(); + task_manager_.FillResourceUsage(false, data); + + auto resource_load_by_shape = data->resource_load_by_shape(); + auto shape1 = resource_load_by_shape.resource_demands()[0]; + + ASSERT_EQ(shape1.backlog_size(), 45); + ASSERT_EQ(shape1.num_infeasible_requests_queued(), 10); + ASSERT_EQ(shape1.num_ready_requests_queued(), 0); + + for (auto &task_id : to_cancel) { + ASSERT_TRUE(task_manager_.CancelTask(task_id)); + } + + data = std::make_shared(); + task_manager_.FillResourceUsage(false, data); + + resource_load_by_shape = data->resource_load_by_shape(); + shape1 = resource_load_by_shape.resource_demands()[0]; + + ASSERT_EQ(shape1.backlog_size(), 0); + ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0); + ASSERT_EQ(shape1.num_ready_requests_queued(), 0); +} + TEST_F(ClusterTaskManagerTest, OwnerDeadTest) { /* Test the race condition in which the owner of a task dies while the task is pending.