diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c4a8e02a6..5a71bdd4e 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -105,6 +105,31 @@ def test_simple_serialization(ray_start_regular): assert type(obj) == type(new_obj_2) +def test_fair_queueing(shutdown_only): + ray.init( + num_cpus=1, _internal_config=json.dumps({ + "fair_queueing_enabled": 1 + })) + + @ray.remote + def h(): + return 0 + + @ray.remote + def g(): + return ray.get(h.remote()) + + @ray.remote + def f(): + return ray.get(g.remote()) + + # This will never finish without fair queueing of {f, g, h}: + # https://github.com/ray-project/ray/issues/3644 + ready, _ = ray.wait( + [f.remote() for _ in range(1000)], timeout=60.0, num_returns=1000) + assert len(ready) == 1000, len(ready) + + def test_complex_serialization(ray_start_regular): def assert_equal(obj1, obj2): module_numpy = (type(obj1).__module__ == np.__name__ diff --git a/python/ray/tests/test_signal.py b/python/ray/tests/test_signal.py index 85e6cc6a5..066281d15 100644 --- a/python/ray/tests/test_signal.py +++ b/python/ray/tests/test_signal.py @@ -335,7 +335,9 @@ def test_receiving_on_two_returns(ray_start_regular): or (x == results[1][0] and y == results[0][0])) -def test_serial_tasks_reading_same_signal(ray_start_regular): +def test_serial_tasks_reading_same_signal(shutdown_only): + ray.init(num_cpus=2) + @ray.remote def send_signal(value): signal.send(UserSignal(value)) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index e0720006a..db6547d3d 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -34,6 +34,12 @@ RAY_CONFIG(uint64_t, num_heartbeats_warning, 5) /// The duration between dumping debug info to logs, or -1 to disable. RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000) +/// Whether to enable fair queueing between task classes in raylet. When +/// fair queueing is enabled, the raylet will try to balance the number +/// of running tasks by class (i.e., function name). This prevents one +/// type of task from starving other types (see issue #3664). +RAY_CONFIG(bool, fair_queueing_enabled, true) + /// The initial period for a task execution lease. The lease will expire this /// many milliseconds after the first acquisition of the lease. Nodes that /// require an object will not try to reconstruct the task until at least diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 4512d4020..41b4f3542 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -5,6 +5,19 @@ namespace ray { +std::unordered_map + TaskSpecification::sched_cls_to_id_; +std::unordered_map + TaskSpecification::sched_id_to_cls_; +int TaskSpecification::next_sched_id_; + +SchedulingClassDescriptor &TaskSpecification::GetSchedulingClassDescriptor( + SchedulingClass id) { + auto it = sched_id_to_cls_.find(id); + RAY_CHECK(it != sched_id_to_cls_.end()) << "invalid id: " << id; + return it->second; +} + void TaskSpecification::ComputeResources() { auto required_resources = MapFromProtobuf(message_->required_resources()); auto required_placement_resources = @@ -14,6 +27,25 @@ void TaskSpecification::ComputeResources() { } required_resources_.reset(new ResourceSet(required_resources)); required_placement_resources_.reset(new ResourceSet(required_placement_resources)); + + // Map the scheduling class descriptor to an integer for performance. + auto sched_cls = std::make_pair(GetRequiredResources(), FunctionDescriptor()); + auto it = sched_cls_to_id_.find(sched_cls); + if (it == sched_cls_to_id_.end()) { + sched_cls_id_ = ++next_sched_id_; + // TODO(ekl) we might want to try cleaning up task types in these cases + if (sched_cls_id_ > 100) { + RAY_LOG(WARNING) << "More than " << sched_cls_id_ + << " types of tasks seen, this may reduce performance."; + } else if (sched_cls_id_ > 1000) { + RAY_LOG(ERROR) << "More than " << sched_cls_id_ + << " types of tasks seen, this may reduce performance."; + } + sched_cls_to_id_[sched_cls] = sched_cls_id_; + sched_id_to_cls_[sched_cls_id_] = sched_cls; + } else { + sched_cls_id_ = it->second; + } } // Task specification getter methods. @@ -33,6 +65,11 @@ std::vector TaskSpecification::FunctionDescriptor() const { return VectorFromProtobuf(message_->function_descriptor()); } +const SchedulingClass TaskSpecification::GetSchedulingClass() const { + RAY_CHECK(sched_cls_id_ > 0); + return sched_cls_id_; +} + size_t TaskSpecification::NumArgs() const { return message_->args_size(); } size_t TaskSpecification::NumReturns() const { return message_->num_returns(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 31de3f704..e69241ec1 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -17,6 +17,10 @@ extern "C" { namespace ray { +typedef std::vector FunctionDescriptor; +typedef std::pair SchedulingClassDescriptor; +typedef int SchedulingClass; + /// Wrapper class of protobuf `TaskSpec`, see `common.proto` for details. class TaskSpecification : public MessageWrapper { public: @@ -78,6 +82,13 @@ class TaskSpecification : public MessageWrapper { size_t ArgMetadataSize(size_t arg_index) const; + /// Return the scheduling class of the task. The scheduler makes a best effort + /// attempt to fairly dispatch tasks of different classes, preventing + /// starvation of any single class of task. + /// + /// \return The scheduling class used for fair task queueing. + const SchedulingClass GetSchedulingClass() const; + /// Return the resources that are to be acquired during the execution of this /// task. /// @@ -137,16 +148,40 @@ class TaskSpecification : public MessageWrapper { std::string DebugString() const; + static SchedulingClassDescriptor &GetSchedulingClassDescriptor(SchedulingClass id); + private: void ComputeResources(); + /// Field storing required resources. Initalized in constructor. /// TODO(ekl) consider optimizing the representation of ResourceSet for fast copies /// instead of keeping shared ptrs here. std::shared_ptr required_resources_; /// Field storing required placement resources. Initalized in constructor. std::shared_ptr required_placement_resources_; + /// Cached scheduling class of this task. + SchedulingClass sched_cls_id_; + + /// Keep global static id mappings for SchedulingClass for performance. + static std::unordered_map sched_cls_to_id_; + static std::unordered_map sched_id_to_cls_; + static int next_sched_id_; }; } // namespace ray +/// We must define the hash since it's not auto-defined for vectors. +namespace std { +template <> +struct hash { + size_t operator()(ray::SchedulingClassDescriptor const &k) const { + size_t seed = std::hash()(k.first); + for (const auto &str : k.second) { + seed ^= std::hash()(str); + } + return seed; + } +}; +} // namespace std + #endif // RAY_COMMON_TASK_TASK_SPEC_H diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index bdc67ba4c..b4e4b744a 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -123,6 +123,8 @@ int main(int argc, char *argv[]) { RayConfig::instance().heartbeat_timeout_milliseconds(); node_manager_config.debug_dump_period_ms = RayConfig::instance().debug_dump_period_milliseconds(); + node_manager_config.fair_queueing_enabled = + RayConfig::instance().fair_queueing_enabled(); node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size(); node_manager_config.store_socket_name = store_socket_name; node_manager_config.temp_dir = temp_dir; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fdd6b9fcf..1b2074a9f 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1,5 +1,6 @@ #include "ray/raylet/node_manager.h" +#include #include #include "ray/common/status.h" @@ -78,6 +79,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, heartbeat_timer_(io_service), heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), debug_dump_period_(config.debug_dump_period_ms), + fair_queueing_enabled_(config.fair_queueing_enabled), temp_dir_(config.temp_dir), object_manager_profile_timer_(io_service), initial_config_(config), @@ -755,24 +757,46 @@ void NodeManager::ProcessNewClient(LocalClientConnection &client) { client.ProcessMessages(); } -// A helper function to create a mapping from resource shapes to -// tasks with that resource shape from a given list of tasks. -std::unordered_map> MakeTasksWithResources( +// A helper function to create a mapping from task scheduling class to +// tasks with that class from a given list of tasks. +std::unordered_map> MakeTasksByClass( const std::vector &tasks) { - std::unordered_map> result; + std::unordered_map> result; for (const auto &task : tasks) { auto spec = task.GetTaskSpecification(); - result[spec.GetRequiredResources()].push_back(spec.TaskId()); + result[spec.GetSchedulingClass()].push_back(spec.TaskId()); } return result; } void NodeManager::DispatchTasks( - const std::unordered_map> &tasks_with_resources) { + const std::unordered_map> &tasks_by_class) { std::unordered_set removed_task_ids; - for (const auto &it : tasks_with_resources) { - const auto &task_resources = it.first; - for (const auto &task_id : it.second) { + + // Dispatch tasks in priority order by class. This avoids starvation problems where + // one class of tasks become stuck behind others in the queue, causing Ray to start + // many workers. See #3644 for a more detailed description of this issue. + std::vector> *> fair_order; + for (auto &it : tasks_by_class) { + fair_order.emplace_back(&it); + } + // Prioritize classes that have fewer currently running tasks. Note that we only + // sort once per round of task dispatch, which is less fair then it could be, but + // is simpler and faster. + if (fair_queueing_enabled_) { + std::sort( + std::begin(fair_order), std::end(fair_order), + [this](const std::pair> *&a, + const std::pair> *&b) { + return local_queues_.NumRunning(a->first) < local_queues_.NumRunning(b->first); + }); + } + // Approximate fair round robin between classes. + for (const auto &it : fair_order) { + const auto &task_resources = + TaskSpecification::GetSchedulingClassDescriptor(it->first).first; + // FIFO order within each class. + for (const auto &task_id : it->second) { const auto &task = local_queues_.GetTaskOfState(task_id, TaskState::READY); if (!local_available_resources_.Contains(task_resources)) { // All the tasks in it.second have the same resource shape, so @@ -985,7 +1009,7 @@ void NodeManager::HandleWorkerAvailable( cluster_resource_map_[local_client_id].SetLoadResources( local_queues_.GetResourceLoad()); // Call task dispatch to assign work to the new worker. - DispatchTasks(local_queues_.GetReadyTasksWithResources()); + DispatchTasks(local_queues_.GetReadyTasksByClass()); } void NodeManager::ProcessDisconnectClientMessage( @@ -1090,7 +1114,7 @@ void NodeManager::ProcessDisconnectClientMessage( << "job_id: " << worker->GetAssignedJobId(); // Since some resources may have been released, we can try to dispatch more tasks. - DispatchTasks(local_queues_.GetReadyTasksWithResources()); + DispatchTasks(local_queues_.GetReadyTasksByClass()); } else if (is_driver) { // The client is a driver. const auto job_id = worker->GetAssignedJobId(); @@ -1684,7 +1708,7 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr worker->MarkBlocked(); // Try dispatching tasks since we may have released some resources. - DispatchTasks(local_queues_.GetReadyTasksWithResources()); + DispatchTasks(local_queues_.GetReadyTasksByClass()); } } else { // The client is a driver. Drivers do not hold resources, so we simply mark @@ -1782,7 +1806,7 @@ void NodeManager::EnqueuePlaceableTask(const Task &task) { // (See design_docs/task_states.rst for the state transition diagram.) if (args_ready) { local_queues_.QueueTasks({task}, TaskState::READY); - DispatchTasks(MakeTasksWithResources({task})); + DispatchTasks(MakeTasksByClass({task})); } else { local_queues_.QueueTasks({task}, TaskState::WAITING); } @@ -1881,8 +1905,8 @@ void NodeManager::FinishAssignedTask(Worker &worker) { task_resources.ToResourceSet()); worker.ResetTaskResourceIds(); - if (task.GetTaskSpecification().IsActorCreationTask() || - task.GetTaskSpecification().IsActorTask()) { + const auto &spec = task.GetTaskSpecification(); + if (spec.IsActorCreationTask() || spec.IsActorTask()) { // If this was an actor or actor creation task, handle the actor's new // state. FinishAssignedActorTask(worker, task); @@ -1898,8 +1922,7 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // Unset the worker's assigned task. worker.AssignTaskId(TaskID::Nil()); // Unset the worker's assigned job Id if this is not an actor. - if (!task.GetTaskSpecification().IsActorCreationTask() && - !task.GetTaskSpecification().IsActorTask()) { + if (!spec.IsActorCreationTask() && !spec.IsActorTask()) { worker.AssignJobId(JobID::Nil()); } } @@ -2227,7 +2250,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { // Queue and dispatch the tasks that are ready to run (i.e., WAITING). auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set); local_queues_.QueueTasks(ready_tasks, TaskState::READY); - DispatchTasks(MakeTasksWithResources(ready_tasks)); + DispatchTasks(MakeTasksByClass(ready_tasks)); } } @@ -2464,7 +2487,7 @@ void NodeManager::FinishAssignTask(const TaskID &task_id, Worker &worker, bool s // assigned to a worker once one becomes available. // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueTasks({assigned_task}, TaskState::READY); - DispatchTasks(MakeTasksWithResources({assigned_task})); + DispatchTasks(MakeTasksByClass({assigned_task})); } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 2afd90460..58b4b381a 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -52,6 +52,8 @@ struct NodeManagerConfig { uint64_t heartbeat_period_ms; /// The time between debug dumps in milliseconds, or -1 to disable. uint64_t debug_dump_period_ms; + /// Whether to enable fair queueing between task classes in raylet. + bool fair_queueing_enabled; /// the maximum lineage size. uint64_t max_lineage_size; /// The store socket name. @@ -304,7 +306,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param tasks_with_resources Mapping from resource shapes to tasks with /// that resource shape. void DispatchTasks( - const std::unordered_map> &tasks_with_resources); + const std::unordered_map> &tasks_by_class); /// Handle a task that is blocked. This could be a task assigned to a worker, /// an out-of-band task (e.g., a thread created by the application), or a @@ -512,6 +514,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { std::chrono::milliseconds heartbeat_period_; /// The period between debug state dumps. int64_t debug_dump_period_; + /// Whether to enable fair queueing between task classes in raylet. + bool fair_queueing_enabled_; /// Whether we have printed out a resource deadlock warning. bool resource_deadlock_warned_ = false; /// The path to the ray temp dir. diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 8e20aacfa..85b2efba1 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -95,23 +95,23 @@ const ResourceSet &TaskQueue::GetCurrentResourceLoad() const { } bool ReadyQueue::AppendTask(const TaskID &task_id, const Task &task) { - const auto &resources = task.GetTaskSpecification().GetRequiredResources(); - tasks_with_resources_[resources].push_back(task_id); + const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); + tasks_by_class_[scheduling_class].push_back(task_id); return TaskQueue::AppendTask(task_id, task); } bool ReadyQueue::RemoveTask(const TaskID &task_id, std::vector *removed_tasks) { if (task_map_.find(task_id) != task_map_.end()) { - const auto &resources = - task_map_[task_id]->GetTaskSpecification().GetRequiredResources(); - tasks_with_resources_[resources].erase(task_id); + const auto &scheduling_class = + task_map_[task_id]->GetTaskSpecification().GetSchedulingClass(); + tasks_by_class_[scheduling_class].erase(task_id); } return TaskQueue::RemoveTask(task_id, removed_tasks); } -const std::unordered_map> - &ReadyQueue::GetTasksWithResources() const { - return tasks_with_resources_; +const std::unordered_map> + &ReadyQueue::GetTasksByClass() const { + return tasks_by_class_; } const std::list &SchedulingQueue::GetTasks(TaskState task_state) const { @@ -119,9 +119,9 @@ const std::list &SchedulingQueue::GetTasks(TaskState task_state) const { return queue->GetTasks(); } -const std::unordered_map> - &SchedulingQueue::GetReadyTasksWithResources() const { - return ready_queue_->GetTasksWithResources(); +const std::unordered_map> + &SchedulingQueue::GetReadyTasksByClass() const { + return ready_queue_->GetTasksByClass(); } const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id, @@ -223,6 +223,10 @@ void SchedulingQueue::RemoveTasksFromQueue(ray::raylet::TaskState task_state, if (queue->RemoveTask(task_id, removed_tasks)) { RAY_LOG(DEBUG) << "Removed task " << task_id << " from " << GetTaskStateString(task_state) << " queue"; + if (task_state == TaskState::RUNNING) { + num_running_tasks_ + [removed_tasks->back().GetTaskSpecification().GetSchedulingClass()] -= 1; + } it = task_ids.erase(it); } else { it++; @@ -351,6 +355,9 @@ void SchedulingQueue::QueueTasks(const std::vector &tasks, TaskState task_ for (const auto &task : tasks) { RAY_LOG(DEBUG) << "Added task " << task.GetTaskSpecification().TaskId() << " to " << GetTaskStateString(task_state) << " queue"; + if (task_state == TaskState::RUNNING) { + num_running_tasks_[task.GetTaskSpecification().GetSchedulingClass()] += 1; + } queue->AppendTask(task.GetTaskSpecification().TaskId(), task); } } @@ -417,6 +424,15 @@ const std::unordered_set &SchedulingQueue::GetDriverTaskIds() const { return driver_task_ids_; } +int SchedulingQueue::NumRunning(const SchedulingClass &cls) const { + auto it = num_running_tasks_.find(cls); + if (it == num_running_tasks_.end()) { + return 0; + } else { + return it->second; + } +} + std::string SchedulingQueue::DebugString() const { std::stringstream result; result << "SchedulingQueue:"; @@ -426,6 +442,30 @@ std::string SchedulingQueue::DebugString() const { << " tasks: " << GetTaskQueue(task_state)->GetTasks().size(); } result << "\n- num tasks blocked: " << blocked_task_ids_.size(); + result << "\nScheduledTaskCounts:"; + size_t total = 0; + for (const auto &pair : num_running_tasks_) { + result << "\n- "; + auto desc = TaskSpecification::GetSchedulingClassDescriptor(pair.first); + for (const auto &str : desc.second) { + // Only print the ASCII parts of the function descriptor. + bool ok = str.size() > 0; + for (char c : str) { + if (!isprint(c)) { + ok = false; + } + } + if (ok) { + result << str; + result << "."; + } + } + result << desc.first.ToString(); + result << ": " << pair.second; + total += pair.second; + } + RAY_CHECK(total == GetTaskQueue(TaskState::RUNNING)->GetTasks().size()) + << total << " vs " << GetTaskQueue(TaskState::RUNNING)->GetTasks().size(); return result.str(); } diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index 2c503dc38..3bea64911 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -132,12 +132,11 @@ class ReadyQueue : public TaskQueue { /// \brief Get a mapping from resource shape to tasks. /// /// \return Mapping from resource set to task IDs with these resource requirements. - const std::unordered_map> &GetTasksWithResources() - const; + const std::unordered_map> &GetTasksByClass() const; private: - /// Index from resource shape to tasks that require these resources. - std::unordered_map> tasks_with_resources_; + /// Index from task description to tasks queued of that type. + std::unordered_map> tasks_by_class_; }; /// \class SchedulingQueue @@ -183,7 +182,7 @@ class SchedulingQueue { /// Get a reference to the queue of ready tasks. /// /// \return A reference to the queue of ready tasks. - const std::unordered_map> &GetReadyTasksWithResources() + const std::unordered_map> &GetReadyTasksByClass() const; /// Get a task from the queue of a given state. The caller must ensure that @@ -302,6 +301,11 @@ class SchedulingQueue { /// \return Aggregate resource demand from ready tasks. ResourceSet GetReadyQueueResources() const; + /// Returns the number of running tasks in this class. + /// + /// \return int. + int NumRunning(const SchedulingClass &cls) const; + /// Returns debug string for class. /// /// \return string. @@ -331,6 +335,8 @@ class SchedulingQueue { // A pointer to the ready queue. const std::shared_ptr ready_queue_; + /// Track the breakdown of tasks by class in the RUNNING queue. + std::unordered_map num_running_tasks_; // A pointer to the task queues. These contain all tasks that have a task // state < TaskState::kNumTaskQueues. std::array, static_cast(TaskState::kNumTaskQueues)>