Actor fault tolerance using object lineage reconstruction (#902)

* Revert Python actor reconstruction

* Actor reconstruction using object lineage

* Add dummy arguments and return values for actor tasks

* Pin dummy outputs for actor tasks

* Skip checkpointing test for now

* TODOs

* minor edits

* Generate dummy object dependencies in Python, not C

* Fix linting.

* Move actor counter and dummy objects inside of the actor handle

* Refactor Worker._process_task, suppress exception propagation for
sequential actor tasks
This commit is contained in:
Stephanie Wang
2017-09-10 19:29:28 -07:00
committed by Robert Nishihara
parent d8aa826e63
commit 99c8b1f38c
7 changed files with 207 additions and 258 deletions
+55 -155
View File
@@ -6,13 +6,12 @@ import cloudpickle as pickle
import hashlib
import inspect
import json
import time
import traceback
import ray.local_scheduler
import ray.signature as signature
import ray.worker
from ray.utils import (FunctionProperties, hex_to_binary, random_string,
from ray.utils import (FunctionProperties, random_string,
select_local_scheduler)
@@ -103,7 +102,7 @@ def fetch_and_register_actor(actor_class_key, worker):
worker.functions[driver_id][function_id] = (actor_method_name,
temporary_actor_method)
worker.function_properties[driver_id][function_id] = (
FunctionProperties(num_return_vals=1,
FunctionProperties(num_return_vals=2,
num_cpus=1,
num_gpus=0,
num_custom_resource=0,
@@ -174,7 +173,7 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus,
# problem.
function_id = get_actor_method_function_id(actor_method_name).id()
worker.function_properties[driver_id][function_id] = (
FunctionProperties(num_return_vals=1,
FunctionProperties(num_return_vals=2,
num_cpus=1,
num_gpus=0,
num_custom_resource=0,
@@ -204,127 +203,6 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus,
worker.redis_client)
def reconstruct_actor_state(actor_id, worker):
"""Reconstruct the state of an actor that is being reconstructed.
Args:
actor_id: The ID of the actor being reconstructed.
worker: The worker object that is running the actor.
"""
# Get the most recent actor checkpoint.
checkpoint_index, checkpoint = get_actor_checkpoint(actor_id, worker)
if checkpoint is not None:
print("Loading actor state from checkpoint {}"
.format(checkpoint_index))
# Wait for the actor to have been defined.
while not hasattr(worker, "actor_class"):
time.sleep(0.001)
# TODO(rkn): Restoring from the checkpoint may fail, so this should be
# in a try-except block and we should give a good error message.
worker.actors[actor_id] = (
worker.actor_class.__ray_restore_from_checkpoint__(checkpoint))
# TODO(rkn): This call is expensive. It'd be nice to find a way to get only
# the tasks that are relevant to this actor.
tasks = ray.global_state.task_table()
def hex_to_object_id(hex_id):
return ray.local_scheduler.ObjectID(hex_to_binary(hex_id))
relevant_tasks = []
# Loop over the task table and keep the tasks that are relevant to this
# actor.
for _, task_info in tasks.items():
task_spec_info = task_info["TaskSpec"]
if hex_to_binary(task_spec_info["ActorID"]) == actor_id:
relevant_tasks.append(task_spec_info)
# Sort the tasks by actor ID.
relevant_tasks.sort(key=lambda task: task["ActorCounter"])
for i in range(len(relevant_tasks)):
assert relevant_tasks[i]["ActorCounter"] == i
# This is a mini replica of the worker's main_loop. This will loop over all
# of the tasks that this actor is supposed to rerun. For each task, the
# worker will submit the task to the local scheduler, retrieve the task
# from the local scheduler, and execute the task.
for task_spec_info in relevant_tasks:
# Create a task spec out of the dictionary of info. This isn't
# necessary. It is strictly for the purposes of checking that the task
# we get back from the local scheduler is identical to the one we
# submit.
task_spec = ray.local_scheduler.Task(
hex_to_object_id(task_spec_info["DriverID"]),
hex_to_object_id(task_spec_info["FunctionID"]),
task_spec_info["Args"],
len(task_spec_info["ReturnObjectIDs"]),
hex_to_object_id(task_spec_info["ParentTaskID"]),
task_spec_info["ParentCounter"],
hex_to_object_id(task_spec_info["ActorID"]),
task_spec_info["ActorCounter"],
[task_spec_info["RequiredResources"]["CPUs"],
task_spec_info["RequiredResources"]["GPUs"],
task_spec_info["RequiredResources"]["CustomResource"]])
# Verify that the return object IDs are the same as they were the
# first time.
assert task_spec_info["ReturnObjectIDs"] == task_spec.returns()
# We need to wait for the actor to be imported and for the functions to
# be defined before we can submit the task.
worker._wait_for_function(hex_to_binary(task_spec_info["FunctionID"]),
hex_to_binary(task_spec_info["DriverID"]))
# Set some additional state. During normal operation
# (non-reconstruction) this state would already be set because tasks
# are only submitted from drivers or from workers that are in the
# middle of executing other tasks.
worker.task_driver_id = ray.local_scheduler.ObjectID(
hex_to_binary(task_spec_info["DriverID"]))
worker.current_task_id = ray.local_scheduler.ObjectID(
hex_to_binary(task_spec_info["ParentTaskID"]))
worker.task_index = task_spec_info["ParentCounter"]
# Submit the task to the local scheduler. This is important so that the
# local scheduler does bookkeeping about this actor's resource
# utilization and things like that. It's also important for updating
# some state on the worker.
if task_spec_info["ActorCounter"] > checkpoint_index:
worker.submit_task(
hex_to_object_id(task_spec_info["FunctionID"]),
task_spec_info["Args"],
actor_id=hex_to_object_id(task_spec_info["ActorID"]))
else:
# Pass in a dummy task with no arguments to avoid having to
# unnecessarily reconstruct past arguments.
worker.submit_task(
hex_to_object_id(task_spec_info["FunctionID"]),
[],
actor_id=hex_to_object_id(task_spec_info["ActorID"]))
# Clear the extra state that we set.
del worker.task_driver_id
del worker.current_task_id
del worker.task_index
# Get the task from the local scheduler.
retrieved_task = worker._get_next_task_from_local_scheduler()
# If the task happened before the most recent checkpoint, ignore it.
# Otherwise, execute it.
if retrieved_task.actor_counter() > checkpoint_index:
# Assert that the retrieved task is the same as the constructed
# task.
assert (ray.local_scheduler.task_to_string(task_spec) ==
ray.local_scheduler.task_to_string(retrieved_task))
# Wait for the task to be ready and then execute it.
worker._wait_for_and_process_task(retrieved_task)
# Enter the main loop to receive and process tasks.
worker.main_loop()
def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
# Modify the class to have an additional method that will be used for
# terminating the worker.
@@ -368,29 +246,14 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
class_id = random_actor_class_id()
# The list exported will have length 0 if the class has not been exported
# yet, and length one if it has. This is just implementing a bool, but we
# don't use a bool because we need to modify it inside of the NewClass
# don't use a bool because we need to modify it inside of the ActorHandle
# constructor.
exported = []
# The function actor_method_call gets called if somebody tries to call a
# method on their local actor stub object.
def actor_method_call(actor_id, attr, function_signature, *args, **kwargs):
ray.worker.check_connected()
ray.worker.check_main_thread()
args = signature.extend_args(function_signature, args, kwargs)
function_id = get_actor_method_function_id(attr)
object_ids = ray.worker.global_worker.submit_task(function_id, args,
actor_id=actor_id)
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids
class ActorMethod(object):
def __init__(self, method_name, actor_id, method_signature):
def __init__(self, actor, method_name, method_signature):
self.actor = actor
self.method_name = method_name
self.actor_id = actor_id
self.method_signature = method_signature
def __call__(self, *args, **kwargs):
@@ -400,10 +263,11 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
.format(self.method_name, self.method_name))
def remote(self, *args, **kwargs):
return actor_method_call(self.actor_id, self.method_name,
self.method_signature, *args, **kwargs)
return self.actor._actor_method_call(self.method_name,
self.method_signature, *args,
**kwargs)
class NewClass(object):
class ActorHandle(object):
def __init__(self, *args, **kwargs):
raise Exception("Actor classes cannot be instantiated directly. "
"Instead of running '{}()', try '{}.remote()'."
@@ -417,6 +281,13 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
def _manual_init(self, *args, **kwargs):
self._ray_actor_id = random_actor_id()
# The number of actor method invocations that we've called so far.
self._ray_actor_counter = 0
# The actor cursor is a dummy object representing the most recent
# actor method invocation. For each subsequent method invocation,
# the current cursor should be added as a dependency, and then
# updated to reflect the new invocation.
self._ray_actor_cursor = None
self._ray_actor_methods = {
k: v for (k, v) in inspect.getmembers(
Class, predicate=(lambda x: (inspect.isfunction(x) or
@@ -441,7 +312,7 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
self._actor_method_invokers = dict()
for k, v in self._ray_actor_methods.items():
self._actor_method_invokers[k] = ActorMethod(
k, self._ray_actor_id, self._ray_method_signatures[k])
self, k, self._ray_method_signatures[k])
# Export the actor class if it has not been exported yet.
if len(exported) == 0:
@@ -456,12 +327,39 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
ray.worker.global_worker)
# Call __init__ as a remote function.
if "__init__" in self._ray_actor_methods.keys():
actor_method_call(self._ray_actor_id, "__init__",
self._ray_method_signatures["__init__"],
*args, **kwargs)
self._actor_method_call(
"__init__", self._ray_method_signatures["__init__"], *args,
**kwargs)
else:
print("WARNING: this object has no __init__ method.")
# The function actor_method_call gets called if somebody tries to call
# a method on their local actor stub object.
def _actor_method_call(self, attr, function_signature, *args,
**kwargs):
ray.worker.check_connected()
ray.worker.check_main_thread()
args = signature.extend_args(function_signature, args, kwargs)
# Add the current actor cursor, a dummy object returned by the most
# recent method invocation, as a dependency for the next method
# invocation.
if self._ray_actor_cursor is not None:
args.append(self._ray_actor_cursor)
function_id = get_actor_method_function_id(attr)
object_ids = ray.worker.global_worker.submit_task(
function_id, args, actor_id=self._ray_actor_id,
actor_counter=self._ray_actor_counter)
# Update the actor counter and cursor to reflect the most recent
# invocation.
self._ray_actor_counter += 1
self._ray_actor_cursor = object_ids.pop()
if len(object_ids) == 1:
return object_ids[0]
elif len(object_ids) > 1:
return object_ids
# Make tab completion work.
def __dir__(self):
return self._ray_actor_methods
@@ -469,8 +367,10 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
def __getattribute__(self, attr):
# The following is needed so we can still access
# self.actor_methods.
if attr in ["_manual_init", "_ray_actor_id", "_ray_actor_methods",
"_actor_method_invokers", "_ray_method_signatures"]:
if attr in ["_manual_init", "_ray_actor_id", "_ray_actor_counter",
"_ray_actor_cursor", "_ray_actor_methods",
"_actor_method_invokers", "_ray_method_signatures",
"_actor_method_call"]:
return object.__getattribute__(self, attr)
if attr in self._ray_actor_methods.keys():
return self._actor_method_invokers[attr]
@@ -487,12 +387,12 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
def __del__(self):
"""Kill the worker that is running this actor."""
if ray.worker.global_worker.connected:
actor_method_call(
self._ray_actor_id, "__ray_terminate__",
self._actor_method_call(
"__ray_terminate__",
self._ray_method_signatures["__ray_terminate__"],
self._ray_actor_id.id())
return NewClass
return ActorHandle
ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor
+40 -11
View File
@@ -10,9 +10,10 @@ import redis
import time
import ray
from ray.services import get_ip_address, get_port
import ray.utils
from ray.services import get_ip_address, get_port
from ray.utils import binary_to_object_id, binary_to_hex, hex_to_binary
from ray.worker import NIL_ACTOR_ID
# Import flatbuffer bindings.
from ray.core.generated.SubscribeToDBClientTableReply \
@@ -142,16 +143,44 @@ class Monitor(object):
num_tasks_updated = 0
for task_id, task in tasks.items():
# See if the corresponding local scheduler is alive.
if task["LocalSchedulerID"] in self.dead_local_schedulers:
# If the task is scheduled on a dead local scheduler, mark the
# task as lost.
key = binary_to_object_id(hex_to_binary(task_id))
ok = self.state._execute_command(
key, "RAY.TASK_TABLE_UPDATE", hex_to_binary(task_id),
ray.experimental.state.TASK_STATUS_LOST, NIL_ID)
if ok != b"OK":
log.warn("Failed to update lost task for dead scheduler.")
num_tasks_updated += 1
if task["LocalSchedulerID"] not in self.dead_local_schedulers:
continue
# Remove dummy objects returned by actor tasks from any plasma
# manager. Although the objects may still exist in that object
# store, this deletion makes them effectively unreachable by any
# local scheduler connected to a different store.
# TODO(swang): Actually remove the objects from the object store,
# so that the reconstructed actor can reuse the same object store.
if hex_to_binary(task["TaskSpec"]["ActorID"]) != NIL_ACTOR_ID:
dummy_object_id = task["TaskSpec"]["ReturnObjectIDs"][-1]
obj = self.state.object_table(dummy_object_id)
manager_ids = obj["ManagerIDs"]
if manager_ids is not None:
# The dummy object should exist on at most one plasma
# manager, the manager associated with the local scheduler
# that died.
assert(len(manager_ids) <= 1)
# Remove the dummy object from the plasma manager
# associated with the dead local scheduler, if any.
for manager in manager_ids:
ok = self.state._execute_command(
dummy_object_id, "RAY.OBJECT_TABLE_REMOVE",
dummy_object_id.id(), hex_to_binary(manager))
if ok != b"OK":
log.warn("Failed to remove object location for "
"dead plasma manager.")
# If the task is scheduled on a dead local scheduler, mark the
# task as lost.
key = binary_to_object_id(hex_to_binary(task_id))
ok = self.state._execute_command(
key, "RAY.TASK_TABLE_UPDATE", hex_to_binary(task_id),
ray.experimental.state.TASK_STATUS_LOST, NIL_ID)
if ok != b"OK":
log.warn("Failed to update lost task for dead scheduler.")
num_tasks_updated += 1
if num_tasks_updated > 0:
log.warn("Marked {} tasks as lost.".format(num_tasks_updated))
+92 -68
View File
@@ -226,10 +226,10 @@ class Worker(object):
self.fetch_and_register_actor = None
self.make_actor = None
self.actors = {}
# Use a defaultdict for the actor counts. If this is accessed with a
# missing key, the default value of 0 is returned, and that key value
# pair is added to the dict.
self.actor_counters = collections.defaultdict(lambda: 0)
# TODO(swang): This is a hack to prevent the object store from evicting
# dummy objects. Once we allow object pinning in the store, we may
# remove this variable.
self.actor_pinned_objects = None
def set_mode(self, mode):
"""Set the mode of the worker.
@@ -444,7 +444,7 @@ class Worker(object):
assert len(final_results) == len(object_ids)
return final_results
def submit_task(self, function_id, args, actor_id=None):
def submit_task(self, function_id, args, actor_id=None, actor_counter=0):
"""Submit a remote task to the scheduler.
Tell the scheduler to schedule the execution of the function with ID
@@ -484,13 +484,12 @@ class Worker(object):
self.current_task_id,
self.task_index,
actor_id,
self.actor_counters[actor_id],
actor_counter,
[function_properties.num_cpus, function_properties.num_gpus,
function_properties.num_custom_resource])
# Increment the worker's task index to track how many tasks have
# been submitted by the current task so far.
self.task_index += 1
self.actor_counters[actor_id] += 1
self.local_scheduler_client.submit(task)
return task.returns()
@@ -554,13 +553,6 @@ class Worker(object):
"data": data})
self.redis_client.rpush("ErrorKeys", error_key)
def _wait_for_actor(self):
"""Wait until the actor has been imported."""
assert self.actor_id != NIL_ACTOR_ID
# Wait until the actor has been imported.
while self.actor_id not in self.actors:
time.sleep(0.001)
def _wait_for_function(self, function_id, driver_id, timeout=10):
"""Wait until the function to be executed is present on this worker.
@@ -674,75 +666,90 @@ class Worker(object):
(these will be retrieved by calls to get or by subsequent tasks that
use the outputs of this task).
"""
try:
# The ID of the driver that this task belongs to. This is needed so
# that if the task throws an exception, we propagate the error
# message to the correct driver.
self.task_driver_id = task.driver_id()
self.current_task_id = task.task_id()
self.current_function_id = task.function_id().id()
self.task_index = 0
self.put_index = 0
function_id = task.function_id()
args = task.arguments()
return_object_ids = task.returns()
function_name, function_executor = (self.functions
[self.task_driver_id.id()]
[function_id.id()])
# The ID of the driver that this task belongs to. This is needed so
# that if the task throws an exception, we propagate the error
# message to the correct driver.
self.task_driver_id = task.driver_id()
self.current_task_id = task.task_id()
self.current_function_id = task.function_id().id()
self.task_index = 0
self.put_index = 0
function_id = task.function_id()
args = task.arguments()
return_object_ids = task.returns()
if task.actor_id().id() != NIL_ACTOR_ID:
return_object_ids.pop()
function_name, function_executor = (self.functions
[self.task_driver_id.id()]
[function_id.id()])
# Get task arguments from the object store.
# Get task arguments from the object store.
try:
with log_span("ray:task:get_arguments", worker=self):
arguments = self._get_arguments_for_execution(function_name,
args)
except (RayGetError, RayGetArgumentError) as e:
self._handle_process_task_failure(function_id, return_object_ids,
e, None)
return
except Exception as e:
self._handle_process_task_failure(
function_id, return_object_ids, e,
format_error_message(traceback.format_exc()))
return
# Execute the task.
# Execute the task.
try:
with log_span("ray:task:execute", worker=self):
if task.actor_id().id() == NIL_ACTOR_ID:
outputs = function_executor.executor(arguments)
else:
# If this is any actor task other than the first, which has
# no dependencies, the last argument is a dummy argument
# that represents the dependency on the previous actor
# task. Remove this argument for invocation.
if task.actor_counter() > 0:
arguments = arguments[:-1]
outputs = function_executor(
self.actors[task.actor_id().id()], *arguments)
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
task_exception = task.actor_id().id() == NIL_ACTOR_ID
traceback_str = format_error_message(traceback.format_exc(),
task_exception=task_exception)
self._handle_process_task_failure(function_id, return_object_ids,
e, traceback_str)
return
# Store the outputs in the local object store.
# Store the outputs in the local object store.
try:
with log_span("ray:task:store_outputs", worker=self):
if len(return_object_ids) == 1:
# If this is an actor task, then the last object ID returned by
# the task is a dummy output, not returned by the function
# itself. Decrement to get the correct number of return values.
num_returns = len(return_object_ids)
if num_returns == 1:
outputs = (outputs,)
self._store_outputs_in_objstore(return_object_ids, outputs)
except Exception as e:
# We determine whether the exception was caused by the call to
# _get_arguments_for_execution or by the execution of the remote
# function or by the call to _store_outputs_in_objstore. Depending
# on which case occurred, we format the error message differently.
# whether the variables "arguments" and "outputs" are defined.
if "arguments" in locals() and "outputs" not in locals():
if task.actor_id().id() == NIL_ACTOR_ID:
# The error occurred during the task execution.
traceback_str = format_error_message(
traceback.format_exc(), task_exception=True)
else:
# The error occurred during the execution of an actor task.
traceback_str = format_error_message(
traceback.format_exc())
elif "arguments" in locals() and "outputs" in locals():
# The error occurred after the task executed.
traceback_str = format_error_message(traceback.format_exc())
else:
# The error occurred before the task execution.
if (isinstance(e, RayGetError) or
isinstance(e, RayGetArgumentError)):
# In this case, getting the task arguments failed.
traceback_str = None
else:
traceback_str = traceback.format_exc()
failure_object = RayTaskError(function_name, e, traceback_str)
failure_objects = [failure_object for _
in range(len(return_object_ids))]
self._store_outputs_in_objstore(return_object_ids, failure_objects)
# Log the error message.
self.push_error_to_driver(self.task_driver_id.id(), "task",
str(failure_object),
data={"function_id": function_id.id(),
"function_name": function_name})
self._handle_process_task_failure(
function_id, return_object_ids, e,
format_error_message(traceback.format_exc()))
def _handle_process_task_failure(self, function_id, return_object_ids,
error, backtrace):
function_name, _ = self.functions[
self.task_driver_id.id()][function_id.id()]
failure_object = RayTaskError(function_name, error, backtrace)
failure_objects = [failure_object for _ in
range(len(return_object_ids))]
self._store_outputs_in_objstore(return_object_ids, failure_objects)
# Log the error message.
self.push_error_to_driver(self.task_driver_id.id(), "task",
str(failure_object),
data={"function_id": function_id.id(),
"function_name": function_name})
def _checkpoint_actor_state(self, actor_counter):
"""Checkpoint the actor state.
@@ -804,6 +811,19 @@ class Worker(object):
with log_span("ray:task", contents=contents, worker=self):
self._process_task(task)
# Add the dummy output for actor tasks. TODO(swang): We use a
# numpy array as a hack to pin the object in the object store.
# Once we allow object pinning in the store, we may use `None`.
if task.actor_id().id() != NIL_ACTOR_ID:
dummy_object_id = task.returns().pop()
dummy_object = np.zeros(1)
self.put_object(dummy_object_id, dummy_object)
# Keep the dummy output in scope for the lifetime of the actor,
# to prevent eviction from the object store.
dummy_object = self.get_object([dummy_object_id])
self.actor_pinned_objects.append(dummy_object[0])
# Push all of the log events to the global state store.
flush_log()
@@ -1742,6 +1762,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
# driver creates an object that is later evicted, we should notify the
# user that we're unable to reconstruct the object, since we cannot
# rerun the driver.
nil_actor_counter = 0
driver_task = ray.local_scheduler.Task(
worker.task_driver_id,
ray.local_scheduler.ObjectID(NIL_FUNCTION_ID),
@@ -1750,7 +1771,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.current_task_id,
worker.task_index,
ray.local_scheduler.ObjectID(NIL_ACTOR_ID),
worker.actor_counters[actor_id],
nil_actor_counter,
[0, 0, 0])
global_state._execute_command(
driver_task.task_id(),
@@ -1768,6 +1789,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
actor_key = b"Actor:" + worker.actor_id
class_id = worker.redis_client.hget(actor_key, "class_id")
worker.class_id = class_id
# Store a list of the dummy outputs produced by actor tasks, to pin the
# dummy outputs in the object store.
worker.actor_pinned_objects = []
# Initialize the serialization library. This registers some classes, and so
# it must be run before we export all of the cached remote functions.
-11
View File
@@ -79,17 +79,6 @@ if __name__ == "__main__":
ray.worker.connect(info, mode=ray.WORKER_MODE, actor_id=actor_id)
# If this is an actor started in reconstruct mode, rerun tasks to
# reconstruct its state.
if args.reconstruct:
try:
ray.actor.reconstruct_actor_state(actor_id,
ray.worker.global_worker)
except Exception as e:
redis_client = create_redis_client(args.redis_address)
push_error_to_all_drivers(redis_client, traceback.format_exc())
raise e
error_explanation = """
This error is unexpected and should not have happened. Somehow a worker
crashed in an unanticipated way causing the main_loop to throw an exception,