[xray] Workers blocked in a ray.get release their resources (#1920)

* [xray] Throttle task dispatch by required resources
* Pass in number of initial workers into raylet command
* Workers blocked in a ray.get release resources
This commit is contained in:
Stephanie Wang
2018-04-18 20:59:58 -07:00
committed by Alexey Tumanov
parent 1c965fcfeb
commit aa07f1ce4e
10 changed files with 131 additions and 18 deletions
+11 -2
View File
@@ -910,6 +910,7 @@ def start_raylet(redis_address,
plasma_store_name,
worker_path,
resources=None,
num_workers=0,
stdout_file=None,
stderr_file=None,
cleanup=True):
@@ -956,8 +957,15 @@ def start_raylet(redis_address,
plasma_store_name, raylet_name, redis_address))
command = [
RAYLET_EXECUTABLE, raylet_name, plasma_store_name, node_ip_address,
gcs_ip_address, gcs_port, start_worker_command, resource_argument
RAYLET_EXECUTABLE,
raylet_name,
plasma_store_name,
node_ip_address,
gcs_ip_address,
gcs_port,
str(num_workers),
start_worker_command,
resource_argument,
]
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
@@ -1471,6 +1479,7 @@ def start_ray_processes(address_info=None,
object_store_addresses[i].name,
worker_path,
resources=resources[i],
num_workers=workers_per_local_scheduler[i],
stdout_file=raylet_stdout_file,
stderr_file=raylet_stderr_file,
cleanup=cleanup))
+1 -1
View File
@@ -504,7 +504,7 @@ class Worker(object):
# If there were objects that we weren't able to get locally, let the
# local scheduler know that we're now unblocked.
if was_blocked and not self.use_raylet:
if was_blocked:
self.local_scheduler_client.notify_unblocked()
assert len(final_results) == len(object_ids)
+5 -4
View File
@@ -6,15 +6,16 @@
#ifndef RAYLET_TEST
int main(int argc, char *argv[]) {
RAY_CHECK(argc == 8);
RAY_CHECK(argc == 9);
const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
const std::string node_ip_address = std::string(argv[3]);
const std::string redis_address = std::string(argv[4]);
int redis_port = std::stoi(argv[5]);
const std::string worker_command = std::string(argv[6]);
const std::string static_resource_list = std::string(argv[7]);
int num_initial_workers = std::stoi(argv[6]);
const std::string worker_command = std::string(argv[7]);
const std::string static_resource_list = std::string(argv[8]);
// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
@@ -33,7 +34,7 @@ int main(int argc, char *argv[]) {
ray::raylet::ResourceSet(std::move(static_resource_conf));
RAY_LOG(INFO) << "Starting raylet with static resource configuration: "
<< node_manager_config.resource_config.ToString();
node_manager_config.num_initial_workers = 0;
node_manager_config.num_initial_workers = num_initial_workers;
// Use a default worker that can execute empty tasks with dependencies.
std::stringstream worker_command_stream(worker_command);
+59
View File
@@ -344,6 +344,65 @@ void NodeManager::ProcessClientMessage(std::shared_ptr<LocalClientConnection> cl
ObjectID object_id = from_flatbuf(*message->object_id());
RAY_LOG(DEBUG) << "reconstructing object " << object_id;
RAY_CHECK_OK(object_manager_.Pull(object_id));
// If the blocked client is a worker, and the worker isn't already blocked,
// then release any CPU resources that it acquired for its assigned task
// while it is blocked. The resources will be acquired again once the
// worker is unblocked.
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
if (worker && !worker->IsBlocked()) {
RAY_CHECK(!worker->GetAssignedTaskId().is_nil());
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = 0;
RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, &required_cpus));
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};
// Release the CPU resources.
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources)));
// Mark the task as blocked.
local_queues_.QueueBlockedTasks(tasks);
worker->MarkBlocked();
// Try to dispatch more tasks since the blocked worker released some
// resources.
DispatchTasks();
}
} break;
case protocol::MessageType_NotifyUnblocked: {
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
// Re-acquire the CPU resources for the task that was assigned to the
// unblocked worker.
if (worker) {
RAY_CHECK(worker->IsBlocked());
RAY_CHECK(!worker->GetAssignedTaskId().is_nil());
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = 0;
RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, &required_cpus));
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};
// Acquire the CPU resources.
bool oversubscribed =
!cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
ResourceSet(cpu_resources));
if (oversubscribed) {
const SchedulingResources &local_resources =
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()];
RAY_LOG(WARNING) << "Resources oversubscribed: "
<< local_resources.GetAvailableResources().ToString();
}
// Mark the task as running again.
local_queues_.QueueRunningTasks(tasks);
worker->MarkUnblocked();
}
} break;
default:
+9
View File
@@ -26,6 +26,10 @@ const std::list<Task> &SchedulingQueue::GetRunningTasks() const {
return this->running_tasks_;
}
const std::list<Task> &SchedulingQueue::GetBlockedTasks() const {
return this->blocked_tasks_;
}
const std::list<Task> &SchedulingQueue::GetReadyMethods() const {
throw std::runtime_error("Method not implemented");
}
@@ -65,6 +69,7 @@ std::vector<Task> SchedulingQueue::RemoveTasks(
removeTasksFromQueue(ready_tasks_, task_ids, removed_tasks);
removeTasksFromQueue(scheduled_tasks_, task_ids, removed_tasks);
removeTasksFromQueue(running_tasks_, task_ids, removed_tasks);
removeTasksFromQueue(blocked_tasks_, task_ids, removed_tasks);
// TODO(swang): Remove from running methods.
RAY_CHECK(task_ids.size() == 0);
@@ -91,6 +96,10 @@ void SchedulingQueue::QueueRunningTasks(const std::vector<Task> &tasks) {
queueTasks(running_tasks_, tasks);
}
void SchedulingQueue::QueueBlockedTasks(const std::vector<Task> &tasks) {
queueTasks(blocked_tasks_, tasks);
}
} // namespace raylet
} // namespace ray
+19 -1
View File
@@ -65,6 +65,13 @@ class SchedulingQueue {
/// executing on a worker.
const std::list<Task> &GetRunningTasks() const;
/// Get the tasks in the blocked state.
///
/// \return A const reference to the queue of tasks that have been dispatched
/// to a worker but are blocked on a data dependency discovered to be missing
/// at runtime.
const std::list<Task> &GetBlockedTasks() const;
/// Remove tasks from the task queue.
///
/// \param tasks The set of task IDs to remove from the queue. The
@@ -77,7 +84,8 @@ class SchedulingQueue {
/// \param tasks The tasks to queue.
void QueueUncreatedActorMethods(const std::vector<Task> &tasks);
/// Queue tasks in the waiting state.
/// Queue tasks in the waiting state. These are tasks that cannot yet be
/// scheduled since they are blocked on a missing data dependency.
///
/// \param tasks The tasks to queue.
void QueueWaitingTasks(const std::vector<Task> &tasks);
@@ -97,6 +105,13 @@ class SchedulingQueue {
/// \param tasks The tasks to queue.
void QueueRunningTasks(const std::vector<Task> &tasks);
/// Queue tasks in the blocked state. These are tasks that have been
/// dispatched to a worker but are blocked on a data dependency that was
/// discovered to be missing at runtime.
///
/// \param tasks The tasks to queue.
void QueueBlockedTasks(const std::vector<Task> &tasks);
private:
/// Tasks that are destined for actors that have not yet been created.
std::list<Task> uncreated_actor_methods_;
@@ -109,6 +124,9 @@ class SchedulingQueue {
std::list<Task> scheduled_tasks_;
/// Tasks that are running on a worker.
std::list<Task> running_tasks_;
/// Tasks that were dispatched to a worker but are blocked on a data
/// dependency that was missing at runtime.
std::list<Task> blocked_tasks_;
};
} // namespace raylet
+8 -8
View File
@@ -73,19 +73,19 @@ bool ResourceSet::RemoveResource(const std::string &resource_name) {
throw std::runtime_error("Method not implemented");
}
bool ResourceSet::SubtractResources(const ResourceSet &other) {
// Return failure if attempting to perform vector subtraction with unknown labels.
// TODO(atumanov): make the implementation atomic. Currently, if false is returned
// the resource capacity may be partially mutated. To reverse, call AddResources.
// Subtract the resources and track whether a resource goes below zero.
bool oversubscribed = false;
for (const auto &resource_pair : other.GetResourceMap()) {
const std::string &resource_label = resource_pair.first;
const double &resource_capacity = resource_pair.second;
if (resource_capacity_.count(resource_label) == 0) {
return false;
} else {
resource_capacity_[resource_label] -= resource_capacity;
RAY_CHECK(resource_capacity_.count(resource_label) == 1)
<< "Attempt to acquire unknown resource: " << resource_label;
resource_capacity_[resource_label] -= resource_capacity;
if (resource_capacity_[resource_label] < 0) {
oversubscribed = true;
}
}
return true;
return !oversubscribed;
}
bool ResourceSet::AddResources(const ResourceSet &other) {
+5 -1
View File
@@ -10,6 +10,8 @@ namespace ray {
namespace raylet {
const std::string kCPU_ResourceLabel = "CPU";
/// Resource availability status reports whether the resource requirement is
/// (1) infeasible, (2) feasible but currently unavailable, or (3) available.
typedef enum {
@@ -160,7 +162,9 @@ class SchedulingResources {
/// \brief Acquire the amount of resources specified.
///
/// \param resources: the amount of resources to be acquired.
/// \return True if resources were successfully acquired. False otherwise.
/// \return True if resources were acquired without oversubscription. If this
/// returns false, then the resources were still acquired, but we are now at
/// negative resources.
bool Acquire(const ResourceSet &resources);
private:
+8 -1
View File
@@ -15,7 +15,14 @@ Worker::Worker(pid_t pid, std::shared_ptr<LocalClientConnection> connection)
: pid_(pid),
connection_(connection),
assigned_task_id_(TaskID::nil()),
actor_id_(ActorID::nil()) {}
actor_id_(ActorID::nil()),
blocked_(false) {}
void Worker::MarkBlocked() { blocked_ = true; }
void Worker::MarkUnblocked() { blocked_ = false; }
bool Worker::IsBlocked() const { return blocked_; }
pid_t Worker::Pid() const { return pid_; }
+6
View File
@@ -19,6 +19,9 @@ class Worker {
Worker(pid_t pid, std::shared_ptr<LocalClientConnection> connection);
/// A destructor responsible for freeing all worker state.
~Worker() {}
void MarkBlocked();
void MarkUnblocked();
bool IsBlocked() const;
/// Return the worker's PID.
pid_t Pid() const;
void AssignTaskId(const TaskID &task_id);
@@ -37,6 +40,9 @@ class Worker {
TaskID assigned_task_id_;
/// The worker's actor ID. If this is nil, then the worker is not an actor.
ActorID actor_id_;
/// Whether the worker is blocked. Workers become blocked in a `ray.get`, if
/// they require a data dependency while executing a task.
bool blocked_;
};
} // namespace raylet