From 4744ed01f79820e5a2d71ade4931e145b7d82a67 Mon Sep 17 00:00:00 2001 From: Gabriele Oliaro Date: Thu, 12 Nov 2020 12:44:13 -0500 Subject: [PATCH] Queueing non-actor tasks at the workers (#11051) * separated adding tasks to queue and executing them (worker side) * linting * first review * second rev * rev3, all tests passing locally * linting * rev4 * linting * finished rev4, all tests passing locally (mac) * rev4, all tests passing locally * linting * rev5 * bug fix * hopefully fixed build * nvm * ptr cast * linting * no special treatment for actor creation tasks --- src/ray/core_worker/core_worker.cc | 28 +++- .../test/direct_actor_transport_test.cc | 10 +- .../core_worker/test/scheduling_queue_test.cc | 14 +- .../transport/direct_actor_transport.cc | 47 ++++--- .../transport/direct_actor_transport.h | 129 +++++++++++++----- 5 files changed, 161 insertions(+), 67 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index af0e64c6e..10834429a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1958,13 +1958,29 @@ void CoreWorker::HandlePushTask(const rpc::PushTaskRequest &request, return; } + // Increment the task_queue_length task_queue_length_ += 1; - task_execution_service_.post([=] { - // We have posted an exit task onto the main event loop, - // so shouldn't bother executing any further work. - if (exiting_) return; - direct_task_receiver_->HandlePushTask(request, reply, send_reply_callback); - }); + + // For actor tasks, we just need to post a HandleActorTask instance to the task + // execution service. + if (request.task_spec().type() == TaskType::ACTOR_TASK) { + task_execution_service_.post([=] { + // We have posted an exit task onto the main event loop, + // so shouldn't bother executing any further work. + if (exiting_) return; + direct_task_receiver_->HandleTask(request, reply, send_reply_callback); + }); + } else { + // Normal tasks are enqueued here, and we post a RunNormalTasksFromQueue instance to + // the task execution service. + direct_task_receiver_->HandleTask(request, reply, send_reply_callback); + task_execution_service_.post([=] { + // We have posted an exit task onto the main event loop, + // so shouldn't bother executing any further work. + if (exiting_) return; + direct_task_receiver_->RunNormalTasksFromQueue(); + }); + } } void CoreWorker::HandleDirectActorCallArgWaitComplete( diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 35101dbbf..3a35e62a4 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -497,7 +497,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) { ++callback_count; ASSERT_TRUE(status.ok()); }; - receiver_->HandlePushTask(request, &reply, reply_callback); + receiver_->HandleTask(request, &reply, reply_callback); } // Push a task request with actor counter 1. This should scucceed @@ -511,7 +511,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) { ++callback_count; ASSERT_TRUE(status.ok()); }; - receiver_->HandlePushTask(request, &reply, reply_callback); + receiver_->HandleTask(request, &reply, reply_callback); } // Create another request with the same caller id, but a different worker id, @@ -529,7 +529,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) { ++callback_count; ASSERT_TRUE(status.ok()); }; - receiver_->HandlePushTask(request, &reply, reply_callback); + receiver_->HandleTask(request, &reply, reply_callback); } // Push a task request with actor counter 1, but with a different worker id, @@ -544,7 +544,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) { ++callback_count; ASSERT_TRUE(!status.ok()); }; - receiver_->HandlePushTask(request, &reply, reply_callback); + receiver_->HandleTask(request, &reply, reply_callback); } StartIOService(); @@ -568,4 +568,4 @@ int main(int argc, char **argv) { /*log_dir=*/""); ray::RayLog::InstallFailureSignalHandler(); return RUN_ALL_TESTS(); -} +} \ No newline at end of file diff --git a/src/ray/core_worker/test/scheduling_queue_test.cc b/src/ray/core_worker/test/scheduling_queue_test.cc index 0c5da0dea..8c8e60fd5 100644 --- a/src/ray/core_worker/test/scheduling_queue_test.cc +++ b/src/ray/core_worker/test/scheduling_queue_test.cc @@ -39,7 +39,7 @@ TEST(SchedulingQueueTest, TestInOrder) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context); + ActorSchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -60,7 +60,7 @@ TEST(SchedulingQueueTest, TestWaitForObjects) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context); + ActorSchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -86,7 +86,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context); + ActorSchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -104,7 +104,7 @@ TEST(SchedulingQueueTest, TestOutOfOrder) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context); + ActorSchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -122,7 +122,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context); + ActorSchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -145,7 +145,7 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context); + ActorSchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -163,4 +163,4 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} +} \ No newline at end of file diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index ac256febc..b447a082c 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -309,7 +309,7 @@ void CoreWorkerDirectTaskReceiver::Init( client_pool_ = client_pool; } -void CoreWorkerDirectTaskReceiver::HandlePushTask( +void CoreWorkerDirectTaskReceiver::HandleTask( const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use"; @@ -403,32 +403,43 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( } }; - // Run actor creation task immediately on the main thread, without going - // through a scheduling queue. - if (task_spec.IsActorCreationTask()) { - accept_callback(); - return; - } - auto reject_callback = [send_reply_callback]() { send_reply_callback(Status::Invalid("client cancelled stale rpc"), nullptr, nullptr); }; - auto it = scheduling_queue_.find(task_spec.CallerWorkerId()); - if (it == scheduling_queue_.end()) { - auto result = scheduling_queue_.emplace( - task_spec.CallerWorkerId(), - SchedulingQueue(task_main_io_service_, *waiter_, worker_context_)); - it = result.first; - } auto dependencies = task_spec.GetDependencies(); - // Pop the dummy actor dependency. + if (task_spec.IsActorTask()) { + auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId()); + if (it == actor_scheduling_queues_.end()) { + auto result = actor_scheduling_queues_.emplace( + task_spec.CallerWorkerId(), + std::unique_ptr(new ActorSchedulingQueue( + task_main_io_service_, *waiter_, worker_context_))); + it = result.first; + } + + // Pop the dummy actor dependency. // TODO(swang): Remove this with legacy raylet code. dependencies.pop_back(); + it->second->Add(request.sequence_number(), request.client_processed_up_to(), + accept_callback, reject_callback, dependencies); + } else { + // Add the normal task's callbacks to the non-actor scheduling queue. + normal_scheduling_queue_->Add(request.sequence_number(), + request.client_processed_up_to(), accept_callback, + reject_callback, dependencies); } - it->second.Add(request.sequence_number(), request.client_processed_up_to(), - accept_callback, reject_callback, dependencies); +} + +void CoreWorkerDirectTaskReceiver::RunNormalTasksFromQueue() { + // If the scheduling queue is empty, return. + if (normal_scheduling_queue_->TaskQueueEmpty()) { + return; + } + + // Execute as many tasks as there are in the queue, in sequential order. + normal_scheduling_queue_->ScheduleRequests(); } } // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index afaaa93f9..c9cdcd142 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -326,26 +326,41 @@ class BoundedExecutor { boost::asio::thread_pool pool_; }; -/// Used to ensure serial order of task execution per actor handle. -/// See direct_actor.proto for a description of the ordering protocol. +/// Used to implement task queueing at the worker. Abstraction to provide a common +/// interface for actor tasks as well as normal ones. class SchedulingQueue { public: - SchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter, - WorkerContext &worker_context, - int64_t reorder_wait_seconds = kMaxReorderWaitSeconds) + virtual void Add(int64_t seq_no, int64_t client_processed_up_to, + std::function accept_request, + std::function reject_request, + const std::vector &dependencies = {}) = 0; + virtual void ScheduleRequests() = 0; + virtual bool TaskQueueEmpty() const = 0; + virtual ~SchedulingQueue(){}; +}; + +/// Used to ensure serial order of task execution per actor handle. +/// See direct_actor.proto for a description of the ordering protocol. +class ActorSchedulingQueue : public SchedulingQueue { + public: + ActorSchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter, + WorkerContext &worker_context, + int64_t reorder_wait_seconds = kMaxReorderWaitSeconds) : worker_context_(worker_context), reorder_wait_seconds_(reorder_wait_seconds), wait_timer_(main_io_service), main_thread_id_(boost::this_thread::get_id()), waiter_(waiter) {} + bool TaskQueueEmpty() const { return pending_actor_tasks_.empty(); } + + /// Add a new actor task's callbacks to the worker queue. void Add(int64_t seq_no, int64_t client_processed_up_to, std::function accept_request, std::function reject_request, const std::vector &dependencies = {}) { - if (seq_no == -1) { - accept_request(); // A seq_no of -1 means no ordering constraint. - return; - } + // A seq_no of -1 means no ordering constraint. Actor tasks must be executed in order. + RAY_CHECK(seq_no != -1); + RAY_CHECK(boost::this_thread::get_id() == main_thread_id_); if (client_processed_up_to >= next_seq_no_) { RAY_LOG(ERROR) << "client skipping requests " << next_seq_no_ << " to " @@ -353,13 +368,13 @@ class SchedulingQueue { next_seq_no_ = client_processed_up_to + 1; } RAY_LOG(DEBUG) << "Enqueue " << seq_no << " cur seqno " << next_seq_no_; - pending_tasks_[seq_no] = + pending_actor_tasks_[seq_no] = InboundRequest(accept_request, reject_request, dependencies.size() > 0); if (dependencies.size() > 0) { waiter_.Wait(dependencies, [seq_no, this]() { RAY_CHECK(boost::this_thread::get_id() == main_thread_id_); - auto it = pending_tasks_.find(seq_no); - if (it != pending_tasks_.end()) { + auto it = pending_actor_tasks_.find(seq_no); + if (it != pending_actor_tasks_.end()) { it->second.MarkDependenciesSatisfied(); ScheduleRequests(); } @@ -368,7 +383,6 @@ class SchedulingQueue { ScheduleRequests(); } - private: /// Schedules as many requests as possible in sequence. void ScheduleRequests() { // Only call SetMaxActorConcurrency to configure threadpool size when the @@ -390,18 +404,20 @@ class SchedulingQueue { } // Cancel any stale requests that the client doesn't need any longer. - while (!pending_tasks_.empty() && pending_tasks_.begin()->first < next_seq_no_) { - auto head = pending_tasks_.begin(); + while (!pending_actor_tasks_.empty() && + pending_actor_tasks_.begin()->first < next_seq_no_) { + auto head = pending_actor_tasks_.begin(); RAY_LOG(ERROR) << "Cancelling stale RPC with seqno " - << pending_tasks_.begin()->first << " < " << next_seq_no_; + << pending_actor_tasks_.begin()->first << " < " << next_seq_no_; head->second.Cancel(); - pending_tasks_.erase(head); + pending_actor_tasks_.erase(head); } // Process as many in-order requests as we can. - while (!pending_tasks_.empty() && pending_tasks_.begin()->first == next_seq_no_ && - pending_tasks_.begin()->second.CanExecute()) { - auto head = pending_tasks_.begin(); + while (!pending_actor_tasks_.empty() && + pending_actor_tasks_.begin()->first == next_seq_no_ && + pending_actor_tasks_.begin()->second.CanExecute()) { + auto head = pending_actor_tasks_.begin(); auto request = head->second; if (is_asyncio_) { @@ -414,18 +430,19 @@ class SchedulingQueue { // Process normal actor task. request.Accept(); } - pending_tasks_.erase(head); + pending_actor_tasks_.erase(head); next_seq_no_++; } - if (pending_tasks_.empty() || !pending_tasks_.begin()->second.CanExecute()) { + if (pending_actor_tasks_.empty() || + !pending_actor_tasks_.begin()->second.CanExecute()) { // No timeout for object dependency waits. wait_timer_.cancel(); } else { // Set a timeout on the queued tasks to avoid an infinite wait on failure. wait_timer_.expires_from_now(boost::posix_time::seconds(reorder_wait_seconds_)); RAY_LOG(DEBUG) << "waiting for " << next_seq_no_ << " queue size " - << pending_tasks_.size(); + << pending_actor_tasks_.size(); wait_timer_.async_wait([this](const boost::system::error_code &error) { if (error == boost::asio::error::operation_aborted) { return; // time deadline was adjusted @@ -435,16 +452,17 @@ class SchedulingQueue { } } + private: /// Called when we time out waiting for an earlier task to show up. void OnSequencingWaitTimeout() { RAY_CHECK(boost::this_thread::get_id() == main_thread_id_); RAY_LOG(ERROR) << "timed out waiting for " << next_seq_no_ << ", cancelling all queued tasks"; - while (!pending_tasks_.empty()) { - auto head = pending_tasks_.begin(); + while (!pending_actor_tasks_.empty()) { + auto head = pending_actor_tasks_.begin(); head->second.Cancel(); next_seq_no_ = std::max(next_seq_no_, head->first + 1); - pending_tasks_.erase(head); + pending_actor_tasks_.erase(head); } } @@ -453,7 +471,7 @@ class SchedulingQueue { /// Max time in seconds to wait for dependencies to show up. const int64_t reorder_wait_seconds_ = 0; /// Sorted map of (accept, rej) task callbacks keyed by their sequence number. - std::map pending_tasks_; + std::map pending_actor_tasks_; /// The next sequence number we are waiting for to arrive. int64_t next_seq_no_ = 0; /// Timer for waiting on dependencies. Note that this is set on the task main @@ -474,6 +492,47 @@ class SchedulingQueue { friend class SchedulingQueueTest; }; +/// Used to implement the non-actor task queue. These tasks do not have ordering +/// constraints. +class NormalSchedulingQueue : public SchedulingQueue { + public: + NormalSchedulingQueue(){}; + + bool TaskQueueEmpty() const { + absl::MutexLock lock(&mu_); + return pending_normal_tasks_.empty(); + } + + /// Add a new task's callbacks to the worker queue. + void Add(int64_t seq_no, int64_t client_processed_up_to, + std::function accept_request, std::function reject_request, + const std::vector &dependencies = {}) { + absl::MutexLock lock(&mu_); + // Normal tasks should not have ordering constraints. + RAY_CHECK(seq_no == -1); + // Create a InboundRequest object for the new task, and add it to the queue. + pending_normal_tasks_.push_back( + InboundRequest(accept_request, reject_request, dependencies.size() > 0)); + } + + /// Schedules as many requests as possible in sequence. + void ScheduleRequests() { + absl::MutexLock lock(&mu_); + while (!pending_normal_tasks_.empty()) { + auto &head = pending_normal_tasks_.front(); + head.Accept(); + pending_normal_tasks_.pop_front(); + } + } + + private: + /// Protects access to the dequeue below. + mutable absl::Mutex mu_; + /// Queue with (accept, rej) callbacks for non-actor tasks + std::deque pending_normal_tasks_ GUARDED_BY(mu_); + friend class SchedulingQueueTest; +}; + class CoreWorkerDirectTaskReceiver { public: using TaskHandler = @@ -497,13 +556,18 @@ class CoreWorkerDirectTaskReceiver { void Init(std::shared_ptr, rpc::Address rpc_address, std::shared_ptr dependency_waiter); - /// Handle a `PushTask` request. + /// Handle a `PushTask` request. If it's an actor request, this function will enqueue + /// the task and then start scheduling the requests to begin the execution. If it's a + /// non-actor request, this function will just enqueue the task. /// /// \param[in] request The request message. /// \param[out] reply The reply message. /// \param[in] send_reply_callback The callback to be called when the request is done. - void HandlePushTask(const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply, - rpc::SendReplyCallback send_reply_callback); + void HandleTask(const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply, + rpc::SendReplyCallback send_reply_callback); + + /// Pop tasks from the queue and execute them sequentially + void RunNormalTasksFromQueue(); private: // Worker context. @@ -522,7 +586,10 @@ class CoreWorkerDirectTaskReceiver { std::shared_ptr waiter_; /// Queue of pending requests per actor handle. /// TODO(ekl) GC these queues once the handle is no longer active. - std::unordered_map scheduling_queue_; + std::unordered_map> actor_scheduling_queues_; + // Queue of pending normal (non-actor) tasks. + std::unique_ptr normal_scheduling_queue_ = + std::unique_ptr(new NormalSchedulingQueue()); }; } // namespace ray