From f2705e2c73605618dfa0b359447b4005bb63ac80 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 23 Jul 2020 21:15:12 -0700 Subject: [PATCH] [core] Enable object reconstruction for retryable actor tasks (#9557) * Test actor plasma reconstruction * Allow resubmission of actor tasks * doc * Test for actor constructor * Kill PID before removing node * Kill pid before node --- doc/source/fault-tolerance.rst | 40 +++-- python/ray/tests/test_reconstruction.py | 163 +++++++++++++++++- src/ray/core_worker/actor_handle.cc | 9 + src/ray/core_worker/actor_handle.h | 14 ++ src/ray/core_worker/core_worker.cc | 10 +- src/ray/core_worker/task_manager.cc | 8 +- src/ray/core_worker/task_manager.h | 2 +- src/ray/core_worker/test/task_manager_test.cc | 2 +- 8 files changed, 224 insertions(+), 24 deletions(-) diff --git a/doc/source/fault-tolerance.rst b/doc/source/fault-tolerance.rst index 39e569b2e..9991e56c0 100644 --- a/doc/source/fault-tolerance.rst +++ b/doc/source/fault-tolerance.rst @@ -41,20 +41,6 @@ You can experiment with this behavior by running the following code. except ray.exceptions.RayWorkerError: print('FAILURE') -Task outputs over a configurable threshold (default 100KB) may be stored in -Ray's distributed object store. Thus, a node failure can cause the loss of a -task output. If this occurs, Ray will automatically attempt to recover the -value by looking for copies of the same object on other nodes. If there are no -other copies left, an ``UnreconstructableError`` will be raised. - -When there are no copies of an object left, Ray also provides an option to -automatically recover the value by re-executing the task that created the -value. Arguments to the task are recursively reconstructed with the same -method. This option can be enabled with -``ray.init(enable_object_reconstruction=True)`` in standalone mode or ``ray -start --enable-object-reconstruction`` in cluster mode. - - Actors ------ @@ -164,8 +150,8 @@ You can experiment with this behavior by running the following code. For at-least-once actors, the system will still guarantee execution ordering according to the initial submission order. For example, any tasks submitted after a failed actor task will not execute on the actor until the failed actor -task has been successfully retried. The system also will not attempt to -re-execute any tasks that executed successfully before the failure. +task has been successfully retried. The system will not attempt to re-execute +any tasks that executed successfully before the failure (unless :ref:`object reconstruction ` is enabled). At-least-once execution is best suited for read-only actors or actors with ephemeral state that does not need to be rebuilt after a failure. For actors @@ -174,3 +160,25 @@ manually restart the actor or automatically restart the actor with at-most-once semantics. If the actor’s exact state at the time of failure is needed, the application is responsible for resubmitting all tasks since the last checkpoint. + +.. _object-reconstruction: + +Objects +------- + +Task outputs over a configurable threshold (default 100KB) may be stored in +Ray's distributed object store. Thus, a node failure can cause the loss of a +task output. If this occurs, Ray will automatically attempt to recover the +value by looking for copies of the same object on other nodes. If there are no +other copies left, an ``UnreconstructableError`` will be raised. + +When there are no copies of an object left, Ray also provides an option to +automatically recover the value by re-executing the task that created the +value. Arguments to the task are recursively reconstructed with the same +method. This option can be enabled with +``ray.init(enable_object_reconstruction=True)`` in standalone mode or ``ray +start --enable-object-reconstruction`` in cluster mode. +During reconstruction, each task will only be re-executed up to the specified +number of times, using ``max_retries`` for normal tasks and +``max_task_retries`` for actor tasks. Both limits can be set to infinity with +the value -1. diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index 0c19e81e7..8b65f2d21 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -1,4 +1,6 @@ import json +import os +import signal import sys import numpy as np @@ -6,7 +8,11 @@ import pytest import ray from ray.test_utils import ( - wait_for_condition, ) + wait_for_condition, + wait_for_pid_to_exit, +) + +SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM def test_cached_object(ray_start_cluster): @@ -217,6 +223,161 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): pass +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_basic_reconstruction_actor_task(ray_start_cluster, + reconstruction_enabled): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "initial_reconstruction_timeout_milliseconds": 200, + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = 0 + config = json.dumps(config) + + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _internal_config=config, + enable_object_reconstruction=reconstruction_enabled) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 2}, object_store_memory=10**8) + cluster.add_node( + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) + cluster.wait_for_nodes() + + @ray.remote( + max_restarts=-1, + max_task_retries=-1 if reconstruction_enabled else 0, + resources={"node1": 1}, + num_cpus=0) + class Actor: + def __init__(self): + pass + + def large_object(self): + return np.zeros(10**7, dtype=np.uint8) + + def pid(self): + return os.getpid() + + @ray.remote + def dependent_task(x): + return + + a = Actor.remote() + pid = ray.get(a.pid.remote()) + obj = a.large_object.remote() + ray.get(dependent_task.options(resources={"node1": 1}).remote(obj)) + + # Workaround to kill the actor process too since there is a bug where the + # actor's plasma client hangs after the plasma store has exited. + os.kill(pid, SIGKILL) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, resources={"node1": 2}, object_store_memory=10**8) + + wait_for_pid_to_exit(pid) + + if reconstruction_enabled: + ray.get(dependent_task.remote(obj)) + else: + with pytest.raises(ray.exceptions.RayTaskError) as e: + ray.get(dependent_task.remote(obj)) + with pytest.raises(ray.exceptions.UnreconstructableError): + raise e.as_instanceof_cause() + + # Make sure the actor handle is still usable. + pid = ray.get(a.pid.remote()) + + +@pytest.mark.parametrize("reconstruction_enabled", [False, True]) +def test_basic_reconstruction_actor_constructor(ray_start_cluster, + reconstruction_enabled): + config = { + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + "initial_reconstruction_timeout_milliseconds": 200, + } + # Workaround to reset the config to the default value. + if not reconstruction_enabled: + config["lineage_pinning_enabled"] = 0 + config = json.dumps(config) + + cluster = ray_start_cluster + # Head node with no resources. + cluster.add_node( + num_cpus=0, + _internal_config=config, + enable_object_reconstruction=reconstruction_enabled) + ray.init(address=cluster.address) + # Node to place the initial object. + node_to_kill = cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + cluster.add_node( + num_cpus=1, resources={"node2": 1}, object_store_memory=10**8) + cluster.wait_for_nodes() + + @ray.remote(max_retries=1 if reconstruction_enabled else 0) + def large_object(): + return np.zeros(10**7, dtype=np.uint8) + + # Both the constructor and a method depend on the large object. + @ray.remote(max_restarts=-1) + class Actor: + def __init__(self, x): + pass + + def dependent_task(self, x): + return + + def pid(self): + return os.getpid() + + obj = large_object.options(resources={"node1": 1}).remote() + a = Actor.options(resources={"node1": 1}).remote(obj) + ray.get(a.dependent_task.remote(obj)) + pid = ray.get(a.pid.remote()) + + # Workaround to kill the actor process too since there is a bug where the + # actor's plasma client hangs after the plasma store has exited. + os.kill(pid, SIGKILL) + + cluster.remove_node(node_to_kill, allow_graceful=False) + cluster.add_node( + num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) + + wait_for_pid_to_exit(pid) + + # Wait for the actor to restart. + def probe(): + try: + ray.get(a.dependent_task.remote(obj)) + return True + except ray.exceptions.RayActorError: + return False + except (ray.exceptions.RayTaskError, + ray.exceptions.UnreconstructableError): + return True + + wait_for_condition(probe) + + if reconstruction_enabled: + ray.get(a.dependent_task.remote(obj)) + else: + with pytest.raises(ray.exceptions.RayTaskError) as e: + x = a.dependent_task.remote(obj) + print(x) + ray.get(x) + with pytest.raises(ray.exceptions.UnreconstructableError): + raise e.as_instanceof_cause() + + @pytest.mark.parametrize("reconstruction_enabled", [False, True]) def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): config = { diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index ed65dcaf8..d419a5da2 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -93,6 +93,15 @@ void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_ actor_cursor_ = new_cursor; } +void ActorHandle::SetResubmittedActorTaskSpec(TaskSpecification &spec, + const ObjectID new_cursor) { + absl::MutexLock guard(&mutex_); + auto mutable_spec = spec.GetMutableMessage().mutable_actor_task_spec(); + mutable_spec->set_previous_actor_task_dummy_object_id(actor_cursor_.Binary()); + mutable_spec->set_actor_counter(task_counter_++); + actor_cursor_ = new_cursor; +} + void ActorHandle::Serialize(std::string *output) { inner_.SerializeToString(output); } } // namespace ray diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index 892de4d19..12b47cb53 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -63,8 +63,22 @@ class ActorHandle { std::string ExtensionData() const { return inner_.extension_data(); } + /// Set the actor task spec fields. + /// + /// \param[in] builder Task spec builder. + /// \param[in] new_cursor Actor dummy object. This is legacy code needed for + /// raylet-based actor restart. void SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor); + /// Reset the actor task spec fields of an existing task so that the task can + /// be re-executed. + /// + /// \param[in] spec An existing task spec that has executed on the actor + /// before. + /// \param[in] new_cursor Actor dummy object. This is legacy code needed for + /// raylet-based actor restart. + void SetResubmittedActorTaskSpec(TaskSpecification &spec, const ObjectID new_cursor); + void Serialize(std::string *output); int64_t MaxTaskRetries() const { return inner_.max_task_retries(); } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8bdcb20f5..b36912ded 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -380,7 +380,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }; task_manager_.reset(new TaskManager( memory_store_, reference_counter_, actor_reporter_, - [this](const TaskSpecification &spec, bool delay) { + [this](TaskSpecification &spec, bool delay) { if (delay) { // Retry after a delay to emulate the existing Raylet reconstruction // behaviour. TODO(ekl) backoff exponentially. @@ -392,7 +392,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ } else { RAY_LOG(ERROR) << "Resubmitting task that produced lost plasma object: " << spec.DebugString(); - RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); + if (spec.IsActorTask()) { + const auto &actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); + actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject()); + RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec)); + } else { + RAY_CHECK_OK(direct_task_submitter_->SubmitTask(spec)); + } } }, check_node_alive_fn, reconstruct_object_callback)); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 323327653..96a378932 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -87,9 +87,6 @@ Status TaskManager::ResubmitTask(const TaskID &task_id, if (it == submissible_tasks_.end()) { return Status::Invalid("Task spec missing"); } - if (it->second.spec.IsActorTask()) { - return Status::Invalid("Cannot reconstruct objects returned by actors"); - } if (!it->second.pending) { resubmit = true; @@ -118,6 +115,11 @@ Status TaskManager::ResubmitTask(const TaskID &task_id, reference_counter_->UpdateResubmittedTaskReferences(*task_deps); } + if (spec.IsActorTask()) { + const auto actor_creation_return_id = spec.ActorCreationDummyObjectId(); + reference_counter_->UpdateResubmittedTaskReferences({actor_creation_return_id}); + } + if (resubmit) { retry_task_callback_(spec, /*delay=*/false); } diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index dbd56ea17..977d8e57c 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -51,7 +51,7 @@ class TaskResubmissionInterface { virtual ~TaskResubmissionInterface() {} }; -using RetryTaskCallback = std::function; +using RetryTaskCallback = std::function; using ReconstructObjectCallback = std::function; class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 219c59fa6..fc800716a 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -52,7 +52,7 @@ class TaskManagerTest : public ::testing::Test { /*distributed_ref_counting_enabled=*/true, lineage_pinning_enabled))), actor_reporter_(std::shared_ptr(new MockActorManager())), manager_(store_, reference_counter_, actor_reporter_, - [this](const TaskSpecification &spec, bool delay) { + [this](TaskSpecification &spec, bool delay) { num_retries_++; return Status::OK(); },