From 6ece291f352e1531f29fa8573e21aa47f290b485 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 18 Dec 2020 16:00:54 -0800 Subject: [PATCH] Clean up block/unblock handling of resources in new scheduler (#12963) --- python/ray/tests/test_failure.py | 5 +-- python/ray/tests/test_gcs_fault_tolerance.py | 2 -- python/ray/tests/test_global_state.py | 3 -- python/ray/tests/test_reference_counting.py | 2 +- src/ray/raylet/node_manager.cc | 14 +++++--- .../scheduling/cluster_resource_scheduler.cc | 35 +++++++++++++------ .../scheduling/cluster_resource_scheduler.h | 8 +++-- .../raylet/scheduling/cluster_task_manager.cc | 12 +++---- .../raylet/scheduling/cluster_task_manager.h | 4 +-- src/ray/raylet/test/util.h | 8 ----- src/ray/raylet/worker.h | 22 ------------ src/ray/raylet/worker_pool.cc | 4 +-- 12 files changed, 51 insertions(+), 68 deletions(-) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index b3505bbf5..f01868989 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -17,8 +17,7 @@ import ray.ray_constants as ray_constants from ray.exceptions import RayTaskError from ray.cluster_utils import Cluster from ray.test_utils import (wait_for_condition, SignalActor, init_error_pubsub, - get_error_message, Semaphore, - new_scheduler_enabled) + get_error_message, Semaphore) def test_failed_task(ray_start_regular, error_pubsub): @@ -633,8 +632,6 @@ def test_export_large_objects(ray_start_regular, error_pubsub): assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR -@pytest.mark.skipif( - new_scheduler_enabled(), reason="Supposed to deadlock, but it doesn't") def test_warning_all_tasks_blocked(shutdown_only): ray.init( num_cpus=1, _system_config={"debug_dump_period_milliseconds": 500}) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 32f20d42a..642cf4dd3 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -6,7 +6,6 @@ from ray.test_utils import ( generate_system_config_map, wait_for_condition, wait_for_pid_to_exit, - new_scheduler_enabled, ) @@ -21,7 +20,6 @@ def increase(x): return x + 1 -@pytest.mark.skipif(new_scheduler_enabled(), reason="notimpl") @pytest.mark.parametrize( "ray_start_regular", [ generate_system_config_map( diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index c201b6bc3..3dcd64c1e 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -8,7 +8,6 @@ import time import ray import ray.ray_constants import ray.test_utils -from ray.test_utils import new_scheduler_enabled from ray._raylet import GlobalStateAccessor @@ -217,8 +216,6 @@ def test_load_report(shutdown_only, max_shapes): global_state_accessor.disconnect() -@pytest.mark.skipif( - new_scheduler_enabled(), reason="requires placement groups") def test_placement_group_load_report(ray_start_cluster): cluster = ray_start_cluster # Add a head node that doesn't have gpu resource. diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index b9f3b0906..b93ee4221 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -167,7 +167,7 @@ def test_dependency_refcounts(ray_start_regular): check_refcounts({}) -@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs") +@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic res todo") def test_actor_creation_task(ray_start_regular): @ray.remote def large_object(): diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2e13bbb91..fe975d79b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2158,9 +2158,11 @@ void NodeManager::HandleDirectCallTaskBlocked( cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble(); } if (cpu_instances.size() > 0) { - std::vector borrowed_cpu_instances = + std::vector overflow_cpu_instances = new_resource_scheduler_->AddCPUResourceInstances(cpu_instances); - worker->SetBorrowedCPUInstances(borrowed_cpu_instances); + for (unsigned int i = 0; i < overflow_cpu_instances.size(); i++) { + RAY_CHECK(overflow_cpu_instances[i] == 0) << "Should not be overflow"; + } worker->MarkBlocked(); } ScheduleAndDispatch(); @@ -2199,9 +2201,11 @@ void NodeManager::HandleDirectCallTaskUnblocked( cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble(); } if (cpu_instances.size() > 0) { - new_resource_scheduler_->SubtractCPUResourceInstances(cpu_instances); - new_resource_scheduler_->AddCPUResourceInstances(worker->GetBorrowedCPUInstances()); - worker->ClearBorrowedCPUInstances(); + // Important: we allow going negative here, since otherwise you can use infinite + // CPU resources by repeatedly blocking / unblocking a task. By allowing it to go + // negative, at most one task can "borrow" this worker's resources. + new_resource_scheduler_->SubtractCPUResourceInstances( + cpu_instances, /*allow_going_negative=*/true); worker->MarkUnblocked(); } ScheduleAndDispatch(); diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index bcca8862a..10eae694c 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -565,15 +565,25 @@ std::vector ClusterResourceScheduler::AddAvailableResourceInstances( } std::vector ClusterResourceScheduler::SubtractAvailableResourceInstances( - std::vector available, ResourceInstanceCapacities *resource_instances) { + std::vector available, ResourceInstanceCapacities *resource_instances, + bool allow_going_negative) { RAY_CHECK(available.size() == resource_instances->available.size()); std::vector underflow(available.size(), 0.); for (size_t i = 0; i < available.size(); i++) { - resource_instances->available[i] = resource_instances->available[i] - available[i]; if (resource_instances->available[i] < 0) { - underflow[i] = -resource_instances->available[i]; - resource_instances->available[i] = 0; + if (allow_going_negative) { + resource_instances->available[i] = + resource_instances->available[i] - available[i]; + } else { + underflow[i] = available[i]; // No change in the value in this case. + } + } else { + resource_instances->available[i] = resource_instances->available[i] - available[i]; + if (resource_instances->available[i] < 0 && !allow_going_negative) { + underflow[i] = -resource_instances->available[i]; + resource_instances->available[i] = 0; + } } } return underflow; @@ -777,7 +787,7 @@ std::vector ClusterResourceScheduler::AddCPUResourceInstances( } std::vector ClusterResourceScheduler::SubtractCPUResourceInstances( - std::vector &cpu_instances) { + std::vector &cpu_instances, bool allow_going_negative) { std::vector cpu_instances_fp = VectorDoubleToVectorFixedPoint(cpu_instances); @@ -787,7 +797,8 @@ std::vector ClusterResourceScheduler::SubtractCPUResourceInstances( RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end()); auto underflow = SubtractAvailableResourceInstances( - cpu_instances_fp, &local_resources_.predefined_resources[CPU]); + cpu_instances_fp, &local_resources_.predefined_resources[CPU], + allow_going_negative); UpdateLocalAvailableResourcesFromResourceInstances(); return VectorFixedPointToVectorDouble(underflow); @@ -916,7 +927,8 @@ void ClusterResourceScheduler::FillResourceUsage( const auto &label = ResourceEnumToString((PredefinedResources)i); const auto &capacity = resources.predefined_resources[i]; const auto &last_capacity = last_report_resources_->predefined_resources[i]; - if (capacity.available != last_capacity.available) { + // Note: available may be negative, but only report positive to GCS. + if (capacity.available != last_capacity.available && capacity.available > 0) { resources_data->set_resources_available_changed(true); (*resources_data->mutable_resources_available())[label] = capacity.available.Double(); @@ -931,7 +943,8 @@ void ClusterResourceScheduler::FillResourceUsage( const auto &capacity = it->second; const auto &last_capacity = last_report_resources_->custom_resources[custom_id]; const auto &label = string_to_int_map_.Get(custom_id); - if (capacity.available != last_capacity.available) { + // Note: available may be negative, but only report positive to GCS. + if (capacity.available != last_capacity.available && capacity.available > 0) { resources_data->set_resources_available_changed(true); (*resources_data->mutable_resources_available())[label] = capacity.available.Double(); @@ -947,7 +960,8 @@ void ClusterResourceScheduler::FillResourceUsage( for (int i = 0; i < PredefinedResources_MAX; i++) { const auto &label = ResourceEnumToString((PredefinedResources)i); const auto &capacity = resources.predefined_resources[i]; - if (capacity.available != 0) { + // Note: available may be negative, but only report positive to GCS. + if (capacity.available > 0) { (*resources_data->mutable_resources_available())[label] = capacity.available.Double(); } @@ -960,7 +974,8 @@ void ClusterResourceScheduler::FillResourceUsage( uint64_t custom_id = it->first; const auto &capacity = it->second; const auto &label = string_to_int_map_.Get(custom_id); - if (capacity.available != 0) { + // Note: available may be negative, but only report positive to GCS. + if (capacity.available > 0) { (*resources_data->mutable_resources_available())[label] = capacity.available.Double(); } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 9e480b4c8..7d1f5253c 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -279,11 +279,13 @@ class ClusterResourceScheduler { /// /// \param free A list of capacities for resource's instances to be freed. /// \param resource_instances List of the resource instances being updated. + /// \param allow_going_negative Allow the values to go negative (disable underflow). /// \return Underflow of "resource_instances" after subtracting instance /// capacities in "available", i.e.,. /// max(available - reasource_instances.available, 0) std::vector SubtractAvailableResourceInstances( - std::vector available, ResourceInstanceCapacities *resource_instances); + std::vector available, ResourceInstanceCapacities *resource_instances, + bool allow_going_negative = false); /// Increase the available CPU instances of this node. /// @@ -296,10 +298,12 @@ class ClusterResourceScheduler { /// Decrease the available CPU instances of this node. /// /// \param cpu_instances CPU instances to be removed from available cpus. + /// \param allow_going_negative Allow the values to go negative (disable underflow). /// /// \return Underflow capacities of CPU instances after subtracting CPU /// capacities in cpu_instances. - std::vector SubtractCPUResourceInstances(std::vector &cpu_instances); + std::vector SubtractCPUResourceInstances(std::vector &cpu_instances, + bool allow_going_negative = false); /// Increase the available GPU instances of this node. /// diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index c11d818ef..09db70f1b 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -1,6 +1,7 @@ #include "ray/raylet/scheduling/cluster_task_manager.h" #include +#include #include "ray/util/logging.h" @@ -242,10 +243,7 @@ void ClusterTaskManager::TasksUnblocked(const std::vector ready_ids) { const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); RAY_LOG(DEBUG) << "Args ready, task can be dispatched " << task.GetTaskSpecification().TaskId(); - // Note: we transition tasks back to the scheduling queue instead of directly - // to dispatch. This allows AnyPendingTasks() to simply check the scheduling - // queue to see if any tasks are blocked on resource availability: see #12438 - tasks_to_schedule_[scheduling_key].push_back(work); + tasks_to_dispatch_[scheduling_key].push_back(work); waiting_tasks_.erase(it); } } @@ -507,9 +505,9 @@ bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending, int *num_pending_actor_creation, int *num_pending_tasks) const { // We are guaranteed that these tasks are blocked waiting for resources after a - // call to ScheduleAndDispatch(). Note that tasks that transition to waiting - // move back to the tasks_to_schedule_ queue after their deps are satisfied. - for (const auto &shapes_it : tasks_to_schedule_) { + // call to ScheduleAndDispatch(). They may be waiting for workers as well, but + // this should be a transient condition only. + for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) { auto &work_queue = shapes_it.second; for (const auto &work_it : work_queue) { const auto &task = std::get<0>(work_it); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index b71593f8a..aabc0a6fb 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -157,11 +157,11 @@ class ClusterTaskManager { std::unordered_map> tasks_to_schedule_; /// Queue of lease requests that should be scheduled onto workers. - /// Tasks move from scheduled -> dispatch. + /// Tasks move from scheduled | waiting -> dispatch. std::unordered_map> tasks_to_dispatch_; /// Tasks waiting for arguments to be transferred locally. - /// Tasks move (back) from waiting -> scheduled. + /// Tasks move from waiting -> dispatch. absl::flat_hash_map waiting_tasks_; /// Queue of lease requests that are infeasible. diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 90fc9b158..a39bdd1c0 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -161,10 +161,6 @@ class MockWorker : public WorkerInterface { void ClearLifetimeAllocatedInstances() { lifetime_allocated_instances_ = nullptr; } - void SetBorrowedCPUInstances(std::vector &cpu_instances) { - borrowed_cpu_instances_ = cpu_instances; - } - const BundleID &GetBundleId() const { RAY_CHECK(false) << "Method unused"; return bundle_id_; @@ -172,10 +168,6 @@ class MockWorker : public WorkerInterface { void SetBundleId(const BundleID &bundle_id) { bundle_id_ = bundle_id; } - std::vector &GetBorrowedCPUInstances() { return borrowed_cpu_instances_; } - - void ClearBorrowedCPUInstances() { RAY_CHECK(false) << "Method unused"; } - Task &GetAssignedTask() { RAY_CHECK(false) << "Method unused"; auto *t = new Task(); diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 7cd19868e..b4c0b34ce 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -98,12 +98,6 @@ class WorkerInterface { virtual void ClearLifetimeAllocatedInstances() = 0; - virtual void SetBorrowedCPUInstances(std::vector &cpu_instances) = 0; - - virtual std::vector &GetBorrowedCPUInstances() = 0; - - virtual void ClearBorrowedCPUInstances() = 0; - virtual Task &GetAssignedTask() = 0; virtual void SetAssignedTask(const Task &assigned_task) = 0; @@ -196,14 +190,6 @@ class Worker : public WorkerInterface { void ClearLifetimeAllocatedInstances() { lifetime_allocated_instances_ = nullptr; }; - void SetBorrowedCPUInstances(std::vector &cpu_instances) { - borrowed_cpu_instances_ = cpu_instances; - }; - - std::vector &GetBorrowedCPUInstances() { return borrowed_cpu_instances_; }; - - void ClearBorrowedCPUInstances() { return borrowed_cpu_instances_.clear(); }; - Task &GetAssignedTask() { return assigned_task_; }; void SetAssignedTask(const Task &assigned_task) { assigned_task_ = assigned_task; }; @@ -273,14 +259,6 @@ class Worker : public WorkerInterface { /// The capacity of each resource instance allocated to this worker /// when running as an actor. std::shared_ptr lifetime_allocated_instances_; - /// CPUs borrowed by the worker. This happens in the following scenario: - /// 1) Worker A is blocked, so it donates its CPUs back to the node. - /// 2) Other workers are scheduled and are allocated some of the CPUs donated by A. - /// 3) Task A is unblocked, but it cannot get all CPUs back. At this point, - /// the node is oversubscribed. borrowed_cpu_instances_ represents the number - /// of CPUs this node is oversubscribed by. - /// TODO (Ion): Investigate a more intuitive alternative to track these Cpus. - std::vector borrowed_cpu_instances_; /// Task being assigned to this worker. Task assigned_task_; }; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index a7ad5c245..93a568748 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -956,8 +956,8 @@ const std::vector> WorkerPool::GetAllRegistered } void WorkerPool::WarnAboutSize() { - for (const auto &entry : states_by_lang_) { - auto state = entry.second; + for (auto &entry : states_by_lang_) { + auto &state = entry.second; int64_t num_workers_started_or_registered = 0; num_workers_started_or_registered += static_cast(state.registered_workers.size());