From 1c965fcfebe5b38bc55da5ddea7c1d28ed204f3a Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Wed, 18 Apr 2018 10:58:11 -0700 Subject: [PATCH] Raylet task dispatch and throttling worker startup (#1912) * separate task placement and task dispatch; throttle task dispatch with locally available resournces * keep track of worker's being started/in flight and suppress starting extraneous workers * cleanup comments * remove early termination in task dispatch to support zero-resource actor tasks * info -> debug * add documentation * linting * mock the worker pool for testing * some linting * kill all workers in flight; clear the worker pool in dtor * remove fixed todo * lint --- python/ray/scripts/scripts.py | 2 +- src/ray/raylet/node_manager.cc | 63 ++++++++++++++++---------- src/ray/raylet/node_manager.h | 3 ++ src/ray/raylet/scheduling_resources.cc | 11 +++++ src/ray/raylet/scheduling_resources.h | 5 ++ src/ray/raylet/worker_pool.cc | 40 +++++++++++++++- src/ray/raylet/worker_pool.h | 33 +++++++++++++- src/ray/raylet/worker_pool_test.cc | 22 ++++++++- 8 files changed, 147 insertions(+), 32 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index f91ae0b76..d06d46068 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -306,7 +306,7 @@ def stop(): subprocess.call( [ "killall global_scheduler plasma_store plasma_manager " - "local_scheduler raylet" + "local_scheduler raylet raylet_monitor" ], shell=True) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a2e971440..1757737a3 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -261,6 +261,31 @@ void NodeManager::ProcessNewClient(std::shared_ptr client client->ProcessMessages(); } +void NodeManager::DispatchTasks() { + // Work with a copy of scheduled tasks. + auto scheduled_tasks = local_queues_.GetScheduledTasks(); + // Return if there are no tasks to schedule. + if (scheduled_tasks.empty()) { + return; + } + const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); + + for (const auto &task : scheduled_tasks) { + const auto &local_resources = + cluster_resource_map_[my_client_id].GetAvailableResources(); + const auto &task_resources = task.GetTaskSpecification().GetRequiredResources(); + if (!task_resources.IsSubset(local_resources)) { + // Not enough local resources for this task right now, skip this task. + continue; + } + // We have enough resources for this task. Assign task. + // TODO(atumanov): perform the task state/queue transition inside AssignTask. + auto dispatched_task = + local_queues_.RemoveTasks({task.GetTaskSpecification().TaskId()}); + AssignTask(dispatched_task.front()); + } +} + void NodeManager::ProcessClientMessage(std::shared_ptr client, int64_t message_type, const uint8_t *message_data) { @@ -285,21 +310,9 @@ void NodeManager::ProcessClientMessage(std::shared_ptr cl } // Return the worker to the idle pool. worker_pool_.PushWorker(worker); - // Check if there is a scheduled task that can now be assigned to the newly - // idle worker. - auto scheduled_tasks = local_queues_.GetScheduledTasks(); - if (!scheduled_tasks.empty()) { - // Find a scheduled task that whose actor ID matches that of the newly - // idle worker. - auto worker_actor_id = worker->GetActorId(); - for (const auto &task : scheduled_tasks) { - if (task.GetTaskSpecification().ActorId() == worker_actor_id) { - auto scheduled_tasks = - local_queues_.RemoveTasks({task.GetTaskSpecification().TaskId()}); - AssignTask(scheduled_tasks.front()); - } - } - } + // Call task dispatch to assign work to the new worker. + DispatchTasks(); + } break; case protocol::MessageType_DisconnectClient: { // Remove the dead worker from the pool and stop listening for messages. @@ -374,6 +387,7 @@ void NodeManager::HandleWaitingTaskReady(const TaskID &task_id) { } void NodeManager::ScheduleTasks() { + // This method performs the transition of tasks from PENDING to SCHEDULED. auto policy_decision = scheduling_policy_.Schedule( cluster_resource_map_, gcs_client_->client_table().GetLocalClientId(), remote_clients_); @@ -386,7 +400,7 @@ void NodeManager::ScheduleTasks() { // Extract decision for this local scheduler. std::unordered_set local_task_ids; - // Iterate over (taskid, clientid) pairs, extract tasks to run on the local client. + // Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node. for (const auto &task_schedule : policy_decision) { TaskID task_id = task_schedule.first; ClientID client_id = task_schedule.second; @@ -402,11 +416,10 @@ void NodeManager::ScheduleTasks() { } } - // Assign the tasks to workers. + // Transition locally scheduled tasks to SCHEDULED and dispatch scheduled tasks. std::vector tasks = local_queues_.RemoveTasks(local_task_ids); - for (auto &task : tasks) { - AssignTask(task); - } + local_queues_.QueueScheduledTasks(tasks); + DispatchTasks(); } void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage) { @@ -481,11 +494,6 @@ void NodeManager::AssignTask(Task &task) { } } - // Resource accounting: acquire resources for the scheduled task. - const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); - RAY_CHECK( - this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); - // Try to get an idle worker that can execute this task. std::shared_ptr worker = worker_pool_.PopWorker(spec.ActorId()); if (worker == nullptr) { @@ -509,6 +517,11 @@ void NodeManager::AssignTask(Task &task) { auto status = worker->Connection()->WriteMessage(protocol::MessageType_ExecuteTask, fbb.GetSize(), fbb.GetBufferPointer()); if (status.ok()) { + // Resource accounting: acquire resources for the assigned task. + const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); + RAY_CHECK( + this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); + // We successfully assigned the task to the worker. worker->AssignTaskId(spec.TaskId()); // If the task was an actor task, then record this execution to guarantee diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index de9d65a0b..e63aa1f40 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -87,6 +87,9 @@ class NodeManager { /// Handler for a heartbeat notification from the GCS. void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id, const HeartbeatTableDataT &data); + /// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to + /// "running" task state. + void DispatchTasks(); boost::asio::io_service &io_service_; ObjectManager &object_manager_; diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index c5bfa6435..adb8a7e6a 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -27,6 +27,17 @@ bool ResourceSet::operator==(const ResourceSet &rhs) const { return (this->IsSubset(rhs) && rhs.IsSubset(*this)); } +bool ResourceSet::IsEmpty() const { + // Check whether the capacity of each resource type is zero. Exit early if not. + if (resource_capacity_.empty()) return true; + for (const auto &resource_pair : resource_capacity_) { + if (resource_pair.second > 0) { + return false; + } + } + return true; +} + bool ResourceSet::IsSubset(const ResourceSet &other) const { // Check to make sure all keys of this are in other. for (const auto &resource_pair : resource_capacity_) { diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index febcbb392..7474a2413 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -97,6 +97,11 @@ class ResourceSet { /// False otherwise. bool GetResource(const std::string &resource_name, double *value) const; + /// Return true if the resource set is empty. False otherwise. + /// + /// \return True if the resource capacity is zero. False otherwise. + bool IsEmpty() const; + // TODO(atumanov): implement const_iterator class for the ResourceSet container. const std::unordered_map &GetResourceMap() const; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 08a4f5ddd..4b58b5a63 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -16,10 +16,15 @@ WorkerPool::WorkerPool(int num_workers, const std::vector &worker_c // become zombies instead of dying gracefully. signal(SIGCHLD, SIG_IGN); for (int i = 0; i < num_workers; i++) { - StartWorker(); + // Force-start num_workers workers. + StartWorker(true); } } +/// A constructor that initializes an empty worker pool with zero workers. +WorkerPool::WorkerPool(const std::vector &worker_command) + : worker_command_(worker_command) {} + WorkerPool::~WorkerPool() { // Kill all registered workers. NOTE(swang): This assumes that the registered // workers were started by the pool. @@ -28,15 +33,39 @@ WorkerPool::~WorkerPool() { kill(worker->Pid(), SIGKILL); waitpid(worker->Pid(), NULL, 0); } + // Kill all the workers that have been started but not registered. + for (const auto &pid : started_worker_pids_) { + RAY_CHECK(pid > 0); + kill(pid, SIGKILL); + waitpid(pid, NULL, 0); + } + + pool_.clear(); + actor_pool_.clear(); + registered_workers_.clear(); + started_worker_pids_.clear(); } -void WorkerPool::StartWorker() { +uint32_t WorkerPool::Size() const { + return static_cast(actor_pool_.size() + pool_.size()); +} + +void WorkerPool::StartWorker(bool force_start) { RAY_CHECK(!worker_command_.empty()) << "No worker command provided"; + if (!started_worker_pids_.empty() && !force_start) { + // Workers have been started, but not registered. Force start disabled -- returning. + RAY_LOG(DEBUG) << started_worker_pids_.size() << " workers pending registration"; + return; + } + // Either there are no workers pending registration or the worker start is being forced. + RAY_LOG(DEBUG) << "starting worker, actor pool " << actor_pool_.size() << " task pool " + << pool_.size(); // Launch the process to create the worker. pid_t pid = fork(); if (pid != 0) { RAY_LOG(DEBUG) << "Started worker with pid " << pid; + started_worker_pids_.insert(pid); return; } @@ -60,6 +89,8 @@ void WorkerPool::StartWorker() { void WorkerPool::RegisterWorker(std::shared_ptr worker) { RAY_LOG(DEBUG) << "Registering worker with pid " << worker->Pid(); registered_workers_.push_back(worker); + RAY_CHECK(started_worker_pids_.count(worker->Pid()) > 0); + started_worker_pids_.erase(worker->Pid()); } std::shared_ptr WorkerPool::GetRegisteredWorker( @@ -119,6 +150,11 @@ bool WorkerPool::DisconnectWorker(std::shared_ptr worker) { return removeWorker(pool_, worker); } +// Protected WorkerPool methods. +void WorkerPool::AddStartedWorker(pid_t pid) { started_worker_pids_.insert(pid); } + +uint32_t WorkerPool::NumStartedWorkers() const { return started_worker_pids_.size(); } + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 2486c57c6..5cb5bb2bf 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "ray/common/client_connection.h" #include "ray/raylet/worker.h" @@ -26,16 +27,26 @@ class WorkerPool { /// pool. /// /// \param num_workers The number of workers to start. + /// \param worker_command The command used to start the worker process. WorkerPool(int num_workers, const std::vector &worker_command); + /// Create a pool with zero workers. + /// + /// \param num_workers The number of workers to start. + /// \param worker_command The command used to start the worker process. + WorkerPool(const std::vector &worker_command); + /// Destructor responsible for freeing a set of workers owned by this class. - ~WorkerPool(); + virtual ~WorkerPool(); /// Asynchronously start a new worker process. Once the worker process has /// registered with an external server, the process should create and /// register a new Worker, then add itself to the pool. Failure to start /// the worker process is a fatal error. - void StartWorker(); + /// + /// \param force_start Controls whether to force starting a worker regardless of any + /// workers that have already been started but not yet registered. + void StartWorker(bool force_start = false); /// Register a new worker. The Worker should be added by the caller to the /// pool after it becomes idle (e.g., requests a work assignment). @@ -70,6 +81,23 @@ class WorkerPool { /// such worker exists. std::shared_ptr PopWorker(const ActorID &actor_id); + /// Return the current size of the worker pool. Counts only the workers that registered + /// and requested a task. + /// + /// \return The total count of all workers (actor and non-actor) in the pool. + uint32_t Size() const; + + protected: + /// Add started worker PID to the internal list of started workers (for testing). + /// + /// \param pid A process identifier for the worker being started. + void AddStartedWorker(pid_t pid); + + /// Return a number of workers currently started but not registered. + /// + /// \return The number of worker PIDs stored for started workers. + uint32_t NumStartedWorkers() const; + private: std::vector worker_command_; /// The pool of idle workers. @@ -80,6 +108,7 @@ class WorkerPool { /// idle and executing. // TODO(swang): Make this a map to make GetRegisteredWorker faster. std::list> registered_workers_; + std::unordered_set started_worker_pids_; }; } // namespace raylet diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index b7aa199e4..8d6c526b4 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -8,9 +8,26 @@ namespace ray { namespace raylet { +class WorkerPoolMock : public WorkerPool { + public: + WorkerPoolMock(const std::vector &worker_command) + : WorkerPool(worker_command) {} + + void StartWorker(pid_t pid, bool force_start = false) { + if (NumStartedWorkers() > 0 && !force_start) { + // Workers have been started, but not registered. Force start disabled -- returning. + RAY_LOG(DEBUG) << NumStartedWorkers() << " workers pending registration"; + return; + } + // Either no workers are pending registration or the worker start is being forced. + RAY_LOG(DEBUG) << "starting worker, worker pool size " << Size(); + AddStartedWorker(pid); + } +}; + class WorkerPoolTest : public ::testing::Test { public: - WorkerPoolTest() : worker_pool_(0, {}), io_service_() {} + WorkerPoolTest() : worker_pool_({}), io_service_() {} std::shared_ptr CreateWorker(pid_t pid) { std::function)> client_handler = [this]( @@ -23,11 +40,12 @@ class WorkerPoolTest : public ::testing::Test { boost::asio::local::stream_protocol::socket socket(io_service_); auto client = LocalClientConnection::Create(client_handler, message_handler, std::move(socket)); + worker_pool_.StartWorker(pid); return std::shared_ptr(new Worker(pid, client)); } protected: - WorkerPool worker_pool_; + WorkerPoolMock worker_pool_; boost::asio::io_service io_service_; private: