From af47737bd5b901fcb93aa6c14cda13f934f846af Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 19 Oct 2017 23:49:59 -0700 Subject: [PATCH] Prototype distributed actor handles (#1137) * Add actor handle ID to the task spec * Local scheduler dispatches actor tasks according to a task counter per handle * Fix python test * Allow passing actor handles into tasks. Not completely working yet. Also this is very messy. * Fixes, should be roughly working now. * Refactor actor handle wrapper * Fix __init__ tests * Terminate actor when the original handle goes out of scope * TODO and a couple test cases * Make tests for unsupported cases * Fix Python mode tests * Linting. * Cache actor definitions that occur before ray.init() is called. * Fix export actor class * Deterministically compute actor handle ID * Fix __getattribute__ * Fix string encoding for python3 * doc * Add comment and assertion. --- python/ray/actor.py | 697 +++++++++++------- python/ray/global_scheduler/test/test.py | 1 + python/ray/worker.py | 73 +- src/common/format/common.fbs | 3 + src/common/lib/python/common_extension.cc | 16 +- src/common/task.cc | 17 +- src/common/task.h | 12 + src/common/test/example_task.h | 3 +- src/common/test/task_tests.cc | 34 +- .../local_scheduler_algorithm.cc | 77 +- test/actor_test.py | 272 +++++-- 11 files changed, 799 insertions(+), 406 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index f1675f0a2..a0c8c0330 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -26,16 +26,40 @@ def random_actor_class_id(): return random_string() -def get_actor_method_function_id(attr): +def compute_actor_handle_id(actor_handle_id, num_forks): + """Deterministically comopute an actor handle ID. + + A new actor handle ID is generated when it is forked from another actor + handle. The new handle ID is computed as hash(old_handle_id || num_forks). + + Args: + actor_handle_id (common.ObjectID): The original actor handle ID. + num_forks: The number of times the original actor handle has been + forked so far. + + Returns: + An object ID for the new actor handle. + """ + handle_id_hash = hashlib.sha1() + handle_id_hash.update(actor_handle_id.id()) + handle_id_hash.update(str(num_forks).encode("ascii")) + handle_id = handle_id_hash.digest() + assert len(handle_id) == 20 + return ray.local_scheduler.ObjectID(handle_id) + + +def compute_actor_method_function_id(class_name, attr): """Get the function ID corresponding to an actor method. Args: + class_name (str): The class name of the actor. attr (str): The attribute name of the method. Returns: Function ID corresponding to the method. """ function_id_hash = hashlib.sha1() + function_id_hash.update(class_name) function_id_hash.update(attr.encode("ascii")) function_id = function_id_hash.digest() assert len(function_id) == 20 @@ -203,19 +227,18 @@ def fetch_and_register_actor(actor_class_key, worker): def temporary_actor_method(*xs): raise Exception("The actor with name {} failed to be imported, and so " "cannot execute this method".format(actor_name)) + # Register the actor method signatures. + register_actor_signatures(worker, driver_id, class_name, + actor_method_names) + # Register the actor method executors. for actor_method_name in actor_method_names: - function_id = get_actor_method_function_id(actor_method_name).id() + function_id = compute_actor_method_function_id(class_name, + actor_method_name).id() temporary_executor = make_actor_method_executor(worker, actor_method_name, temporary_actor_method) worker.functions[driver_id][function_id] = (actor_method_name, temporary_executor) - worker.function_properties[driver_id][function_id] = ( - FunctionProperties(num_return_vals=2, - num_cpus=1, - num_gpus=0, - num_custom_resource=0, - max_calls=0)) worker.num_task_executions[driver_id][function_id] = 0 try: @@ -226,7 +249,8 @@ def fetch_and_register_actor(actor_class_key, worker): # traceback and notify the scheduler of the failure. traceback_str = ray.worker.format_error_message(traceback.format_exc()) # Log the error message. - worker.push_error_to_driver(driver_id, "register_actor", traceback_str, + worker.push_error_to_driver(driver_id, "register_actor_signatures", + traceback_str, data={"actor_id": actor_id_str}) # TODO(rkn): In the future, it might make sense to have the worker exit # here. However, currently that would lead to hanging if someone calls @@ -239,7 +263,8 @@ def fetch_and_register_actor(actor_class_key, worker): unpickled_class, predicate=(lambda x: (inspect.isfunction(x) or inspect.ismethod(x)))) for actor_method_name, actor_method in actor_methods: - function_id = get_actor_method_function_id(actor_method_name).id() + function_id = compute_actor_method_function_id( + class_name, actor_method_name).id() executor = make_actor_method_executor(worker, actor_method_name, actor_method) worker.functions[driver_id][function_id] = (actor_method_name, @@ -256,28 +281,86 @@ def fetch_and_register_actor(actor_class_key, worker): worker.local_scheduler_id = binary_to_hex(local_scheduler_id) -def export_actor_class(class_id, Class, actor_method_names, - checkpoint_interval, worker): - if worker.mode is None: - raise Exception("Actors cannot be created before Ray has been " - "started. You can start Ray with 'ray.init()'.") - key = b"ActorClass:" + class_id - d = {"driver_id": worker.task_driver_id.id(), - "class_name": Class.__name__, - "module": Class.__module__, - "class": pickle.dumps(Class), - "checkpoint_interval": checkpoint_interval, - "actor_method_names": json.dumps(list(actor_method_names))} - worker.redis_client.hmset(key, d) +def register_actor_signatures(worker, driver_id, class_name, + actor_method_names): + """Register an actor's method signatures in the worker. + + Args: + worker: The worker to register the signatures on. + driver_id: The ID of the driver that this actor is associated with. + actor_id: The ID of the actor. + actor_method_names: The names of the methods to register. + """ + for actor_method_name in actor_method_names: + # TODO(rkn): When we create a second actor, we are probably overwriting + # the values from the first actor here. This may or may not be a + # problem. + function_id = compute_actor_method_function_id(class_name, + actor_method_name).id() + # For now, all actor methods have 1 return value. + worker.function_properties[driver_id][function_id] = ( + FunctionProperties(num_return_vals=2, + num_cpus=1, + num_gpus=0, + num_custom_resource=0, + max_calls=0)) + + +def publish_actor_class_to_key(key, actor_class_info, worker): + """Push an actor class definition to Redis. + + The is factored out as a separate function because it is also called + on cached actor class definitions when a worker connects for the first + time. + + Args: + key: The key to store the actor class info at. + actor_class_info: Information about the actor class. + worker: The worker to use to connect to Redis. + """ + # We set the driver ID here because it may not have been available when the + # actor class was defined. + actor_class_info["driver_id"] = worker.task_driver_id.id() + worker.redis_client.hmset(key, actor_class_info) worker.redis_client.rpush("Exports", key) -def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, - worker): +def export_actor_class(class_id, Class, actor_method_names, + checkpoint_interval, worker): + key = b"ActorClass:" + class_id + actor_class_info = { + "class_name": Class.__name__, + "module": Class.__module__, + "class": pickle.dumps(Class), + "checkpoint_interval": checkpoint_interval, + "actor_method_names": json.dumps(list(actor_method_names))} + + if worker.mode is None: + # This means that 'ray.init()' has not been called yet and so we must + # cache the actor class definition and export it when 'ray.init()' is + # called. + assert worker.cached_remote_functions_and_actors is not None + worker.cached_remote_functions_and_actors.append( + ("actor", (key, actor_class_info))) + # This caching code path is currently not used because we only export + # actor class definitions lazily when we instantiate the actor for the + # first time. + assert False, "This should be unreachable." + else: + publish_actor_class_to_key(key, actor_class_info, worker) + # TODO(rkn): Currently we allow actor classes to be defined within tasks. + # I tried to disable this, but it may be necessary because of + # https://github.com/ray-project/ray/issues/1146. + + +def export_actor(actor_id, class_id, class_name, actor_method_names, num_cpus, + num_gpus, worker): """Export an actor to redis. Args: - actor_id: The ID of the actor. + actor_id (common.ObjectID): The ID of the actor. + class_id (str): A random ID for the actor class. + class_name (str): The actor class name. actor_method_names (list): A list of the names of this actor's methods. num_cpus (int): The number of CPUs that this actor requires. num_gpus (int): The number of GPUs that this actor requires. @@ -286,23 +369,13 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, if worker.mode is None: raise Exception("Actors cannot be created before Ray has been " "started. You can start Ray with 'ray.init()'.") - key = b"Actor:" + actor_id.id() - # For now, all actor methods have 1 return value. driver_id = worker.task_driver_id.id() - for actor_method_name in actor_method_names: - # TODO(rkn): When we create a second actor, we are probably overwriting - # the values from the first actor here. This may or may not be a - # problem. - function_id = get_actor_method_function_id(actor_method_name).id() - worker.function_properties[driver_id][function_id] = ( - FunctionProperties(num_return_vals=2, - num_cpus=1, - num_gpus=0, - num_custom_resource=0, - max_calls=0)) + register_actor_signatures(worker, driver_id, class_name, + actor_method_names) # Select a local scheduler for the actor. + key = b"Actor:" + actor_id.id() local_scheduler_id = select_local_scheduler( worker.task_driver_id.id(), ray.global_state.local_schedulers(), num_gpus, worker.redis_client) @@ -311,6 +384,7 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, # We must put the actor information in Redis before publishing the actor # notification so that when the newly created actor attempts to fetch the # information from Redis, it is already there. + driver_id = worker.task_driver_id.id() worker.redis_client.hmset(key, {"class_id": class_id, "driver_id": driver_id, "local_scheduler_id": local_scheduler_id, @@ -326,6 +400,340 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, worker.redis_client) +# Create objects to wrap method invocations. This is done so that we can +# invoke methods with actor.method.remote() instead of actor.method(). +class ActorMethod(object): + def __init__(self, actor, method_name): + self.actor = actor + self.method_name = method_name + + def __call__(self, *args, **kwargs): + raise Exception("Actor methods cannot be called directly. Instead " + "of running 'object.{}()', try " + "'object.{}.remote()'." + .format(self.method_name, self.method_name)) + + def remote(self, *args, **kwargs): + return self.actor._actor_method_call( + self.method_name, args=args, kwargs=kwargs, + dependency=self.actor._ray_actor_cursor) + + +# Checkpoint methods do not take in the state of the previous actor method +# as an explicit data dependency. +class CheckpointMethod(ActorMethod): + def remote(self): + # A checkpoint's arguments are the current task counter and the + # object ID of the preceding task. The latter is an implicit data + # dependency, since the checkpoint method can run at any time. + args = [self.actor._ray_actor_counter, + [self.actor._ray_actor_cursor]] + return self.actor._actor_method_call(self.method_name, args=args) + + +class ActorHandleWrapper(object): + """A wrapper for the contents of an ActorHandle. + + This is essentially just a dictionary, but it is used so that the recipient + can tell that an argument is an ActorHandle. + """ + def __init__(self, actor_id, actor_handle_id, actor_cursor, actor_counter, + actor_method_names, method_signatures, checkpoint_interval, + class_name): + self.actor_id = actor_id + self.actor_handle_id = actor_handle_id + self.actor_cursor = actor_cursor + self.actor_counter = actor_counter + self.actor_method_names = actor_method_names + # TODO(swang): Fetch this information from Redis so that we don't have + # to fall back to pickle. + self.method_signatures = method_signatures + self.checkpoint_interval = checkpoint_interval + self.class_name = class_name + + +def wrap_actor_handle(actor_handle): + """Wrap the ActorHandle to store the fields. + + Args: + actor_handle: The ActorHandle instance to wrap. + + Returns: + An ActorHandleWrapper instance that stores the ActorHandle's fields. + """ + if actor_handle._ray_checkpoint_interval > 0: + raise Exception("Checkpointing not yet supported for distributed " + "actor handles.") + wrapper = ActorHandleWrapper( + actor_handle._ray_actor_id, + compute_actor_handle_id(actor_handle._ray_actor_handle_id, + actor_handle._ray_actor_forks), + actor_handle._ray_actor_cursor, + 0, # Reset the actor counter. + actor_handle._ray_actor_method_names, + actor_handle._ray_method_signatures, + actor_handle._ray_checkpoint_interval, + actor_handle._ray_class_name) + actor_handle._ray_actor_forks += 1 + return wrapper + + +def unwrap_actor_handle(worker, wrapper): + """Make an ActorHandle from the stored fields. + + Args: + worker: The worker that is unwrapping the actor handle. + wrapper: An ActorHandleWrapper instance to unwrap. + + Returns: + The unwrapped ActorHandle instance. + """ + driver_id = worker.task_driver_id.id() + register_actor_signatures(worker, driver_id, wrapper.class_name, + wrapper.actor_method_names) + + actor_handle_class = make_actor_handle_class(wrapper.class_name) + actor_object = actor_handle_class.__new__(actor_handle_class) + actor_object._manual_init( + wrapper.actor_id, + wrapper.actor_handle_id, + wrapper.actor_cursor, + wrapper.actor_counter, + wrapper.actor_method_names, + wrapper.method_signatures, + wrapper.checkpoint_interval) + return actor_object + + +class ActorHandleParent(object): + """This is the parent class of all ActorHandle classes. + + This enables us to identify actor handles by checking if an object obj + satisfies isinstance(obj, ActorHandleParent). + """ + pass + + +def make_actor_handle_class(class_name): + class ActorHandle(ActorHandleParent): + def __init__(self, *args, **kwargs): + raise Exception("Actor classes cannot be instantiated directly. " + "Instead of running '{}()', try '{}.remote()'." + .format(class_name, class_name)) + + @classmethod + def remote(cls, *args, **kwargs): + raise NotImplementedError("The classmethod remote() can only be " + "called on the original Class.") + + def _manual_init(self, actor_id, actor_handle_id, actor_cursor, + actor_counter, actor_method_names, method_signatures, + checkpoint_interval): + self._ray_actor_id = actor_id + self._ray_actor_handle_id = actor_handle_id + self._ray_actor_cursor = actor_cursor + self._ray_actor_counter = actor_counter + self._ray_actor_method_names = actor_method_names + self._ray_method_signatures = method_signatures + self._ray_checkpoint_interval = checkpoint_interval + self._ray_class_name = class_name + self._ray_actor_forks = 0 + + def _actor_method_call(self, method_name, args=None, kwargs=None, + dependency=None): + """Method execution stub for an actor handle. + + This is the function that executes when + `actor.method_name.remote(*args, **kwargs)` is called. Instead of + executing locally, the method is packaged as a task and scheduled + to the remote actor instance. + + Args: + self: The local actor handle. + method_name: The name of the actor method to execute. + args: A list of arguments for the actor method. + kwargs: A dictionary of keyword arguments for the actor method. + dependency: The object ID that this method is dependent on. + Defaults to None, for no dependencies. Most tasks should + pass in the dummy object returned by the preceding task. + Some tasks, such as checkpoint and terminate methods, have + no dependencies. + + Returns: + object_ids: A list of object IDs returned by the remote actor + method. + """ + ray.worker.check_connected() + ray.worker.check_main_thread() + function_signature = self._ray_method_signatures[method_name] + if args is None: + args = [] + if kwargs is None: + kwargs = {} + args = signature.extend_args(function_signature, args, kwargs) + + # Execute functions locally if Ray is run in PYTHON_MODE + # Copy args to prevent the function from mutating them. + if ray.worker.global_worker.mode == ray.PYTHON_MODE: + return getattr( + ray.worker.global_worker.actors[self._ray_actor_id], + method_name)(*copy.deepcopy(args)) + + # Add the dummy argument that represents dependency on a preceding + # task. + args.append(dependency) + + is_actor_checkpoint_method = (method_name == "__ray_checkpoint__") + + function_id = compute_actor_method_function_id( + self._ray_class_name, method_name) + object_ids = ray.worker.global_worker.submit_task( + function_id, args, actor_id=self._ray_actor_id, + actor_handle_id=self._ray_actor_handle_id, + actor_counter=self._ray_actor_counter, + is_actor_checkpoint_method=is_actor_checkpoint_method) + # Update the actor counter and cursor to reflect the most recent + # invocation. + self._ray_actor_counter += 1 + self._ray_actor_cursor = object_ids.pop() + + # Submit a checkpoint task if it is time to do so. + if (self._ray_checkpoint_interval > 1 and + self._ray_actor_counter % self._ray_checkpoint_interval == + 0): + self.__ray_checkpoint__.remote() + + # The last object returned is the dummy object that should be + # passed in to the next actor method. Do not return it to the user. + if len(object_ids) == 1: + return object_ids[0] + elif len(object_ids) > 1: + return object_ids + + # Make tab completion work. + def __dir__(self): + return self._ray_actor_method_names + + def __getattribute__(self, attr): + try: + # Check whether this is an actor method. + actor_method_names = object.__getattribute__( + self, "_ray_actor_method_names") + if attr in actor_method_names: + # We create the ActorMethod on the fly here so that the + # ActorHandle doesn't need a reference to the ActorMethod. + # The ActorMethod has a reference to the ActorHandle and + # this was causing cyclic references which were prevent + # object deallocation from behaving in a predictable + # manner. + if attr == "__ray_checkpoint__": + actor_method_cls = CheckpointMethod + else: + actor_method_cls = ActorMethod + return actor_method_cls(self, attr) + except AttributeError: + pass + + # If the requested attribute is not a registered method, fall back + # to default __getattribute__. + return object.__getattribute__(self, attr) + + def __repr__(self): + return "Actor(" + self._ray_actor_id.hex() + ")" + + def __reduce__(self): + raise Exception("Actor objects cannot be pickled.") + + def __del__(self): + """Kill the worker that is running this actor.""" + # TODO(swang): Also clean up forked actor handles. + # Kill the worker if this is the original actor handle, created + # with Class.remote(). + if (ray.worker.global_worker.connected and + self._ray_actor_handle_id.id() == ray.worker.NIL_ACTOR_ID): + self._actor_method_call("__ray_terminate__", + args=[self._ray_actor_id.id()]) + + return ActorHandle + + +def actor_handle_from_class(Class, class_id, num_cpus, num_gpus, + checkpoint_interval): + class_name = Class.__name__.encode("ascii") + actor_handle_class = make_actor_handle_class(class_name) + exported = [] + + class ActorHandle(actor_handle_class): + + @classmethod + def remote(cls, *args, **kwargs): + actor_id = random_actor_id() + # The ID for this instance of ActorHandle. These should be unique + # across instances with the same _ray_actor_id. + actor_handle_id = ray.local_scheduler.ObjectID( + ray.worker.NIL_ACTOR_ID) + # The actor cursor is a dummy object representing the most recent + # actor method invocation. For each subsequent method invocation, + # the current cursor should be added as a dependency, and then + # updated to reflect the new invocation. + actor_cursor = None + # The number of actor method invocations that we've called so far. + actor_counter = 0 + # Get the actor methods of the given class. + actor_methods = inspect.getmembers( + Class, predicate=(lambda x: (inspect.isfunction(x) or + inspect.ismethod(x)))) + # Extract the signatures of each of the methods. This will be used + # to catch some errors if the methods are called with inappropriate + # arguments. + method_signatures = dict() + for k, v in actor_methods: + # Print a warning message if the method signature is not + # supported. We don't raise an exception because if the actor + # inherits from a class that has a method whose signature we + # don't support, we there may not be much the user can do about + # it. + signature.check_signature_supported(v, warn=True) + method_signatures[k] = signature.extract_signature( + v, ignore_first=True) + + actor_method_names = [method_name for method_name, _ in + actor_methods] + # Do not export the actor class or the actor if run in PYTHON_MODE + # Instead, instantiate the actor locally and add it to + # global_worker's dictionary + if ray.worker.global_worker.mode == ray.PYTHON_MODE: + ray.worker.global_worker.actors[actor_id] = ( + Class.__new__(Class)) + else: + # Export the actor. + if not exported: + export_actor_class(class_id, Class, actor_method_names, + checkpoint_interval, + ray.worker.global_worker) + exported.append(0) + export_actor(actor_id, class_id, class_name, + actor_method_names, num_cpus, num_gpus, + ray.worker.global_worker) + + # Instantiate the actor handle. + actor_object = cls.__new__(cls) + actor_object._manual_init(actor_id, actor_handle_id, actor_cursor, + actor_counter, actor_method_names, + method_signatures, checkpoint_interval) + + # Call __init__ as a remote function. + if "__init__" in actor_object._ray_actor_method_names: + actor_object._actor_method_call("__init__", args=args, + kwargs=kwargs) + else: + print("WARNING: this object has no __init__ method.") + + return actor_object + + return ActorHandle + + def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): if checkpoint_interval == 0: raise Exception("checkpoint_interval must be greater than 0.") @@ -472,216 +880,9 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): Class.__name__ = cls.__name__ class_id = random_actor_class_id() - # The list exported will have length 0 if the class has not been exported - # yet, and length one if it has. This is just implementing a bool, but we - # don't use a bool because we need to modify it inside of the ActorHandle - # constructor. - exported = [] - # Create objects to wrap method invocations. This is done so that we can - # invoke methods with actor.method.remote() instead of actor.method(). - class ActorMethod(object): - def __init__(self, actor, method_name): - self.actor = actor - self.method_name = method_name - - def __call__(self, *args, **kwargs): - raise Exception("Actor methods cannot be called directly. Instead " - "of running 'object.{}()', try " - "'object.{}.remote()'." - .format(self.method_name, self.method_name)) - - def remote(self, *args, **kwargs): - return self.actor._actor_method_call( - self.method_name, args=args, kwargs=kwargs, - dependency=self.actor._ray_actor_cursor) - - # Checkpoint methods do not take in the state of the previous actor method - # as an explicit data dependency. - class CheckpointMethod(ActorMethod): - def remote(self): - # A checkpoint's arguments are the current task counter and the - # object ID of the preceding task. The latter is an implicit data - # dependency, since the checkpoint method can run at any time. - args = [self.actor._ray_actor_counter, - [self.actor._ray_actor_cursor]] - return self.actor._actor_method_call(self.method_name, args=args) - - class ActorHandle(object): - def __init__(self, *args, **kwargs): - raise Exception("Actor classes cannot be instantiated directly. " - "Instead of running '{}()', try '{}.remote()'." - .format(Class.__name__, Class.__name__)) - - @classmethod - def remote(cls, *args, **kwargs): - actor_object = cls.__new__(cls) - actor_object._manual_init(*args, **kwargs) - return actor_object - - def _manual_init(self, *args, **kwargs): - self._ray_actor_id = random_actor_id() - # The number of actor method invocations that we've called so far. - self._ray_actor_counter = 0 - # The actor cursor is a dummy object representing the most recent - # actor method invocation. For each subsequent method invocation, - # the current cursor should be added as a dependency, and then - # updated to reflect the new invocation. - self._ray_actor_cursor = None - ray_actor_methods = inspect.getmembers( - Class, predicate=(lambda x: (inspect.isfunction(x) or - inspect.ismethod(x)))) - self._ray_actor_methods = {} - for actor_method_name, actor_method in ray_actor_methods: - self._ray_actor_methods[actor_method_name] = actor_method - # Extract the signatures of each of the methods. This will be used - # to catch some errors if the methods are called with inappropriate - # arguments. - self._ray_method_signatures = dict() - for k, v in self._ray_actor_methods.items(): - # Print a warning message if the method signature is not - # supported. We don't raise an exception because if the actor - # inherits from a class that has a method whose signature we - # don't support, we there may not be much the user can do about - # it. - signature.check_signature_supported(v, warn=True) - self._ray_method_signatures[k] = signature.extract_signature( - v, ignore_first=True) - - # Do not export the actor class or the actor if run in PYTHON_MODE - # Instead, instantiate the actor locally and add it to - # global_worker's dictionary - if ray.worker.global_worker.mode == ray.PYTHON_MODE: - ray.worker.global_worker.actors[self._ray_actor_id] = ( - Class.__new__(Class)) - else: - # Export the actor class if it has not been exported yet. - if len(exported) == 0: - export_actor_class(class_id, Class, - self._ray_actor_methods.keys(), - checkpoint_interval, - ray.worker.global_worker) - exported.append(0) - # Export the actor. - export_actor(self._ray_actor_id, class_id, - self._ray_actor_methods.keys(), num_cpus, - num_gpus, ray.worker.global_worker) - - # Call __init__ as a remote function. - if "__init__" in self._ray_actor_methods.keys(): - self._actor_method_call("__init__", args=args, kwargs=kwargs) - else: - print("WARNING: this object has no __init__ method.") - - def _actor_method_call(self, method_name, args=None, kwargs=None, - dependency=None): - """Method execution stub for an actor handle. - - This is the function that executes when - `actor.method_name.remote(*args, **kwargs)` is called. Instead of - executing locally, the method is packaged as a task and scheduled - to the remote actor instance. - - Args: - self: The local actor handle. - method_name: The name of the actor method to execute. - args: A list of arguments for the actor method. - kwargs: A dictionary of keyword arguments for the actor method. - dependency: The object ID that this method is dependent on. - Defaults to None, for no dependencies. Most tasks should - pass in the dummy object returned by the preceding task. - Some tasks, such as checkpoint and terminate methods, have - no dependencies. - - Returns: - object_ids: A list of object IDs returned by the remote actor - method. - """ - ray.worker.check_connected() - ray.worker.check_main_thread() - function_signature = self._ray_method_signatures[method_name] - if args is None: - args = [] - if kwargs is None: - kwargs = {} - args = signature.extend_args(function_signature, args, kwargs) - - # Execute functions locally if Ray is run in PYTHON_MODE - # Copy args to prevent the function from mutating them. - if ray.worker.global_worker.mode == ray.PYTHON_MODE: - return getattr( - ray.worker.global_worker.actors[self._ray_actor_id], - method_name)(*copy.deepcopy(args)) - - # Add the dummy argument that represents dependency on a preceding - # task. - args.append(dependency) - - is_actor_checkpoint_method = (method_name == "__ray_checkpoint__") - - function_id = get_actor_method_function_id(method_name) - object_ids = ray.worker.global_worker.submit_task( - function_id, args, actor_id=self._ray_actor_id, - actor_counter=self._ray_actor_counter, - is_actor_checkpoint_method=is_actor_checkpoint_method) - # Update the actor counter and cursor to reflect the most recent - # invocation. - self._ray_actor_counter += 1 - self._ray_actor_cursor = object_ids.pop() - - # Submit a checkpoint task if it is time to do so. - if (checkpoint_interval > 1 and - self._ray_actor_counter % checkpoint_interval == 0): - self.__ray_checkpoint__.remote() - - # The last object returned is the dummy object that should be - # passed in to the next actor method. Do not return it to the user. - if len(object_ids) == 1: - return object_ids[0] - elif len(object_ids) > 1: - return object_ids - - # Make tab completion work. - def __dir__(self): - return self._ray_actor_methods - - def __getattribute__(self, attr): - # The following is needed so we can still access - # self.actor_methods. - if attr in ["_manual_init", "_ray_actor_id", "_ray_actor_counter", - "_ray_actor_cursor", "_ray_actor_methods", - "_actor_method_invokers", "_ray_method_signatures", - "_actor_method_call"]: - return object.__getattribute__(self, attr) - if attr in self._ray_actor_methods.keys(): - # We create the ActorMethod on the fly here so that the - # ActorHandle doesn't need a reference to the ActorMethod. The - # ActorMethod has a reference to the ActorHandle and this was - # causing cyclic references which were prevent object - # deallocation from behaving in a predictable manner. - if attr == "__ray_checkpoint__": - actor_method_cls = CheckpointMethod - else: - actor_method_cls = ActorMethod - return actor_method_cls(self, attr) - else: - # There is no method with this name, so raise an exception. - raise AttributeError("'{}' Actor object has no attribute '{}'" - .format(Class, attr)) - - def __repr__(self): - return "Actor(" + self._ray_actor_id.hex() + ")" - - def __reduce__(self): - raise Exception("Actor objects cannot be pickled.") - - def __del__(self): - """Kill the worker that is running this actor.""" - if ray.worker.global_worker.connected: - self._actor_method_call("__ray_terminate__", - args=[self._ray_actor_id.id()]) - - return ActorHandle + return actor_handle_from_class(Class, class_id, num_cpus, num_gpus, + checkpoint_interval) ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index f1a738060..9daf5736b 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -170,6 +170,7 @@ class TestGlobalScheduler(unittest.TestCase): task2 = local_scheduler.Task(random_driver_id(), random_function_id(), [random_object_id()], 0, random_task_id(), 0, local_scheduler.ObjectID(NIL_ACTOR_ID), + local_scheduler.ObjectID(NIL_ACTOR_ID), 0, 0, [1.0, 2.0, 0.0]) self.assertEqual(task2.required_resources(), [1.0, 2.0, 0.0]) diff --git a/python/ray/worker.py b/python/ray/worker.py index 08ad0bb08..cf2ad083d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -184,14 +184,12 @@ class Worker(object): connected (bool): True if Ray has been started and False otherwise. mode: The mode of the worker. One of SCRIPT_MODE, PYTHON_MODE, SILENT_MODE, and WORKER_MODE. - cached_remote_functions (List[Tuple[str, str]]): A list of pairs - representing the remote functions that were defined before the - worker called connect. The first element is the name of the remote - function, and the second element is the serialized remote function. - When the worker eventually does call connect, if it is a driver, it - will export these functions to the scheduler. If - cached_remote_functions is None, that means that connect has been - called already. + cached_remote_functions_and_actors: A list of information for exporting + remote functions and actor classes definitions that were defined + before the worker called connect. When the worker eventually does + call connect, if it is a driver, it will export these functions and + actors. If cached_remote_functions_and_actors is None, that means + that connect has been called already. cached_functions_to_run (List): A list of functions to run on all of the workers that should be exported as soon as connect is called. """ @@ -221,7 +219,7 @@ class Worker(object): self.num_task_executions = collections.defaultdict(lambda: {}) self.connected = False self.mode = None - self.cached_remote_functions = [] + self.cached_remote_functions_and_actors = [] self.cached_functions_to_run = [] self.fetch_and_register_actor = None self.make_actor = None @@ -454,7 +452,8 @@ class Worker(object): assert len(final_results) == len(object_ids) return final_results - def submit_task(self, function_id, args, actor_id=None, actor_counter=0, + def submit_task(self, function_id, args, actor_id=None, + actor_handle_id=None, actor_counter=0, is_actor_checkpoint_method=False): """Submit a remote task to the scheduler. @@ -474,14 +473,21 @@ class Worker(object): """ with log_span("ray:submit_task", worker=self): check_main_thread() - actor_id = (ray.local_scheduler.ObjectID(NIL_ACTOR_ID) - if actor_id is None else actor_id) + if actor_id is None: + assert actor_handle_id is None + actor_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) + actor_handle_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) + else: + assert actor_handle_id is not None # Put large or complex arguments that are passed by value in the # object store first. args_for_local_scheduler = [] for arg in args: if isinstance(arg, ray.local_scheduler.ObjectID): args_for_local_scheduler.append(arg) + elif isinstance(arg, ray.actor.ActorHandleParent): + args_for_local_scheduler.append(put( + ray.actor.wrap_actor_handle(arg))) elif ray.local_scheduler.check_simple_value(arg): args_for_local_scheduler.append(arg) else: @@ -500,6 +506,7 @@ class Worker(object): self.current_task_id, self.task_index, actor_id, + actor_handle_id, actor_counter, is_actor_checkpoint_method, [function_properties.num_cpus, function_properties.num_gpus, @@ -655,6 +662,8 @@ class Worker(object): # created this object failed, and we should propagate the # error message here. raise RayGetArgumentError(function_name, i, arg, argument) + elif isinstance(argument, ray.actor.ActorHandleWrapper): + argument = ray.actor.unwrap_actor_handle(self, argument) else: # pass the argument by value argument = arg @@ -1098,6 +1107,8 @@ def _initialize_serialization(worker=global_worker): _register_class(type(lambda: 0), use_pickle=True) # Tell Ray to serialize types with pickle. _register_class(type(int), use_pickle=True) + # Ray can serialize actor handles that have been wrapped. + _register_class(ray.actor.ActorHandleWrapper) def get_address_info_from_redis_helper(redis_address, node_ip_address): @@ -1704,7 +1715,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, error_message = "Perhaps you called ray.init twice by accident?" assert not worker.connected, error_message assert worker.cached_functions_to_run is not None, error_message - assert worker.cached_remote_functions is not None, error_message + assert worker.cached_remote_functions_and_actors is not None, error_message # Initialize some fields. worker.worker_id = random_string() worker.actor_id = actor_id @@ -1840,6 +1851,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, worker.current_task_id, worker.task_index, ray.local_scheduler.ObjectID(NIL_ACTOR_ID), + ray.local_scheduler.ObjectID(NIL_ACTOR_ID), nil_actor_counter, False, [0, 0, 0]) @@ -1913,23 +1925,32 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, for function in worker.cached_functions_to_run: worker.run_function_on_all_workers(function) # Export cached remote functions to the workers. - for info in worker.cached_remote_functions: - (function_id, func_name, func, - func_invoker, function_properties) = info - export_remote_function(function_id, func_name, func, func_invoker, - function_properties, worker) + for cached_type, info in worker.cached_remote_functions_and_actors: + if cached_type == "remote_function": + (function_id, func_name, func, + func_invoker, function_properties) = info + export_remote_function(function_id, func_name, func, + func_invoker, function_properties, + worker) + elif cached_type == "actor": + (key, actor_class_info) = info + ray.actor.publish_actor_class_to_key(key, actor_class_info, + worker) + else: + assert False, "This code should be unreachable." worker.cached_functions_to_run = None - worker.cached_remote_functions = None + worker.cached_remote_functions_and_actors = None def disconnect(worker=global_worker): """Disconnect this worker from the scheduler and object store.""" - # Reset the list of cached remote functions so that if more remote - # functions are defined and then connect is called again, the remote - # functions will be exported. This is mostly relevant for the tests. + # Reset the list of cached remote functions and actors so that if more + # remote functions or actors are defined and then connect is called again, + # the remote functions will be exported. This is mostly relevant for the + # tests. worker.connected = False worker.cached_functions_to_run = [] - worker.cached_remote_functions = [] + worker.cached_remote_functions_and_actors = [] worker.serialization_context = pyarrow.SerializationContext() @@ -2381,9 +2402,9 @@ def remote(*args, **kwargs): export_remote_function(function_id, func_name, func, func_invoker, function_properties) elif worker.mode is None: - worker.cached_remote_functions.append((function_id, func_name, - func, func_invoker, - function_properties)) + worker.cached_remote_functions_and_actors.append( + ("remote_function", (function_id, func_name, func, + func_invoker, function_properties))) return func_invoker return remote_decorator diff --git a/src/common/format/common.fbs b/src/common/format/common.fbs index 81b126231..7d74c9ab7 100644 --- a/src/common/format/common.fbs +++ b/src/common/format/common.fbs @@ -35,6 +35,9 @@ table TaskInfo { // Actor ID of the task. This is the actor that this task is executed on // or NIL_ACTOR_ID if the task is just a normal task. actor_id: string; + // The ID of the handle that was used to submit the task. This should be + // unique across handles with the same actor_id. + actor_handle_id: string; // Number of tasks that have been submitted to this actor so far. actor_counter: int; // True if this task is an actor checkpoint task and false otherwise. diff --git a/src/common/lib/python/common_extension.cc b/src/common/lib/python/common_extension.cc index 7d7c4b4fc..bdb14dfb2 100644 --- a/src/common/lib/python/common_extension.cc +++ b/src/common/lib/python/common_extension.cc @@ -271,6 +271,8 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { UniqueID driver_id; /* ID of the actor this task should run on. */ UniqueID actor_id = NIL_ACTOR_ID; + /* ID of the actor handle used to submit this task. */ + UniqueID actor_handle_id = NIL_ACTOR_ID; /* How many tasks have been launched on the actor so far? */ int actor_counter = 0; /* True if this is an actor checkpoint task and false otherwise. */ @@ -287,12 +289,13 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { int parent_counter; /* Resource vector of the required resources to execute this task. */ PyObject *resource_vector = NULL; - if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&iOO", &PyObjectToUniqueID, + if (!PyArg_ParseTuple(args, "O&O&OiO&i|O&O&iOO", &PyObjectToUniqueID, &driver_id, &PyObjectToUniqueID, &function_id, &arguments, &num_returns, &PyObjectToUniqueID, &parent_task_id, &parent_counter, &PyObjectToUniqueID, - &actor_id, &actor_counter, - &is_actor_checkpoint_method_object, &resource_vector)) { + &actor_id, &PyObjectToUniqueID, &actor_handle_id, + &actor_counter, &is_actor_checkpoint_method_object, + &resource_vector)) { return -1; } @@ -304,9 +307,10 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { Py_ssize_t size = PyList_Size(arguments); /* Construct the task specification. */ - TaskSpec_start_construct( - g_task_builder, driver_id, parent_task_id, parent_counter, actor_id, - actor_counter, is_actor_checkpoint_method, function_id, num_returns); + TaskSpec_start_construct(g_task_builder, driver_id, parent_task_id, + parent_counter, actor_id, actor_handle_id, + actor_counter, is_actor_checkpoint_method, + function_id, num_returns); /* Add the task arguments. */ for (Py_ssize_t i = 0; i < size; ++i) { PyObject *arg = PyList_GetItem(arguments, i); diff --git a/src/common/task.cc b/src/common/task.cc index d8074bbe8..044875fba 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -38,6 +38,7 @@ class TaskBuilder { TaskID parent_task_id, int64_t parent_counter, ActorID actor_id, + ActorID actor_handle_id, int64_t actor_counter, bool is_actor_checkpoint_method, FunctionID function_id, @@ -46,6 +47,7 @@ class TaskBuilder { parent_task_id_ = parent_task_id; parent_counter_ = parent_counter; actor_id_ = actor_id; + actor_handle_id_ = actor_handle_id; actor_counter_ = actor_counter; is_actor_checkpoint_method_ = is_actor_checkpoint_method; function_id_ = function_id; @@ -107,7 +109,8 @@ class TaskBuilder { auto message = CreateTaskInfo( fbb, to_flatbuf(fbb, driver_id_), to_flatbuf(fbb, task_id), to_flatbuf(fbb, parent_task_id_), parent_counter_, - to_flatbuf(fbb, actor_id_), actor_counter_, is_actor_checkpoint_method_, + to_flatbuf(fbb, actor_id_), to_flatbuf(fbb, actor_handle_id_), + actor_counter_, is_actor_checkpoint_method_, to_flatbuf(fbb, function_id_), arguments, fbb.CreateVector(returns), fbb.CreateVector(resource_vector_)); /* Finish the TaskInfo. */ @@ -130,6 +133,7 @@ class TaskBuilder { TaskID parent_task_id_; int64_t parent_counter_; ActorID actor_id_; + ActorID actor_handle_id_; int64_t actor_counter_; bool is_actor_checkpoint_method_; FunctionID function_id_; @@ -172,13 +176,14 @@ void TaskSpec_start_construct(TaskBuilder *builder, TaskID parent_task_id, int64_t parent_counter, ActorID actor_id, + ActorID actor_handle_id, int64_t actor_counter, bool is_actor_checkpoint_method, FunctionID function_id, int64_t num_returns) { builder->Start(driver_id, parent_task_id, parent_counter, actor_id, - actor_counter, is_actor_checkpoint_method, function_id, - num_returns); + actor_handle_id, actor_counter, is_actor_checkpoint_method, + function_id, num_returns); } uint8_t *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) { @@ -221,6 +226,12 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) { return from_flatbuf(message->actor_id()); } +ActorID TaskSpec_actor_handle_id(TaskSpec *spec) { + CHECK(spec); + auto message = flatbuffers::GetRoot(spec); + return from_flatbuf(message->actor_handle_id()); +} + bool TaskSpec_is_actor_task(TaskSpec *spec) { return !ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID); } diff --git a/src/common/task.h b/src/common/task.h index eee474ed4..588f5c92e 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -86,6 +86,9 @@ void free_task_builder(TaskBuilder *builder); * the parent task prior to this one. * @param actor_id The ID of the actor that this task is for. If it is not an * actor task, then this if NIL_ACTOR_ID. + * @param actor_handle_id The ID of the actor handle that this task was + * submitted through. If it is not an actor task, or if this is the + * original handle, then this is NIL_ACTOR_ID. * @param actor_counter A counter indicating how many tasks have been submitted * to the same actor before this one. * @param is_actor_checkpoint_method True if this is an actor checkpoint method @@ -102,6 +105,7 @@ void TaskSpec_start_construct(TaskBuilder *B, TaskID parent_task_id, int64_t parent_counter, UniqueID actor_id, + UniqueID actor_handle_id, int64_t actor_counter, bool is_actor_checkpoint_method, FunctionID function_id, @@ -133,6 +137,14 @@ FunctionID TaskSpec_function(TaskSpec *spec); */ UniqueID TaskSpec_actor_id(TaskSpec *spec); +/** + * Return the actor handle ID of the task. + * + * @param spec The task_spec in question. + * @return The ID of the actor handle that the task was submitted through. + */ +UniqueID TaskSpec_actor_handle_id(TaskSpec *spec); + /** * Return whether this task is for an actor. * diff --git a/src/common/test/example_task.h b/src/common/test/example_task.h index ab0e40cfe..fce0697b2 100644 --- a/src/common/test/example_task.h +++ b/src/common/test/example_task.h @@ -14,7 +14,8 @@ static inline TaskSpec *example_task_spec_with_args(int64_t num_args, TaskID parent_task_id = globally_unique_id(); FunctionID func_id = globally_unique_id(); TaskSpec_start_construct(g_task_builder, NIL_ID, parent_task_id, 0, - NIL_ACTOR_ID, 0, false, func_id, num_returns); + NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, + num_returns); for (int64_t i = 0; i < num_args; ++i) { ObjectID arg_id; if (arg_ids == NULL) { diff --git a/src/common/test/task_tests.cc b/src/common/test/task_tests.cc index 1cc806d1f..f22bf28d0 100644 --- a/src/common/test/task_tests.cc +++ b/src/common/test/task_tests.cc @@ -15,8 +15,8 @@ TEST task_test(void) { TaskID parent_task_id = globally_unique_id(); FunctionID func_id = globally_unique_id(); TaskBuilder *builder = make_task_builder(); - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - false, func_id, 2); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, func_id, 2); UniqueID arg1 = globally_unique_id(); TaskSpec_args_add_ref(builder, arg1); @@ -54,16 +54,16 @@ TEST deterministic_ids_test(void) { uint8_t *arg2 = (uint8_t *) "hello world"; /* Construct a first task. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - false, func_id, 3); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size1; TaskSpec *spec1 = TaskSpec_finish_construct(builder, &size1); /* Construct a second identical task. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - false, func_id, 3); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size2; @@ -83,39 +83,39 @@ TEST deterministic_ids_test(void) { /* Construct a task with a different parent task ID. */ TaskSpec_start_construct(builder, NIL_ID, globally_unique_id(), 0, - NIL_ACTOR_ID, 0, false, func_id, 3); + NIL_ACTOR_ID, NIL_ACTOR_ID, 0, false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size3; TaskSpec *spec3 = TaskSpec_finish_construct(builder, &size3); /* Construct a task with a different parent counter. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 1, NIL_ACTOR_ID, 0, - false, func_id, 3); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 1, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size4; TaskSpec *spec4 = TaskSpec_finish_construct(builder, &size4); /* Construct a task with a different function ID. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - false, globally_unique_id(), 3); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, globally_unique_id(), 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, arg2, 11); int64_t size5; TaskSpec *spec5 = TaskSpec_finish_construct(builder, &size5); /* Construct a task with a different object ID argument. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - false, func_id, 3); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, func_id, 3); TaskSpec_args_add_ref(builder, globally_unique_id()); TaskSpec_args_add_val(builder, arg2, 11); int64_t size6; TaskSpec *spec6 = TaskSpec_finish_construct(builder, &size6); /* Construct a task with a different value argument. */ - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - false, func_id, 3); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, func_id, 3); TaskSpec_args_add_ref(builder, arg1); TaskSpec_args_add_val(builder, (uint8_t *) "hello_world", 11); int64_t size7; @@ -159,8 +159,8 @@ TEST send_task(void) { TaskBuilder *builder = make_task_builder(); TaskID parent_task_id = globally_unique_id(); FunctionID func_id = globally_unique_id(); - TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, 0, - false, func_id, 2); + TaskSpec_start_construct(builder, NIL_ID, parent_task_id, 0, NIL_ACTOR_ID, + NIL_ACTOR_ID, 0, false, func_id, 2); TaskSpec_args_add_ref(builder, globally_unique_id()); TaskSpec_args_add_val(builder, (uint8_t *) "Hello", 5); TaskSpec_args_add_val(builder, (uint8_t *) "World", 5); diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index d6fed501a..19f3ddce9 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -51,16 +51,21 @@ struct ObjectEntry { /** This struct contains information about a specific actor. This struct will be * used inside of a hash table. */ typedef struct { - /** The number of tasks that have been executed on this actor so far. This is - * used to guarantee the in-order execution of tasks on actors (in the order - * that the tasks were submitted). This is currently meaningful because we - * restrict the submission of tasks on actors to the process that created the - * actor. */ - int64_t task_counter; + /** The number of tasks that have been executed on this actor so far, per + * handle. This is used to guarantee execution of tasks on actors in the + * order that the tasks were submitted, per handle. Tasks from different + * handles to the same actor may be interleaved. */ + std::unordered_map task_counters; /** The index of the task assigned to this actor. Set to -1 if no task is * currently assigned. If the actor process reports back success for the - * assigned task execution, task_counter should be set to this value. */ + * assigned task execution, then the corresponding task_counter should be + * updated to this value. */ int64_t assigned_task_counter; + /** The handle that the currently assigned task was submitted by. This field + * is only valid if assigned_task_counter is set. If the actor process + * reports back success for the assigned task execution, then the + * task_counter corresponding to this handle should be updated. */ + ActorID assigned_task_handle_id; /** Whether the actor process has loaded yet. The actor counts as loaded once * it has either executed its first task or successfully resumed from a * checkpoint. Before the actor has loaded, we may dispatch the first task @@ -247,8 +252,9 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id, LocalSchedulerClient *worker) { LocalActorInfo entry; - entry.task_counter = 0; + entry.task_counters[NIL_ACTOR_ID] = 0; entry.assigned_task_counter = -1; + entry.assigned_task_handle_id = NIL_ACTOR_ID; entry.task_queue = new std::list(); entry.worker = worker; entry.worker_available = false; @@ -333,16 +339,17 @@ bool dispatch_actor_task(LocalSchedulerState *state, /* Check whether we can execute the first task in the queue. */ auto task = entry.task_queue->begin(); int64_t next_task_counter = TaskSpec_actor_counter(task->spec); + ActorID next_task_handle_id = TaskSpec_actor_handle_id(task->spec); if (entry.loaded) { /* Once the actor has loaded, we can only execute tasks in order of * task_counter. */ - if (next_task_counter != entry.task_counter) { + if (next_task_counter != entry.task_counters[next_task_handle_id]) { return false; } } else { /* If the actor has not yet loaded, we can only execute the task that * matches task_counter (the first task), or a checkpoint task. */ - if (next_task_counter != entry.task_counter) { + if (next_task_counter != entry.task_counters[next_task_handle_id]) { /* No other task should be first in the queue. */ CHECK(TaskSpec_is_actor_checkpoint_method(task->spec)); } @@ -361,6 +368,7 @@ bool dispatch_actor_task(LocalSchedulerState *state, * as unavailable. */ assign_task_to_worker(state, task->spec, task->task_spec_size, entry.worker); entry.assigned_task_counter = next_task_counter; + entry.assigned_task_handle_id = next_task_handle_id; entry.worker_available = false; /* Free the task queue entry. */ TaskQueueEntry_free(&(*task)); @@ -407,6 +415,8 @@ void insert_actor_task_queue(LocalSchedulerState *state, TaskQueueEntry task_entry) { /* Get the local actor entry for this actor. */ ActorID actor_id = TaskSpec_actor_id(task_entry.spec); + ActorID task_handle_id = TaskSpec_actor_handle_id(task_entry.spec); + int64_t task_counter = TaskSpec_actor_counter(task_entry.spec); /* Handle the case in which there is no LocalActorInfo struct yet. */ if (algorithm_state->local_actor_infos.count(actor_id) == 0) { @@ -418,40 +428,43 @@ void insert_actor_task_queue(LocalSchedulerState *state, } LocalActorInfo &entry = algorithm_state->local_actor_infos.find(actor_id)->second; + if (entry.task_counters.count(task_handle_id) == 0) { + entry.task_counters[task_handle_id] = 0; + } - int64_t task_counter = TaskSpec_actor_counter(task_entry.spec); /* As a sanity check, the counter of the new task should be greater than the * number of tasks that have executed on this actor so far (since we are * guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This * check will fail if the fault-tolerance mechanism resubmits a task on an * actor. */ - if (task_counter < entry.task_counter) { + if (task_counter < entry.task_counters[task_handle_id]) { LOG_INFO( "A task that has already been executed has been resubmitted, so we " "are ignoring it. This should only happen during reconstruction."); return; } - /* Add the task spec to the actor's task queue in a manner that preserves the - * order of the actor task counters. Iterate from the beginning of the queue - * to find the right place to insert the task queue entry. TODO(pcm): This - * makes submitting multiple actor tasks take quadratic time, which needs to - * be optimized. */ + /* Insert the task spec to the actor's task queue in sorted order, per actor + * handle ID. Find the first task in the queue with a counter greater than + * the submitted task's and the same handle ID. */ auto it = entry.task_queue->begin(); - while (it != entry.task_queue->end() && - (task_counter > TaskSpec_actor_counter(it->spec))) { - ++it; + for (; it != entry.task_queue->end(); it++) { + /* Skip tasks submitted by a different handle. */ + if (!ActorID_equal(task_handle_id, TaskSpec_actor_handle_id(it->spec))) { + continue; + } + /* A duplicate task submitted by the same handle. */ + if (task_counter == TaskSpec_actor_counter(it->spec)) { + LOG_INFO( + "A task was resubmitted, so we are ignoring it. This should only " + "happen during reconstruction."); + return; + } + /* We found a task with the same handle ID and a greater task counter. */ + if (task_counter < TaskSpec_actor_counter(it->spec)) { + break; + } } - if (it != entry.task_queue->end() && - task_counter == TaskSpec_actor_counter(it->spec)) { - LOG_INFO( - "A task was resubmitted, so we are ignoring it. This should only " - "happen during reconstruction."); - return; - } - - /* The task has a counter that has not been executed or submitted before. Add - * it to the actor queue. */ entry.task_queue->insert(it, task_entry); /* Record the fact that this actor has a task waiting to execute. */ @@ -1266,7 +1279,8 @@ void handle_actor_worker_available(LocalSchedulerState *state, * loaded the checkpoint successfully, then we update the actor's counter * to the assigned counter. */ if (!actor_checkpoint_failed) { - entry.task_counter = entry.assigned_task_counter + 1; + entry.task_counters[entry.assigned_task_handle_id] = + entry.assigned_task_counter + 1; /* If a task was assigned to this actor and there was no checkpoint * failure, then it is now loaded. */ if (entry.assigned_task_counter > -1) { @@ -1274,6 +1288,7 @@ void handle_actor_worker_available(LocalSchedulerState *state, } } entry.assigned_task_counter = -1; + entry.assigned_task_handle_id = NIL_ACTOR_ID; entry.worker_available = true; /* Assign new tasks if possible. */ dispatch_all_tasks(state, algorithm_state); diff --git a/test/actor_test.py b/test/actor_test.py index cef079b45..eb286c13d 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -16,6 +16,9 @@ import ray.test.test_utils class ActorAPI(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testKeywordArgs(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -64,8 +67,6 @@ class ActorAPI(unittest.TestCase): with self.assertRaises(Exception): ray.get(actor.get_values.remote()) - ray.worker.cleanup() - def testVariableNumberOfArgs(self): ray.init(num_workers=0) @@ -109,8 +110,6 @@ class ActorAPI(unittest.TestCase): a = Actor.remote(1, 2) self.assertEqual(ray.get(a.get_values.remote(3, 4)), ((1, 2), (3, 4))) - ray.worker.cleanup() - def testNoArgs(self): ray.init(num_workers=0) @@ -125,8 +124,6 @@ class ActorAPI(unittest.TestCase): actor = Actor.remote() self.assertEqual(ray.get(actor.get_values.remote()), None) - ray.worker.cleanup() - def testNoConstructor(self): # If no __init__ method is provided, that should not be a problem. ray.init(num_workers=0) @@ -139,8 +136,6 @@ class ActorAPI(unittest.TestCase): actor = Actor.remote() self.assertEqual(ray.get(actor.get_values.remote()), None) - ray.worker.cleanup() - def testCustomClasses(self): ray.init(num_workers=0) @@ -169,11 +164,27 @@ class ActorAPI(unittest.TestCase): self.assertEqual(results2[1].x, 2) self.assertEqual(results2[2].x, 3) - ray.worker.cleanup() + def testCachingActors(self): + # Test defining actors before ray.init() has been called. - # def testCachingActors(self): - # # TODO(rkn): Implement this. - # pass + @ray.remote + class Foo(object): + def __init__(self): + pass + + def get_val(self): + return 3 + + # Check that we can't actually create actors before ray.init() has been + # called. + with self.assertRaises(Exception): + f = Foo.remote() + + ray.init(num_workers=0) + + f = Foo.remote() + + self.assertEqual(ray.get(f.get_val.remote()), 3) def testDecoratorArgs(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -217,8 +228,6 @@ class ActorAPI(unittest.TestCase): def __init__(self): pass - ray.worker.cleanup() - def testRandomIDGeneration(self): ray.init(num_workers=0) @@ -238,8 +247,6 @@ class ActorAPI(unittest.TestCase): self.assertNotEqual(f1._ray_actor_id.id(), f2._ray_actor_id.id()) - ray.worker.cleanup() - def testActorClassName(self): ray.init(num_workers=0) @@ -257,11 +264,12 @@ class ActorAPI(unittest.TestCase): self.assertEqual(actor_class_info[b"class_name"], b"Foo") self.assertEqual(actor_class_info[b"module"], b"__main__") - ray.worker.cleanup() - class ActorMethods(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testDefineActor(self): ray.init() @@ -280,8 +288,6 @@ class ActorMethods(unittest.TestCase): with self.assertRaises(Exception): t.f(1) - ray.worker.cleanup() - def testActorDeletion(self): ray.init(num_workers=0) @@ -314,8 +320,6 @@ class ActorMethods(unittest.TestCase): # called. self.assertEqual(ray.get(Actor.remote().method.remote()), 1) - ray.worker.cleanup() - def testActorDeletionWithGPUs(self): ray.init(num_workers=0, num_gpus=1) @@ -341,8 +345,6 @@ class ActorMethods(unittest.TestCase): a = None ray.test.test_utils.wait_for_pid_to_exit(pid) - ray.worker.cleanup() - def testActorState(self): ray.init() @@ -366,8 +368,6 @@ class ActorMethods(unittest.TestCase): c2.increase.remote() self.assertEqual(ray.get(c2.value.remote()), 2) - ray.worker.cleanup() - def testMultipleActors(self): # Create a bunch of actors and call a bunch of methods on all of them. ray.init(num_workers=0) @@ -412,11 +412,12 @@ class ActorMethods(unittest.TestCase): result_values[(num_actors * j):(num_actors * (j + 1))], num_actors * [j + 1]) - ray.worker.cleanup() - class ActorNesting(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testRemoteFunctionWithinActor(self): # Make sure we can use remote funtions within actors. ray.init(num_cpus=100) @@ -466,8 +467,6 @@ class ActorNesting(unittest.TestCase): ray.get(actor.h.remote([f.remote(i) for i in range(5)])), list(range(1, 6))) - ray.worker.cleanup() - def testDefineActorWithinActor(self): # Make sure we can use remote funtions within actors. ray.init(num_cpus=10) @@ -494,8 +493,6 @@ class ActorNesting(unittest.TestCase): actor1 = Actor1.remote(3) self.assertEqual(ray.get(actor1.get_values.remote(5)), (3, 5)) - ray.worker.cleanup() - def testUseActorWithinActor(self): # Make sure we can use actors within actors. ray.init(num_cpus=10) @@ -520,8 +517,6 @@ class ActorNesting(unittest.TestCase): actor2 = Actor2.remote(3, 4) self.assertEqual(ray.get(actor2.get_values.remote(5)), (3, 4)) - ray.worker.cleanup() - def testDefineActorWithinRemoteFunction(self): # Make sure we can define and actors within remote funtions. ray.init(num_cpus=10) @@ -542,8 +537,6 @@ class ActorNesting(unittest.TestCase): self.assertEqual(ray.get([f.remote(i, 20) for i in range(10)]), [20 * [i] for i in range(10)]) - ray.worker.cleanup() - def testUseActorWithinRemoteFunction(self): # Make sure we can create and use actors within remote funtions. ray.init(num_cpus=10) @@ -563,8 +556,6 @@ class ActorNesting(unittest.TestCase): self.assertEqual(ray.get(f.remote(3)), 3) - ray.worker.cleanup() - def testActorImportCounter(self): # This is mostly a test of the export counters to make sure that when # an actor is imported, all of the necessary remote functions have been @@ -594,11 +585,12 @@ class ActorNesting(unittest.TestCase): self.assertEqual(ray.get(g.remote()), num_remote_functions - 1) - ray.worker.cleanup() - class ActorInheritance(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testInheritActorFromClass(self): # Make sure we can define an actor by inheriting from a regular class. # Note that actors cannot inherit from other actors. @@ -626,11 +618,12 @@ class ActorInheritance(unittest.TestCase): self.assertEqual(ray.get(actor.get_value.remote()), 1) self.assertEqual(ray.get(actor.g.remote(5)), 6) - ray.worker.cleanup() - class ActorSchedulingProperties(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testRemoteFunctionsNotScheduledOnActors(self): # Make sure that regular remote functions are not scheduled on actors. ray.init(num_workers=0) @@ -653,11 +646,12 @@ class ActorSchedulingProperties(unittest.TestCase): resulting_ids = ray.get([f.remote() for _ in range(100)]) self.assertNotIn(actor_id, resulting_ids) - ray.worker.cleanup() - class ActorsOnMultipleNodes(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testActorsOnNodesWithNoCPUs(self): ray.init(num_cpus=0) @@ -669,8 +663,6 @@ class ActorsOnMultipleNodes(unittest.TestCase): with self.assertRaises(Exception): Foo.remote() - ray.worker.cleanup() - def testActorLoadBalancing(self): num_local_schedulers = 3 ray.worker._init(start_ray_local=True, num_workers=0, @@ -711,11 +703,12 @@ class ActorsOnMultipleNodes(unittest.TestCase): results.append(actors[index].get_location.remote()) ray.get(results) - ray.worker.cleanup() - class ActorsWithGPUs(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testActorGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 4 @@ -755,8 +748,6 @@ class ActorsWithGPUs(unittest.TestCase): with self.assertRaises(Exception): Actor1.remote() - ray.worker.cleanup() - def testActorMultipleGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 5 @@ -825,8 +816,6 @@ class ActorsWithGPUs(unittest.TestCase): with self.assertRaises(Exception): Actor2.remote() - ray.worker.cleanup() - def testActorDifferentNumbersOfGPUs(self): # Test that we can create actors on two nodes that have different # numbers of GPUs. @@ -862,8 +851,6 @@ class ActorsWithGPUs(unittest.TestCase): with self.assertRaises(Exception): Actor1.remote() - ray.worker.cleanup() - def testActorMultipleGPUsFromMultipleTasks(self): num_local_schedulers = 10 num_gpus_per_scheduler = 10 @@ -904,8 +891,6 @@ class ActorsWithGPUs(unittest.TestCase): with self.assertRaises(Exception): Actor.remote() - ray.worker.cleanup() - @unittest.skipIf(sys.version_info < (3, 0), "This test requires Python 3.") def testActorsAndTasksWithGPUs(self): num_local_schedulers = 3 @@ -1045,8 +1030,6 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, remaining_ids = ray.wait(results, timeout=1000) self.assertEqual(len(ready_ids), 0) - ray.worker.cleanup() - def testActorsAndTasksWithGPUsVersionTwo(self): # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs @@ -1082,8 +1065,6 @@ class ActorsWithGPUs(unittest.TestCase): gpu_ids = ray.get(results) self.assertEqual(set(gpu_ids), set(range(10))) - ray.worker.cleanup() - @unittest.skipIf(sys.version_info < (3, 0), "This test requires Python 3.") def testActorsAndTaskResourceBookkeeping(self): ray.init(num_cpus=1) @@ -1121,8 +1102,6 @@ class ActorsWithGPUs(unittest.TestCase): self.assertLess(interval1[1], interval2[0]) self.assertLess(interval2[0], interval2[1]) - ray.worker.cleanup() - def testBlockingActorTask(self): ray.init(num_cpus=1, num_gpus=1) @@ -1158,11 +1137,12 @@ class ActorsWithGPUs(unittest.TestCase): self.assertEqual(ready_ids, []) self.assertEqual(remaining_ids, [x_id]) - ray.worker.cleanup() - class ActorReconstruction(unittest.TestCase): + def tearDown(self): + ray.worker.cleanup() + def testLocalSchedulerDying(self): ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_workers=0, redirect_output=True) @@ -1203,8 +1183,6 @@ class ActorReconstruction(unittest.TestCase): self.assertEqual(results, list(range(1, 1 + len(results)))) - ray.worker.cleanup() - def testManyLocalSchedulersDying(self): # This test can be made more stressful by increasing the numbers below. # The total number of actors created will be @@ -1270,8 +1248,6 @@ class ActorReconstruction(unittest.TestCase): self.assertEqual(ray.get(result_id_list), list(range(1, len(result_id_list) + 1))) - ray.worker.cleanup() - def setup_test_checkpointing(self, save_exception=False, resume_exception=False): ray.worker._init(start_ray_local=True, num_local_schedulers=2, @@ -1350,8 +1326,6 @@ class ActorReconstruction(unittest.TestCase): # the one method call since the most recent checkpoint). self.assertEqual(ray.get(actor.get_num_inc_calls.remote()), 1) - ray.worker.cleanup() - def testLostCheckpoint(self): actor, ids = self.setup_test_checkpointing() # Wait for the first fraction of tasks to finish running. @@ -1378,8 +1352,6 @@ class ActorReconstruction(unittest.TestCase): results = ray.get(ids) self.assertEqual(results, list(range(1, 1 + len(results)))) - ray.worker.cleanup() - def testCheckpointException(self): actor, ids = self.setup_test_checkpointing(save_exception=True) # Wait for the last task to finish running. @@ -1408,8 +1380,6 @@ class ActorReconstruction(unittest.TestCase): self.assertEqual(len([error for error in errors if error[b"type"] == b"task"]), num_checkpoints * 2) - ray.worker.cleanup() - def testCheckpointResumeException(self): actor, ids = self.setup_test_checkpointing(resume_exception=True) # Wait for the last task to finish running. @@ -1437,8 +1407,162 @@ class ActorReconstruction(unittest.TestCase): self.assertTrue(len([error for error in errors if error[b"type"] == b"task"]) > 0) + +class DistributedActorHandles(unittest.TestCase): + + def tearDown(self): ray.worker.cleanup() + def make_counter_actor(self, checkpoint_interval=-1): + ray.init() + + @ray.remote(checkpoint_interval=checkpoint_interval) + class Counter(object): + def __init__(self): + self.value = 0 + + def increase(self): + self.value += 1 + return self.value + + return Counter.remote() + + def testFork(self): + counter = self.make_counter_actor() + num_calls = 1 + self.assertEqual(ray.get(counter.increase.remote()), num_calls) + + @ray.remote + def fork(counter): + return ray.get(counter.increase.remote()) + + # Fork once. + num_calls += 1 + self.assertEqual(ray.get(fork.remote(counter)), num_calls) + num_calls += 1 + self.assertEqual(ray.get(counter.increase.remote()), num_calls) + + # Fork num_iters times. + num_iters = 100 + num_calls += num_iters + ray.get([fork.remote(counter) for _ in range(num_iters)]) + num_calls += 1 + self.assertEqual(ray.get(counter.increase.remote()), num_calls) + + def testForkConsistency(self): + counter = self.make_counter_actor() + + @ray.remote + def fork_many_incs(counter, num_incs): + x = None + for _ in range(num_incs): + x = counter.increase.remote() + # Only call ray.get() on the last task submitted. + return ray.get(x) + + num_incs = 100 + + # Fork once. + num_calls = num_incs + self.assertEqual(ray.get(fork_many_incs.remote(counter, num_incs)), + num_calls) + num_calls += 1 + self.assertEqual(ray.get(counter.increase.remote()), num_calls) + + # Fork num_iters times. + num_iters = 10 + num_calls += num_iters * num_incs + ray.get([fork_many_incs.remote(counter, num_incs) for _ in + range(num_iters)]) + # Check that we ensured per-handle serialization. + num_calls += 1 + self.assertEqual(ray.get(counter.increase.remote()), num_calls) + + @unittest.skip("Garbage collection for distributed actor handles not " + "implemented.") + def testGarbageCollection(self): + counter = self.make_counter_actor() + + @ray.remote + def fork(counter): + for _ in range(10): + x = counter.increase.remote() + time.sleep(0.1) + return ray.get(x) + + x = fork.remote(counter) + ray.get(counter.increase.remote()) + del counter + + print(ray.get(x)) + + def testCheckpoint(self): + counter = self.make_counter_actor(checkpoint_interval=1) + num_calls = 1 + self.assertEqual(ray.get(counter.increase.remote()), num_calls) + + @ray.remote + def fork(counter): + return ray.get(counter.increase.remote()) + + # Passing an actor handle with checkpointing enabled shouldn't be + # allowed yet. + with self.assertRaises(Exception): + fork.remote(counter) + + num_calls += 1 + self.assertEqual(ray.get(counter.increase.remote()), num_calls) + + @unittest.skip("Fork/join consistency not yet implemented.") + def testLocalSchedulerDying(self): + ray.worker._init(start_ray_local=True, num_local_schedulers=2, + num_workers=0, redirect_output=False) + + @ray.remote + class Counter(object): + def __init__(self): + self.x = 0 + + def local_plasma(self): + return ray.worker.global_worker.plasma_client.store_socket_name + + def inc(self): + self.x += 1 + return self.x + + @ray.remote + def foo(counter): + for _ in range(100): + x = counter.inc.remote() + return ray.get(x) + + local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + + # Create an actor that is not on the local scheduler. + actor = Counter.remote() + while ray.get(actor.local_plasma.remote()) == local_plasma: + actor = Counter.remote() + + # Concurrently, submit many tasks to the actor through the original + # handle and the forked handle. + x = foo.remote(actor) + ids = [actor.inc.remote() for _ in range(100)] + + # Wait for the last task to finish running. + ray.get(ids[-1]) + y = ray.get(x) + + # Kill the second plasma store to get rid of the cached objects and + # trigger the corresponding local scheduler to exit. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_PLASMA_STORE][1] + process.kill() + process.wait() + + # Submit a new task. Its results should reflect the tasks submitted + # through both the original handle and the forked handle. + self.assertEqual(ray.get(actor.inc.remote()), y + 1) + if __name__ == "__main__": unittest.main(verbosity=2)