From f596113fc78483cfd9bd30d781df7bb324e6f024 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Tue, 1 Dec 2020 09:35:54 -0800 Subject: [PATCH] [Core] Actor Retries Out of Order Tasks on Restart (#12338) --- doc/source/fault-tolerance.rst | 6 ++ python/ray/serve/tests/test_long_poll.py | 28 +++-- python/ray/tests/test_failure.py | 70 ++++++++++++ src/ray/core_worker/core_worker.cc | 1 + .../test/direct_actor_transport_test.cc | 58 +++++++++- .../transport/direct_actor_transport.cc | 101 ++++++++++++++---- .../transport/direct_actor_transport.h | 33 ++++-- src/ray/protobuf/common.proto | 4 + 8 files changed, 257 insertions(+), 44 deletions(-) diff --git a/doc/source/fault-tolerance.rst b/doc/source/fault-tolerance.rst index 8a3ae991e..2dd0560fd 100644 --- a/doc/source/fault-tolerance.rst +++ b/doc/source/fault-tolerance.rst @@ -163,6 +163,12 @@ semantics. If the actor’s exact state at the time of failure is needed, the application is responsible for resubmitting all tasks since the last checkpoint. +.. note:: + For :ref:`async or threaded actors `, the tasks might + be completed out of order. Upon actor restart, the system will only retry + *incomplete* task, in their initial submission order. Previously completed + tasks will not be re-executed. + .. _object-reconstruction: Objects diff --git a/python/ray/serve/tests/test_long_poll.py b/python/ray/serve/tests/test_long_poll.py index 7a33ee58d..040219527 100644 --- a/python/ray/serve/tests/test_long_poll.py +++ b/python/ray/serve/tests/test_long_poll.py @@ -2,6 +2,7 @@ import sys import functools import time import asyncio +import os from typing import Dict import pytest @@ -38,34 +39,41 @@ def test_host_standalone(serve_instance): assert "key_2" in result -@pytest.mark.skip( - "Skip until https://github.com/ray-project/ray/issues/11683 fixed " - "since async actor retries is broken.") -def test_long_pull_restarts(serve_instance): +def test_long_poll_restarts(serve_instance): @ray.remote( max_restarts=-1, - # max_task_retries=-1, + max_task_retries=-1, ) class RestartableLongPollerHost: def __init__(self) -> None: print("actor started") self.host = LongPollerHost() self.host.notify_changed("timer", time.time()) + self.should_exit = False async def listen_for_change(self, key_to_ids): - await asyncio.sleep(0.5) + print("listening for change ", key_to_ids) return await self.host.listen_for_change(key_to_ids) - async def exit(self): - sys.exit(1) + async def set_exit(self): + self.should_exit = True + + async def exit_if_set(self): + if self.should_exit: + print("actor exit") + os._exit(1) host = RestartableLongPollerHost.remote() updated_values = ray.get(host.listen_for_change.remote({"timer": -1})) timer: UpdatedObject = updated_values["timer"] on_going_ref = host.listen_for_change.remote({"timer": timer.snapshot_id}) - host.exit.remote() - on_going_ref = host.listen_for_change.remote({"timer": timer.snapshot_id}) + ray.get(host.set_exit.remote()) + # This task should trigger the actor to exit. + # But the retried task will not because self.should_exit is false. + host.exit_if_set.remote() + + # on_going_ref should return succesfully with a differnt value. new_timer: UpdatedObject = ray.get(on_going_ref)["timer"] assert new_timer.snapshot_id != timer.snapshot_id + 1 assert new_timer.object_snapshot != timer.object_snapshot diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 1c01977e4..89167f475 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -1289,6 +1289,76 @@ def test_gcs_server_failiure_report(ray_start_regular, log_pubsub): assert data["pid"] == "gcs_server" +@pytest.mark.parametrize( + "ray_start_regular", [{ + "_system_config": { + "task_retry_delay_ms": 500 + } + }], + indirect=True) +def test_async_actor_task_retries(ray_start_regular): + # https://github.com/ray-project/ray/issues/11683 + + signal = SignalActor.remote() + + @ray.remote + class DyingActor: + def __init__(self): + print("DyingActor init called") + self.should_exit = False + + def set_should_exit(self): + print("DyingActor.set_should_exit called") + self.should_exit = True + + async def get(self, x, wait=False): + print(f"DyingActor.get called with x={x}, wait={wait}") + if self.should_exit: + os._exit(0) + if wait: + await signal.wait.remote() + return x + + # Normal in order actor task retries should work + dying = DyingActor.options( + max_restarts=-1, + max_task_retries=-1, + ).remote() + + assert ray.get(dying.get.remote(1)) == 1 + ray.get(dying.set_should_exit.remote()) + assert ray.get(dying.get.remote(42)) == 42 + + # Now let's try out of order retries: + # Task seqno 0 will return + # Task seqno 1 will be pending and retried later + # Task seqno 2 will return + # Task seqno 3 will crash the actor and retried later + dying = DyingActor.options( + max_restarts=-1, + max_task_retries=-1, + ).remote() + + # seqno 0 + ref_0 = dying.get.remote(0) + assert ray.get(ref_0) == 0 + # seqno 1 + ref_1 = dying.get.remote(1, wait=True) + # seqno 2 + ref_2 = dying.set_should_exit.remote() + assert ray.get(ref_2) is None + # seqno 3, this will crash the actor because previous task set should exit + # to true. + ref_3 = dying.get.remote(3) + + # At this point the actor should be restarted. The two pending tasks + # [ref_1, ref_3] should be retried, but not the completed tasks [ref_0, + # ref_2]. Critically, if ref_2 was retried, ref_3 can never return. + ray.get(signal.send.remote()) + assert ray.get(ref_1) == 1 + assert ray.get(ref_3) == 3 + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 97eb0dc83..109440a5c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -436,6 +436,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }; task_manager_.reset(new TaskManager( memory_store_, reference_counter_, + /* retry_task_callback= */ [this](TaskSpecification &spec, bool delay) { if (delay) { // Retry after a delay to emulate the existing Raylet reconstruction diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 3a35e62a4..dffb8c4b5 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -65,18 +65,18 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { callbacks.push_back(callback); } - bool ReplyPushTask(Status status = Status::OK()) { + bool ReplyPushTask(Status status = Status::OK(), size_t index = 0) { if (callbacks.size() == 0) { return false; } - auto callback = callbacks.front(); + auto callback = callbacks.at(index); callback(status, rpc::PushTaskReply()); - callbacks.pop_front(); + callbacks.erase(callbacks.begin() + index); return true; } rpc::Address addr; - std::list> callbacks; + std::vector> callbacks; std::vector received_seq_nos; }; @@ -345,6 +345,56 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) { ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 2, 0, 1)); } +TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) { + rpc::Address addr; + auto worker_id = WorkerID::FromRandom(); + addr.set_worker_id(worker_id.Binary()); + ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0); + submitter_.AddActorQueueIfNotExists(actor_id); + gcs::ActorTableData actor_data; + addr.set_port(0); + submitter_.ConnectActor(actor_id, addr, 0); + ASSERT_EQ(worker_client_->callbacks.size(), 0); + + // Create four tasks for the actor. + auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0); + auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1); + auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2); + // Submit three tasks. + ASSERT_TRUE(submitter_.SubmitTask(task1).ok()); + ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + ASSERT_TRUE(submitter_.SubmitTask(task3).ok()); + // All tasks will eventually finish. + EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(3); + + // Tasks 2 will be retried + EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _)) + .Times(1) + .WillRepeatedly(Return(true)); + // First task finishes. Second task hang. Third task finishes. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK(), /*index=*/0)); + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK(), /*index=*/1)); + // Simulate the actor failing. + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""), /*index=*/0)); + submitter_.DisconnectActor(actor_id, 0, /*dead=*/false); + + // Actor gets restarted. + addr.set_port(1); + submitter_.ConnectActor(actor_id, addr, 1); + // Upon re-connect, task 2 (failed) and 3 (completed) should be both retried. + // Retry task 2 manually (simulating task_finisher and SendPendingTask's behavior) + // Retry task 3 should happen via event loop + ASSERT_TRUE(submitter_.SubmitTask(task2).ok()); + + // Both task2 and task3 should be submitted. + ASSERT_EQ(worker_client_->callbacks.size(), 2); + + // Finishes all task + while (!worker_client_->callbacks.empty()) { + ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK())); + } +} + TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) { rpc::Address addr; auto worker_id = WorkerID::FromRandom(); diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index b447a082c..19d911de1 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -164,8 +164,10 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, // actor task, so we can ignore completed tasks from old epochs. RAY_LOG(INFO) << "Resetting caller starts at for actor " << actor_id << " from " << queue->second.caller_starts_at << " to " - << queue->second.num_completed_tasks; - queue->second.caller_starts_at = queue->second.num_completed_tasks; + << queue->second.next_task_reply_position; + queue->second.caller_starts_at = queue->second.next_task_reply_position; + + ResendOutOfOrderTasks(actor_id); SendPendingTasks(actor_id); } @@ -223,33 +225,54 @@ void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_i if (!it->second.rpc_client) { return; } + auto &client_queue = it->second; // Check if there is a pending force kill. If there is, send it and disconnect the // client. - if (it->second.pending_force_kill) { + if (client_queue.pending_force_kill) { RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id; // It's okay if this fails because this means the worker is already dead. - it->second.rpc_client->KillActor(*it->second.pending_force_kill, nullptr); - it->second.pending_force_kill.reset(); + client_queue.rpc_client->KillActor(*client_queue.pending_force_kill, nullptr); + client_queue.pending_force_kill.reset(); } // Submit all pending requests. - auto &requests = it->second.requests; + auto &requests = client_queue.requests; auto head = requests.begin(); - while (head != requests.end() && head->first <= it->second.next_send_position && - head->second.second) { + while (head != requests.end() && + (/*seqno*/ head->first <= client_queue.next_send_position) && + (/*dependencies_resolved*/ head->second.second)) { // If the task has been sent before, skip the other tasks in the send // queue. - bool skip_queue = head->first < it->second.next_send_position; + bool skip_queue = head->first < client_queue.next_send_position; auto task_spec = std::move(head->second.first); head = requests.erase(head); - RAY_CHECK(!it->second.worker_id.empty()); - PushActorTask(it->second, task_spec, skip_queue); - it->second.next_send_position++; + RAY_CHECK(!client_queue.worker_id.empty()); + PushActorTask(client_queue, task_spec, skip_queue); + client_queue.next_send_position++; } } +void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &actor_id) { + auto it = client_queues_.find(actor_id); + RAY_CHECK(it != client_queues_.end()); + if (!it->second.rpc_client) { + return; + } + auto &client_queue = it->second; + RAY_CHECK(!client_queue.worker_id.empty()); + + for (const auto &completed_task : client_queue.out_of_order_completed_tasks) { + // Making a copy here because we are flipping a flag and the original value is + // const. + auto task_spec = completed_task.second; + task_spec.GetMutableMessage().set_skip_execution(true); + PushActorTask(client_queue, task_spec, /*skip_queue=*/true); + } + client_queue.out_of_order_completed_tasks.clear(); +} + void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue, const TaskSpecification &task_spec, bool skip_queue) { @@ -266,30 +289,61 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue, const auto task_id = task_spec.TaskId(); const auto actor_id = task_spec.ActorId(); - const auto counter = task_spec.ActorCounter(); + const auto actor_counter = task_spec.ActorCounter(); + const auto task_skipped = task_spec.GetMessage().skip_execution(); RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id - << " actor counter " << counter << " seq no " + << " actor counter " << actor_counter << " seq no " << request->sequence_number(); rpc::Address addr(queue.rpc_client->Addr()); queue.rpc_client->PushActorTask( std::move(request), skip_queue, - [this, addr, task_id, actor_id](Status status, const rpc::PushTaskReply &reply) { + [this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped]( + Status status, const rpc::PushTaskReply &reply) { bool increment_completed_tasks = true; - if (!status.ok()) { + + if (task_skipped) { + // NOTE(simon):Increment the task counter regardless of the status because the + // reply for a previously completed task. We are not calling CompletePendingTask + // because the tasks are pushed directly to the actor, not placed on any queues + // in task_finisher_. + } else if (status.ok()) { + task_finisher_->CompletePendingTask(task_id, reply, addr); + } else { bool will_retry = task_finisher_->PendingTaskFailed( task_id, rpc::ErrorType::ACTOR_DIED, &status); if (will_retry) { increment_completed_tasks = false; } - } else { - task_finisher_->CompletePendingTask(task_id, reply, addr); } if (increment_completed_tasks) { absl::MutexLock lock(&mu_); - auto queue = client_queues_.find(actor_id); - RAY_CHECK(queue != client_queues_.end()); - queue->second.num_completed_tasks++; + auto queue_pair = client_queues_.find(actor_id); + RAY_CHECK(queue_pair != client_queues_.end()); + auto &queue = queue_pair->second; + + // Try to increment queue.next_task_reply_position consecutively until we + // cannot. In the case of tasks not received in order, the following block + // ensure queue.next_task_reply_position are incremented to the max possible + // value. + queue.out_of_order_completed_tasks.insert({actor_counter, task_spec}); + auto min_completed_task = queue.out_of_order_completed_tasks.begin(); + while (min_completed_task != queue.out_of_order_completed_tasks.end()) { + if (min_completed_task->first == queue.next_task_reply_position) { + queue.next_task_reply_position++; + // increment the iterator and erase the old value + queue.out_of_order_completed_tasks.erase(min_completed_task++); + } else { + break; + } + } + + RAY_LOG(DEBUG) << "Got PushTaskReply for actor " << actor_id + << " with actor_counter " << actor_counter + << " new queue.next_task_reply_position is " + << queue.next_task_reply_position + << " and size of out_of_order_tasks set is " + << queue.out_of_order_completed_tasks.size(); } }); } @@ -342,6 +396,11 @@ void CoreWorkerDirectTaskReceiver::HandleTask( } auto accept_callback = [this, reply, send_reply_callback, task_spec, resource_ids]() { + if (task_spec.GetMessage().skip_execution()) { + send_reply_callback(Status::OK(), nullptr, nullptr); + return; + } + auto num_returns = task_spec.NumReturns(); if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) { // Decrease to account for the dummy object id. diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index c9cdcd142..cb7637c9f 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -147,17 +147,17 @@ class CoreWorkerDirectActorTaskSubmitter /// (0-5) so far, and have received a successful reply for 4 tasks (0-3). /// 0 1 2 3 4 5 6 7 8 9 /// ^ next_send_position - /// ^ num_completed_tasks + /// ^ next_task_reply_position /// ^ caller_starts_at /// /// Suppose the actor crashes and recovers. Then, caller_starts_at is reset - /// to the current num_completed_tasks. caller_starts_at is then subtracted + /// to the current next_task_reply_position. caller_starts_at is then subtracted /// from each task's counter, so the recovered actor will receive the /// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively. /// Therefore, the recovered actor will restart execution from task 4. /// 0 1 2 3 4 5 6 7 8 9 /// ^ next_send_position - /// ^ num_completed_tasks + /// ^ next_task_reply_position /// ^ caller_starts_at /// /// New actor tasks will continue to be sent even while tasks are being @@ -167,7 +167,7 @@ class CoreWorkerDirectActorTaskSubmitter /// received a successful reply for task 4. /// 0 1 2 3 4 5 6 7 8 9 /// ^ next_send_position - /// ^ num_completed_tasks + /// ^ next_task_reply_position /// ^ caller_starts_at /// /// The send position of the next task to send to this actor. This sequence @@ -181,8 +181,18 @@ class CoreWorkerDirectActorTaskSubmitter /// that we will never send to the actor again. This is used to reset /// caller_starts_at if the actor dies and is restarted. We only include /// tasks that will not be sent again, to support automatic task retry on - /// actor failure. - uint64_t num_completed_tasks = 0; + /// actor failure. This value only tracks consecutive tasks that are completed. + /// Tasks completed out of order will be cached in out_of_completed_tasks first. + uint64_t next_task_reply_position = 0; + + /// The temporary container for tasks completed out of order. It can happen in + /// async or threaded actor mode. This map is used to store the seqno and task + /// spec for (1) increment next_task_reply_position later when the in order tasks are + /// returned (2) resend the tasks to restarted actor so retried tasks can maintain + /// ordering. + // NOTE(simon): consider absl::btree_set for performance, but it requires updating + // abseil. + std::map out_of_order_completed_tasks; /// A force-kill request that should be sent to the actor once an RPC /// client to the actor is available. @@ -202,13 +212,18 @@ class CoreWorkerDirectActorTaskSubmitter bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); /// Send all pending tasks for an actor. - /// Note that this function doesn't take lock, the caller is expected to hold - /// `mutex_` before calling this function. /// /// \param[in] actor_id Actor ID. /// \return Void. void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Resend all previously-received, out-of-order, received tasks for an actor. + /// When sending these tasks, the tasks will have the flag skip_execution=true. + /// + /// \param[in] actor_id Actor ID. + /// \return Void. + void ResendOutOfOrderTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Disconnect the RPC client for an actor. void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_); @@ -226,7 +241,7 @@ class CoreWorkerDirectActorTaskSubmitter absl::flat_hash_map client_queues_ GUARDED_BY(mu_); - /// Resolve direct call object dependencies; + /// Resolve direct call object dependencies. LocalDependencyResolver resolver_; /// Used to complete tasks. diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index dde765d22..b894f92aa 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -196,6 +196,10 @@ message TaskSpec { bool placement_group_capture_child_tasks = 20; // Environment variables to override for this task map override_environment_variables = 21; + // Whether or not to skip the execution of this task. When it's true, + // the receiver will not execute the task. This field is used by async actors + // to guarantee task submission order after restart. + bool skip_execution = 22; } message Bundle {