Raylet task dispatch and throttling worker startup (#1912)

* separate task placement and task dispatch; throttle task dispatch with locally available resournces

* keep track of worker's being started/in flight and suppress starting extraneous workers

* cleanup comments

* remove early termination in task dispatch to support zero-resource actor tasks

* info -> debug

* add documentation

* linting

* mock the worker pool for testing

* some linting

* kill all workers in flight; clear the worker pool in dtor

* remove fixed todo

* lint
This commit is contained in:
Alexey Tumanov
2018-04-18 10:58:11 -07:00
committed by Stephanie Wang
parent 0728d4719b
commit 1c965fcfeb
8 changed files with 147 additions and 32 deletions
+1 -1
View File
@@ -306,7 +306,7 @@ def stop():
subprocess.call(
[
"killall global_scheduler plasma_store plasma_manager "
"local_scheduler raylet"
"local_scheduler raylet raylet_monitor"
],
shell=True)
+38 -25
View File
@@ -261,6 +261,31 @@ void NodeManager::ProcessNewClient(std::shared_ptr<LocalClientConnection> client
client->ProcessMessages();
}
void NodeManager::DispatchTasks() {
// Work with a copy of scheduled tasks.
auto scheduled_tasks = local_queues_.GetScheduledTasks();
// Return if there are no tasks to schedule.
if (scheduled_tasks.empty()) {
return;
}
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();
for (const auto &task : scheduled_tasks) {
const auto &local_resources =
cluster_resource_map_[my_client_id].GetAvailableResources();
const auto &task_resources = task.GetTaskSpecification().GetRequiredResources();
if (!task_resources.IsSubset(local_resources)) {
// Not enough local resources for this task right now, skip this task.
continue;
}
// We have enough resources for this task. Assign task.
// TODO(atumanov): perform the task state/queue transition inside AssignTask.
auto dispatched_task =
local_queues_.RemoveTasks({task.GetTaskSpecification().TaskId()});
AssignTask(dispatched_task.front());
}
}
void NodeManager::ProcessClientMessage(std::shared_ptr<LocalClientConnection> client,
int64_t message_type,
const uint8_t *message_data) {
@@ -285,21 +310,9 @@ void NodeManager::ProcessClientMessage(std::shared_ptr<LocalClientConnection> cl
}
// Return the worker to the idle pool.
worker_pool_.PushWorker(worker);
// Check if there is a scheduled task that can now be assigned to the newly
// idle worker.
auto scheduled_tasks = local_queues_.GetScheduledTasks();
if (!scheduled_tasks.empty()) {
// Find a scheduled task that whose actor ID matches that of the newly
// idle worker.
auto worker_actor_id = worker->GetActorId();
for (const auto &task : scheduled_tasks) {
if (task.GetTaskSpecification().ActorId() == worker_actor_id) {
auto scheduled_tasks =
local_queues_.RemoveTasks({task.GetTaskSpecification().TaskId()});
AssignTask(scheduled_tasks.front());
}
}
}
// Call task dispatch to assign work to the new worker.
DispatchTasks();
} break;
case protocol::MessageType_DisconnectClient: {
// Remove the dead worker from the pool and stop listening for messages.
@@ -374,6 +387,7 @@ void NodeManager::HandleWaitingTaskReady(const TaskID &task_id) {
}
void NodeManager::ScheduleTasks() {
// This method performs the transition of tasks from PENDING to SCHEDULED.
auto policy_decision = scheduling_policy_.Schedule(
cluster_resource_map_, gcs_client_->client_table().GetLocalClientId(),
remote_clients_);
@@ -386,7 +400,7 @@ void NodeManager::ScheduleTasks() {
// Extract decision for this local scheduler.
std::unordered_set<TaskID, UniqueIDHasher> local_task_ids;
// Iterate over (taskid, clientid) pairs, extract tasks to run on the local client.
// Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node.
for (const auto &task_schedule : policy_decision) {
TaskID task_id = task_schedule.first;
ClientID client_id = task_schedule.second;
@@ -402,11 +416,10 @@ void NodeManager::ScheduleTasks() {
}
}
// Assign the tasks to workers.
// Transition locally scheduled tasks to SCHEDULED and dispatch scheduled tasks.
std::vector<Task> tasks = local_queues_.RemoveTasks(local_task_ids);
for (auto &task : tasks) {
AssignTask(task);
}
local_queues_.QueueScheduledTasks(tasks);
DispatchTasks();
}
void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage) {
@@ -481,11 +494,6 @@ void NodeManager::AssignTask(Task &task) {
}
}
// Resource accounting: acquire resources for the scheduled task.
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();
RAY_CHECK(
this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()));
// Try to get an idle worker that can execute this task.
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec.ActorId());
if (worker == nullptr) {
@@ -509,6 +517,11 @@ void NodeManager::AssignTask(Task &task) {
auto status = worker->Connection()->WriteMessage(protocol::MessageType_ExecuteTask,
fbb.GetSize(), fbb.GetBufferPointer());
if (status.ok()) {
// Resource accounting: acquire resources for the assigned task.
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();
RAY_CHECK(
this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()));
// We successfully assigned the task to the worker.
worker->AssignTaskId(spec.TaskId());
// If the task was an actor task, then record this execution to guarantee
+3
View File
@@ -87,6 +87,9 @@ class NodeManager {
/// Handler for a heartbeat notification from the GCS.
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);
/// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to
/// "running" task state.
void DispatchTasks();
boost::asio::io_service &io_service_;
ObjectManager &object_manager_;
+11
View File
@@ -27,6 +27,17 @@ bool ResourceSet::operator==(const ResourceSet &rhs) const {
return (this->IsSubset(rhs) && rhs.IsSubset(*this));
}
bool ResourceSet::IsEmpty() const {
// Check whether the capacity of each resource type is zero. Exit early if not.
if (resource_capacity_.empty()) return true;
for (const auto &resource_pair : resource_capacity_) {
if (resource_pair.second > 0) {
return false;
}
}
return true;
}
bool ResourceSet::IsSubset(const ResourceSet &other) const {
// Check to make sure all keys of this are in other.
for (const auto &resource_pair : resource_capacity_) {
+5
View File
@@ -97,6 +97,11 @@ class ResourceSet {
/// False otherwise.
bool GetResource(const std::string &resource_name, double *value) const;
/// Return true if the resource set is empty. False otherwise.
///
/// \return True if the resource capacity is zero. False otherwise.
bool IsEmpty() const;
// TODO(atumanov): implement const_iterator class for the ResourceSet container.
const std::unordered_map<std::string, double> &GetResourceMap() const;
+38 -2
View File
@@ -16,10 +16,15 @@ WorkerPool::WorkerPool(int num_workers, const std::vector<std::string> &worker_c
// become zombies instead of dying gracefully.
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < num_workers; i++) {
StartWorker();
// Force-start num_workers workers.
StartWorker(true);
}
}
/// A constructor that initializes an empty worker pool with zero workers.
WorkerPool::WorkerPool(const std::vector<std::string> &worker_command)
: worker_command_(worker_command) {}
WorkerPool::~WorkerPool() {
// Kill all registered workers. NOTE(swang): This assumes that the registered
// workers were started by the pool.
@@ -28,15 +33,39 @@ WorkerPool::~WorkerPool() {
kill(worker->Pid(), SIGKILL);
waitpid(worker->Pid(), NULL, 0);
}
// Kill all the workers that have been started but not registered.
for (const auto &pid : started_worker_pids_) {
RAY_CHECK(pid > 0);
kill(pid, SIGKILL);
waitpid(pid, NULL, 0);
}
pool_.clear();
actor_pool_.clear();
registered_workers_.clear();
started_worker_pids_.clear();
}
void WorkerPool::StartWorker() {
uint32_t WorkerPool::Size() const {
return static_cast<uint32_t>(actor_pool_.size() + pool_.size());
}
void WorkerPool::StartWorker(bool force_start) {
RAY_CHECK(!worker_command_.empty()) << "No worker command provided";
if (!started_worker_pids_.empty() && !force_start) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << started_worker_pids_.size() << " workers pending registration";
return;
}
// Either there are no workers pending registration or the worker start is being forced.
RAY_LOG(DEBUG) << "starting worker, actor pool " << actor_pool_.size() << " task pool "
<< pool_.size();
// Launch the process to create the worker.
pid_t pid = fork();
if (pid != 0) {
RAY_LOG(DEBUG) << "Started worker with pid " << pid;
started_worker_pids_.insert(pid);
return;
}
@@ -60,6 +89,8 @@ void WorkerPool::StartWorker() {
void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) {
RAY_LOG(DEBUG) << "Registering worker with pid " << worker->Pid();
registered_workers_.push_back(worker);
RAY_CHECK(started_worker_pids_.count(worker->Pid()) > 0);
started_worker_pids_.erase(worker->Pid());
}
std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(
@@ -119,6 +150,11 @@ bool WorkerPool::DisconnectWorker(std::shared_ptr<Worker> worker) {
return removeWorker(pool_, worker);
}
// Protected WorkerPool methods.
void WorkerPool::AddStartedWorker(pid_t pid) { started_worker_pids_.insert(pid); }
uint32_t WorkerPool::NumStartedWorkers() const { return started_worker_pids_.size(); }
} // namespace raylet
} // namespace ray
+31 -2
View File
@@ -4,6 +4,7 @@
#include <inttypes.h>
#include <list>
#include <unordered_map>
#include <unordered_set>
#include "ray/common/client_connection.h"
#include "ray/raylet/worker.h"
@@ -26,16 +27,26 @@ class WorkerPool {
/// pool.
///
/// \param num_workers The number of workers to start.
/// \param worker_command The command used to start the worker process.
WorkerPool(int num_workers, const std::vector<std::string> &worker_command);
/// Create a pool with zero workers.
///
/// \param num_workers The number of workers to start.
/// \param worker_command The command used to start the worker process.
WorkerPool(const std::vector<std::string> &worker_command);
/// Destructor responsible for freeing a set of workers owned by this class.
~WorkerPool();
virtual ~WorkerPool();
/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
/// register a new Worker, then add itself to the pool. Failure to start
/// the worker process is a fatal error.
void StartWorker();
///
/// \param force_start Controls whether to force starting a worker regardless of any
/// workers that have already been started but not yet registered.
void StartWorker(bool force_start = false);
/// Register a new worker. The Worker should be added by the caller to the
/// pool after it becomes idle (e.g., requests a work assignment).
@@ -70,6 +81,23 @@ class WorkerPool {
/// such worker exists.
std::shared_ptr<Worker> PopWorker(const ActorID &actor_id);
/// Return the current size of the worker pool. Counts only the workers that registered
/// and requested a task.
///
/// \return The total count of all workers (actor and non-actor) in the pool.
uint32_t Size() const;
protected:
/// Add started worker PID to the internal list of started workers (for testing).
///
/// \param pid A process identifier for the worker being started.
void AddStartedWorker(pid_t pid);
/// Return a number of workers currently started but not registered.
///
/// \return The number of worker PIDs stored for started workers.
uint32_t NumStartedWorkers() const;
private:
std::vector<std::string> worker_command_;
/// The pool of idle workers.
@@ -80,6 +108,7 @@ class WorkerPool {
/// idle and executing.
// TODO(swang): Make this a map to make GetRegisteredWorker faster.
std::list<std::shared_ptr<Worker>> registered_workers_;
std::unordered_set<pid_t> started_worker_pids_;
};
} // namespace raylet
+20 -2
View File
@@ -8,9 +8,26 @@ namespace ray {
namespace raylet {
class WorkerPoolMock : public WorkerPool {
public:
WorkerPoolMock(const std::vector<std::string> &worker_command)
: WorkerPool(worker_command) {}
void StartWorker(pid_t pid, bool force_start = false) {
if (NumStartedWorkers() > 0 && !force_start) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << NumStartedWorkers() << " workers pending registration";
return;
}
// Either no workers are pending registration or the worker start is being forced.
RAY_LOG(DEBUG) << "starting worker, worker pool size " << Size();
AddStartedWorker(pid);
}
};
class WorkerPoolTest : public ::testing::Test {
public:
WorkerPoolTest() : worker_pool_(0, {}), io_service_() {}
WorkerPoolTest() : worker_pool_({}), io_service_() {}
std::shared_ptr<Worker> CreateWorker(pid_t pid) {
std::function<void(std::shared_ptr<LocalClientConnection>)> client_handler = [this](
@@ -23,11 +40,12 @@ class WorkerPoolTest : public ::testing::Test {
boost::asio::local::stream_protocol::socket socket(io_service_);
auto client =
LocalClientConnection::Create(client_handler, message_handler, std::move(socket));
worker_pool_.StartWorker(pid);
return std::shared_ptr<Worker>(new Worker(pid, client));
}
protected:
WorkerPool worker_pool_;
WorkerPoolMock worker_pool_;
boost::asio::io_service io_service_;
private: