mirror of
https://github.com/wassname/ray.git
synced 2026-07-03 20:05:38 +08:00
Cleanup Plasma Async Callback (#7452)
This commit is contained in:
@@ -528,11 +528,14 @@ cdef CRayStatus task_execution_handler(
|
||||
cdef void async_plasma_callback(CObjectID object_id,
|
||||
int64_t data_size,
|
||||
int64_t metadata_size) with gil:
|
||||
message = [tuple([ObjectID(object_id.Binary()), data_size, metadata_size])]
|
||||
core_worker = ray.worker.global_worker.core_worker
|
||||
event_handler = core_worker.get_plasma_event_handler()
|
||||
if event_handler is not None:
|
||||
event_handler.process_notifications(message)
|
||||
obj_id = ObjectID(object_id.Binary())
|
||||
if data_size > 0 and obj_id:
|
||||
# This must be asynchronous to allow objects to avoid blocking the IO thread.
|
||||
event_handler._loop.call_soon_threadsafe(
|
||||
event_handler._complete_future, obj_id)
|
||||
|
||||
cdef CRayStatus check_signals() nogil:
|
||||
with gil:
|
||||
|
||||
@@ -10,23 +10,6 @@ class PlasmaObjectFuture(asyncio.Future):
|
||||
pass
|
||||
|
||||
|
||||
def _complete_future(event_handler, ray_object_id):
|
||||
# TODO(ilr): Consider race condition between popping from the
|
||||
# waiting_dict and as_future appending to the waiting_dict's list.
|
||||
logger.debug(
|
||||
"Completing plasma futures for object id {}".format(ray_object_id))
|
||||
obj = event_handler._worker.get_objects([ray_object_id], timeout=0)[0]
|
||||
futures = event_handler._waiting_dict.pop(ray_object_id)
|
||||
for fut in futures:
|
||||
try:
|
||||
fut.set_result(obj)
|
||||
except asyncio.InvalidStateError:
|
||||
# Avoid issues where process_notifications
|
||||
# and check_immediately both get executed
|
||||
logger.debug("Failed to set result for future {}."
|
||||
"Most likely already set.".format(fut))
|
||||
|
||||
|
||||
class PlasmaEventHandler:
|
||||
"""This class is an event handler for Plasma."""
|
||||
|
||||
@@ -36,15 +19,23 @@ class PlasmaEventHandler:
|
||||
self._worker = worker
|
||||
self._waiting_dict = defaultdict(list)
|
||||
|
||||
def process_notifications(self, messages):
|
||||
"""Process notifications."""
|
||||
for object_id, object_size, metadata_size in messages:
|
||||
if object_size > 0 and object_id in self._waiting_dict:
|
||||
# This must be asynchronous because it runs on the main IO
|
||||
# thread in the worker. If this is blocked, other messages
|
||||
# won't be received.
|
||||
self._loop.call_soon_threadsafe(_complete_future, self,
|
||||
object_id)
|
||||
def _complete_future(self, ray_object_id):
|
||||
# TODO(ilr): Consider race condition between popping from the
|
||||
# waiting_dict and as_future appending to the waiting_dict's list.
|
||||
logger.debug(
|
||||
"Completing plasma futures for object id {}".format(ray_object_id))
|
||||
if ray_object_id not in self._waiting_dict:
|
||||
return
|
||||
obj = self._worker.get_objects([ray_object_id], timeout=0)[0]
|
||||
futures = self._waiting_dict.pop(ray_object_id)
|
||||
for fut in futures:
|
||||
try:
|
||||
fut.set_result(obj)
|
||||
except asyncio.InvalidStateError:
|
||||
# Avoid issues where process_notifications
|
||||
# and check_immediately both get executed
|
||||
logger.debug("Failed to set result for future {}."
|
||||
"Most likely already set.".format(fut))
|
||||
|
||||
def close(self):
|
||||
"""Clean up this handler."""
|
||||
@@ -55,7 +46,7 @@ class PlasmaEventHandler:
|
||||
def check_immediately(self, object_id):
|
||||
ready, _ = ray.wait([object_id], timeout=0)
|
||||
if ready:
|
||||
_complete_future(self, object_id)
|
||||
self._complete_future(object_id)
|
||||
|
||||
def as_future(self, object_id, check_ready=True):
|
||||
"""Turn an object_id into a Future object.
|
||||
|
||||
Reference in New Issue
Block a user