mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 20:18:33 +08:00
Add pending task dependencies to ObjectID ref counting (#6054)
This commit is contained in:
@@ -1000,17 +1000,17 @@ cdef class CoreWorker:
|
||||
c_actor_id, &output))
|
||||
return output
|
||||
|
||||
def add_active_object_id(self, ObjectID object_id):
|
||||
def add_object_id_reference(self, ObjectID object_id):
|
||||
cdef:
|
||||
CObjectID c_object_id = object_id.native()
|
||||
# Note: faster to not release GIL for short-running op.
|
||||
self.core_worker.get().AddActiveObjectID(c_object_id)
|
||||
self.core_worker.get().AddObjectIDReference(c_object_id)
|
||||
|
||||
def remove_active_object_id(self, ObjectID object_id):
|
||||
def remove_object_id_reference(self, ObjectID object_id):
|
||||
cdef:
|
||||
CObjectID c_object_id = object_id.native()
|
||||
# Note: faster to not release GIL for short-running op.
|
||||
self.core_worker.get().RemoveActiveObjectID(c_object_id)
|
||||
self.core_worker.get().RemoveObjectIDReference(c_object_id)
|
||||
|
||||
# TODO: handle noreturn better
|
||||
cdef store_task_outputs(
|
||||
|
||||
@@ -101,8 +101,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
CActorID DeserializeAndRegisterActorHandle(const c_string &bytes)
|
||||
CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string
|
||||
*bytes)
|
||||
void AddActiveObjectID(const CObjectID &object_id)
|
||||
void RemoveActiveObjectID(const CObjectID &object_id)
|
||||
void AddObjectIDReference(const CObjectID &object_id)
|
||||
void RemoveObjectIDReference(const CObjectID &object_id)
|
||||
|
||||
CRayStatus SetClientOptions(c_string client_name, int64_t limit)
|
||||
CRayStatus Put(const CRayObject &object, CObjectID *object_id)
|
||||
|
||||
@@ -146,14 +146,14 @@ cdef class ObjectID(BaseID):
|
||||
# TODO(edoakes): there are dummy object IDs being created in
|
||||
# includes/task.pxi before the core worker is initialized.
|
||||
if hasattr(worker, "core_worker"):
|
||||
worker.core_worker.add_active_object_id(self)
|
||||
worker.core_worker.add_object_id_reference(self)
|
||||
self.in_core_worker = True
|
||||
|
||||
def __dealloc__(self):
|
||||
if self.in_core_worker:
|
||||
try:
|
||||
worker = ray.worker.global_worker
|
||||
worker.core_worker.remove_active_object_id(self)
|
||||
worker.core_worker.remove_object_id_reference(self)
|
||||
except Exception as e:
|
||||
# There is a strange error in rllib that causes the above to
|
||||
# fail. Somehow the global 'ray' variable corresponding to the
|
||||
|
||||
Reference in New Issue
Block a user