Queueing non-actor tasks at the workers (#11051)

* separated adding tasks to queue and executing them (worker side)

* linting

* first review

* second rev

* rev3, all tests passing locally

* linting

* rev4

* linting

* finished rev4, all tests passing locally (mac)

* rev4, all tests passing locally

* linting

* rev5

* bug fix

* hopefully fixed build

* nvm

* ptr cast

* linting

* no special treatment for actor creation tasks
This commit is contained in:
Gabriele Oliaro
2020-11-12 12:44:13 -05:00
committed by GitHub
parent 02c02369ca
commit 4744ed01f7
5 changed files with 161 additions and 67 deletions
+22 -6
View File
@@ -1958,13 +1958,29 @@ void CoreWorker::HandlePushTask(const rpc::PushTaskRequest &request,
return;
}
// Increment the task_queue_length
task_queue_length_ += 1;
task_execution_service_.post([=] {
// We have posted an exit task onto the main event loop,
// so shouldn't bother executing any further work.
if (exiting_) return;
direct_task_receiver_->HandlePushTask(request, reply, send_reply_callback);
});
// For actor tasks, we just need to post a HandleActorTask instance to the task
// execution service.
if (request.task_spec().type() == TaskType::ACTOR_TASK) {
task_execution_service_.post([=] {
// We have posted an exit task onto the main event loop,
// so shouldn't bother executing any further work.
if (exiting_) return;
direct_task_receiver_->HandleTask(request, reply, send_reply_callback);
});
} else {
// Normal tasks are enqueued here, and we post a RunNormalTasksFromQueue instance to
// the task execution service.
direct_task_receiver_->HandleTask(request, reply, send_reply_callback);
task_execution_service_.post([=] {
// We have posted an exit task onto the main event loop,
// so shouldn't bother executing any further work.
if (exiting_) return;
direct_task_receiver_->RunNormalTasksFromQueue();
});
}
}
void CoreWorker::HandleDirectActorCallArgWaitComplete(
@@ -497,7 +497,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) {
++callback_count;
ASSERT_TRUE(status.ok());
};
receiver_->HandlePushTask(request, &reply, reply_callback);
receiver_->HandleTask(request, &reply, reply_callback);
}
// Push a task request with actor counter 1. This should scucceed
@@ -511,7 +511,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) {
++callback_count;
ASSERT_TRUE(status.ok());
};
receiver_->HandlePushTask(request, &reply, reply_callback);
receiver_->HandleTask(request, &reply, reply_callback);
}
// Create another request with the same caller id, but a different worker id,
@@ -529,7 +529,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) {
++callback_count;
ASSERT_TRUE(status.ok());
};
receiver_->HandlePushTask(request, &reply, reply_callback);
receiver_->HandleTask(request, &reply, reply_callback);
}
// Push a task request with actor counter 1, but with a different worker id,
@@ -544,7 +544,7 @@ TEST_F(DirectActorReceiverTest, TestNewTaskFromDifferentWorker) {
++callback_count;
ASSERT_TRUE(!status.ok());
};
receiver_->HandlePushTask(request, &reply, reply_callback);
receiver_->HandleTask(request, &reply, reply_callback);
}
StartIOService();
@@ -568,4 +568,4 @@ int main(int argc, char **argv) {
/*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
return RUN_ALL_TESTS();
}
}
@@ -39,7 +39,7 @@ TEST(SchedulingQueueTest, TestInOrder) {
boost::asio::io_service io_service;
MockWaiter waiter;
WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil());
SchedulingQueue queue(io_service, waiter, context);
ActorSchedulingQueue queue(io_service, waiter, context);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@@ -60,7 +60,7 @@ TEST(SchedulingQueueTest, TestWaitForObjects) {
boost::asio::io_service io_service;
MockWaiter waiter;
WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil());
SchedulingQueue queue(io_service, waiter, context);
ActorSchedulingQueue queue(io_service, waiter, context);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@@ -86,7 +86,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) {
boost::asio::io_service io_service;
MockWaiter waiter;
WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil());
SchedulingQueue queue(io_service, waiter, context);
ActorSchedulingQueue queue(io_service, waiter, context);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@@ -104,7 +104,7 @@ TEST(SchedulingQueueTest, TestOutOfOrder) {
boost::asio::io_service io_service;
MockWaiter waiter;
WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil());
SchedulingQueue queue(io_service, waiter, context);
ActorSchedulingQueue queue(io_service, waiter, context);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@@ -122,7 +122,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) {
boost::asio::io_service io_service;
MockWaiter waiter;
WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil());
SchedulingQueue queue(io_service, waiter, context);
ActorSchedulingQueue queue(io_service, waiter, context);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@@ -145,7 +145,7 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) {
boost::asio::io_service io_service;
MockWaiter waiter;
WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::Nil());
SchedulingQueue queue(io_service, waiter, context);
ActorSchedulingQueue queue(io_service, waiter, context);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
@@ -163,4 +163,4 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) {
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}
@@ -309,7 +309,7 @@ void CoreWorkerDirectTaskReceiver::Init(
client_pool_ = client_pool;
}
void CoreWorkerDirectTaskReceiver::HandlePushTask(
void CoreWorkerDirectTaskReceiver::HandleTask(
const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use";
@@ -403,32 +403,43 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
}
};
// Run actor creation task immediately on the main thread, without going
// through a scheduling queue.
if (task_spec.IsActorCreationTask()) {
accept_callback();
return;
}
auto reject_callback = [send_reply_callback]() {
send_reply_callback(Status::Invalid("client cancelled stale rpc"), nullptr, nullptr);
};
auto it = scheduling_queue_.find(task_spec.CallerWorkerId());
if (it == scheduling_queue_.end()) {
auto result = scheduling_queue_.emplace(
task_spec.CallerWorkerId(),
SchedulingQueue(task_main_io_service_, *waiter_, worker_context_));
it = result.first;
}
auto dependencies = task_spec.GetDependencies();
// Pop the dummy actor dependency.
if (task_spec.IsActorTask()) {
auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId());
if (it == actor_scheduling_queues_.end()) {
auto result = actor_scheduling_queues_.emplace(
task_spec.CallerWorkerId(),
std::unique_ptr<SchedulingQueue>(new ActorSchedulingQueue(
task_main_io_service_, *waiter_, worker_context_)));
it = result.first;
}
// Pop the dummy actor dependency.
// TODO(swang): Remove this with legacy raylet code.
dependencies.pop_back();
it->second->Add(request.sequence_number(), request.client_processed_up_to(),
accept_callback, reject_callback, dependencies);
} else {
// Add the normal task's callbacks to the non-actor scheduling queue.
normal_scheduling_queue_->Add(request.sequence_number(),
request.client_processed_up_to(), accept_callback,
reject_callback, dependencies);
}
it->second.Add(request.sequence_number(), request.client_processed_up_to(),
accept_callback, reject_callback, dependencies);
}
void CoreWorkerDirectTaskReceiver::RunNormalTasksFromQueue() {
// If the scheduling queue is empty, return.
if (normal_scheduling_queue_->TaskQueueEmpty()) {
return;
}
// Execute as many tasks as there are in the queue, in sequential order.
normal_scheduling_queue_->ScheduleRequests();
}
} // namespace ray
@@ -326,26 +326,41 @@ class BoundedExecutor {
boost::asio::thread_pool pool_;
};
/// Used to ensure serial order of task execution per actor handle.
/// See direct_actor.proto for a description of the ordering protocol.
/// Used to implement task queueing at the worker. Abstraction to provide a common
/// interface for actor tasks as well as normal ones.
class SchedulingQueue {
public:
SchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter,
WorkerContext &worker_context,
int64_t reorder_wait_seconds = kMaxReorderWaitSeconds)
virtual void Add(int64_t seq_no, int64_t client_processed_up_to,
std::function<void()> accept_request,
std::function<void()> reject_request,
const std::vector<rpc::ObjectReference> &dependencies = {}) = 0;
virtual void ScheduleRequests() = 0;
virtual bool TaskQueueEmpty() const = 0;
virtual ~SchedulingQueue(){};
};
/// Used to ensure serial order of task execution per actor handle.
/// See direct_actor.proto for a description of the ordering protocol.
class ActorSchedulingQueue : public SchedulingQueue {
public:
ActorSchedulingQueue(boost::asio::io_service &main_io_service, DependencyWaiter &waiter,
WorkerContext &worker_context,
int64_t reorder_wait_seconds = kMaxReorderWaitSeconds)
: worker_context_(worker_context),
reorder_wait_seconds_(reorder_wait_seconds),
wait_timer_(main_io_service),
main_thread_id_(boost::this_thread::get_id()),
waiter_(waiter) {}
bool TaskQueueEmpty() const { return pending_actor_tasks_.empty(); }
/// Add a new actor task's callbacks to the worker queue.
void Add(int64_t seq_no, int64_t client_processed_up_to,
std::function<void()> accept_request, std::function<void()> reject_request,
const std::vector<rpc::ObjectReference> &dependencies = {}) {
if (seq_no == -1) {
accept_request(); // A seq_no of -1 means no ordering constraint.
return;
}
// A seq_no of -1 means no ordering constraint. Actor tasks must be executed in order.
RAY_CHECK(seq_no != -1);
RAY_CHECK(boost::this_thread::get_id() == main_thread_id_);
if (client_processed_up_to >= next_seq_no_) {
RAY_LOG(ERROR) << "client skipping requests " << next_seq_no_ << " to "
@@ -353,13 +368,13 @@ class SchedulingQueue {
next_seq_no_ = client_processed_up_to + 1;
}
RAY_LOG(DEBUG) << "Enqueue " << seq_no << " cur seqno " << next_seq_no_;
pending_tasks_[seq_no] =
pending_actor_tasks_[seq_no] =
InboundRequest(accept_request, reject_request, dependencies.size() > 0);
if (dependencies.size() > 0) {
waiter_.Wait(dependencies, [seq_no, this]() {
RAY_CHECK(boost::this_thread::get_id() == main_thread_id_);
auto it = pending_tasks_.find(seq_no);
if (it != pending_tasks_.end()) {
auto it = pending_actor_tasks_.find(seq_no);
if (it != pending_actor_tasks_.end()) {
it->second.MarkDependenciesSatisfied();
ScheduleRequests();
}
@@ -368,7 +383,6 @@ class SchedulingQueue {
ScheduleRequests();
}
private:
/// Schedules as many requests as possible in sequence.
void ScheduleRequests() {
// Only call SetMaxActorConcurrency to configure threadpool size when the
@@ -390,18 +404,20 @@ class SchedulingQueue {
}
// Cancel any stale requests that the client doesn't need any longer.
while (!pending_tasks_.empty() && pending_tasks_.begin()->first < next_seq_no_) {
auto head = pending_tasks_.begin();
while (!pending_actor_tasks_.empty() &&
pending_actor_tasks_.begin()->first < next_seq_no_) {
auto head = pending_actor_tasks_.begin();
RAY_LOG(ERROR) << "Cancelling stale RPC with seqno "
<< pending_tasks_.begin()->first << " < " << next_seq_no_;
<< pending_actor_tasks_.begin()->first << " < " << next_seq_no_;
head->second.Cancel();
pending_tasks_.erase(head);
pending_actor_tasks_.erase(head);
}
// Process as many in-order requests as we can.
while (!pending_tasks_.empty() && pending_tasks_.begin()->first == next_seq_no_ &&
pending_tasks_.begin()->second.CanExecute()) {
auto head = pending_tasks_.begin();
while (!pending_actor_tasks_.empty() &&
pending_actor_tasks_.begin()->first == next_seq_no_ &&
pending_actor_tasks_.begin()->second.CanExecute()) {
auto head = pending_actor_tasks_.begin();
auto request = head->second;
if (is_asyncio_) {
@@ -414,18 +430,19 @@ class SchedulingQueue {
// Process normal actor task.
request.Accept();
}
pending_tasks_.erase(head);
pending_actor_tasks_.erase(head);
next_seq_no_++;
}
if (pending_tasks_.empty() || !pending_tasks_.begin()->second.CanExecute()) {
if (pending_actor_tasks_.empty() ||
!pending_actor_tasks_.begin()->second.CanExecute()) {
// No timeout for object dependency waits.
wait_timer_.cancel();
} else {
// Set a timeout on the queued tasks to avoid an infinite wait on failure.
wait_timer_.expires_from_now(boost::posix_time::seconds(reorder_wait_seconds_));
RAY_LOG(DEBUG) << "waiting for " << next_seq_no_ << " queue size "
<< pending_tasks_.size();
<< pending_actor_tasks_.size();
wait_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return; // time deadline was adjusted
@@ -435,16 +452,17 @@ class SchedulingQueue {
}
}
private:
/// Called when we time out waiting for an earlier task to show up.
void OnSequencingWaitTimeout() {
RAY_CHECK(boost::this_thread::get_id() == main_thread_id_);
RAY_LOG(ERROR) << "timed out waiting for " << next_seq_no_
<< ", cancelling all queued tasks";
while (!pending_tasks_.empty()) {
auto head = pending_tasks_.begin();
while (!pending_actor_tasks_.empty()) {
auto head = pending_actor_tasks_.begin();
head->second.Cancel();
next_seq_no_ = std::max(next_seq_no_, head->first + 1);
pending_tasks_.erase(head);
pending_actor_tasks_.erase(head);
}
}
@@ -453,7 +471,7 @@ class SchedulingQueue {
/// Max time in seconds to wait for dependencies to show up.
const int64_t reorder_wait_seconds_ = 0;
/// Sorted map of (accept, rej) task callbacks keyed by their sequence number.
std::map<int64_t, InboundRequest> pending_tasks_;
std::map<int64_t, InboundRequest> pending_actor_tasks_;
/// The next sequence number we are waiting for to arrive.
int64_t next_seq_no_ = 0;
/// Timer for waiting on dependencies. Note that this is set on the task main
@@ -474,6 +492,47 @@ class SchedulingQueue {
friend class SchedulingQueueTest;
};
/// Used to implement the non-actor task queue. These tasks do not have ordering
/// constraints.
class NormalSchedulingQueue : public SchedulingQueue {
public:
NormalSchedulingQueue(){};
bool TaskQueueEmpty() const {
absl::MutexLock lock(&mu_);
return pending_normal_tasks_.empty();
}
/// Add a new task's callbacks to the worker queue.
void Add(int64_t seq_no, int64_t client_processed_up_to,
std::function<void()> accept_request, std::function<void()> reject_request,
const std::vector<rpc::ObjectReference> &dependencies = {}) {
absl::MutexLock lock(&mu_);
// Normal tasks should not have ordering constraints.
RAY_CHECK(seq_no == -1);
// Create a InboundRequest object for the new task, and add it to the queue.
pending_normal_tasks_.push_back(
InboundRequest(accept_request, reject_request, dependencies.size() > 0));
}
/// Schedules as many requests as possible in sequence.
void ScheduleRequests() {
absl::MutexLock lock(&mu_);
while (!pending_normal_tasks_.empty()) {
auto &head = pending_normal_tasks_.front();
head.Accept();
pending_normal_tasks_.pop_front();
}
}
private:
/// Protects access to the dequeue below.
mutable absl::Mutex mu_;
/// Queue with (accept, rej) callbacks for non-actor tasks
std::deque<InboundRequest> pending_normal_tasks_ GUARDED_BY(mu_);
friend class SchedulingQueueTest;
};
class CoreWorkerDirectTaskReceiver {
public:
using TaskHandler =
@@ -497,13 +556,18 @@ class CoreWorkerDirectTaskReceiver {
void Init(std::shared_ptr<rpc::CoreWorkerClientPool>, rpc::Address rpc_address,
std::shared_ptr<DependencyWaiter> dependency_waiter);
/// Handle a `PushTask` request.
/// Handle a `PushTask` request. If it's an actor request, this function will enqueue
/// the task and then start scheduling the requests to begin the execution. If it's a
/// non-actor request, this function will just enqueue the task.
///
/// \param[in] request The request message.
/// \param[out] reply The reply message.
/// \param[in] send_reply_callback The callback to be called when the request is done.
void HandlePushTask(const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply,
rpc::SendReplyCallback send_reply_callback);
void HandleTask(const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply,
rpc::SendReplyCallback send_reply_callback);
/// Pop tasks from the queue and execute them sequentially
void RunNormalTasksFromQueue();
private:
// Worker context.
@@ -522,7 +586,10 @@ class CoreWorkerDirectTaskReceiver {
std::shared_ptr<DependencyWaiter> waiter_;
/// Queue of pending requests per actor handle.
/// TODO(ekl) GC these queues once the handle is no longer active.
std::unordered_map<WorkerID, SchedulingQueue> scheduling_queue_;
std::unordered_map<WorkerID, std::unique_ptr<SchedulingQueue>> actor_scheduling_queues_;
// Queue of pending normal (non-actor) tasks.
std::unique_ptr<SchedulingQueue> normal_scheduling_queue_ =
std::unique_ptr<SchedulingQueue>(new NormalSchedulingQueue());
};
} // namespace ray