mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 18:45:03 +08:00
Cleanup actor handle pickling code (#3560)
* Cleanup actor handle pickling code * remove unused * fix * lint
This commit is contained in:
+37
-58
@@ -44,8 +44,7 @@ def compute_actor_handle_id(actor_handle_id, num_forks):
|
||||
return ray.ObjectID(handle_id)
|
||||
|
||||
|
||||
def compute_actor_handle_id_non_forked(actor_id, actor_handle_id,
|
||||
current_task_id):
|
||||
def compute_actor_handle_id_non_forked(actor_handle_id, current_task_id):
|
||||
"""Deterministically compute an actor handle ID in the non-forked case.
|
||||
|
||||
This code path is used whenever an actor handle is pickled and unpickled
|
||||
@@ -59,16 +58,13 @@ def compute_actor_handle_id_non_forked(actor_id, actor_handle_id,
|
||||
to the same actor handle IDs.
|
||||
|
||||
Args:
|
||||
actor_id: The actor ID.
|
||||
actor_handle_id: The original actor handle ID.
|
||||
num_forks: The number of times the original actor handle has been
|
||||
forked so far.
|
||||
current_task_id: The ID of the task that is unpickling the handle.
|
||||
|
||||
Returns:
|
||||
An ID for the new actor handle.
|
||||
"""
|
||||
handle_id_hash = hashlib.sha1()
|
||||
handle_id_hash.update(actor_id.id())
|
||||
handle_id_hash.update(actor_handle_id.id())
|
||||
handle_id_hash.update(current_task_id.id())
|
||||
handle_id = handle_id_hash.digest()
|
||||
@@ -429,14 +425,14 @@ class ActorClass(object):
|
||||
resources=resources,
|
||||
placement_resources=actor_placement_resources)
|
||||
|
||||
# We initialize the actor counter at 1 to account for the actor
|
||||
# creation task.
|
||||
actor_counter = 1
|
||||
actor_handle = ActorHandle(
|
||||
actor_id, self._modified_class.__module__, self._class_name,
|
||||
actor_cursor, actor_counter, self._actor_method_names,
|
||||
self._method_signatures, self._actor_method_num_return_vals,
|
||||
actor_cursor, self._actor_method_cpus, worker.task_driver_id)
|
||||
actor_cursor, self._actor_method_names, self._method_signatures,
|
||||
self._actor_method_num_return_vals, actor_cursor,
|
||||
self._actor_method_cpus, worker.task_driver_id)
|
||||
# We increment the actor counter by 1 to account for the actor creation
|
||||
# task.
|
||||
actor_handle._ray_actor_counter += 1
|
||||
|
||||
return actor_handle
|
||||
|
||||
@@ -489,10 +485,6 @@ class ActorHandle(object):
|
||||
_ray_actor_driver_id: The driver ID of the job that created the actor
|
||||
(it is possible that this ActorHandle exists on a driver with a
|
||||
different driver ID).
|
||||
_ray_previous_actor_handle_id: If this actor handle is not an original
|
||||
handle, (e.g., it was created by forking or pickling), then
|
||||
this is the ID of the handle that this handle was created from.
|
||||
Otherwise, this is None.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
@@ -500,28 +492,25 @@ class ActorHandle(object):
|
||||
module_name,
|
||||
class_name,
|
||||
actor_cursor,
|
||||
actor_counter,
|
||||
actor_method_names,
|
||||
method_signatures,
|
||||
method_num_return_vals,
|
||||
actor_creation_dummy_object_id,
|
||||
actor_method_cpus,
|
||||
actor_driver_id,
|
||||
actor_handle_id=None,
|
||||
previous_actor_handle_id=None):
|
||||
actor_handle_id=None):
|
||||
self._ray_actor_id = actor_id
|
||||
self._ray_module_name = module_name
|
||||
# False if this actor handle was created by forking or pickling. True
|
||||
# if it was created by the _serialization_helper function.
|
||||
self._ray_original_handle = previous_actor_handle_id is None
|
||||
self._ray_module_name = module_name
|
||||
|
||||
self._ray_actor_id = actor_id
|
||||
self._ray_original_handle = actor_handle_id is None
|
||||
if self._ray_original_handle:
|
||||
self._ray_actor_handle_id = ray.ObjectID(
|
||||
ray.worker.NIL_ACTOR_HANDLE_ID)
|
||||
else:
|
||||
self._ray_actor_handle_id = actor_handle_id
|
||||
self._ray_actor_cursor = actor_cursor
|
||||
self._ray_actor_counter = actor_counter
|
||||
self._ray_actor_counter = 0
|
||||
self._ray_actor_method_names = actor_method_names
|
||||
self._ray_method_signatures = method_signatures
|
||||
self._ray_method_num_return_vals = method_num_return_vals
|
||||
@@ -531,8 +520,6 @@ class ActorHandle(object):
|
||||
actor_creation_dummy_object_id)
|
||||
self._ray_actor_method_cpus = actor_method_cpus
|
||||
self._ray_actor_driver_id = actor_driver_id
|
||||
self._ray_previous_actor_handle_id = previous_actor_handle_id
|
||||
self._ray_previously_generated_actor_handle_id = None
|
||||
|
||||
def _actor_method_call(self,
|
||||
method_name,
|
||||
@@ -586,32 +573,13 @@ class ActorHandle(object):
|
||||
|
||||
is_actor_checkpoint_method = (method_name == "__ray_checkpoint__")
|
||||
|
||||
# Right now, if the actor handle has been pickled, we create a
|
||||
# temporary actor handle id for invocations.
|
||||
# TODO(pcm): This still leads to a lot of actor handles being
|
||||
# created, there should be a better way to handle pickled
|
||||
# actor handles.
|
||||
if self._ray_actor_handle_id is None:
|
||||
actor_handle_id = compute_actor_handle_id_non_forked(
|
||||
self._ray_actor_id, self._ray_previous_actor_handle_id,
|
||||
worker.current_task_id)
|
||||
# Each new task creates a new actor handle id, so we need to
|
||||
# reset the actor counter to 0
|
||||
if (actor_handle_id !=
|
||||
self._ray_previously_generated_actor_handle_id):
|
||||
self._ray_actor_counter = 0
|
||||
self._ray_previously_generated_actor_handle_id = (
|
||||
actor_handle_id)
|
||||
else:
|
||||
actor_handle_id = self._ray_actor_handle_id
|
||||
|
||||
function_descriptor = FunctionDescriptor(
|
||||
self._ray_module_name, method_name, self._ray_class_name)
|
||||
object_ids = worker.submit_task(
|
||||
function_descriptor,
|
||||
args,
|
||||
actor_id=self._ray_actor_id,
|
||||
actor_handle_id=actor_handle_id,
|
||||
actor_handle_id=self._ray_actor_handle_id,
|
||||
actor_counter=self._ray_actor_counter,
|
||||
is_actor_checkpoint_method=is_actor_checkpoint_method,
|
||||
actor_creation_dummy_object_id=(
|
||||
@@ -707,24 +675,28 @@ class ActorHandle(object):
|
||||
Returns:
|
||||
A dictionary of the information needed to reconstruct the object.
|
||||
"""
|
||||
if ray_forking:
|
||||
actor_handle_id = compute_actor_handle_id(
|
||||
self._ray_actor_handle_id, self._ray_actor_forks)
|
||||
else:
|
||||
actor_handle_id = self._ray_actor_handle_id
|
||||
|
||||
state = {
|
||||
"actor_id": self._ray_actor_id.id(),
|
||||
"actor_handle_id": actor_handle_id.id(),
|
||||
"module_name": self._ray_module_name,
|
||||
"class_name": self._ray_class_name,
|
||||
"actor_forks": self._ray_actor_forks,
|
||||
"actor_cursor": self._ray_actor_cursor.id()
|
||||
if self._ray_actor_cursor is not None else None,
|
||||
"actor_counter": 0, # Reset the actor counter.
|
||||
"actor_method_names": self._ray_actor_method_names,
|
||||
"method_signatures": self._ray_method_signatures,
|
||||
"method_num_return_vals": self._ray_method_num_return_vals,
|
||||
# Actors in local mode don't have dummy objects.
|
||||
"actor_creation_dummy_object_id": self.
|
||||
_ray_actor_creation_dummy_object_id.id()
|
||||
if self._ray_actor_creation_dummy_object_id is not None else None,
|
||||
"actor_method_cpus": self._ray_actor_method_cpus,
|
||||
"actor_driver_id": self._ray_actor_driver_id.id(),
|
||||
"previous_actor_handle_id": self._ray_actor_handle_id.id()
|
||||
if self._ray_actor_handle_id else None,
|
||||
"ray_forking": ray_forking
|
||||
}
|
||||
|
||||
@@ -745,11 +717,21 @@ class ActorHandle(object):
|
||||
worker.check_connected()
|
||||
|
||||
if state["ray_forking"]:
|
||||
actor_handle_id = compute_actor_handle_id(
|
||||
ray.ObjectID(state["previous_actor_handle_id"]),
|
||||
state["actor_forks"])
|
||||
actor_handle_id = ray.ObjectID(state["actor_handle_id"])
|
||||
else:
|
||||
actor_handle_id = None
|
||||
# Right now, if the actor handle has been pickled, we create a
|
||||
# temporary actor handle id for invocations.
|
||||
# TODO(pcm): This still leads to a lot of actor handles being
|
||||
# created, there should be a better way to handle pickled
|
||||
# actor handles.
|
||||
# TODO(swang): Accessing the worker's current task ID is not
|
||||
# thread-safe.
|
||||
# TODO(swang): Unpickling the same actor handle twice in the same
|
||||
# task will break the application, and unpickling it twice in the
|
||||
# same actor is likely a performance bug. We should consider
|
||||
# logging a warning in these cases.
|
||||
actor_handle_id = compute_actor_handle_id_non_forked(
|
||||
ray.ObjectID(state["actor_handle_id"]), worker.current_task_id)
|
||||
|
||||
# This is the driver ID of the driver that owns the actor, not
|
||||
# necessarily the driver that owns this actor handle.
|
||||
@@ -761,7 +743,6 @@ class ActorHandle(object):
|
||||
state["class_name"],
|
||||
ray.ObjectID(state["actor_cursor"])
|
||||
if state["actor_cursor"] is not None else None,
|
||||
state["actor_counter"],
|
||||
state["actor_method_names"],
|
||||
state["method_signatures"],
|
||||
state["method_num_return_vals"],
|
||||
@@ -769,9 +750,7 @@ class ActorHandle(object):
|
||||
if state["actor_creation_dummy_object_id"] is not None else None,
|
||||
state["actor_method_cpus"],
|
||||
actor_driver_id,
|
||||
actor_handle_id=actor_handle_id,
|
||||
previous_actor_handle_id=ray.ObjectID(
|
||||
state["previous_actor_handle_id"]))
|
||||
actor_handle_id=actor_handle_id)
|
||||
|
||||
def __getstate__(self):
|
||||
"""This code path is used by pickling but not by Ray forking."""
|
||||
|
||||
Reference in New Issue
Block a user