[Core] Actor Retries Out of Order Tasks on Restart (#12338)

This commit is contained in:
Simon Mo
2020-12-01 09:35:54 -08:00
committed by GitHub
parent f6f3cc9af1
commit f596113fc7
8 changed files with 257 additions and 44 deletions
+6
View File
@@ -163,6 +163,12 @@ semantics. If the actors exact state at the time of failure is needed, the
application is responsible for resubmitting all tasks since the last
checkpoint.
.. note::
For :ref:`async or threaded actors <async-actors>`, the tasks might
be completed out of order. Upon actor restart, the system will only retry
*incomplete* task, in their initial submission order. Previously completed
tasks will not be re-executed.
.. _object-reconstruction:
Objects
+18 -10
View File
@@ -2,6 +2,7 @@ import sys
import functools
import time
import asyncio
import os
from typing import Dict
import pytest
@@ -38,34 +39,41 @@ def test_host_standalone(serve_instance):
assert "key_2" in result
@pytest.mark.skip(
"Skip until https://github.com/ray-project/ray/issues/11683 fixed "
"since async actor retries is broken.")
def test_long_pull_restarts(serve_instance):
def test_long_poll_restarts(serve_instance):
@ray.remote(
max_restarts=-1,
# max_task_retries=-1,
max_task_retries=-1,
)
class RestartableLongPollerHost:
def __init__(self) -> None:
print("actor started")
self.host = LongPollerHost()
self.host.notify_changed("timer", time.time())
self.should_exit = False
async def listen_for_change(self, key_to_ids):
await asyncio.sleep(0.5)
print("listening for change ", key_to_ids)
return await self.host.listen_for_change(key_to_ids)
async def exit(self):
sys.exit(1)
async def set_exit(self):
self.should_exit = True
async def exit_if_set(self):
if self.should_exit:
print("actor exit")
os._exit(1)
host = RestartableLongPollerHost.remote()
updated_values = ray.get(host.listen_for_change.remote({"timer": -1}))
timer: UpdatedObject = updated_values["timer"]
on_going_ref = host.listen_for_change.remote({"timer": timer.snapshot_id})
host.exit.remote()
on_going_ref = host.listen_for_change.remote({"timer": timer.snapshot_id})
ray.get(host.set_exit.remote())
# This task should trigger the actor to exit.
# But the retried task will not because self.should_exit is false.
host.exit_if_set.remote()
# on_going_ref should return succesfully with a differnt value.
new_timer: UpdatedObject = ray.get(on_going_ref)["timer"]
assert new_timer.snapshot_id != timer.snapshot_id + 1
assert new_timer.object_snapshot != timer.object_snapshot
+70
View File
@@ -1289,6 +1289,76 @@ def test_gcs_server_failiure_report(ray_start_regular, log_pubsub):
assert data["pid"] == "gcs_server"
@pytest.mark.parametrize(
"ray_start_regular", [{
"_system_config": {
"task_retry_delay_ms": 500
}
}],
indirect=True)
def test_async_actor_task_retries(ray_start_regular):
# https://github.com/ray-project/ray/issues/11683
signal = SignalActor.remote()
@ray.remote
class DyingActor:
def __init__(self):
print("DyingActor init called")
self.should_exit = False
def set_should_exit(self):
print("DyingActor.set_should_exit called")
self.should_exit = True
async def get(self, x, wait=False):
print(f"DyingActor.get called with x={x}, wait={wait}")
if self.should_exit:
os._exit(0)
if wait:
await signal.wait.remote()
return x
# Normal in order actor task retries should work
dying = DyingActor.options(
max_restarts=-1,
max_task_retries=-1,
).remote()
assert ray.get(dying.get.remote(1)) == 1
ray.get(dying.set_should_exit.remote())
assert ray.get(dying.get.remote(42)) == 42
# Now let's try out of order retries:
# Task seqno 0 will return
# Task seqno 1 will be pending and retried later
# Task seqno 2 will return
# Task seqno 3 will crash the actor and retried later
dying = DyingActor.options(
max_restarts=-1,
max_task_retries=-1,
).remote()
# seqno 0
ref_0 = dying.get.remote(0)
assert ray.get(ref_0) == 0
# seqno 1
ref_1 = dying.get.remote(1, wait=True)
# seqno 2
ref_2 = dying.set_should_exit.remote()
assert ray.get(ref_2) is None
# seqno 3, this will crash the actor because previous task set should exit
# to true.
ref_3 = dying.get.remote(3)
# At this point the actor should be restarted. The two pending tasks
# [ref_1, ref_3] should be retried, but not the completed tasks [ref_0,
# ref_2]. Critically, if ref_2 was retried, ref_3 can never return.
ray.get(signal.send.remote())
assert ray.get(ref_1) == 1
assert ray.get(ref_3) == 3
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+1
View File
@@ -436,6 +436,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
};
task_manager_.reset(new TaskManager(
memory_store_, reference_counter_,
/* retry_task_callback= */
[this](TaskSpecification &spec, bool delay) {
if (delay) {
// Retry after a delay to emulate the existing Raylet reconstruction
@@ -65,18 +65,18 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface {
callbacks.push_back(callback);
}
bool ReplyPushTask(Status status = Status::OK()) {
bool ReplyPushTask(Status status = Status::OK(), size_t index = 0) {
if (callbacks.size() == 0) {
return false;
}
auto callback = callbacks.front();
auto callback = callbacks.at(index);
callback(status, rpc::PushTaskReply());
callbacks.pop_front();
callbacks.erase(callbacks.begin() + index);
return true;
}
rpc::Address addr;
std::list<rpc::ClientCallback<rpc::PushTaskReply>> callbacks;
std::vector<rpc::ClientCallback<rpc::PushTaskReply>> callbacks;
std::vector<uint64_t> received_seq_nos;
};
@@ -345,6 +345,56 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) {
ASSERT_THAT(worker_client_->received_seq_nos, ElementsAre(0, 1, 2, 2, 0, 1));
}
TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create four tasks for the actor.
auto task1 = CreateActorTaskHelper(actor_id, worker_id, 0);
auto task2 = CreateActorTaskHelper(actor_id, worker_id, 1);
auto task3 = CreateActorTaskHelper(actor_id, worker_id, 2);
// Submit three tasks.
ASSERT_TRUE(submitter_.SubmitTask(task1).ok());
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
ASSERT_TRUE(submitter_.SubmitTask(task3).ok());
// All tasks will eventually finish.
EXPECT_CALL(*task_finisher_, CompletePendingTask(task1.TaskId(), _, _)).Times(3);
// Tasks 2 will be retried
EXPECT_CALL(*task_finisher_, PendingTaskFailed(task2.TaskId(), _, _))
.Times(1)
.WillRepeatedly(Return(true));
// First task finishes. Second task hang. Third task finishes.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK(), /*index=*/0));
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK(), /*index=*/1));
// Simulate the actor failing.
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError(""), /*index=*/0));
submitter_.DisconnectActor(actor_id, 0, /*dead=*/false);
// Actor gets restarted.
addr.set_port(1);
submitter_.ConnectActor(actor_id, addr, 1);
// Upon re-connect, task 2 (failed) and 3 (completed) should be both retried.
// Retry task 2 manually (simulating task_finisher and SendPendingTask's behavior)
// Retry task 3 should happen via event loop
ASSERT_TRUE(submitter_.SubmitTask(task2).ok());
// Both task2 and task3 should be submitted.
ASSERT_EQ(worker_client_->callbacks.size(), 2);
// Finishes all task
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::OK()));
}
}
TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
rpc::Address addr;
auto worker_id = WorkerID::FromRandom();
@@ -164,8 +164,10 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
// actor task, so we can ignore completed tasks from old epochs.
RAY_LOG(INFO) << "Resetting caller starts at for actor " << actor_id << " from "
<< queue->second.caller_starts_at << " to "
<< queue->second.num_completed_tasks;
queue->second.caller_starts_at = queue->second.num_completed_tasks;
<< queue->second.next_task_reply_position;
queue->second.caller_starts_at = queue->second.next_task_reply_position;
ResendOutOfOrderTasks(actor_id);
SendPendingTasks(actor_id);
}
@@ -223,33 +225,54 @@ void CoreWorkerDirectActorTaskSubmitter::SendPendingTasks(const ActorID &actor_i
if (!it->second.rpc_client) {
return;
}
auto &client_queue = it->second;
// Check if there is a pending force kill. If there is, send it and disconnect the
// client.
if (it->second.pending_force_kill) {
if (client_queue.pending_force_kill) {
RAY_LOG(INFO) << "Sending KillActor request to actor " << actor_id;
// It's okay if this fails because this means the worker is already dead.
it->second.rpc_client->KillActor(*it->second.pending_force_kill, nullptr);
it->second.pending_force_kill.reset();
client_queue.rpc_client->KillActor(*client_queue.pending_force_kill, nullptr);
client_queue.pending_force_kill.reset();
}
// Submit all pending requests.
auto &requests = it->second.requests;
auto &requests = client_queue.requests;
auto head = requests.begin();
while (head != requests.end() && head->first <= it->second.next_send_position &&
head->second.second) {
while (head != requests.end() &&
(/*seqno*/ head->first <= client_queue.next_send_position) &&
(/*dependencies_resolved*/ head->second.second)) {
// If the task has been sent before, skip the other tasks in the send
// queue.
bool skip_queue = head->first < it->second.next_send_position;
bool skip_queue = head->first < client_queue.next_send_position;
auto task_spec = std::move(head->second.first);
head = requests.erase(head);
RAY_CHECK(!it->second.worker_id.empty());
PushActorTask(it->second, task_spec, skip_queue);
it->second.next_send_position++;
RAY_CHECK(!client_queue.worker_id.empty());
PushActorTask(client_queue, task_spec, skip_queue);
client_queue.next_send_position++;
}
}
void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &actor_id) {
auto it = client_queues_.find(actor_id);
RAY_CHECK(it != client_queues_.end());
if (!it->second.rpc_client) {
return;
}
auto &client_queue = it->second;
RAY_CHECK(!client_queue.worker_id.empty());
for (const auto &completed_task : client_queue.out_of_order_completed_tasks) {
// Making a copy here because we are flipping a flag and the original value is
// const.
auto task_spec = completed_task.second;
task_spec.GetMutableMessage().set_skip_execution(true);
PushActorTask(client_queue, task_spec, /*skip_queue=*/true);
}
client_queue.out_of_order_completed_tasks.clear();
}
void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue,
const TaskSpecification &task_spec,
bool skip_queue) {
@@ -266,30 +289,61 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue,
const auto task_id = task_spec.TaskId();
const auto actor_id = task_spec.ActorId();
const auto counter = task_spec.ActorCounter();
const auto actor_counter = task_spec.ActorCounter();
const auto task_skipped = task_spec.GetMessage().skip_execution();
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id
<< " actor counter " << counter << " seq no "
<< " actor counter " << actor_counter << " seq no "
<< request->sequence_number();
rpc::Address addr(queue.rpc_client->Addr());
queue.rpc_client->PushActorTask(
std::move(request), skip_queue,
[this, addr, task_id, actor_id](Status status, const rpc::PushTaskReply &reply) {
[this, addr, task_id, actor_id, actor_counter, task_spec, task_skipped](
Status status, const rpc::PushTaskReply &reply) {
bool increment_completed_tasks = true;
if (!status.ok()) {
if (task_skipped) {
// NOTE(simon):Increment the task counter regardless of the status because the
// reply for a previously completed task. We are not calling CompletePendingTask
// because the tasks are pushed directly to the actor, not placed on any queues
// in task_finisher_.
} else if (status.ok()) {
task_finisher_->CompletePendingTask(task_id, reply, addr);
} else {
bool will_retry = task_finisher_->PendingTaskFailed(
task_id, rpc::ErrorType::ACTOR_DIED, &status);
if (will_retry) {
increment_completed_tasks = false;
}
} else {
task_finisher_->CompletePendingTask(task_id, reply, addr);
}
if (increment_completed_tasks) {
absl::MutexLock lock(&mu_);
auto queue = client_queues_.find(actor_id);
RAY_CHECK(queue != client_queues_.end());
queue->second.num_completed_tasks++;
auto queue_pair = client_queues_.find(actor_id);
RAY_CHECK(queue_pair != client_queues_.end());
auto &queue = queue_pair->second;
// Try to increment queue.next_task_reply_position consecutively until we
// cannot. In the case of tasks not received in order, the following block
// ensure queue.next_task_reply_position are incremented to the max possible
// value.
queue.out_of_order_completed_tasks.insert({actor_counter, task_spec});
auto min_completed_task = queue.out_of_order_completed_tasks.begin();
while (min_completed_task != queue.out_of_order_completed_tasks.end()) {
if (min_completed_task->first == queue.next_task_reply_position) {
queue.next_task_reply_position++;
// increment the iterator and erase the old value
queue.out_of_order_completed_tasks.erase(min_completed_task++);
} else {
break;
}
}
RAY_LOG(DEBUG) << "Got PushTaskReply for actor " << actor_id
<< " with actor_counter " << actor_counter
<< " new queue.next_task_reply_position is "
<< queue.next_task_reply_position
<< " and size of out_of_order_tasks set is "
<< queue.out_of_order_completed_tasks.size();
}
});
}
@@ -342,6 +396,11 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
}
auto accept_callback = [this, reply, send_reply_callback, task_spec, resource_ids]() {
if (task_spec.GetMessage().skip_execution()) {
send_reply_callback(Status::OK(), nullptr, nullptr);
return;
}
auto num_returns = task_spec.NumReturns();
if (task_spec.IsActorCreationTask() || task_spec.IsActorTask()) {
// Decrease to account for the dummy object id.
@@ -147,17 +147,17 @@ class CoreWorkerDirectActorTaskSubmitter
/// (0-5) so far, and have received a successful reply for 4 tasks (0-3).
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ num_completed_tasks
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// Suppose the actor crashes and recovers. Then, caller_starts_at is reset
/// to the current num_completed_tasks. caller_starts_at is then subtracted
/// to the current next_task_reply_position. caller_starts_at is then subtracted
/// from each task's counter, so the recovered actor will receive the
/// sequence numbers 0, 1, 2 (and so on) for tasks 4, 5, 6, respectively.
/// Therefore, the recovered actor will restart execution from task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ num_completed_tasks
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// New actor tasks will continue to be sent even while tasks are being
@@ -167,7 +167,7 @@ class CoreWorkerDirectActorTaskSubmitter
/// received a successful reply for task 4.
/// 0 1 2 3 4 5 6 7 8 9
/// ^ next_send_position
/// ^ num_completed_tasks
/// ^ next_task_reply_position
/// ^ caller_starts_at
///
/// The send position of the next task to send to this actor. This sequence
@@ -181,8 +181,18 @@ class CoreWorkerDirectActorTaskSubmitter
/// that we will never send to the actor again. This is used to reset
/// caller_starts_at if the actor dies and is restarted. We only include
/// tasks that will not be sent again, to support automatic task retry on
/// actor failure.
uint64_t num_completed_tasks = 0;
/// actor failure. This value only tracks consecutive tasks that are completed.
/// Tasks completed out of order will be cached in out_of_completed_tasks first.
uint64_t next_task_reply_position = 0;
/// The temporary container for tasks completed out of order. It can happen in
/// async or threaded actor mode. This map is used to store the seqno and task
/// spec for (1) increment next_task_reply_position later when the in order tasks are
/// returned (2) resend the tasks to restarted actor so retried tasks can maintain
/// ordering.
// NOTE(simon): consider absl::btree_set for performance, but it requires updating
// abseil.
std::map<uint64_t, TaskSpecification> out_of_order_completed_tasks;
/// A force-kill request that should be sent to the actor once an RPC
/// client to the actor is available.
@@ -202,13 +212,18 @@ class CoreWorkerDirectActorTaskSubmitter
bool skip_queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Send all pending tasks for an actor.
/// Note that this function doesn't take lock, the caller is expected to hold
/// `mutex_` before calling this function.
///
/// \param[in] actor_id Actor ID.
/// \return Void.
void SendPendingTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Resend all previously-received, out-of-order, received tasks for an actor.
/// When sending these tasks, the tasks will have the flag skip_execution=true.
///
/// \param[in] actor_id Actor ID.
/// \return Void.
void ResendOutOfOrderTasks(const ActorID &actor_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Disconnect the RPC client for an actor.
void DisconnectRpcClient(ClientQueue &queue) EXCLUSIVE_LOCKS_REQUIRED(mu_);
@@ -226,7 +241,7 @@ class CoreWorkerDirectActorTaskSubmitter
absl::flat_hash_map<ActorID, ClientQueue> client_queues_ GUARDED_BY(mu_);
/// Resolve direct call object dependencies;
/// Resolve direct call object dependencies.
LocalDependencyResolver resolver_;
/// Used to complete tasks.
+4
View File
@@ -196,6 +196,10 @@ message TaskSpec {
bool placement_group_capture_child_tasks = 20;
// Environment variables to override for this task
map<string, string> override_environment_variables = 21;
// Whether or not to skip the execution of this task. When it's true,
// the receiver will not execute the task. This field is used by async actors
// to guarantee task submission order after restart.
bool skip_execution = 22;
}
message Bundle {