diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index dc4562be2..648b4f4f9 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -439,3 +439,7 @@ def format_web_url(url): if not url.startswith("http://"): return "http://" + url return url + + +def new_scheduler_enabled(): + return os.environ.get("RAY_ENABLE_NEW_SCHEDULER") == "1" diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 90faadfb0..d9676022c 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -11,7 +11,13 @@ SRCS = [] + select({ py_test_module_list( files = [ "test_async.py", + "test_actor.py", + "test_actor_failures.py", + "test_actor_advanced.py", + "test_advanced.py", + "test_advanced_2.py", "test_basic.py", + "test_basic_2.py", "test_cli.py", "test_component_failures_3.py", "test_error_ray_not_initialized.py", @@ -27,15 +33,9 @@ py_test_module_list( py_test_module_list( files = [ - "test_actor_advanced.py", - "test_actor_failures.py", - "test_actor.py", "test_actor_resources.py", - "test_advanced_2.py", "test_advanced_3.py", - "test_advanced.py", "test_array.py", - "test_basic_2.py", "test_cancel.py", "test_component_failures_2.py", "test_dynres.py", diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index b599ec5c1..2eec9fa56 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -12,7 +12,7 @@ import ray import ray.test_utils import ray.cluster_utils from ray.test_utils import (run_string_as_driver, get_non_head_nodes, - wait_for_condition) + wait_for_condition, new_scheduler_enabled) from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put @@ -91,6 +91,7 @@ def test_actor_load_balancing(ray_start_cluster): ray.get(results) +@pytest.mark.skipif(new_scheduler_enabled(), reason="multi node broken") def test_actor_lifetime_load_balancing(ray_start_cluster): cluster = ray_start_cluster cluster.add_node(num_cpus=0) @@ -943,6 +944,7 @@ def test_actor_creation_task_crash(ray_start_regular): } }], indirect=True) +@pytest.mark.skipif(new_scheduler_enabled(), reason="todo hangs") def test_pending_actor_removed_by_owner(ray_start_regular): # Verify when an owner of pending actors is killed, the actor resources # are correctly returned. diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 7ddd4b26b..a6b9c381b 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -14,6 +14,7 @@ from ray.test_utils import ( wait_for_pid_to_exit, generate_system_config_map, get_other_nodes, + new_scheduler_enabled, SignalActor, ) @@ -265,6 +266,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster): assert result == 1 or result == results[-1] + 1 +@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic resources todo") def test_actor_restart_without_task(ray_start_regular): """Test a dead actor can be restarted without sending task to it.""" @@ -483,6 +485,7 @@ def test_decorated_method(ray_start_regular): "num_cpus": 1, "num_nodes": 3, }], indirect=True) +@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic resources todo") def test_ray_wait_dead_actor(ray_start_cluster): """Tests that methods completed by dead actors are returned as ready""" cluster = ray_start_cluster diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index f1f96031d..60fccec7e 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -14,6 +14,7 @@ import ray.test_utils from ray.test_utils import ( RayTestTimeoutException, wait_for_condition, + new_scheduler_enabled, ) logger = logging.getLogger(__name__) @@ -251,6 +252,7 @@ def test_zero_cpus(shutdown_only): ray.get(x) +@pytest.mark.skipif(new_scheduler_enabled(), reason="zero cpu handling") def test_zero_cpus_actor(ray_start_cluster): cluster = ray_start_cluster cluster.add_node(num_cpus=0) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d7ef3674b..310a73e05 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2970,6 +2970,10 @@ std::string NodeManager::DebugString() const { uint64_t now_ms = current_time_ms(); result << "NodeManager:"; result << "\nInitialConfigResources: " << initial_config_.resource_config.ToString(); + if (cluster_task_manager_ != nullptr) { + result << "\nClusterTaskManager:\n"; + result << cluster_task_manager_->DebugString(); + } result << "\nClusterResources:"; for (auto &pair : cluster_resource_map_) { result << "\n" << pair.first.Hex() << ": " << pair.second.DebugString(); diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index 926aa4305..91856c5b8 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -311,6 +311,20 @@ TaskResourceInstances NodeResourceInstances::GetAvailableResourceInstances() { return task_resources; }; +bool TaskRequest::IsEmpty() const { + for (size_t i = 0; i < this->predefined_resources.size(); i++) { + if (this->predefined_resources[i].demand != 0) { + return false; + } + } + for (size_t i = 0; i < this->custom_resources.size(); i++) { + if (this->custom_resources[i].demand != 0) { + return false; + } + } + return true; +} + std::string TaskRequest::DebugString() const { std::stringstream buffer; buffer << " {"; diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index ef2bdf714..1d1f8c592 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -79,6 +79,8 @@ class TaskRequest { /// the task will run on a different node in the cluster, if none of the /// nodes in this list can schedule this task. absl::flat_hash_set placement_hints; + /// Check whether the request contains no resources. + bool IsEmpty() const; /// Returns human-readable string for this task request. std::string DebugString() const; }; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 1e48ab1fc..3646fabae 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -159,6 +159,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, } int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task_req, + bool actor_creation, int64_t *total_violations) { // Minimum number of soft violations across all nodes that can schedule the request. // We will pick the node with the smallest number of soft violations. @@ -167,6 +168,25 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task int64_t best_node = -1; *total_violations = 0; + if (actor_creation && task_req.IsEmpty()) { + // This an actor which requires no resources. + // Pick a random node to to avoid all scheduling all actors on the local node. + if (nodes_.size() > 0) { + int idx = std::rand() % nodes_.size(); + for (auto &node : nodes_) { + if (idx == 0) { + best_node = node.first; + break; + } + idx--; + } + } + RAY_LOG(DEBUG) << "GetBestSchedulableNode, best_node = " << best_node + << ", # nodes = " << nodes_.size() + << ", task_req = " << task_req.DebugString(); + return best_node; + } + // Check whether local node is schedulable. We return immediately // the local node only if there are zero violations. auto it = nodes_.find(local_node_id_); @@ -211,10 +231,11 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task } std::string ClusterResourceScheduler::GetBestSchedulableNode( - const std::unordered_map &task_resources, + const std::unordered_map &task_resources, bool actor_creation, int64_t *total_violations) { TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); - int64_t node_id = GetBestSchedulableNode(task_request, total_violations); + int64_t node_id = + GetBestSchedulableNode(task_request, actor_creation, total_violations); std::string id_string; if (node_id == -1) { diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 1b4d584b5..c71859f3e 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -145,13 +145,15 @@ class ClusterResourceScheduler { /// Finally, if no such node exists, return -1. /// /// \param task_request: Task to be scheduled. + /// \param actor_creation: True if this is an actor creation task. /// \param violations: The number of soft constraint violations associated /// with the node returned by this function (assuming /// a node that can schedule task_req is found). /// /// \return -1, if no node can schedule the current request; otherwise, /// return the ID of a node that can schedule the task request. - int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t *violations); + int64_t GetBestSchedulableNode(const TaskRequest &task_request, bool actor_creation, + int64_t *violations); /// Similar to /// int64_t GetBestSchedulableNode(const TaskRequest &task_request, int64_t @@ -161,7 +163,8 @@ class ClusterResourceScheduler { /// return the ID in string format of a node that can schedule the // task request. std::string GetBestSchedulableNode( - const std::unordered_map &task_request, int64_t *violations); + const std::unordered_map &task_request, bool actor_creation, + int64_t *violations); /// Decrease the available resources of a node when a task request is /// scheduled on the given node. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 31e252dad..08a8c7d30 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -331,7 +331,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); @@ -428,7 +429,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_EQ(node_id, -1); } // Predefined resources, soft constraint violation @@ -439,7 +441,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -452,7 +455,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); } @@ -467,7 +471,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id == -1); } // Custom resources, soft constraint violation. @@ -481,7 +486,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -496,7 +502,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); } @@ -511,7 +518,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id == -1); } // Custom resource missing, soft constraint violation. @@ -525,7 +533,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, EmptyIntVector); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -541,7 +550,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, placement_hints); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations > 0); } @@ -557,7 +567,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, placement_hints); int64_t violations; - int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + int64_t node_id = + cluster_resources.GetBestSchedulableNode(task_req, false, &violations); ASSERT_TRUE(node_id != -1); ASSERT_TRUE(violations == 0); } diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 5d1e4e419..38fa403ac 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -36,7 +36,7 @@ bool ClusterTaskManager::SchedulePendingTasks() { // TODO (Alex): We should distinguish between infeasible tasks and a fully // utilized cluster. std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( - request_resources, &_unused); + request_resources, task.GetTaskSpecification().IsActorCreationTask(), &_unused); if (node_id_string.empty()) { // There is no node that has available resources to run the request. // Move on to the next shape. @@ -178,12 +178,20 @@ void ClusterTaskManager::HandleTaskFinished(std::shared_ptr wor worker->ClearAllocatedInstances(); } +void ReplyCancelled(Work &work) { + auto reply = std::get<1>(work); + auto callback = std::get<2>(work); + reply->set_canceled(true); + callback(); +} + bool ClusterTaskManager::CancelTask(const TaskID &task_id) { for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end(); shapes_it++) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) { + ReplyCancelled(*work_it); work_queue.erase(work_it); if (work_queue.empty()) { tasks_to_schedule_.erase(shapes_it); @@ -197,6 +205,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) { + ReplyCancelled(*work_it); work_queue.erase(work_it); if (work_queue.empty()) { tasks_to_dispatch_.erase(shapes_it); @@ -208,6 +217,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { auto iter = waiting_tasks_.find(task_id); if (iter != waiting_tasks_.end()) { + ReplyCancelled(iter->second); waiting_tasks_.erase(iter); return true; } diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 11ad743b4..d9cee05e2 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -266,10 +266,13 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { task_manager_.QueueTask(task, &reply, callback); // Task is now queued so cancellation works. + callback_called = false; + reply.Clear(); ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); // Task will not execute. - ASSERT_FALSE(callback_called); + ASSERT_TRUE(callback_called); + ASSERT_TRUE(reply.canceled()); ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(pool_.workers.size(), 1); @@ -277,9 +280,12 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { task_manager_.SchedulePendingTasks(); // We can still cancel the task if it's on the dispatch queue. + callback_called = false; + reply.Clear(); ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); // Task will not execute. - ASSERT_FALSE(callback_called); + ASSERT_TRUE(reply.canceled()); + ASSERT_TRUE(callback_called); ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(pool_.workers.size(), 1); @@ -288,9 +294,12 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); // Task is now running so we can't cancel it. + callback_called = false; + reply.Clear(); ASSERT_FALSE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); // Task will not execute. - ASSERT_TRUE(callback_called); + ASSERT_FALSE(reply.canceled()); + ASSERT_FALSE(callback_called); ASSERT_EQ(pool_.workers.size(), 0); ASSERT_EQ(leased_workers_.size(), 1); } diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 30dd33ffe..ccf0303a6 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1048,7 +1048,7 @@ std::string WorkerPool::DebugString() const { result << "\n- num " << Language_Name(entry.first) << " drivers: " << entry.second.registered_drivers.size(); } - result << "- num idle workers: " << idle_of_all_languages_.size(); + result << "\n- num idle workers: " << idle_of_all_languages_.size(); return result.str(); }