diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index e02e372b2..2ad458cd3 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -295,6 +295,62 @@ def test_placement_group_load_report(ray_start_cluster): client.close() +def test_backlog_report(shutdown_only): + cluster = ray.init( + num_cpus=1, _system_config={ + "report_worker_backlog": True, + }) + redis = ray._private.services.create_redis_client( + cluster["redis_address"], + password=ray.ray_constants.REDIS_DEFAULT_PASSWORD) + client = redis.pubsub(ignore_subscribe_messages=True) + client.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN) + + @ray.remote(num_cpus=1) + def foo(x): + print(".") + time.sleep(x) + return None + + def backlog_size_set(): + try: + raw_message = client.get_message() + except Exception: + return False + if raw_message is None: + return False + + data = raw_message["data"] + pub_message = ray.gcs_utils.PubSubMessage.FromString(data) + heartbeat_data = pub_message.data + + message = ray.gcs_utils.HeartbeatBatchTableData.FromString( + heartbeat_data) + aggregate_resource_load = \ + message.resource_load_by_shape.resource_demands + if len(aggregate_resource_load) == 1: + backlog_size = aggregate_resource_load[0].backlog_size + print(backlog_size) + # Ideally we'd want to assert backlog_size == 8, but guaranteeing + # the order the order that submissions will occur is too + # hard/flaky. + return backlog_size > 0 + return False + + # We want this first task to finish + refs = [foo.remote(0.5)] + # These tasks should all start _before_ the first one finishes. + refs.extend([foo.remote(1000) for _ in range(9)]) + # Now there's 1 request running, 1 queued in the raylet, and 8 queued in + # the worker backlog. + + ray.get(refs[0]) + # First request finishes, second request is now running, third lease + # request is sent to the raylet with backlog=7 + + ray.test_utils.wait_for_condition(backlog_size_set, timeout=2) + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 7e3c367f5..6793c9e9d 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -283,6 +283,10 @@ RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000) /// load reported by each raylet. RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100) +/// If true, the worker's queue backlog size will be propagated to the heartbeat batch +/// data. +RAY_CONFIG(bool, report_worker_backlog, true) + /// The timeout for synchronous GCS requests in seconds. RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5) diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index eae684550..c8ec17a17 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -4,6 +4,19 @@ namespace ray { +Task::Task(const rpc::Task &message, int64_t backlog_size) + : task_spec_(message.task_spec()), + task_execution_spec_(message.task_execution_spec()), + backlog_size_(backlog_size) { + ComputeDependencies(); +} + +Task::Task(TaskSpecification task_spec, TaskExecutionSpecification task_execution_spec) + : task_spec_(std::move(task_spec)), + task_execution_spec_(std::move(task_execution_spec)) { + ComputeDependencies(); +} + const TaskExecutionSpecification &Task::GetTaskExecutionSpec() const { return task_execution_spec_; } @@ -22,6 +35,8 @@ void Task::CopyTaskExecutionSpec(const Task &task) { task_execution_spec_ = task.task_execution_spec_; } +int64_t Task::BacklogSize() const { return backlog_size_; } + std::string Task::DebugString() const { std::ostringstream stream; stream << "task_spec={" << task_spec_.DebugString() << "}, task_execution_spec={" diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 800dc1d31..1fe9e24fc 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -33,19 +33,13 @@ class Task { /// Construct a `Task` object from a protobuf message. /// /// \param message The protobuf message. - explicit Task(const rpc::Task &message) - : task_spec_(message.task_spec()), - task_execution_spec_(message.task_execution_spec()) { - ComputeDependencies(); - } + /// \param backlog_size The size of the task owner's backlog size for this + /// task's shape. + explicit Task(const rpc::Task &message, int64_t backlog_size = -1); /// Construct a `Task` object from a `TaskSpecification` and a /// `TaskExecutionSpecification`. - Task(TaskSpecification task_spec, TaskExecutionSpecification task_execution_spec) - : task_spec_(std::move(task_spec)), - task_execution_spec_(std::move(task_execution_spec)) { - ComputeDependencies(); - } + Task(TaskSpecification task_spec, TaskExecutionSpecification task_execution_spec); /// Override dispatch behaviour. void OnDispatchInstead(const DispatchTaskCallback &callback) { @@ -95,6 +89,8 @@ class Task { /// Returns the cancellation task callback, or nullptr. const CancelTaskCallback &OnCancellation() const { return on_cancellation_; } + int64_t BacklogSize() const; + std::string DebugString() const; private: @@ -121,6 +117,8 @@ class Task { /// For direct task calls, overrides the cancellation behaviour to send an /// RPC back to the submitting worker. mutable CancelTaskCallback on_cancellation_ = nullptr; + /// The size of the core worker's backlog when this task was submitted. + int64_t backlog_size_ = -1; }; } // namespace ray diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 3f843414f..67ed7de02 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -102,7 +102,8 @@ class MockRayletClient : public WorkerLeaseInterface { void RequestWorkerLease( const ray::TaskSpecification &resource_spec, - const rpc::ClientCallback &callback) override { + const rpc::ClientCallback &callback, + const int64_t backlog_size) override { num_workers_requested += 1; callbacks.push_back(callback); } diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 9db414299..7aa635f21 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -278,16 +278,18 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( if (!scheduling_key_entry.AllPipelinesToWorkersFull(max_tasks_in_flight_per_worker_)) { // The pipelines to the current workers are not full yet, so we don't need more // workers. - return; } auto lease_client = GetOrConnectLeaseClient(raylet_address); TaskSpecification &resource_spec = task_queue.front(); TaskID task_id = resource_spec.TaskId(); + // Subtract 1 so we don't double count the task we are requesting for. + int64_t queue_size = task_queue.size() - 1; lease_client->RequestWorkerLease( - resource_spec, [this, scheduling_key](const Status &status, - const rpc::RequestWorkerLeaseReply &reply) { + resource_spec, + [this, scheduling_key](const Status &status, + const rpc::RequestWorkerLeaseReply &reply) { absl::MutexLock lock(&mu_); auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; @@ -333,7 +335,8 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( "likely because the local raylet has crahsed."; RAY_LOG(FATAL) << status.ToString(); } - }); + }, + queue_size); pending_lease_request = std::make_pair(lease_client, task_id); } diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 53d38bec7..82a448fd6 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -107,6 +107,10 @@ void GcsNodeManager::NodeFailureDetector::SendBatchedHeartbeat() { aggregate_demand.set_num_infeasible_requests_queued( aggregate_demand.num_infeasible_requests_queued() + demand.num_infeasible_requests_queued()); + if (RayConfig::instance().report_worker_backlog()) { + aggregate_demand.set_backlog_size(aggregate_demand.backlog_size() + + demand.backlog_size()); + } } heartbeat.second.clear_resource_load_by_shape(); diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 9ba2f5527..67420ee03 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -69,7 +69,8 @@ struct GcsServerMocker { void RequestWorkerLease( const ray::TaskSpecification &resource_spec, - const rpc::ClientCallback &callback) override { + const rpc::ClientCallback &callback, + const int64_t backlog_size = -1) override { num_workers_requested += 1; callbacks.push_back(callback); } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 6e34762b2..59b0767ba 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -272,6 +272,9 @@ message ResourceDemand { // The number of requests for which there is no node that is a superset of // the requested resource shape. uint64 num_infeasible_requests_queued = 3; + // The number of requests of this shape still queued in CoreWorkers that this + // raylet knows about. + int64 backlog_size = 4; } // Represents the demand sorted by resource shape. diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 167212301..05f3ee754 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -22,6 +22,8 @@ import "src/ray/protobuf/common.proto"; message RequestWorkerLeaseRequest { // TaskSpec containing the requested resources. TaskSpec resource_spec = 1; + // Worker's backlog size for this spec's shape. + int64 backlog_size = 2; } message RequestWorkerLeaseReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b0bf1a488..28573b4e7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -162,7 +162,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self new DefaultAgentManagerServiceHandler(agent_manager_)), agent_manager_service_(io_service, *agent_manager_service_handler_), client_call_manager_(io_service), - new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()) { + new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()), + report_worker_backlog_(RayConfig::instance().report_worker_backlog()) { RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. @@ -1684,7 +1685,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest rpc::SendReplyCallback send_reply_callback) { rpc::Task task_message; task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); - Task task(task_message); + auto backlog_size = -1; + if (report_worker_backlog_) { + backlog_size = request.backlog_size(); + } + Task task(task_message, backlog_size); bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask(); ActorID actor_id = ActorID::Nil(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index f7b82b556..49b0ae385 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -783,6 +783,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Whether new schedule is enabled. const bool new_scheduler_enabled_; + /// Whether to report the worker's backlog size in the GCS heartbeat. + const bool report_worker_backlog_; + /// Whether to trigger global GC in the next heartbeat. This will broadcast /// a global GC message to all raylets except for this one. bool should_global_gc_ = false; diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index df4e15e85..25d3ff707 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -69,7 +69,14 @@ bool TaskQueue::AppendTask(const TaskID &task_id, const Task &task) { task_map_[task_id] = list_iterator; // Resource bookkeeping total_resource_load_.AddResources(task.GetTaskSpecification().GetRequiredResources()); - resource_load_by_shape_[task.GetTaskSpecification().GetSchedulingClass()]++; + const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); + resource_load_by_shape_[scheduling_class]++; + + int64_t backlog_size = task.BacklogSize(); + if (backlog_size > + 0) { // Poor man's version of RayConfig::instance().report_worker_backlog() + request_backlog_by_shape_[scheduling_class] += task.BacklogSize(); + } return true; } @@ -88,6 +95,10 @@ bool TaskQueue::RemoveTask(const TaskID &task_id, std::vector *removed_tas if (resource_load_by_shape_[scheduling_class] == 0) { resource_load_by_shape_.erase(scheduling_class); } + request_backlog_by_shape_[scheduling_class] -= it->BacklogSize(); + if (request_backlog_by_shape_[scheduling_class] <= 0) { + request_backlog_by_shape_.erase(scheduling_class); + } if (removed_tasks) { removed_tasks->push_back(std::move(*it)); } @@ -117,6 +128,11 @@ const std::unordered_map &TaskQueue::GetResourceLoadB return resource_load_by_shape_; } +const std::unordered_map &TaskQueue::GetRequestBacklogByShape() + const { + return request_backlog_by_shape_; +} + bool ReadyQueue::AppendTask(const TaskID &task_id, const Task &task) { const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); tasks_by_class_[scheduling_class].push_back(task_id); @@ -161,11 +177,13 @@ ResourceSet SchedulingQueue::GetTotalResourceLoad() const { return load; } -rpc::ResourceLoad SchedulingQueue::GetResourceLoadByShape(int64_t max_shapes) const { +rpc::ResourceLoad SchedulingQueue::GetResourceLoadByShape( + int64_t max_shapes, bool report_worker_backlog) const { std::unordered_map load; auto infeasible_queue_load = task_queues_[static_cast(TaskState::INFEASIBLE)]->GetResourceLoadByShape(); auto ready_queue_load = ready_queue_->GetResourceLoadByShape(); + auto backlog_size_load = ready_queue_->GetRequestBacklogByShape(); size_t max_shapes_to_add = ready_queue_load.size() + infeasible_queue_load.size(); if (max_shapes >= 0) { max_shapes_to_add = max_shapes; @@ -186,6 +204,12 @@ rpc::ResourceLoad SchedulingQueue::GetResourceLoadByShape(int64_t max_shapes) co load[one_cpu_scheduling_cls].set_num_ready_requests_queued( ready_queue_load.at(one_cpu_scheduling_cls)); } + if (report_worker_backlog) { + if (backlog_size_load.count(one_cpu_scheduling_cls) > 0) { + load[one_cpu_scheduling_cls].set_backlog_size( + backlog_size_load.at(one_cpu_scheduling_cls)); + } + } } // Collect the infeasible queue's load. @@ -203,6 +227,15 @@ rpc::ResourceLoad SchedulingQueue::GetResourceLoadByShape(int64_t max_shapes) co ready_it++; } + if (report_worker_backlog) { + // Collect the backlog size. + auto backlog_it = backlog_size_load.begin(); + while (backlog_it != backlog_size_load.end() && load.size() < max_shapes_to_add) { + load[backlog_it->first].set_backlog_size(backlog_it->second); + backlog_it++; + } + } + // Set the resource shapes. rpc::ResourceLoad load_proto; for (auto &demand : load) { diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index 35e49741e..062b411b0 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -108,6 +108,12 @@ class TaskQueue { /// require that shape. const std::unordered_map &GetResourceLoadByShape() const; + /// \brief Get the resources required by the tasks queued in CoreWorkers. + /// + /// \return A map from resource shape key to the number of tasks queued that + /// require that shape. + const std::unordered_map &GetRequestBacklogByShape() const; + protected: /// A list of tasks. std::list task_list_; @@ -119,6 +125,11 @@ class TaskQueue { /// map from resource shape key to number of tasks queued that require that /// shape. std::unordered_map resource_load_by_shape_; + /// Required resources for all the tasks that are queued in core workers + /// still.. This is a map from resource shape key to number of tasks queued + /// on any worker requesting a lease from this raylet that require that + /// shape. + std::unordered_map request_backlog_by_shape_; }; class ReadyQueue : public TaskQueue { @@ -216,7 +227,8 @@ class SchedulingQueue { /// /// \return A message summarizing the number of requests, sorted by shape, in /// the ready and infeasible queues. - rpc::ResourceLoad GetResourceLoadByShape(int64_t max_shapes = -1) const; + rpc::ResourceLoad GetResourceLoadByShape(int64_t max_shapes = -1, + bool report_worker_backlog = true) const; /// Get the tasks in the blocked state. /// diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index e706d74af..22b1a88b4 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -316,9 +316,11 @@ Status raylet::RayletClient::SetResource(const std::string &resource_name, void raylet::RayletClient::RequestWorkerLease( const TaskSpecification &resource_spec, - const rpc::ClientCallback &callback) { + const rpc::ClientCallback &callback, + const int64_t backlog_size) { rpc::RequestWorkerLeaseRequest request; request.mutable_resource_spec()->CopyFrom(resource_spec.GetMessage()); + request.set_backlog_size(backlog_size); grpc_client_->RequestWorkerLease(request, callback); } diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 6cbdd2e53..4ee749b33 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -60,10 +60,12 @@ class WorkerLeaseInterface { public: /// Requests a worker from the raylet. The callback will be sent via gRPC. /// \param resource_spec Resources that should be allocated for the worker. + /// \param backlog_size The queue length for the given shape on the CoreWorker. /// \return ray::Status virtual void RequestWorkerLease( const ray::TaskSpecification &resource_spec, - const ray::rpc::ClientCallback &callback) = 0; + const ray::rpc::ClientCallback &callback, + const int64_t backlog_size = -1) = 0; /// Returns a worker to the raylet. /// \param worker_port The local port of the worker on the raylet node. @@ -344,8 +346,8 @@ class RayletClient : public PinObjectsInterface, /// Implements WorkerLeaseInterface. void RequestWorkerLease( const ray::TaskSpecification &resource_spec, - const ray::rpc::ClientCallback &callback) - override; + const ray::rpc::ClientCallback &callback, + const int64_t backlog_size) override; /// Implements WorkerLeaseInterface. ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,