mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 15:22:56 +08:00
Actor dummy object garbage collection (#3593)
* Convert UniqueID::nil() to a constructor * Cleanup actor handle pickling code * Add new actor handles to the task spec * Pass in new actor handles * Add new handles to the actor registration * Regression test for actor handle forking and GC * lint and doc * Handle pickled actor handles in the backend and some refactoring * Add regression test for dummy object GC and pickled actor handles * Check for duplicate actor tasks on submission * Regression test for forking twice, fix failed named actor leak * Fix bug for forking twice * lint * Revert "Fix bug for forking twice" This reverts commit 3da85e59d401e53606c2e37ffbebcc8653ff27ac. * Add new actor handles when task is assigned, not finished * Remove comment * remove UniqueID() * Updates * update * fix * fix java * fixes * fix
This commit is contained in:
@@ -485,6 +485,10 @@ class ActorHandle(object):
|
||||
_ray_actor_driver_id: The driver ID of the job that created the actor
|
||||
(it is possible that this ActorHandle exists on a driver with a
|
||||
different driver ID).
|
||||
_ray_new_actor_handles: The new actor handles that were created from
|
||||
this handle since the last task on this handle was submitted. This
|
||||
is used to garbage-collect dummy objects that are no longer
|
||||
necessary in the backend.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
@@ -520,6 +524,7 @@ class ActorHandle(object):
|
||||
actor_creation_dummy_object_id)
|
||||
self._ray_actor_method_cpus = actor_method_cpus
|
||||
self._ray_actor_driver_id = actor_driver_id
|
||||
self._ray_new_actor_handles = []
|
||||
|
||||
def _actor_method_call(self,
|
||||
method_name,
|
||||
@@ -585,6 +590,7 @@ class ActorHandle(object):
|
||||
actor_creation_dummy_object_id=(
|
||||
self._ray_actor_creation_dummy_object_id),
|
||||
execution_dependencies=execution_dependencies,
|
||||
new_actor_handles=self._ray_new_actor_handles,
|
||||
# We add one for the dummy return ID.
|
||||
num_return_vals=num_return_vals + 1,
|
||||
resources={"CPU": self._ray_actor_method_cpus},
|
||||
@@ -596,6 +602,9 @@ class ActorHandle(object):
|
||||
# The last object returned is the dummy object that should be
|
||||
# passed in to the next actor method. Do not return it to the user.
|
||||
self._ray_actor_cursor = object_ids.pop()
|
||||
# We have notified the backend of the new actor handles to expect since
|
||||
# the last task was submitted, so clear the list.
|
||||
self._ray_new_actor_handles = []
|
||||
|
||||
if len(object_ids) == 1:
|
||||
object_ids = object_ids[0]
|
||||
@@ -702,6 +711,19 @@ class ActorHandle(object):
|
||||
|
||||
if ray_forking:
|
||||
self._ray_actor_forks += 1
|
||||
new_actor_handle_id = actor_handle_id
|
||||
else:
|
||||
# The execution dependency for a pickled actor handle is never safe
|
||||
# to release, since it could be unpickled and submit another
|
||||
# dependent task at any time. Therefore, we notify the backend of a
|
||||
# random handle ID that will never actually be used.
|
||||
new_actor_handle_id = ray.ObjectID(_random_string())
|
||||
# Notify the backend to expect this new actor handle. The backend will
|
||||
# not release the cursor for any new handles until the first task for
|
||||
# each of the new handles is submitted.
|
||||
# NOTE(swang): There is currently no garbage collection for actor
|
||||
# handles until the actor itself is removed.
|
||||
self._ray_new_actor_handles.append(new_actor_handle_id)
|
||||
|
||||
return state
|
||||
|
||||
|
||||
@@ -56,5 +56,8 @@ def register_actor(name, actor_handle):
|
||||
# Add the actor to Redis if it does not already exist.
|
||||
already_exists = _internal_kv_put(actor_name, pickled_state)
|
||||
if already_exists:
|
||||
# If the registration fails, then erase the new actor handle that
|
||||
# was added when pickling the actor handle.
|
||||
actor_handle._ray_new_actor_handles.pop()
|
||||
raise ValueError(
|
||||
"Error: the actor with name={} already exists".format(name))
|
||||
|
||||
@@ -524,6 +524,7 @@ class Worker(object):
|
||||
actor_creation_dummy_object_id=None,
|
||||
max_actor_reconstructions=0,
|
||||
execution_dependencies=None,
|
||||
new_actor_handles=None,
|
||||
num_return_vals=None,
|
||||
resources=None,
|
||||
placement_resources=None,
|
||||
@@ -594,6 +595,9 @@ class Worker(object):
|
||||
if execution_dependencies is None:
|
||||
execution_dependencies = []
|
||||
|
||||
if new_actor_handles is None:
|
||||
new_actor_handles = []
|
||||
|
||||
if driver_id is None:
|
||||
driver_id = self.task_driver_id
|
||||
|
||||
@@ -628,8 +632,8 @@ class Worker(object):
|
||||
num_return_vals, self.current_task_id, task_index,
|
||||
actor_creation_id, actor_creation_dummy_object_id,
|
||||
max_actor_reconstructions, actor_id, actor_handle_id,
|
||||
actor_counter, execution_dependencies, resources,
|
||||
placement_resources)
|
||||
actor_counter, new_actor_handles, execution_dependencies,
|
||||
resources, placement_resources)
|
||||
self.raylet_client.submit_task(task)
|
||||
|
||||
return task.returns()
|
||||
@@ -1944,7 +1948,7 @@ def connect(ray_params,
|
||||
worker.current_task_id, worker.task_index,
|
||||
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), 0,
|
||||
ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID),
|
||||
nil_actor_counter, [], {"CPU": 0}, {})
|
||||
nil_actor_counter, [], [], {"CPU": 0}, {})
|
||||
|
||||
# Add the driver task to the task table.
|
||||
global_state._execute_command(driver_task.task_id(), "RAY.TABLE_ADD",
|
||||
|
||||
Reference in New Issue
Block a user