diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 89167f475..84904a5e1 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -16,14 +16,8 @@ import ray.utils import ray.ray_constants as ray_constants from ray.exceptions import RayTaskError from ray.cluster_utils import Cluster -from ray.test_utils import ( - wait_for_condition, - SignalActor, - init_error_pubsub, - get_error_message, - Semaphore, - new_scheduler_enabled, -) +from ray.test_utils import (wait_for_condition, SignalActor, init_error_pubsub, + get_error_message, Semaphore) def test_failed_task(ray_start_regular, error_pubsub): @@ -663,7 +657,6 @@ def test_warning_for_resource_deadlock(error_pubsub, shutdown_only): assert errors[0].type == ray_constants.RESOURCE_DEADLOCK_ERROR -@pytest.mark.skipif(new_scheduler_enabled(), reason="broken") def test_warning_for_infeasible_tasks(ray_start_regular, error_pubsub): p = error_pubsub # Check that we get warning messages for infeasible tasks. @@ -689,7 +682,6 @@ def test_warning_for_infeasible_tasks(ray_start_regular, error_pubsub): assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR -@pytest.mark.skipif(new_scheduler_enabled(), reason="broken") def test_warning_for_infeasible_zero_cpu_actor(shutdown_only): # Check that we cannot place an actor on a 0 CPU machine and that we get an # infeasibility warning (even though the actor creation task itself @@ -956,7 +948,6 @@ def test_raylet_crash_when_get(ray_start_regular): thread.join() -@pytest.mark.skipif(new_scheduler_enabled(), reason="broken") def test_connect_with_disconnected_node(shutdown_only): config = { "num_heartbeats_timeout": 50, diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 1ff2dd29d..0e1fa3af2 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -224,7 +224,7 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE // make room. if (space_needed > 0) { if (spill_objects_callback_) { - // If the space needed is too small, we'd like to bump up to the minimum spilling + // If the space needed is too small, we'd like to bump up to the minimum // size. Cap the max size to be lower than the plasma store limit. int64_t byte_to_spill = std::min(PlasmaAllocator::GetFootprintLimit(), diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 31ef907e0..e86975ba0 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -221,9 +221,12 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self return !(failed_workers_cache_.count(owner_worker_id) > 0 || failed_nodes_cache_.count(owner_node_id) > 0); }; + auto announce_infeasible_task = [this](const Task &task) { + PublishInfeasibleTaskError(task); + }; cluster_task_manager_ = std::shared_ptr(new ClusterTaskManager( self_node_id_, new_resource_scheduler_, fulfills_dependencies_func, - is_owner_alive, get_node_info_func)); + is_owner_alive, get_node_info_func, announce_infeasible_task)); } RAY_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); @@ -930,6 +933,7 @@ void NodeManager::ResourceDeleted(const NodeID &node_id, void NodeManager::TryLocalInfeasibleTaskScheduling() { RAY_LOG(DEBUG) << "[LocalResourceUpdateRescheduler] The resource update is on the " "local node, check if we can reschedule tasks"; + SchedulingResources &new_local_resources = cluster_resource_map_[self_node_id_]; // SpillOver locally to figure out which infeasible tasks can be placed now @@ -2006,41 +2010,7 @@ void NodeManager::ScheduleTasks( for (const auto &task : local_queues_.GetTasks(TaskState::PLACEABLE)) { task_dependency_manager_.TaskPending(task); move_task_set.insert(task.GetTaskSpecification().TaskId()); - - // This block is used to suppress infeasible task warning. - bool suppress_warning = false; - const auto &required_resources = task.GetTaskSpecification().GetRequiredResources(); - const auto &resources_map = required_resources.GetResourceMap(); - const auto &it = resources_map.begin(); - // It is a hack to suppress infeasible task warning. - // If the first resource of a task requires this magic number, infeasible warning is - // suppressed. It is currently only used by placement group ready API. We don't want - // to have this in ray_config_def.h because the use case is very narrow, and we don't - // want to expose this anywhere. - double INFEASIBLE_TASK_SUPPRESS_MAGIC_NUMBER = 0.0101; - if (it != resources_map.end() && - it->second == INFEASIBLE_TASK_SUPPRESS_MAGIC_NUMBER) { - suppress_warning = true; - } - - // Push a warning to the task's driver that this task is currently infeasible. - if (!suppress_warning) { - // TODO(rkn): Define this constant somewhere else. - std::string type = "infeasible_task"; - std::ostringstream error_message; - error_message - << "The actor or task with ID " << task.GetTaskSpecification().TaskId() - << " cannot be scheduled right now. It requires " - << task.GetTaskSpecification().GetRequiredPlacementResources().ToString() - << " for placement, however the cluster currently cannot provide the requested " - "resources. The required resources may be added as autoscaling takes place " - "or placement groups are scheduled. Otherwise, consider reducing the " - "resource requirements of the task."; - auto error_data_ptr = - gcs::CreateErrorTableData(type, error_message.str(), current_time_ms(), - task.GetTaskSpecification().JobId()); - RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); - } + PublishInfeasibleTaskError(task); // Assert that this placeable task is not feasible locally (necessary but not // sufficient). RAY_CHECK(!task.GetTaskSpecification().GetRequiredPlacementResources().IsSubset( @@ -3198,6 +3168,42 @@ void NodeManager::RecordMetrics() { local_queues_.RecordMetrics(); } +void NodeManager::PublishInfeasibleTaskError(const Task &task) const { + // This block is used to suppress infeasible task warning. + bool suppress_warning = false; + const auto &required_resources = task.GetTaskSpecification().GetRequiredResources(); + const auto &resources_map = required_resources.GetResourceMap(); + const auto &it = resources_map.begin(); + // It is a hack to suppress infeasible task warning. + // If the first resource of a task requires this magic number, infeasible warning is + // suppressed. It is currently only used by placement group ready API. We don't want + // to have this in ray_config_def.h because the use case is very narrow, and we don't + // want to expose this anywhere. + double INFEASIBLE_TASK_SUPPRESS_MAGIC_NUMBER = 0.0101; + if (it != resources_map.end() && it->second == INFEASIBLE_TASK_SUPPRESS_MAGIC_NUMBER) { + suppress_warning = true; + } + + // Push a warning to the task's driver that this task is currently infeasible. + if (!suppress_warning) { + // TODO(rkn): Define this constant somewhere else. + std::string type = "infeasible_task"; + std::ostringstream error_message; + error_message + << "The actor or task with ID " << task.GetTaskSpecification().TaskId() + << " cannot be scheduled right now. It requires " + << task.GetTaskSpecification().GetRequiredPlacementResources().ToString() + << " for placement, however the cluster currently cannot provide the requested " + "resources. The required resources may be added as autoscaling takes place " + "or placement groups are scheduled. Otherwise, consider reducing the " + "resource requirements of the task."; + auto error_data_ptr = + gcs::CreateErrorTableData(type, error_message.str(), current_time_ms(), + task.GetTaskSpecification().JobId()); + RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); + } +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f1e9ffad0..bdc8f5c47 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -649,6 +649,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Whether the resource is returned successfully. bool ReturnBundleResources(const BundleSpecification &bundle_spec); + /// Publish the infeasible task error to GCS so that drivers can subscribe to it and + /// print. + /// + /// \param task Task that is infeasible + void PublishInfeasibleTaskError(const Task &task) const; + /// ID of this node. NodeID self_node_id_; boost::asio::io_service &io_service_; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 2245760c2..2590ed98f 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -172,7 +172,8 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task_req, bool actor_creation, - int64_t *total_violations) { + int64_t *total_violations, + bool *is_infeasible) { // Minimum number of soft violations across all nodes that can schedule the request. // We will pick the node with the smallest number of soft violations. int64_t min_violations = INT_MAX; @@ -248,20 +249,23 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task best_node = node.first; } if (violations == 0) { - *total_violations = 0; - return best_node; + // If violation is 0, we can schedule the task. So just break the loop + break; } } *total_violations = min_violations; + // If there's no best node, and the task is not feasible locally, + // it means the task is infeasible. + *is_infeasible = best_node == -1 && !local_node_feasible; return best_node; } std::string ClusterResourceScheduler::GetBestSchedulableNode( const std::unordered_map &task_resources, bool actor_creation, - int64_t *total_violations) { + int64_t *total_violations, bool *is_infeasible) { TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); - int64_t node_id = - GetBestSchedulableNode(task_request, actor_creation, total_violations); + int64_t node_id = GetBestSchedulableNode(task_request, actor_creation, total_violations, + is_infeasible); std::string id_string; if (node_id == -1) { diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index dfaa61fbd..c4058e586 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -132,11 +132,13 @@ class ClusterResourceScheduler { /// \param violations: The number of soft constraint violations associated /// with the node returned by this function (assuming /// a node that can schedule task_req is found). + /// \param is_infeasible[in]: It is set true if the task is not schedulable because it + /// is infeasible. /// /// \return -1, if no node can schedule the current request; otherwise, /// return the ID of a node that can schedule the task request. int64_t GetBestSchedulableNode(const TaskRequest &task_request, bool actor_creation, - int64_t *violations); + int64_t *violations, bool *is_infeasible); /// Similar to /// int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t @@ -147,7 +149,7 @@ class ClusterResourceScheduler { // task request. std::string GetBestSchedulableNode( const std::unordered_map &task_request, bool actor_creation, - int64_t *violations); + int64_t *violations, bool *is_infeasible); /// Return resources associated to the given node_id in ret_resources. /// If node_id not found, return false; otherwise return true. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index a43ed2e4c..37bf4b3da 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -346,8 +346,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_EQ(node_id, 1); ASSERT_TRUE(violations > 0); @@ -446,8 +447,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_EQ(node_id, -1); } // Predefined resources, soft constraint violation @@ -458,8 +460,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -472,8 +475,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); } @@ -488,8 +492,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id == -1); } // Custom resources, soft constraint violation. @@ -503,8 +508,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -519,8 +525,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); } @@ -535,8 +542,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id == -1); } // Custom resource missing, soft constraint violation. @@ -550,8 +558,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -567,8 +576,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, placement_hints); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -584,8 +594,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, placement_hints); int64_t violations; - int64_t node_id = - cluster_resources.GetBestSchedulableNode(task_req, false, &violations); + bool is_infeasible; + int64_t node_id = cluster_resources.GetBestSchedulableNode( + task_req, false, &violations, &is_infeasible); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); } @@ -1007,23 +1018,24 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { // No feasible nodes. int64_t total_violations; - ASSERT_EQ( - cluster_resources.GetBestSchedulableNode(resource_spec, false, &total_violations), - ""); + bool is_infeasible; + ASSERT_EQ(cluster_resources.GetBestSchedulableNode(resource_spec, false, + &total_violations, &is_infeasible), + ""); // Feasible remote node, but doesn't currently have resources available. We // should spill there. cluster_resources.AddOrUpdateNode("remote_feasible", resource_spec, {{"CPU", 0.}}); - ASSERT_EQ( - cluster_resources.GetBestSchedulableNode(resource_spec, false, &total_violations), - "remote_feasible"); + ASSERT_EQ(cluster_resources.GetBestSchedulableNode(resource_spec, false, + &total_violations, &is_infeasible), + "remote_feasible"); // Feasible remote node, and it currently has resources available. We should // prefer to spill there. cluster_resources.AddOrUpdateNode("remote_available", resource_spec, resource_spec); - ASSERT_EQ( - cluster_resources.GetBestSchedulableNode(resource_spec, false, &total_violations), - "remote_available"); + ASSERT_EQ(cluster_resources.GetBestSchedulableNode(resource_spec, false, + &total_violations, &is_infeasible), + "remote_available"); } TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { @@ -1156,18 +1168,22 @@ TEST_F(ClusterResourceSchedulerTest, TestDirtyLocalView) { {{"CPU", num_slots_available}}); auto data = std::make_shared(); int64_t t; + bool is_infeasible; for (int i = 0; i < 3; i++) { // Resource usage report tick should reset the remote node's resources. cluster_resources.FillResourceUsage(true, data); for (int j = 0; j < num_slots_available; j++) { - ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t), + ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t, + &is_infeasible), "remote"); // Allocate remote resources. ASSERT_TRUE(cluster_resources.AllocateRemoteTaskResources("remote", task_spec)); } // Our local view says there are not enough resources on the remote node to // schedule another task. - ASSERT_EQ(cluster_resources.GetBestSchedulableNode(task_spec, false, &t), ""); + ASSERT_EQ( + cluster_resources.GetBestSchedulableNode(task_spec, false, &t, &is_infeasible), + ""); ASSERT_FALSE( cluster_resources.AllocateLocalTaskResources(task_spec, task_allocation)); ASSERT_FALSE(cluster_resources.AllocateRemoteTaskResources("remote", task_spec)); @@ -1180,25 +1196,31 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { std::unordered_map task_request = {{"CPU", 1}, {"custom123", 2}}; int64_t t; + bool is_infeasible; - std::string result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + std::string result = + cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_TRUE(result.empty()); cluster_resources.AddLocalResource("custom123", 5); - result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + result = + cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_FALSE(result.empty()); task_request["custom123"] = 6; - result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + result = + cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_TRUE(result.empty()); cluster_resources.AddLocalResource("custom123", 5); - result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + result = + cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_FALSE(result.empty()); cluster_resources.DeleteLocalResource("custom123"); - result = cluster_resources.GetBestSchedulableNode(task_request, false, &t); + result = + cluster_resources.GetBestSchedulableNode(task_request, false, &t, &is_infeasible); ASSERT_TRUE(result.empty()); } diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 4b6e5e0d7..bc86e280f 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -12,21 +12,26 @@ ClusterTaskManager::ClusterTaskManager( std::shared_ptr cluster_resource_scheduler, std::function fulfills_dependencies_func, std::function is_owner_alive, - NodeInfoGetter get_node_info) + NodeInfoGetter get_node_info, + std::function announce_infeasible_task) : self_node_id_(self_node_id), cluster_resource_scheduler_(cluster_resource_scheduler), fulfills_dependencies_func_(fulfills_dependencies_func), is_owner_alive_(is_owner_alive), get_node_info_(get_node_info), + announce_infeasible_task_(announce_infeasible_task), max_resource_shapes_per_load_report_( RayConfig::instance().max_resource_shapes_per_load_report()), report_worker_backlog_(RayConfig::instance().report_worker_backlog()) {} bool ClusterTaskManager::SchedulePendingTasks() { + // Always try to schedule infeasible tasks in case they are now feasible. + TryLocalInfeasibleTaskScheduling(); bool did_schedule = false; for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end();) { auto &work_queue = shapes_it->second; + bool is_infeasible = false; for (auto work_it = work_queue.begin(); work_it != work_queue.end();) { // Check every task in task_to_schedule queue to see // whether it can be scheduled. This avoids head-of-line @@ -39,33 +44,46 @@ bool ClusterTaskManager::SchedulePendingTasks() { << task.GetTaskSpecification().TaskId(); auto placement_resources = task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(); + // This argument is used to set violation, which is an unsupported feature now. int64_t _unused; - // TODO (Alex): We should distinguish between infeasible tasks and a fully - // utilized cluster. std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( placement_resources, task.GetTaskSpecification().IsActorCreationTask(), - &_unused); + &_unused, &is_infeasible); + + // There is no node that has available resources to run the request. + // Move on to the next shape. if (node_id_string.empty()) { - // There is no node that has available resources to run the request. - // Move on to the next shape. - RAY_LOG(DEBUG) << "No feasible node found for task " - << task.GetTaskSpecification().TaskId(); + RAY_LOG(DEBUG) << "No node found to schedule a task " + << task.GetTaskSpecification().TaskId() << " is infeasible?" + << is_infeasible; break; - } else { - if (node_id_string == self_node_id_.Binary()) { - // Warning: WaitForTaskArgsRequests must execute (do not let it short - // circuit if did_schedule is true). - bool task_scheduled = WaitForTaskArgsRequests(work); - did_schedule = task_scheduled || did_schedule; - } else { - // Should spill over to a different node. - NodeID node_id = NodeID::FromBinary(node_id_string); - Spillback(node_id, work); - } - work_it = work_queue.erase(work_it); } + + if (node_id_string == self_node_id_.Binary()) { + // Warning: WaitForTaskArgsRequests must execute (do not let it short + // circuit if did_schedule is true). + bool task_scheduled = WaitForTaskArgsRequests(work); + did_schedule = task_scheduled || did_schedule; + } else { + // Should spill over to a different node. + NodeID node_id = NodeID::FromBinary(node_id_string); + Spillback(node_id, work); + } + work_it = work_queue.erase(work_it); } - if (work_queue.empty()) { + + if (is_infeasible) { + RAY_CHECK(!work_queue.empty()); + // Only announce the first item as infeasible. + auto &work_queue = shapes_it->second; + const auto &work = work_queue[0]; + const Task task = std::get<0>(work); + announce_infeasible_task_(task); + + // TODO(sang): Use a shared pointer deque to reduce copy overhead. + infeasible_tasks_[shapes_it->first] = shapes_it->second; + shapes_it = tasks_to_schedule_.erase(shapes_it); + } else if (work_queue.empty()) { shapes_it = tasks_to_schedule_.erase(shapes_it); } else { shapes_it++; @@ -172,9 +190,12 @@ bool ClusterTaskManager::AttemptDispatchWork(const Work &work, // Spill at most one task from this queue, then move on to the next // queue. int64_t _unused; + bool is_infeasible; auto placement_resources = spec.GetRequiredPlacementResources().GetResourceMap(); std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( - placement_resources, spec.IsActorCreationTask(), &_unused); + placement_resources, spec.IsActorCreationTask(), &_unused, &is_infeasible); + RAY_CHECK(!is_infeasible) + << "Task cannot be infeasible when it is about to be dispatched"; if (node_id_string != self_node_id_.Binary() && !node_id_string.empty()) { NodeID node_id = NodeID::FromBinary(node_id_string); Spillback(node_id, work); @@ -201,7 +222,13 @@ void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseRepl RAY_LOG(DEBUG) << "Queuing task " << task.GetTaskSpecification().TaskId(); Work work = std::make_tuple(task, reply, callback); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); - tasks_to_schedule_[scheduling_class].push_back(work); + // If the scheduling class is infeasible, just add the work to the infeasible queue + // directly. + if (infeasible_tasks_.count(scheduling_class) > 0) { + infeasible_tasks_[scheduling_class].push_back(work); + } else { + tasks_to_schedule_[scheduling_class].push_back(work); + } AddToBacklogTracker(task); } @@ -236,6 +263,8 @@ void ReplyCancelled(Work &work) { } bool ClusterTaskManager::CancelTask(const TaskID &task_id) { + // TODO(sang): There are lots of repetitive code around task backlogs. We should + // refactor them. for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end(); shapes_it++) { auto &work_queue = shapes_it->second; @@ -270,6 +299,23 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { } } + for (auto shapes_it = infeasible_tasks_.begin(); shapes_it != infeasible_tasks_.end(); + shapes_it++) { + auto &work_queue = shapes_it->second; + for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { + const auto &task = std::get<0>(*work_it); + if (task.GetTaskSpecification().TaskId() == task_id) { + RemoveFromBacklogTracker(task); + ReplyCancelled(*work_it); + work_queue.erase(work_it); + if (work_queue.empty()) { + infeasible_tasks_.erase(shapes_it); + } + return true; + } + } + } + auto iter = waiting_tasks_.find(task_id); if (iter != waiting_tasks_.end()) { const auto &task = std::get<0>(iter->second); @@ -369,13 +415,8 @@ void ClusterTaskManager::FillResourceUsage( // If a task is not feasible on the local node it will not be feasible on any other // node in the cluster. See the scheduling policy defined by // ClusterResourceScheduler::GetBestSchedulableNode for more details. - if (cluster_resource_scheduler_->IsLocallyFeasible(resources)) { - int num_ready = by_shape_entry->num_ready_requests_queued(); - by_shape_entry->set_num_ready_requests_queued(num_ready + count); - } else { - int num_infeasible = by_shape_entry->num_infeasible_requests_queued(); - by_shape_entry->set_num_infeasible_requests_queued(num_infeasible + count); - } + int num_ready = by_shape_entry->num_ready_requests_queued(); + by_shape_entry->set_num_ready_requests_queued(num_ready + count); auto backlog_it = backlog_tracker_.find(scheduling_class); if (backlog_it != backlog_tracker_.end()) { by_shape_entry->set_backlog_size(backlog_it->second); @@ -417,6 +458,45 @@ void ClusterTaskManager::FillResourceUsage( by_shape_entry->set_backlog_size(backlog_it->second); } } + + for (const auto &pair : infeasible_tasks_) { + const auto &scheduling_class = pair.first; + if (scheduling_class == one_cpu_scheduling_cls) { + continue; + } + if (num_reported++ >= max_resource_shapes_per_load_report_ && + max_resource_shapes_per_load_report_ >= 0) { + // TODO (Alex): It's possible that we skip a different scheduling key which contains + // the same resources. + break; + } + const auto &resources = + TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) + .GetResourceMap(); + const auto &queue = pair.second; + const auto &count = queue.size(); + + auto by_shape_entry = resource_load_by_shape->Add(); + for (const auto &resource : resources) { + // Add to `resource_loads`. + const auto &label = resource.first; + const auto &quantity = resource.second; + (*resource_loads)[label] += quantity * count; + + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + } + + // If a task is not feasible on the local node it will not be feasible on any other + // node in the cluster. See the scheduling policy defined by + // ClusterResourceScheduler::GetBestSchedulableNode for more details. + int num_infeasible = by_shape_entry->num_infeasible_requests_queued(); + by_shape_entry->set_num_infeasible_requests_queued(num_infeasible + count); + auto backlog_it = backlog_tracker_.find(scheduling_class); + if (backlog_it != backlog_tracker_.end()) { + by_shape_entry->set_backlog_size(backlog_it->second); + } + } } std::string ClusterTaskManager::DebugString() const { @@ -425,12 +505,50 @@ std::string ClusterTaskManager::DebugString() const { buffer << "Schedule queue length: " << tasks_to_schedule_.size() << "\n"; buffer << "Dispatch queue length: " << tasks_to_dispatch_.size() << "\n"; buffer << "Waiting tasks size: " << waiting_tasks_.size() << "\n"; + buffer << "infeasible queue length size: " << infeasible_tasks_.size() << "\n"; buffer << "cluster_resource_scheduler state: " << cluster_resource_scheduler_->DebugString() << "\n"; buffer << "=================================================="; return buffer.str(); } +void ClusterTaskManager::TryLocalInfeasibleTaskScheduling() { + for (auto shapes_it = infeasible_tasks_.begin(); + shapes_it != infeasible_tasks_.end();) { + auto &work_queue = shapes_it->second; + RAY_CHECK(!work_queue.empty()) + << "Empty work queue shouldn't have been added as a infeasible shape."; + // We only need to check the first item because every task has the same shape. + // If the first entry is infeasible, that means everything else is the same. + const auto work = work_queue[0]; + Task task = std::get<0>(work); + RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:" + << task.GetTaskSpecification().TaskId(); + auto placement_resources = + task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(); + // This argument is used to set violation, which is an unsupported feature now. + int64_t _unused; + bool is_infeasible; + std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( + placement_resources, task.GetTaskSpecification().IsActorCreationTask(), &_unused, + &is_infeasible); + + // There is no node that has available resources to run the request. + // Move on to the next shape. + if (is_infeasible) { + RAY_LOG(DEBUG) << "No feasible node found for task " + << task.GetTaskSpecification().TaskId(); + shapes_it++; + } else { + RAY_LOG(DEBUG) << "Infeasible task of task id " + << task.GetTaskSpecification().TaskId() + << " is now feasible. Move the entry back to tasks_to_schedule_"; + tasks_to_schedule_[shapes_it->first] = shapes_it->second; + shapes_it = infeasible_tasks_.erase(shapes_it); + } + } +} + void ClusterTaskManager::Dispatch( std::shared_ptr worker, std::unordered_map> &leased_workers, @@ -492,7 +610,6 @@ void ClusterTaskManager::Dispatch( } } } - // Send the result back. send_reply_callback(); } diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 6ec2db994..995273ed5 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -49,12 +49,14 @@ class ClusterTaskManager { /// \param fulfills_dependencies_func: Returns true if all of a task's /// dependencies are fulfilled. /// \param is_owner_alive: A callback which returns if the owner process is alive - /// (according to our ownership model). \param gcs_client: A gcs client. + /// (according to our ownership model). + /// \param gcs_client: A gcs client. ClusterTaskManager(const NodeID &self_node_id, std::shared_ptr cluster_resource_scheduler, std::function fulfills_dependencies_func, std::function is_owner_alive, - NodeInfoGetter get_node_info); + NodeInfoGetter get_node_info, + std::function announce_infeasible_task); /// (Step 2) For each task in tasks_to_schedule_, pick a node in the system /// (local or remote) that has enough resources available to run the task, if @@ -122,11 +124,20 @@ class ClusterTaskManager { bool AttemptDispatchWork(const Work &work, std::shared_ptr &worker, bool *worker_leased); + /// Reiterate all local infeasible tasks and register them to task_to_schedule_ if it + /// becomes feasible to schedule. + void TryLocalInfeasibleTaskScheduling(); + const NodeID &self_node_id_; std::shared_ptr cluster_resource_scheduler_; + /// Function to make task dependencies to be local. std::function fulfills_dependencies_func_; + /// Function to check if the owner is alive on a given node. std::function is_owner_alive_; + /// Function to get the node information of a given node id. NodeInfoGetter get_node_info_; + /// Function to announce infeasible task to GCS. + std::function announce_infeasible_task_; const int max_resource_shapes_per_load_report_; const bool report_worker_backlog_; @@ -143,6 +154,10 @@ class ClusterTaskManager { /// Tasks move from waiting -> dispatch. absl::flat_hash_map waiting_tasks_; + /// Queue of lease requests that are infeasible. + /// Tasks go between scheduling <-> infeasible. + std::unordered_map> infeasible_tasks_; + /// Track the cumulative backlog of all workers requesting a lease to this raylet. std::unordered_map backlog_tracker_; diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index ddda8fed5..023390632 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -101,6 +101,7 @@ class ClusterTaskManagerTest : public ::testing::Test { dependencies_fulfilled_(true), is_owner_alive_(true), node_info_calls_(0), + announce_infeasible_task_calls_(0), task_manager_(id_, scheduler_, [this](const Task &_task) { fulfills_dependencies_calls_++; @@ -112,7 +113,8 @@ class ClusterTaskManagerTest : public ::testing::Test { [this](const NodeID &node_id) { node_info_calls_++; return node_info_[node_id]; - }) {} + }, + [this](const Task &task) { announce_infeasible_task_calls_++; }) {} void SetUp() {} @@ -141,6 +143,7 @@ class ClusterTaskManagerTest : public ::testing::Test { bool is_owner_alive_; int node_info_calls_; + int announce_infeasible_task_calls_; std::unordered_map> node_info_; ClusterTaskManager task_manager_; @@ -371,6 +374,43 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { ASSERT_EQ(leased_workers_.size(), 1); } +TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) { + /* Make sure cancelTask works for infeasible tasks */ + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + + Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}}); + rpc::RequestWorkerLeaseReply reply; + + bool callback_called = false; + bool *callback_called_ptr = &callback_called; + auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + + task_manager_.QueueTask(task, &reply, callback); + + // Task is now queued so cancellation works. + ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + // Task will not execute. + ASSERT_TRUE(callback_called); + ASSERT_TRUE(reply.canceled()); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_EQ(pool_.workers.size(), 1); + + // Althoug the feasible node is added, task shouldn't be executed because it is + // cancelled. + auto remote_node_id = NodeID::FromRandom(); + AddNode(remote_node_id, 12); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + ASSERT_TRUE(callback_called); + ASSERT_TRUE(reply.canceled()); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_EQ(pool_.workers.size(), 1); +} + TEST_F(ClusterTaskManagerTest, HeartbeatTest) { std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); @@ -570,6 +610,72 @@ TEST_F(ClusterTaskManagerTest, OwnerDeadTest) { ASSERT_EQ(pool_.workers.size(), 1); } +TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) { + /* + Test if infeasible tasks warnings are printed. + */ + // Create an infeasible task. + Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}}); + rpc::RequestWorkerLeaseReply reply; + std::shared_ptr callback_occurred = std::make_shared(false); + auto callback = [callback_occurred]() { *callback_occurred = true; }; + task_manager_.QueueTask(task, &reply, callback); + task_manager_.SchedulePendingTasks(); + ASSERT_EQ(announce_infeasible_task_calls_, 1); + + // Infeasible warning shouldn't be reprinted when the previous task is still infeasible + // after adding a new node. + AddNode(NodeID::FromRandom(), 8); + task_manager_.SchedulePendingTasks(); + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + // Task shouldn't be scheduled yet. + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + ASSERT_EQ(announce_infeasible_task_calls_, 1); + ASSERT_FALSE(*callback_occurred); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_EQ(pool_.workers.size(), 1); + + // Now we have a node that is feasible to schedule the task. Make sure the infeasible + // task is spillbacked properly. + auto remote_node_id = NodeID::FromRandom(); + AddNode(remote_node_id, 12); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + // Make sure nothing happens locally. + ASSERT_EQ(announce_infeasible_task_calls_, 1); + ASSERT_TRUE(*callback_occurred); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_EQ(pool_.workers.size(), 1); + // Make sure the spillback callback is called. + ASSERT_EQ(reply.retry_at_raylet_address().raylet_id(), remote_node_id.Binary()); +} + +TEST_F(ClusterTaskManagerTest, TestMultipleInfeasibleTasksWarnOnce) { + /* + Test infeasible warning is printed only once when the same shape is queued again. + */ + + // Make sure the first infeasible task announces warning. + Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}}); + rpc::RequestWorkerLeaseReply reply; + std::shared_ptr callback_occurred = std::make_shared(false); + auto callback = [callback_occurred]() { *callback_occurred = true; }; + task_manager_.QueueTask(task, &reply, callback); + task_manager_.SchedulePendingTasks(); + ASSERT_EQ(announce_infeasible_task_calls_, 1); + + // Make sure the same shape infeasible task won't be announced. + Task task2 = CreateTask({{ray::kCPU_ResourceLabel, 12}}); + rpc::RequestWorkerLeaseReply reply2; + std::shared_ptr callback_occurred2 = std::make_shared(false); + auto callback2 = [callback_occurred2]() { *callback_occurred2 = true; }; + task_manager_.QueueTask(task2, &reply2, callback2); + task_manager_.SchedulePendingTasks(); + ASSERT_EQ(announce_infeasible_task_calls_, 1); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();