mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:22:39 +08:00
Fix a crash problem caused by GetActorHandle in ActorManager (#13164)
This commit is contained in:
@@ -17,7 +17,12 @@ from ray.includes.common cimport (
|
||||
CBuffer,
|
||||
CRayObject
|
||||
)
|
||||
from ray.includes.libcoreworker cimport CActorHandle, CFiberEvent
|
||||
from ray.includes.libcoreworker cimport (
|
||||
ActorHandleSharedPtr,
|
||||
CActorHandle,
|
||||
CFiberEvent,
|
||||
)
|
||||
|
||||
from ray.includes.unique_ids cimport (
|
||||
CObjectID,
|
||||
CActorID
|
||||
@@ -101,7 +106,7 @@ cdef class CoreWorker:
|
||||
self, worker, outputs, const c_vector[CObjectID] return_ids,
|
||||
c_vector[shared_ptr[CRayObject]] *returns)
|
||||
cdef yield_current_fiber(self, CFiberEvent &fiber_event)
|
||||
cdef make_actor_handle(self, const CActorHandle *c_actor_handle)
|
||||
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)
|
||||
|
||||
cdef class FunctionDescriptor:
|
||||
cdef:
|
||||
|
||||
+13
-18
@@ -79,6 +79,7 @@ from ray.includes.unique_ids cimport (
|
||||
CPlacementGroupID,
|
||||
)
|
||||
from ray.includes.libcoreworker cimport (
|
||||
ActorHandleSharedPtr,
|
||||
CActorCreationOptions,
|
||||
CPlacementGroupCreationOptions,
|
||||
CCoreWorkerOptions,
|
||||
@@ -1321,22 +1322,22 @@ cdef class CoreWorker:
|
||||
CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference(
|
||||
c_actor_id)
|
||||
|
||||
cdef make_actor_handle(self, const CActorHandle *c_actor_handle):
|
||||
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle):
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
manager = worker.function_actor_manager
|
||||
|
||||
actor_id = ActorID(c_actor_handle.GetActorID().Binary())
|
||||
job_id = JobID(c_actor_handle.CreationJobID().Binary())
|
||||
language = Language.from_native(c_actor_handle.ActorLanguage())
|
||||
actor_creation_function_descriptor = \
|
||||
CFunctionDescriptorToPython(
|
||||
c_actor_handle.ActorCreationTaskFunctionDescriptor())
|
||||
actor_id = ActorID(dereference(c_actor_handle).GetActorID().Binary())
|
||||
job_id = JobID(dereference(c_actor_handle).CreationJobID().Binary())
|
||||
language = Language.from_native(
|
||||
dereference(c_actor_handle).ActorLanguage())
|
||||
actor_creation_function_descriptor = CFunctionDescriptorToPython(
|
||||
dereference(c_actor_handle).ActorCreationTaskFunctionDescriptor())
|
||||
if language == Language.PYTHON:
|
||||
assert isinstance(actor_creation_function_descriptor,
|
||||
PythonFunctionDescriptor)
|
||||
# Load actor_method_cpu from actor handle's extension data.
|
||||
extension_data = <str>c_actor_handle.ExtensionData()
|
||||
extension_data = <str>dereference(c_actor_handle).ExtensionData()
|
||||
if extension_data:
|
||||
actor_method_cpu = int(extension_data)
|
||||
else:
|
||||
@@ -1372,27 +1373,21 @@ cdef class CoreWorker:
|
||||
.GetCoreWorker()
|
||||
.DeserializeAndRegisterActorHandle(
|
||||
bytes, c_outer_object_id))
|
||||
cdef:
|
||||
# NOTE: This handle should not be stored anywhere.
|
||||
const CActorHandle* c_actor_handle = (
|
||||
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id))
|
||||
return self.make_actor_handle(c_actor_handle)
|
||||
return self.make_actor_handle(
|
||||
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id))
|
||||
|
||||
def get_named_actor_handle(self, const c_string &name):
|
||||
cdef:
|
||||
pair[const CActorHandle*, CRayStatus] named_actor_handle_pair
|
||||
# NOTE: This handle should not be stored anywhere.
|
||||
const CActorHandle* c_actor_handle
|
||||
pair[ActorHandleSharedPtr, CRayStatus] named_actor_handle_pair
|
||||
|
||||
# We need it because GetNamedActorHandle needs
|
||||
# to call a method that holds the gil.
|
||||
with nogil:
|
||||
named_actor_handle_pair = (
|
||||
CCoreWorkerProcess.GetCoreWorker().GetNamedActorHandle(name))
|
||||
c_actor_handle = named_actor_handle_pair.first
|
||||
check_status(named_actor_handle_pair.second)
|
||||
|
||||
return self.make_actor_handle(c_actor_handle)
|
||||
return self.make_actor_handle(named_actor_handle_pair.first)
|
||||
|
||||
def serialize_actor_handle(self, ActorID actor_id):
|
||||
cdef:
|
||||
|
||||
@@ -49,6 +49,11 @@ ctypedef void (*ray_callback_function) \
|
||||
ctypedef void (*plasma_callback_function) \
|
||||
(CObjectID object_id, int64_t data_size, int64_t metadata_size)
|
||||
|
||||
# NOTE: This ctypedef is needed, because Cython doesn't compile
|
||||
# "pair[shared_ptr[const CActorHandle], CRayStatus]".
|
||||
# This is a bug of cython: https://github.com/cython/cython/issues/3967.
|
||||
ctypedef shared_ptr[const CActorHandle] ActorHandleSharedPtr
|
||||
|
||||
cdef extern from "ray/core_worker/profiling.h" nogil:
|
||||
cdef cppclass CProfiler "ray::worker::Profiler":
|
||||
void Start()
|
||||
@@ -140,8 +145,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string
|
||||
*bytes,
|
||||
CObjectID *c_actor_handle_id)
|
||||
const CActorHandle* GetActorHandle(const CActorID &actor_id) const
|
||||
pair[const CActorHandle*, CRayStatus] GetNamedActorHandle(
|
||||
ActorHandleSharedPtr GetActorHandle(const CActorID &actor_id) const
|
||||
pair[ActorHandleSharedPtr, CRayStatus] GetNamedActorHandle(
|
||||
const c_string &name)
|
||||
void AddLocalReference(const CObjectID &object_id)
|
||||
void RemoveLocalReference(const CObjectID &object_id)
|
||||
|
||||
Reference in New Issue
Block a user