diff --git a/python/ray/actor.py b/python/ray/actor.py index 99def23e8..a29bb765a 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -44,8 +44,7 @@ def compute_actor_handle_id(actor_handle_id, num_forks): return ray.ObjectID(handle_id) -def compute_actor_handle_id_non_forked(actor_id, actor_handle_id, - current_task_id): +def compute_actor_handle_id_non_forked(actor_handle_id, current_task_id): """Deterministically compute an actor handle ID in the non-forked case. This code path is used whenever an actor handle is pickled and unpickled @@ -59,16 +58,13 @@ def compute_actor_handle_id_non_forked(actor_id, actor_handle_id, to the same actor handle IDs. Args: - actor_id: The actor ID. actor_handle_id: The original actor handle ID. - num_forks: The number of times the original actor handle has been - forked so far. + current_task_id: The ID of the task that is unpickling the handle. Returns: An ID for the new actor handle. """ handle_id_hash = hashlib.sha1() - handle_id_hash.update(actor_id.id()) handle_id_hash.update(actor_handle_id.id()) handle_id_hash.update(current_task_id.id()) handle_id = handle_id_hash.digest() @@ -429,14 +425,14 @@ class ActorClass(object): resources=resources, placement_resources=actor_placement_resources) - # We initialize the actor counter at 1 to account for the actor - # creation task. - actor_counter = 1 actor_handle = ActorHandle( actor_id, self._modified_class.__module__, self._class_name, - actor_cursor, actor_counter, self._actor_method_names, - self._method_signatures, self._actor_method_num_return_vals, - actor_cursor, self._actor_method_cpus, worker.task_driver_id) + actor_cursor, self._actor_method_names, self._method_signatures, + self._actor_method_num_return_vals, actor_cursor, + self._actor_method_cpus, worker.task_driver_id) + # We increment the actor counter by 1 to account for the actor creation + # task. + actor_handle._ray_actor_counter += 1 return actor_handle @@ -489,10 +485,6 @@ class ActorHandle(object): _ray_actor_driver_id: The driver ID of the job that created the actor (it is possible that this ActorHandle exists on a driver with a different driver ID). - _ray_previous_actor_handle_id: If this actor handle is not an original - handle, (e.g., it was created by forking or pickling), then - this is the ID of the handle that this handle was created from. - Otherwise, this is None. """ def __init__(self, @@ -500,28 +492,25 @@ class ActorHandle(object): module_name, class_name, actor_cursor, - actor_counter, actor_method_names, method_signatures, method_num_return_vals, actor_creation_dummy_object_id, actor_method_cpus, actor_driver_id, - actor_handle_id=None, - previous_actor_handle_id=None): + actor_handle_id=None): + self._ray_actor_id = actor_id + self._ray_module_name = module_name # False if this actor handle was created by forking or pickling. True # if it was created by the _serialization_helper function. - self._ray_original_handle = previous_actor_handle_id is None - self._ray_module_name = module_name - - self._ray_actor_id = actor_id + self._ray_original_handle = actor_handle_id is None if self._ray_original_handle: self._ray_actor_handle_id = ray.ObjectID( ray.worker.NIL_ACTOR_HANDLE_ID) else: self._ray_actor_handle_id = actor_handle_id self._ray_actor_cursor = actor_cursor - self._ray_actor_counter = actor_counter + self._ray_actor_counter = 0 self._ray_actor_method_names = actor_method_names self._ray_method_signatures = method_signatures self._ray_method_num_return_vals = method_num_return_vals @@ -531,8 +520,6 @@ class ActorHandle(object): actor_creation_dummy_object_id) self._ray_actor_method_cpus = actor_method_cpus self._ray_actor_driver_id = actor_driver_id - self._ray_previous_actor_handle_id = previous_actor_handle_id - self._ray_previously_generated_actor_handle_id = None def _actor_method_call(self, method_name, @@ -586,32 +573,13 @@ class ActorHandle(object): is_actor_checkpoint_method = (method_name == "__ray_checkpoint__") - # Right now, if the actor handle has been pickled, we create a - # temporary actor handle id for invocations. - # TODO(pcm): This still leads to a lot of actor handles being - # created, there should be a better way to handle pickled - # actor handles. - if self._ray_actor_handle_id is None: - actor_handle_id = compute_actor_handle_id_non_forked( - self._ray_actor_id, self._ray_previous_actor_handle_id, - worker.current_task_id) - # Each new task creates a new actor handle id, so we need to - # reset the actor counter to 0 - if (actor_handle_id != - self._ray_previously_generated_actor_handle_id): - self._ray_actor_counter = 0 - self._ray_previously_generated_actor_handle_id = ( - actor_handle_id) - else: - actor_handle_id = self._ray_actor_handle_id - function_descriptor = FunctionDescriptor( self._ray_module_name, method_name, self._ray_class_name) object_ids = worker.submit_task( function_descriptor, args, actor_id=self._ray_actor_id, - actor_handle_id=actor_handle_id, + actor_handle_id=self._ray_actor_handle_id, actor_counter=self._ray_actor_counter, is_actor_checkpoint_method=is_actor_checkpoint_method, actor_creation_dummy_object_id=( @@ -707,24 +675,28 @@ class ActorHandle(object): Returns: A dictionary of the information needed to reconstruct the object. """ + if ray_forking: + actor_handle_id = compute_actor_handle_id( + self._ray_actor_handle_id, self._ray_actor_forks) + else: + actor_handle_id = self._ray_actor_handle_id + state = { "actor_id": self._ray_actor_id.id(), + "actor_handle_id": actor_handle_id.id(), "module_name": self._ray_module_name, "class_name": self._ray_class_name, - "actor_forks": self._ray_actor_forks, "actor_cursor": self._ray_actor_cursor.id() if self._ray_actor_cursor is not None else None, - "actor_counter": 0, # Reset the actor counter. "actor_method_names": self._ray_actor_method_names, "method_signatures": self._ray_method_signatures, "method_num_return_vals": self._ray_method_num_return_vals, + # Actors in local mode don't have dummy objects. "actor_creation_dummy_object_id": self. _ray_actor_creation_dummy_object_id.id() if self._ray_actor_creation_dummy_object_id is not None else None, "actor_method_cpus": self._ray_actor_method_cpus, "actor_driver_id": self._ray_actor_driver_id.id(), - "previous_actor_handle_id": self._ray_actor_handle_id.id() - if self._ray_actor_handle_id else None, "ray_forking": ray_forking } @@ -745,11 +717,21 @@ class ActorHandle(object): worker.check_connected() if state["ray_forking"]: - actor_handle_id = compute_actor_handle_id( - ray.ObjectID(state["previous_actor_handle_id"]), - state["actor_forks"]) + actor_handle_id = ray.ObjectID(state["actor_handle_id"]) else: - actor_handle_id = None + # Right now, if the actor handle has been pickled, we create a + # temporary actor handle id for invocations. + # TODO(pcm): This still leads to a lot of actor handles being + # created, there should be a better way to handle pickled + # actor handles. + # TODO(swang): Accessing the worker's current task ID is not + # thread-safe. + # TODO(swang): Unpickling the same actor handle twice in the same + # task will break the application, and unpickling it twice in the + # same actor is likely a performance bug. We should consider + # logging a warning in these cases. + actor_handle_id = compute_actor_handle_id_non_forked( + ray.ObjectID(state["actor_handle_id"]), worker.current_task_id) # This is the driver ID of the driver that owns the actor, not # necessarily the driver that owns this actor handle. @@ -761,7 +743,6 @@ class ActorHandle(object): state["class_name"], ray.ObjectID(state["actor_cursor"]) if state["actor_cursor"] is not None else None, - state["actor_counter"], state["actor_method_names"], state["method_signatures"], state["method_num_return_vals"], @@ -769,9 +750,7 @@ class ActorHandle(object): if state["actor_creation_dummy_object_id"] is not None else None, state["actor_method_cpus"], actor_driver_id, - actor_handle_id=actor_handle_id, - previous_actor_handle_id=ray.ObjectID( - state["previous_actor_handle_id"])) + actor_handle_id=actor_handle_id) def __getstate__(self): """This code path is used by pickling but not by Ray forking."""