diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 7a82fd713..bf94a8f18 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -19,7 +19,7 @@ from ray import resource_spec import setproctitle from ray.test_utils import (check_call_ray, RayTestTimeoutException, - wait_for_num_actors) + wait_for_condition, wait_for_num_actors) logger = logging.getLogger(__name__) @@ -63,6 +63,34 @@ def test_load_balancing(ray_start_cluster): attempt_to_load_balance(f, [], 1000, num_nodes, 100) +def test_local_scheduling_first(ray_start_cluster): + cluster = ray_start_cluster + num_cpus = 8 + # Disable worker caching. + cluster.add_node( + num_cpus=num_cpus, + _internal_config=json.dumps({ + "worker_lease_timeout_milliseconds": 0, + })) + cluster.add_node(num_cpus=num_cpus) + ray.init(address=cluster.address) + + @ray.remote + def f(): + time.sleep(0.01) + return ray.worker.global_worker.node.unique_id + + def local(): + return ray.get(f.remote()) == ray.worker.global_worker.node.unique_id + + # Wait for a worker to get started. + wait_for_condition(local) + + # Check that we are scheduling locally while there are resources available. + for i in range(20): + assert local() + + def test_load_balancing_with_dependencies(ray_start_cluster): # This test ensures that tasks are being assigned to all raylets in a # roughly equal manner even when the tasks have dependencies. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 094236e91..060c0ff1c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -919,7 +919,8 @@ void NodeManager::TryLocalInfeasibleTaskScheduling() { SchedulingResources &new_local_resources = cluster_resource_map_[self_node_id_]; // SpillOver locally to figure out which infeasible tasks can be placed now - std::vector decision = scheduling_policy_.SpillOver(new_local_resources); + std::vector decision = + scheduling_policy_.SpillOverInfeasibleTasks(new_local_resources); std::unordered_set local_task_ids(decision.begin(), decision.end()); @@ -986,7 +987,8 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id, } // Extract decision for this raylet. - auto decision = scheduling_policy_.SpillOver(remote_resources); + auto decision = scheduling_policy_.SpillOver(remote_resources, + cluster_resource_map_[self_node_id_]); std::unordered_set local_task_ids; for (const auto &task_id : decision) { // (See design_docs/task_states.rst for the state transition diagram.) diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index 43521e553..cd6d33b9b 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -53,7 +53,25 @@ std::unordered_map SchedulingPolicy::Schedule( const auto &resource_demand = spec.GetRequiredPlacementResources(); const TaskID &task_id = spec.TaskId(); - // TODO(atumanov): try to place tasks locally first. + // Try to place tasks locally first. + const auto &local_resources = cluster_resources[local_client_id]; + ResourceSet available_local_resources = + ResourceSet(local_resources.GetAvailableResources()); + // We have to subtract the current "load" because we set the current "load" + // to be the resources used by tasks that are in the + // `SchedulingQueue::ready_queue_` in NodeManager::HandleWorkerAvailable's + // call to SchedulingQueue::GetResourceLoad. + available_local_resources.SubtractResources(local_resources.GetLoadResources()); + if (resource_demand.IsSubset(available_local_resources)) { + // This node is a feasible candidate. + decision[task_id] = local_client_id; + + ResourceSet new_load(cluster_resources[local_client_id].GetLoadResources()); + new_load.AddResources(resource_demand); + cluster_resources[local_client_id].SetLoadResources(std::move(new_load)); + continue; + } + // Construct a set of viable node candidates and randomly pick between them. // Get all the client id keys and randomly pick. std::vector client_keys; @@ -165,37 +183,67 @@ bool SchedulingPolicy::ScheduleBundle( return resource_demand.IsSubset(available_node_resources); } -std::vector SchedulingPolicy::SpillOver( - SchedulingResources &remote_scheduling_resources) const { +std::vector SchedulingPolicy::SpillOverInfeasibleTasks( + SchedulingResources &node_resources) const { // The policy decision to be returned. std::vector decision; - - ResourceSet new_load(remote_scheduling_resources.GetLoadResources()); + ResourceSet new_load(node_resources.GetLoadResources()); // Check if we can accommodate infeasible tasks. for (const auto &task : scheduling_queue_.GetTasks(TaskState::INFEASIBLE)) { const auto &spec = task.GetTaskSpecification(); const auto &placement_resources = spec.GetRequiredPlacementResources(); - if (placement_resources.IsSubset(remote_scheduling_resources.GetTotalResources())) { + if (placement_resources.IsSubset(node_resources.GetTotalResources())) { decision.push_back(spec.TaskId()); new_load.AddResources(spec.GetRequiredResources()); } } + node_resources.SetLoadResources(std::move(new_load)); + return decision; +} +std::vector SchedulingPolicy::SpillOver( + SchedulingResources &remote_resources, SchedulingResources &local_resources) const { + // First try to spill infeasible tasks. + auto decision = SpillOverInfeasibleTasks(remote_resources); + + // Get local available resources. + ResourceSet available_local_resources = + ResourceSet(local_resources.GetAvailableResources()); + available_local_resources.SubtractResources(local_resources.GetLoadResources()); // Try to accommodate up to a single ready task. - for (const auto &task : scheduling_queue_.GetTasks(TaskState::READY)) { - const auto &spec = task.GetTaskSpecification(); - if (!spec.IsActorTask()) { + bool task_spilled = false; + for (const auto &queue : scheduling_queue_.GetReadyTasksByClass()) { + // Skip tasks for which there are resources available locally. + const auto &task_resources = + TaskSpecification::GetSchedulingClassDescriptor(queue.first); + if (task_resources.IsSubset(available_local_resources)) { + continue; + } + // Try to spill one task. + for (const auto &task_id : queue.second) { + const auto &task = scheduling_queue_.GetTaskOfState(task_id, TaskState::READY); + const auto &spec = task.GetTaskSpecification(); // Make sure the node has enough available resources to prevent forwarding cycles. if (spec.GetRequiredPlacementResources().IsSubset( - remote_scheduling_resources.GetAvailableResources())) { + remote_resources.GetAvailableResources())) { + // Update the scheduling resources. + ResourceSet new_remote_load(remote_resources.GetLoadResources()); + new_remote_load.AddResources(spec.GetRequiredResources()); + remote_resources.SetLoadResources(std::move(new_remote_load)); + ResourceSet new_local_load(local_resources.GetLoadResources()); + new_local_load.SubtractResources(spec.GetRequiredResources()); + local_resources.SetLoadResources(std::move(new_local_load)); + decision.push_back(spec.TaskId()); - new_load.AddResources(spec.GetRequiredResources()); + task_spilled = true; break; } } + if (task_spilled) { + break; + } } - remote_scheduling_resources.SetLoadResources(std::move(new_load)); return decision; } diff --git a/src/ray/raylet/scheduling_policy.h b/src/ray/raylet/scheduling_policy.h index d921ad1d7..ea10218de 100644 --- a/src/ray/raylet/scheduling_policy.h +++ b/src/ray/raylet/scheduling_policy.h @@ -61,14 +61,23 @@ class SchedulingPolicy { std::unordered_map &cluster_resources, const ClientID &local_client_id, const ray::BundleSpecification &bundle_spec); + /// \brief Given a set of cluster resources, try to spillover infeasible tasks. + /// + /// \param node_resources The resource information for a node. This may be + /// the local node. + /// \return Tasks that should be spilled to this node. + std::vector SpillOverInfeasibleTasks(SchedulingResources &node_resources) const; + /// \brief Given a set of cluster resources perform a spill-over scheduling operation. /// - /// \param cluster_resources: a set of cluster resources containing resource and load - /// information for some subset of the cluster. For all client IDs in the returned - /// placement map, the corresponding SchedulingResources::resources_load_ is - /// incremented by the aggregate resource demand of the tasks assigned to it. - /// \return Scheduling decision, mapping tasks to raylets for placement. - std::vector SpillOver(SchedulingResources &remote_scheduling_resources) const; + /// \param remote_resources The resource information for a remote node. This + /// is guaranteed to not be the local node. The load info is updated if a + /// task is spilled. + /// \param local_resources The resource information for the local node. The + /// load info is updated if a task is spilled. + /// \return Tasks that should be spilled to this node. + std::vector SpillOver(SchedulingResources &remote_resources, + SchedulingResources &local_resources) const; /// \brief SchedulingPolicy destructor. virtual ~SchedulingPolicy();