diff --git a/python/ray/services.py b/python/ray/services.py index 2e24eefe8..8cfd8aeee 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -910,6 +910,7 @@ def start_raylet(redis_address, plasma_store_name, worker_path, resources=None, + num_workers=0, stdout_file=None, stderr_file=None, cleanup=True): @@ -956,8 +957,15 @@ def start_raylet(redis_address, plasma_store_name, raylet_name, redis_address)) command = [ - RAYLET_EXECUTABLE, raylet_name, plasma_store_name, node_ip_address, - gcs_ip_address, gcs_port, start_worker_command, resource_argument + RAYLET_EXECUTABLE, + raylet_name, + plasma_store_name, + node_ip_address, + gcs_ip_address, + gcs_port, + str(num_workers), + start_worker_command, + resource_argument, ] pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) @@ -1471,6 +1479,7 @@ def start_ray_processes(address_info=None, object_store_addresses[i].name, worker_path, resources=resources[i], + num_workers=workers_per_local_scheduler[i], stdout_file=raylet_stdout_file, stderr_file=raylet_stderr_file, cleanup=cleanup)) diff --git a/python/ray/worker.py b/python/ray/worker.py index edd788cfb..a12f93a54 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -504,7 +504,7 @@ class Worker(object): # If there were objects that we weren't able to get locally, let the # local scheduler know that we're now unblocked. - if was_blocked and not self.use_raylet: + if was_blocked: self.local_scheduler_client.notify_unblocked() assert len(final_results) == len(object_ids) diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 77a2d708c..453df9dc5 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -6,15 +6,16 @@ #ifndef RAYLET_TEST int main(int argc, char *argv[]) { - RAY_CHECK(argc == 8); + RAY_CHECK(argc == 9); const std::string raylet_socket_name = std::string(argv[1]); const std::string store_socket_name = std::string(argv[2]); const std::string node_ip_address = std::string(argv[3]); const std::string redis_address = std::string(argv[4]); int redis_port = std::stoi(argv[5]); - const std::string worker_command = std::string(argv[6]); - const std::string static_resource_list = std::string(argv[7]); + int num_initial_workers = std::stoi(argv[6]); + const std::string worker_command = std::string(argv[7]); + const std::string static_resource_list = std::string(argv[8]); // Configuration for the node manager. ray::raylet::NodeManagerConfig node_manager_config; @@ -33,7 +34,7 @@ int main(int argc, char *argv[]) { ray::raylet::ResourceSet(std::move(static_resource_conf)); RAY_LOG(INFO) << "Starting raylet with static resource configuration: " << node_manager_config.resource_config.ToString(); - node_manager_config.num_initial_workers = 0; + node_manager_config.num_initial_workers = num_initial_workers; // Use a default worker that can execute empty tasks with dependencies. std::stringstream worker_command_stream(worker_command); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1757737a3..14105e057 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -344,6 +344,65 @@ void NodeManager::ProcessClientMessage(std::shared_ptr cl ObjectID object_id = from_flatbuf(*message->object_id()); RAY_LOG(DEBUG) << "reconstructing object " << object_id; RAY_CHECK_OK(object_manager_.Pull(object_id)); + + // If the blocked client is a worker, and the worker isn't already blocked, + // then release any CPU resources that it acquired for its assigned task + // while it is blocked. The resources will be acquired again once the + // worker is unblocked. + std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); + if (worker && !worker->IsBlocked()) { + RAY_CHECK(!worker->GetAssignedTaskId().is_nil()); + auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); + const auto &task = tasks.front(); + // Get the CPU resources required by the running task. + const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); + double required_cpus = 0; + RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, &required_cpus)); + const std::unordered_map cpu_resources = { + {kCPU_ResourceLabel, required_cpus}}; + // Release the CPU resources. + RAY_CHECK( + cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( + ResourceSet(cpu_resources))); + // Mark the task as blocked. + local_queues_.QueueBlockedTasks(tasks); + worker->MarkBlocked(); + + // Try to dispatch more tasks since the blocked worker released some + // resources. + DispatchTasks(); + } + } break; + case protocol::MessageType_NotifyUnblocked: { + std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); + // Re-acquire the CPU resources for the task that was assigned to the + // unblocked worker. + if (worker) { + RAY_CHECK(worker->IsBlocked()); + RAY_CHECK(!worker->GetAssignedTaskId().is_nil()); + + auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); + const auto &task = tasks.front(); + // Get the CPU resources required by the running task. + const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); + double required_cpus = 0; + RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, &required_cpus)); + const std::unordered_map cpu_resources = { + {kCPU_ResourceLabel, required_cpus}}; + // Acquire the CPU resources. + bool oversubscribed = + !cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( + ResourceSet(cpu_resources)); + if (oversubscribed) { + const SchedulingResources &local_resources = + cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()]; + RAY_LOG(WARNING) << "Resources oversubscribed: " + << local_resources.GetAvailableResources().ToString(); + } + // Mark the task as running again. + local_queues_.QueueRunningTasks(tasks); + worker->MarkUnblocked(); + } } break; default: diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 30919ded6..1ff6963cb 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -26,6 +26,10 @@ const std::list &SchedulingQueue::GetRunningTasks() const { return this->running_tasks_; } +const std::list &SchedulingQueue::GetBlockedTasks() const { + return this->blocked_tasks_; +} + const std::list &SchedulingQueue::GetReadyMethods() const { throw std::runtime_error("Method not implemented"); } @@ -65,6 +69,7 @@ std::vector SchedulingQueue::RemoveTasks( removeTasksFromQueue(ready_tasks_, task_ids, removed_tasks); removeTasksFromQueue(scheduled_tasks_, task_ids, removed_tasks); removeTasksFromQueue(running_tasks_, task_ids, removed_tasks); + removeTasksFromQueue(blocked_tasks_, task_ids, removed_tasks); // TODO(swang): Remove from running methods. RAY_CHECK(task_ids.size() == 0); @@ -91,6 +96,10 @@ void SchedulingQueue::QueueRunningTasks(const std::vector &tasks) { queueTasks(running_tasks_, tasks); } +void SchedulingQueue::QueueBlockedTasks(const std::vector &tasks) { + queueTasks(blocked_tasks_, tasks); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index ad47da4f7..068bfceb6 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -65,6 +65,13 @@ class SchedulingQueue { /// executing on a worker. const std::list &GetRunningTasks() const; + /// Get the tasks in the blocked state. + /// + /// \return A const reference to the queue of tasks that have been dispatched + /// to a worker but are blocked on a data dependency discovered to be missing + /// at runtime. + const std::list &GetBlockedTasks() const; + /// Remove tasks from the task queue. /// /// \param tasks The set of task IDs to remove from the queue. The @@ -77,7 +84,8 @@ class SchedulingQueue { /// \param tasks The tasks to queue. void QueueUncreatedActorMethods(const std::vector &tasks); - /// Queue tasks in the waiting state. + /// Queue tasks in the waiting state. These are tasks that cannot yet be + /// scheduled since they are blocked on a missing data dependency. /// /// \param tasks The tasks to queue. void QueueWaitingTasks(const std::vector &tasks); @@ -97,6 +105,13 @@ class SchedulingQueue { /// \param tasks The tasks to queue. void QueueRunningTasks(const std::vector &tasks); + /// Queue tasks in the blocked state. These are tasks that have been + /// dispatched to a worker but are blocked on a data dependency that was + /// discovered to be missing at runtime. + /// + /// \param tasks The tasks to queue. + void QueueBlockedTasks(const std::vector &tasks); + private: /// Tasks that are destined for actors that have not yet been created. std::list uncreated_actor_methods_; @@ -109,6 +124,9 @@ class SchedulingQueue { std::list scheduled_tasks_; /// Tasks that are running on a worker. std::list running_tasks_; + /// Tasks that were dispatched to a worker but are blocked on a data + /// dependency that was missing at runtime. + std::list blocked_tasks_; }; } // namespace raylet diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index adb8a7e6a..6354e2570 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -73,19 +73,19 @@ bool ResourceSet::RemoveResource(const std::string &resource_name) { throw std::runtime_error("Method not implemented"); } bool ResourceSet::SubtractResources(const ResourceSet &other) { - // Return failure if attempting to perform vector subtraction with unknown labels. - // TODO(atumanov): make the implementation atomic. Currently, if false is returned - // the resource capacity may be partially mutated. To reverse, call AddResources. + // Subtract the resources and track whether a resource goes below zero. + bool oversubscribed = false; for (const auto &resource_pair : other.GetResourceMap()) { const std::string &resource_label = resource_pair.first; const double &resource_capacity = resource_pair.second; - if (resource_capacity_.count(resource_label) == 0) { - return false; - } else { - resource_capacity_[resource_label] -= resource_capacity; + RAY_CHECK(resource_capacity_.count(resource_label) == 1) + << "Attempt to acquire unknown resource: " << resource_label; + resource_capacity_[resource_label] -= resource_capacity; + if (resource_capacity_[resource_label] < 0) { + oversubscribed = true; } } - return true; + return !oversubscribed; } bool ResourceSet::AddResources(const ResourceSet &other) { diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index 7474a2413..435fe450b 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -10,6 +10,8 @@ namespace ray { namespace raylet { +const std::string kCPU_ResourceLabel = "CPU"; + /// Resource availability status reports whether the resource requirement is /// (1) infeasible, (2) feasible but currently unavailable, or (3) available. typedef enum { @@ -160,7 +162,9 @@ class SchedulingResources { /// \brief Acquire the amount of resources specified. /// /// \param resources: the amount of resources to be acquired. - /// \return True if resources were successfully acquired. False otherwise. + /// \return True if resources were acquired without oversubscription. If this + /// returns false, then the resources were still acquired, but we are now at + /// negative resources. bool Acquire(const ResourceSet &resources); private: diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index cec388346..328cd1d7a 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -15,7 +15,14 @@ Worker::Worker(pid_t pid, std::shared_ptr connection) : pid_(pid), connection_(connection), assigned_task_id_(TaskID::nil()), - actor_id_(ActorID::nil()) {} + actor_id_(ActorID::nil()), + blocked_(false) {} + +void Worker::MarkBlocked() { blocked_ = true; } + +void Worker::MarkUnblocked() { blocked_ = false; } + +bool Worker::IsBlocked() const { return blocked_; } pid_t Worker::Pid() const { return pid_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 3017521ff..4690df88d 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -19,6 +19,9 @@ class Worker { Worker(pid_t pid, std::shared_ptr connection); /// A destructor responsible for freeing all worker state. ~Worker() {} + void MarkBlocked(); + void MarkUnblocked(); + bool IsBlocked() const; /// Return the worker's PID. pid_t Pid() const; void AssignTaskId(const TaskID &task_id); @@ -37,6 +40,9 @@ class Worker { TaskID assigned_task_id_; /// The worker's actor ID. If this is nil, then the worker is not an actor. ActorID actor_id_; + /// Whether the worker is blocked. Workers become blocked in a `ray.get`, if + /// they require a data dependency while executing a task. + bool blocked_; }; } // namespace raylet