diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 86cdfebee..1caa49707 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -528,11 +528,14 @@ cdef CRayStatus task_execution_handler( cdef void async_plasma_callback(CObjectID object_id, int64_t data_size, int64_t metadata_size) with gil: - message = [tuple([ObjectID(object_id.Binary()), data_size, metadata_size])] core_worker = ray.worker.global_worker.core_worker event_handler = core_worker.get_plasma_event_handler() if event_handler is not None: - event_handler.process_notifications(message) + obj_id = ObjectID(object_id.Binary()) + if data_size > 0 and obj_id: + # This must be asynchronous to allow objects to avoid blocking the IO thread. + event_handler._loop.call_soon_threadsafe( + event_handler._complete_future, obj_id) cdef CRayStatus check_signals() nogil: with gil: diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index 5e127dc1b..a43670bd9 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -10,23 +10,6 @@ class PlasmaObjectFuture(asyncio.Future): pass -def _complete_future(event_handler, ray_object_id): - # TODO(ilr): Consider race condition between popping from the - # waiting_dict and as_future appending to the waiting_dict's list. - logger.debug( - "Completing plasma futures for object id {}".format(ray_object_id)) - obj = event_handler._worker.get_objects([ray_object_id], timeout=0)[0] - futures = event_handler._waiting_dict.pop(ray_object_id) - for fut in futures: - try: - fut.set_result(obj) - except asyncio.InvalidStateError: - # Avoid issues where process_notifications - # and check_immediately both get executed - logger.debug("Failed to set result for future {}." - "Most likely already set.".format(fut)) - - class PlasmaEventHandler: """This class is an event handler for Plasma.""" @@ -36,15 +19,23 @@ class PlasmaEventHandler: self._worker = worker self._waiting_dict = defaultdict(list) - def process_notifications(self, messages): - """Process notifications.""" - for object_id, object_size, metadata_size in messages: - if object_size > 0 and object_id in self._waiting_dict: - # This must be asynchronous because it runs on the main IO - # thread in the worker. If this is blocked, other messages - # won't be received. - self._loop.call_soon_threadsafe(_complete_future, self, - object_id) + def _complete_future(self, ray_object_id): + # TODO(ilr): Consider race condition between popping from the + # waiting_dict and as_future appending to the waiting_dict's list. + logger.debug( + "Completing plasma futures for object id {}".format(ray_object_id)) + if ray_object_id not in self._waiting_dict: + return + obj = self._worker.get_objects([ray_object_id], timeout=0)[0] + futures = self._waiting_dict.pop(ray_object_id) + for fut in futures: + try: + fut.set_result(obj) + except asyncio.InvalidStateError: + # Avoid issues where process_notifications + # and check_immediately both get executed + logger.debug("Failed to set result for future {}." + "Most likely already set.".format(fut)) def close(self): """Clean up this handler.""" @@ -55,7 +46,7 @@ class PlasmaEventHandler: def check_immediately(self, object_id): ready, _ = ray.wait([object_id], timeout=0) if ready: - _complete_future(self, object_id) + self._complete_future(object_id) def as_future(self, object_id, check_ready=True): """Turn an object_id into a Future object.