From ca36827f01d97e90756779fbdc962b589c75e4de Mon Sep 17 00:00:00 2001 From: Zhijun Fu <37800433+zhijunfu@users.noreply.github.com> Date: Thu, 2 Aug 2018 05:41:20 +0800 Subject: [PATCH] [Issues 2403][xray] Fix raylet performance issues on scheduling queue (#2438) * merge from ray * Revert "merge from ray" This reverts commit 32b181ebbb1fa184026631e1a7368112c4c3118d. * fix raylet performance regression * address comments * Update code after merging latest changes * fix lint * address comments --- src/ray/raylet/scheduling_queue.cc | 84 +++++++++++++++++++++++------- src/ray/raylet/scheduling_queue.h | 58 ++++++++++++++++++--- 2 files changed, 116 insertions(+), 26 deletions(-) diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index b6cd51b92..1fd9f59c5 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -6,15 +6,12 @@ namespace { // Helper function to remove tasks in the given set of task_ids from a // queue, and append them to the given vector removed_tasks. -void RemoveTasksFromQueue(std::list &queue, +void RemoveTasksFromQueue(ray::raylet::SchedulingQueue::TaskQueue &queue, std::unordered_set &task_ids, std::vector &removed_tasks) { - for (auto it = queue.begin(); it != queue.end();) { - auto task_id = task_ids.find(it->GetTaskSpecification().TaskId()); - if (task_id != task_ids.end()) { - task_ids.erase(task_id); - removed_tasks.push_back(std::move(*it)); - it = queue.erase(it); + for (auto it = task_ids.begin(); it != task_ids.end();) { + if (queue.RemoveTask(*it, removed_tasks)) { + it = task_ids.erase(it); } else { it++; } @@ -22,19 +19,22 @@ void RemoveTasksFromQueue(std::list &queue, } // Helper function to queue the given tasks to the given queue. -inline void QueueTasks(std::list &queue, +inline void QueueTasks(ray::raylet::SchedulingQueue::TaskQueue &queue, const std::vector &tasks) { - queue.insert(queue.end(), tasks.begin(), tasks.end()); + for (const auto &task : tasks) { + queue.AppendTask(task.GetTaskSpecification().TaskId(), task); + } } // Helper function to filter out tasks of a given state. -inline void FilterStateFromQueue(const std::list &queue, +inline void FilterStateFromQueue(const ray::raylet::SchedulingQueue::TaskQueue &queue, std::unordered_set &task_ids, ray::raylet::TaskState filter_state) { - for (auto it = queue.begin(); it != queue.end(); it++) { - auto task_id = task_ids.find(it->GetTaskSpecification().TaskId()); - if (task_id != task_ids.end()) { - task_ids.erase(task_id); + for (auto it = task_ids.begin(); it != task_ids.end();) { + if (queue.HasTask(*it)) { + it = task_ids.erase(it); + } else { + it++; } } } @@ -45,28 +45,72 @@ namespace ray { namespace raylet { +SchedulingQueue::TaskQueue::~TaskQueue() { + task_map_.clear(); + task_list_.clear(); +} + +bool SchedulingQueue::TaskQueue::AppendTask(const TaskID &task_id, const Task &task) { + RAY_CHECK(task_map_.find(task_id) == task_map_.end()); + auto list_iterator = task_list_.insert(task_list_.end(), task); + task_map_[task_id] = list_iterator; + return true; +} + +bool SchedulingQueue::TaskQueue::RemoveTask(const TaskID &task_id) { + auto task_found_iterator = task_map_.find(task_id); + if (task_found_iterator == task_map_.end()) { + return false; + } + + auto list_iterator = task_found_iterator->second; + task_map_.erase(task_found_iterator); + task_list_.erase(list_iterator); + return true; +} + +bool SchedulingQueue::TaskQueue::RemoveTask(const TaskID &task_id, + std::vector &removed_tasks) { + auto task_found_iterator = task_map_.find(task_id); + if (task_found_iterator == task_map_.end()) { + return false; + } + + auto list_iterator = task_found_iterator->second; + removed_tasks.push_back(std::move(*list_iterator)); + task_map_.erase(task_found_iterator); + task_list_.erase(list_iterator); + return true; +} + +bool SchedulingQueue::TaskQueue::HasTask(const TaskID &task_id) const { + return task_map_.find(task_id) != task_map_.end(); +} + +const std::list &SchedulingQueue::TaskQueue::GetTasks() const { return task_list_; } + const std::list &SchedulingQueue::GetMethodsWaitingForActorCreation() const { - return this->methods_waiting_for_actor_creation_; + return this->methods_waiting_for_actor_creation_.GetTasks(); } const std::list &SchedulingQueue::GetWaitingTasks() const { - return this->waiting_tasks_; + return this->waiting_tasks_.GetTasks(); } const std::list &SchedulingQueue::GetPlaceableTasks() const { - return this->placeable_tasks_; + return this->placeable_tasks_.GetTasks(); } const std::list &SchedulingQueue::GetReadyTasks() const { - return this->ready_tasks_; + return this->ready_tasks_.GetTasks(); } const std::list &SchedulingQueue::GetRunningTasks() const { - return this->running_tasks_; + return this->running_tasks_.GetTasks(); } const std::list &SchedulingQueue::GetBlockedTasks() const { - return this->blocked_tasks_; + return this->blocked_tasks_.GetTasks(); } void SchedulingQueue::FilterState(std::unordered_set &task_ids, diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index 768bff29e..7845dfb6a 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -147,21 +147,67 @@ class SchedulingQueue { /// \param filter_state The task state to filter out. void FilterState(std::unordered_set &task_ids, TaskState filter_state) const; + class TaskQueue { + public: + /// Creating a task queue. + TaskQueue() {} + + /// Destructor for task queue. + ~TaskQueue(); + + /// \brief Append a task to queue. + /// + /// \param task_id The task ID for the task to append. + /// \param task The task to append to the queue. + /// \return Whether the append operation succeeds. + bool AppendTask(const TaskID &task_id, const Task &task); + + /// \brief Remove a task from queue. + /// + /// \param task_id The task ID for the task to remove from the queue. + /// \return Whether the removal succeeds. + bool RemoveTask(const TaskID &task_id); + + /// \brief Remove a task from queue. + /// + /// \param task_id The task ID for the task to remove from the queue. + /// \param removed_tasks If the task specified by task_id is successfully + // removed from the queue, the task data is appended to the vector. + /// \return Whether the removal succeeds. + bool RemoveTask(const TaskID &task_id, std::vector &removed_tasks); + + /// \brief Check if the queue contains a specific task id. + /// + /// \param task_id The task ID for the task. + /// \return Whether the task_id exists in this queue. + bool HasTask(const TaskID &task_id) const; + + /// \brief Remove the task list of the queue. + /// \return A list of tasks contained in this queue. + const std::list &GetTasks() const; + + private: + // A list of tasks. + std::list task_list_; + // A hash to speed up looking up a task. + std::unordered_map::iterator> task_map_; + }; + private: /// Tasks that are destined for actors that have not yet been created. - std::list methods_waiting_for_actor_creation_; + TaskQueue methods_waiting_for_actor_creation_; /// Tasks that are waiting for an object dependency to appear locally. - std::list waiting_tasks_; + TaskQueue waiting_tasks_; /// Tasks whose object dependencies are locally available, but that are /// waiting to be scheduled. - std::list placeable_tasks_; + TaskQueue placeable_tasks_; /// Tasks ready for dispatch, but that are waiting for a worker. - std::list ready_tasks_; + TaskQueue ready_tasks_; /// Tasks that are running on a worker. - std::list running_tasks_; + TaskQueue running_tasks_; /// Tasks that were dispatched to a worker but are blocked on a data /// dependency that was missing at runtime. - std::list blocked_tasks_; + TaskQueue blocked_tasks_; /// The set of currently running driver tasks. These are empty tasks that are /// started by a driver process on initialization. std::unordered_set driver_task_ids_;