From 99c8b1f38cdb02390fafd8242196ce5d4e118488 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sun, 10 Sep 2017 19:29:28 -0700 Subject: [PATCH] Actor fault tolerance using object lineage reconstruction (#902) * Revert Python actor reconstruction * Actor reconstruction using object lineage * Add dummy arguments and return values for actor tasks * Pin dummy outputs for actor tasks * Skip checkpointing test for now * TODOs * minor edits * Generate dummy object dependencies in Python, not C * Fix linting. * Move actor counter and dummy objects inside of the actor handle * Refactor Worker._process_task, suppress exception propagation for sequential actor tasks --- python/ray/actor.py | 210 +++++------------- python/ray/monitor.py | 51 ++++- python/ray/worker.py | 160 +++++++------ python/ray/workers/default_worker.py | 11 - src/local_scheduler/local_scheduler.cc | 28 +-- .../local_scheduler_algorithm.cc | 3 + test/actor_test.py | 2 + 7 files changed, 207 insertions(+), 258 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index e5848d330..1785621c3 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -6,13 +6,12 @@ import cloudpickle as pickle import hashlib import inspect import json -import time import traceback import ray.local_scheduler import ray.signature as signature import ray.worker -from ray.utils import (FunctionProperties, hex_to_binary, random_string, +from ray.utils import (FunctionProperties, random_string, select_local_scheduler) @@ -103,7 +102,7 @@ def fetch_and_register_actor(actor_class_key, worker): worker.functions[driver_id][function_id] = (actor_method_name, temporary_actor_method) worker.function_properties[driver_id][function_id] = ( - FunctionProperties(num_return_vals=1, + FunctionProperties(num_return_vals=2, num_cpus=1, num_gpus=0, num_custom_resource=0, @@ -174,7 +173,7 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, # problem. function_id = get_actor_method_function_id(actor_method_name).id() worker.function_properties[driver_id][function_id] = ( - FunctionProperties(num_return_vals=1, + FunctionProperties(num_return_vals=2, num_cpus=1, num_gpus=0, num_custom_resource=0, @@ -204,127 +203,6 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, worker.redis_client) -def reconstruct_actor_state(actor_id, worker): - """Reconstruct the state of an actor that is being reconstructed. - - Args: - actor_id: The ID of the actor being reconstructed. - worker: The worker object that is running the actor. - """ - # Get the most recent actor checkpoint. - checkpoint_index, checkpoint = get_actor_checkpoint(actor_id, worker) - if checkpoint is not None: - print("Loading actor state from checkpoint {}" - .format(checkpoint_index)) - # Wait for the actor to have been defined. - while not hasattr(worker, "actor_class"): - time.sleep(0.001) - # TODO(rkn): Restoring from the checkpoint may fail, so this should be - # in a try-except block and we should give a good error message. - worker.actors[actor_id] = ( - worker.actor_class.__ray_restore_from_checkpoint__(checkpoint)) - - # TODO(rkn): This call is expensive. It'd be nice to find a way to get only - # the tasks that are relevant to this actor. - tasks = ray.global_state.task_table() - - def hex_to_object_id(hex_id): - return ray.local_scheduler.ObjectID(hex_to_binary(hex_id)) - - relevant_tasks = [] - - # Loop over the task table and keep the tasks that are relevant to this - # actor. - for _, task_info in tasks.items(): - task_spec_info = task_info["TaskSpec"] - if hex_to_binary(task_spec_info["ActorID"]) == actor_id: - relevant_tasks.append(task_spec_info) - - # Sort the tasks by actor ID. - relevant_tasks.sort(key=lambda task: task["ActorCounter"]) - for i in range(len(relevant_tasks)): - assert relevant_tasks[i]["ActorCounter"] == i - - # This is a mini replica of the worker's main_loop. This will loop over all - # of the tasks that this actor is supposed to rerun. For each task, the - # worker will submit the task to the local scheduler, retrieve the task - # from the local scheduler, and execute the task. - for task_spec_info in relevant_tasks: - # Create a task spec out of the dictionary of info. This isn't - # necessary. It is strictly for the purposes of checking that the task - # we get back from the local scheduler is identical to the one we - # submit. - task_spec = ray.local_scheduler.Task( - hex_to_object_id(task_spec_info["DriverID"]), - hex_to_object_id(task_spec_info["FunctionID"]), - task_spec_info["Args"], - len(task_spec_info["ReturnObjectIDs"]), - hex_to_object_id(task_spec_info["ParentTaskID"]), - task_spec_info["ParentCounter"], - hex_to_object_id(task_spec_info["ActorID"]), - task_spec_info["ActorCounter"], - [task_spec_info["RequiredResources"]["CPUs"], - task_spec_info["RequiredResources"]["GPUs"], - task_spec_info["RequiredResources"]["CustomResource"]]) - - # Verify that the return object IDs are the same as they were the - # first time. - assert task_spec_info["ReturnObjectIDs"] == task_spec.returns() - - # We need to wait for the actor to be imported and for the functions to - # be defined before we can submit the task. - worker._wait_for_function(hex_to_binary(task_spec_info["FunctionID"]), - hex_to_binary(task_spec_info["DriverID"])) - - # Set some additional state. During normal operation - # (non-reconstruction) this state would already be set because tasks - # are only submitted from drivers or from workers that are in the - # middle of executing other tasks. - worker.task_driver_id = ray.local_scheduler.ObjectID( - hex_to_binary(task_spec_info["DriverID"])) - worker.current_task_id = ray.local_scheduler.ObjectID( - hex_to_binary(task_spec_info["ParentTaskID"])) - worker.task_index = task_spec_info["ParentCounter"] - - # Submit the task to the local scheduler. This is important so that the - # local scheduler does bookkeeping about this actor's resource - # utilization and things like that. It's also important for updating - # some state on the worker. - if task_spec_info["ActorCounter"] > checkpoint_index: - worker.submit_task( - hex_to_object_id(task_spec_info["FunctionID"]), - task_spec_info["Args"], - actor_id=hex_to_object_id(task_spec_info["ActorID"])) - else: - # Pass in a dummy task with no arguments to avoid having to - # unnecessarily reconstruct past arguments. - worker.submit_task( - hex_to_object_id(task_spec_info["FunctionID"]), - [], - actor_id=hex_to_object_id(task_spec_info["ActorID"])) - - # Clear the extra state that we set. - del worker.task_driver_id - del worker.current_task_id - del worker.task_index - - # Get the task from the local scheduler. - retrieved_task = worker._get_next_task_from_local_scheduler() - - # If the task happened before the most recent checkpoint, ignore it. - # Otherwise, execute it. - if retrieved_task.actor_counter() > checkpoint_index: - # Assert that the retrieved task is the same as the constructed - # task. - assert (ray.local_scheduler.task_to_string(task_spec) == - ray.local_scheduler.task_to_string(retrieved_task)) - # Wait for the task to be ready and then execute it. - worker._wait_for_and_process_task(retrieved_task) - - # Enter the main loop to receive and process tasks. - worker.main_loop() - - def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): # Modify the class to have an additional method that will be used for # terminating the worker. @@ -368,29 +246,14 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): class_id = random_actor_class_id() # The list exported will have length 0 if the class has not been exported # yet, and length one if it has. This is just implementing a bool, but we - # don't use a bool because we need to modify it inside of the NewClass + # don't use a bool because we need to modify it inside of the ActorHandle # constructor. exported = [] - # The function actor_method_call gets called if somebody tries to call a - # method on their local actor stub object. - def actor_method_call(actor_id, attr, function_signature, *args, **kwargs): - ray.worker.check_connected() - ray.worker.check_main_thread() - args = signature.extend_args(function_signature, args, kwargs) - - function_id = get_actor_method_function_id(attr) - object_ids = ray.worker.global_worker.submit_task(function_id, args, - actor_id=actor_id) - if len(object_ids) == 1: - return object_ids[0] - elif len(object_ids) > 1: - return object_ids - class ActorMethod(object): - def __init__(self, method_name, actor_id, method_signature): + def __init__(self, actor, method_name, method_signature): + self.actor = actor self.method_name = method_name - self.actor_id = actor_id self.method_signature = method_signature def __call__(self, *args, **kwargs): @@ -400,10 +263,11 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): .format(self.method_name, self.method_name)) def remote(self, *args, **kwargs): - return actor_method_call(self.actor_id, self.method_name, - self.method_signature, *args, **kwargs) + return self.actor._actor_method_call(self.method_name, + self.method_signature, *args, + **kwargs) - class NewClass(object): + class ActorHandle(object): def __init__(self, *args, **kwargs): raise Exception("Actor classes cannot be instantiated directly. " "Instead of running '{}()', try '{}.remote()'." @@ -417,6 +281,13 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): def _manual_init(self, *args, **kwargs): self._ray_actor_id = random_actor_id() + # The number of actor method invocations that we've called so far. + self._ray_actor_counter = 0 + # The actor cursor is a dummy object representing the most recent + # actor method invocation. For each subsequent method invocation, + # the current cursor should be added as a dependency, and then + # updated to reflect the new invocation. + self._ray_actor_cursor = None self._ray_actor_methods = { k: v for (k, v) in inspect.getmembers( Class, predicate=(lambda x: (inspect.isfunction(x) or @@ -441,7 +312,7 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): self._actor_method_invokers = dict() for k, v in self._ray_actor_methods.items(): self._actor_method_invokers[k] = ActorMethod( - k, self._ray_actor_id, self._ray_method_signatures[k]) + self, k, self._ray_method_signatures[k]) # Export the actor class if it has not been exported yet. if len(exported) == 0: @@ -456,12 +327,39 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): ray.worker.global_worker) # Call __init__ as a remote function. if "__init__" in self._ray_actor_methods.keys(): - actor_method_call(self._ray_actor_id, "__init__", - self._ray_method_signatures["__init__"], - *args, **kwargs) + self._actor_method_call( + "__init__", self._ray_method_signatures["__init__"], *args, + **kwargs) else: print("WARNING: this object has no __init__ method.") + # The function actor_method_call gets called if somebody tries to call + # a method on their local actor stub object. + def _actor_method_call(self, attr, function_signature, *args, + **kwargs): + ray.worker.check_connected() + ray.worker.check_main_thread() + args = signature.extend_args(function_signature, args, kwargs) + # Add the current actor cursor, a dummy object returned by the most + # recent method invocation, as a dependency for the next method + # invocation. + if self._ray_actor_cursor is not None: + args.append(self._ray_actor_cursor) + + function_id = get_actor_method_function_id(attr) + object_ids = ray.worker.global_worker.submit_task( + function_id, args, actor_id=self._ray_actor_id, + actor_counter=self._ray_actor_counter) + # Update the actor counter and cursor to reflect the most recent + # invocation. + self._ray_actor_counter += 1 + self._ray_actor_cursor = object_ids.pop() + + if len(object_ids) == 1: + return object_ids[0] + elif len(object_ids) > 1: + return object_ids + # Make tab completion work. def __dir__(self): return self._ray_actor_methods @@ -469,8 +367,10 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): def __getattribute__(self, attr): # The following is needed so we can still access # self.actor_methods. - if attr in ["_manual_init", "_ray_actor_id", "_ray_actor_methods", - "_actor_method_invokers", "_ray_method_signatures"]: + if attr in ["_manual_init", "_ray_actor_id", "_ray_actor_counter", + "_ray_actor_cursor", "_ray_actor_methods", + "_actor_method_invokers", "_ray_method_signatures", + "_actor_method_call"]: return object.__getattribute__(self, attr) if attr in self._ray_actor_methods.keys(): return self._actor_method_invokers[attr] @@ -487,12 +387,12 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): def __del__(self): """Kill the worker that is running this actor.""" if ray.worker.global_worker.connected: - actor_method_call( - self._ray_actor_id, "__ray_terminate__", + self._actor_method_call( + "__ray_terminate__", self._ray_method_signatures["__ray_terminate__"], self._ray_actor_id.id()) - return NewClass + return ActorHandle ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 9be416fe2..2616b2d46 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -10,9 +10,10 @@ import redis import time import ray -from ray.services import get_ip_address, get_port import ray.utils +from ray.services import get_ip_address, get_port from ray.utils import binary_to_object_id, binary_to_hex, hex_to_binary +from ray.worker import NIL_ACTOR_ID # Import flatbuffer bindings. from ray.core.generated.SubscribeToDBClientTableReply \ @@ -142,16 +143,44 @@ class Monitor(object): num_tasks_updated = 0 for task_id, task in tasks.items(): # See if the corresponding local scheduler is alive. - if task["LocalSchedulerID"] in self.dead_local_schedulers: - # If the task is scheduled on a dead local scheduler, mark the - # task as lost. - key = binary_to_object_id(hex_to_binary(task_id)) - ok = self.state._execute_command( - key, "RAY.TASK_TABLE_UPDATE", hex_to_binary(task_id), - ray.experimental.state.TASK_STATUS_LOST, NIL_ID) - if ok != b"OK": - log.warn("Failed to update lost task for dead scheduler.") - num_tasks_updated += 1 + if task["LocalSchedulerID"] not in self.dead_local_schedulers: + continue + + # Remove dummy objects returned by actor tasks from any plasma + # manager. Although the objects may still exist in that object + # store, this deletion makes them effectively unreachable by any + # local scheduler connected to a different store. + # TODO(swang): Actually remove the objects from the object store, + # so that the reconstructed actor can reuse the same object store. + if hex_to_binary(task["TaskSpec"]["ActorID"]) != NIL_ACTOR_ID: + dummy_object_id = task["TaskSpec"]["ReturnObjectIDs"][-1] + obj = self.state.object_table(dummy_object_id) + manager_ids = obj["ManagerIDs"] + if manager_ids is not None: + # The dummy object should exist on at most one plasma + # manager, the manager associated with the local scheduler + # that died. + assert(len(manager_ids) <= 1) + # Remove the dummy object from the plasma manager + # associated with the dead local scheduler, if any. + for manager in manager_ids: + ok = self.state._execute_command( + dummy_object_id, "RAY.OBJECT_TABLE_REMOVE", + dummy_object_id.id(), hex_to_binary(manager)) + if ok != b"OK": + log.warn("Failed to remove object location for " + "dead plasma manager.") + + # If the task is scheduled on a dead local scheduler, mark the + # task as lost. + key = binary_to_object_id(hex_to_binary(task_id)) + ok = self.state._execute_command( + key, "RAY.TASK_TABLE_UPDATE", hex_to_binary(task_id), + ray.experimental.state.TASK_STATUS_LOST, NIL_ID) + if ok != b"OK": + log.warn("Failed to update lost task for dead scheduler.") + num_tasks_updated += 1 + if num_tasks_updated > 0: log.warn("Marked {} tasks as lost.".format(num_tasks_updated)) diff --git a/python/ray/worker.py b/python/ray/worker.py index 03d7fd6e8..81c355fc7 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -226,10 +226,10 @@ class Worker(object): self.fetch_and_register_actor = None self.make_actor = None self.actors = {} - # Use a defaultdict for the actor counts. If this is accessed with a - # missing key, the default value of 0 is returned, and that key value - # pair is added to the dict. - self.actor_counters = collections.defaultdict(lambda: 0) + # TODO(swang): This is a hack to prevent the object store from evicting + # dummy objects. Once we allow object pinning in the store, we may + # remove this variable. + self.actor_pinned_objects = None def set_mode(self, mode): """Set the mode of the worker. @@ -444,7 +444,7 @@ class Worker(object): assert len(final_results) == len(object_ids) return final_results - def submit_task(self, function_id, args, actor_id=None): + def submit_task(self, function_id, args, actor_id=None, actor_counter=0): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with ID @@ -484,13 +484,12 @@ class Worker(object): self.current_task_id, self.task_index, actor_id, - self.actor_counters[actor_id], + actor_counter, [function_properties.num_cpus, function_properties.num_gpus, function_properties.num_custom_resource]) # Increment the worker's task index to track how many tasks have # been submitted by the current task so far. self.task_index += 1 - self.actor_counters[actor_id] += 1 self.local_scheduler_client.submit(task) return task.returns() @@ -554,13 +553,6 @@ class Worker(object): "data": data}) self.redis_client.rpush("ErrorKeys", error_key) - def _wait_for_actor(self): - """Wait until the actor has been imported.""" - assert self.actor_id != NIL_ACTOR_ID - # Wait until the actor has been imported. - while self.actor_id not in self.actors: - time.sleep(0.001) - def _wait_for_function(self, function_id, driver_id, timeout=10): """Wait until the function to be executed is present on this worker. @@ -674,75 +666,90 @@ class Worker(object): (these will be retrieved by calls to get or by subsequent tasks that use the outputs of this task). """ - try: - # The ID of the driver that this task belongs to. This is needed so - # that if the task throws an exception, we propagate the error - # message to the correct driver. - self.task_driver_id = task.driver_id() - self.current_task_id = task.task_id() - self.current_function_id = task.function_id().id() - self.task_index = 0 - self.put_index = 0 - function_id = task.function_id() - args = task.arguments() - return_object_ids = task.returns() - function_name, function_executor = (self.functions - [self.task_driver_id.id()] - [function_id.id()]) + # The ID of the driver that this task belongs to. This is needed so + # that if the task throws an exception, we propagate the error + # message to the correct driver. + self.task_driver_id = task.driver_id() + self.current_task_id = task.task_id() + self.current_function_id = task.function_id().id() + self.task_index = 0 + self.put_index = 0 + function_id = task.function_id() + args = task.arguments() + return_object_ids = task.returns() + if task.actor_id().id() != NIL_ACTOR_ID: + return_object_ids.pop() + function_name, function_executor = (self.functions + [self.task_driver_id.id()] + [function_id.id()]) - # Get task arguments from the object store. + # Get task arguments from the object store. + try: with log_span("ray:task:get_arguments", worker=self): arguments = self._get_arguments_for_execution(function_name, args) + except (RayGetError, RayGetArgumentError) as e: + self._handle_process_task_failure(function_id, return_object_ids, + e, None) + return + except Exception as e: + self._handle_process_task_failure( + function_id, return_object_ids, e, + format_error_message(traceback.format_exc())) + return - # Execute the task. + # Execute the task. + try: with log_span("ray:task:execute", worker=self): if task.actor_id().id() == NIL_ACTOR_ID: outputs = function_executor.executor(arguments) else: + # If this is any actor task other than the first, which has + # no dependencies, the last argument is a dummy argument + # that represents the dependency on the previous actor + # task. Remove this argument for invocation. + if task.actor_counter() > 0: + arguments = arguments[:-1] outputs = function_executor( self.actors[task.actor_id().id()], *arguments) + except Exception as e: + # Determine whether the exception occured during a task, not an + # actor method. + task_exception = task.actor_id().id() == NIL_ACTOR_ID + traceback_str = format_error_message(traceback.format_exc(), + task_exception=task_exception) + self._handle_process_task_failure(function_id, return_object_ids, + e, traceback_str) + return - # Store the outputs in the local object store. + # Store the outputs in the local object store. + try: with log_span("ray:task:store_outputs", worker=self): - if len(return_object_ids) == 1: + # If this is an actor task, then the last object ID returned by + # the task is a dummy output, not returned by the function + # itself. Decrement to get the correct number of return values. + num_returns = len(return_object_ids) + if num_returns == 1: outputs = (outputs,) self._store_outputs_in_objstore(return_object_ids, outputs) except Exception as e: - # We determine whether the exception was caused by the call to - # _get_arguments_for_execution or by the execution of the remote - # function or by the call to _store_outputs_in_objstore. Depending - # on which case occurred, we format the error message differently. - # whether the variables "arguments" and "outputs" are defined. - if "arguments" in locals() and "outputs" not in locals(): - if task.actor_id().id() == NIL_ACTOR_ID: - # The error occurred during the task execution. - traceback_str = format_error_message( - traceback.format_exc(), task_exception=True) - else: - # The error occurred during the execution of an actor task. - traceback_str = format_error_message( - traceback.format_exc()) - elif "arguments" in locals() and "outputs" in locals(): - # The error occurred after the task executed. - traceback_str = format_error_message(traceback.format_exc()) - else: - # The error occurred before the task execution. - if (isinstance(e, RayGetError) or - isinstance(e, RayGetArgumentError)): - # In this case, getting the task arguments failed. - traceback_str = None - else: - traceback_str = traceback.format_exc() - failure_object = RayTaskError(function_name, e, traceback_str) - failure_objects = [failure_object for _ - in range(len(return_object_ids))] - self._store_outputs_in_objstore(return_object_ids, failure_objects) - # Log the error message. - self.push_error_to_driver(self.task_driver_id.id(), "task", - str(failure_object), - data={"function_id": function_id.id(), - "function_name": function_name}) + self._handle_process_task_failure( + function_id, return_object_ids, e, + format_error_message(traceback.format_exc())) + + def _handle_process_task_failure(self, function_id, return_object_ids, + error, backtrace): + function_name, _ = self.functions[ + self.task_driver_id.id()][function_id.id()] + failure_object = RayTaskError(function_name, error, backtrace) + failure_objects = [failure_object for _ in + range(len(return_object_ids))] + self._store_outputs_in_objstore(return_object_ids, failure_objects) + # Log the error message. + self.push_error_to_driver(self.task_driver_id.id(), "task", + str(failure_object), + data={"function_id": function_id.id(), + "function_name": function_name}) def _checkpoint_actor_state(self, actor_counter): """Checkpoint the actor state. @@ -804,6 +811,19 @@ class Worker(object): with log_span("ray:task", contents=contents, worker=self): self._process_task(task) + # Add the dummy output for actor tasks. TODO(swang): We use a + # numpy array as a hack to pin the object in the object store. + # Once we allow object pinning in the store, we may use `None`. + if task.actor_id().id() != NIL_ACTOR_ID: + dummy_object_id = task.returns().pop() + dummy_object = np.zeros(1) + self.put_object(dummy_object_id, dummy_object) + + # Keep the dummy output in scope for the lifetime of the actor, + # to prevent eviction from the object store. + dummy_object = self.get_object([dummy_object_id]) + self.actor_pinned_objects.append(dummy_object[0]) + # Push all of the log events to the global state store. flush_log() @@ -1742,6 +1762,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, # driver creates an object that is later evicted, we should notify the # user that we're unable to reconstruct the object, since we cannot # rerun the driver. + nil_actor_counter = 0 driver_task = ray.local_scheduler.Task( worker.task_driver_id, ray.local_scheduler.ObjectID(NIL_FUNCTION_ID), @@ -1750,7 +1771,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.current_task_id, worker.task_index, ray.local_scheduler.ObjectID(NIL_ACTOR_ID), - worker.actor_counters[actor_id], + nil_actor_counter, [0, 0, 0]) global_state._execute_command( driver_task.task_id(), @@ -1768,6 +1789,9 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, actor_key = b"Actor:" + worker.actor_id class_id = worker.redis_client.hget(actor_key, "class_id") worker.class_id = class_id + # Store a list of the dummy outputs produced by actor tasks, to pin the + # dummy outputs in the object store. + worker.actor_pinned_objects = [] # Initialize the serialization library. This registers some classes, and so # it must be run before we export all of the cached remote functions. diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index d4660327f..7a9257e67 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -79,17 +79,6 @@ if __name__ == "__main__": ray.worker.connect(info, mode=ray.WORKER_MODE, actor_id=actor_id) - # If this is an actor started in reconstruct mode, rerun tasks to - # reconstruct its state. - if args.reconstruct: - try: - ray.actor.reconstruct_actor_state(actor_id, - ray.worker.global_worker) - except Exception as e: - redis_client = create_redis_client(args.redis_address) - push_error_to_all_drivers(redis_client, traceback.format_exc()) - raise e - error_explanation = """ This error is unexpected and should not have happened. Somehow a worker crashed in an unanticipated way causing the main_loop to throw an exception, diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 578259b38..d4fa06c67 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -633,24 +633,25 @@ void reconstruct_task_update_callback(Task *task, * Suppress the reconstruction request. */ return; } + /* Otherwise, the test-and-set succeeded, so resubmit the task for execution * to ensure that reconstruction will happen. */ LocalSchedulerState *state = (LocalSchedulerState *) user_context; TaskSpec *spec = Task_task_spec(task); - /* If the task is an actor task, then we currently do not reconstruct it. - * TODO(rkn): Handle this better. */ - if (!ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { - LOG_WARN("We are not resubmitting this task because it is an actor task."); - } else { - /* Resubmit the task. */ - handle_task_submitted(state, state->algorithm_state, spec, + if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { + handle_task_submitted(state, state->algorithm_state, Task_task_spec(task), Task_task_spec_size(task)); - /* Recursively reconstruct the task's inputs, if necessary. */ - for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) { - if (TaskSpec_arg_by_ref(spec, i)) { - ObjectID arg_id = TaskSpec_arg_id(spec, i); - reconstruct_object(state, arg_id); - } + } else { + handle_actor_task_submitted(state, state->algorithm_state, + Task_task_spec(task), + Task_task_spec_size(task)); + } + + /* Recursively reconstruct the task's inputs, if necessary. */ + for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) { + if (TaskSpec_arg_by_ref(spec, i)) { + ObjectID arg_id = TaskSpec_arg_id(spec, i); + reconstruct_object(state, arg_id); } } } @@ -1178,6 +1179,7 @@ void handle_actor_creation_callback(ActorID actor_id, /* TODO(rkn): We should kill the actor here if it is still around. Also, * if it hasn't registered yet, we should keep track of its PID so we can * kill it anyway. */ + /* TODO(swang): Evict actor dummy objects as part of actor cleanup. */ } } diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 3f1d08260..1c58a15a2 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -498,6 +498,9 @@ void fetch_missing_dependency(LocalSchedulerState *state, * Fetch a queued task's missing object dependencies. The fetch requests will * be retried every kLocalSchedulerFetchTimeoutMilliseconds until all * objects are available locally. + * TODO(swang): For actor task dummy objects, we should still request + * reconstruction for missing dependencies, but we should not request transfer + * from other nodes. * * @param state The scheduler state. * @param algorithm_state The scheduling algorithm state. diff --git a/test/actor_test.py b/test/actor_test.py index f9075a0e4..612c92157 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1214,6 +1214,8 @@ class ActorReconstruction(unittest.TestCase): ray.worker.cleanup() + @unittest.skip("Skipping until checkpointing is integrated with object " + "lineage.") def testCheckpointing(self): ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_workers=0, redirect_output=True)