diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f8e46d035..6b90cdf75 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1142,8 +1142,8 @@ cdef class CoreWorker: asyncio.set_event_loop(self.async_event_loop) # Initialize the async plasma connection. # Delayed import due to async_api depends on _raylet. - from ray.experimental.async_api import _async_init - self.async_event_loop.run_until_complete(_async_init()) + from ray.experimental.async_api import init as plasma_async_init + plasma_async_init() # Create and attach the monitor object monitor_state = AsyncMonitorState(self.async_event_loop) diff --git a/python/ray/experimental/async_api.py b/python/ray/experimental/async_api.py index 5ae3323fc..0a00aafff 100644 --- a/python/ray/experimental/async_api.py +++ b/python/ray/experimental/async_api.py @@ -1,7 +1,4 @@ -# Note: asyncio is only compatible with Python 3 - import asyncio -import threading import ray from ray.experimental.async_plasma import PlasmaEventHandler @@ -10,7 +7,10 @@ from ray.services import logger handler = None -async def _async_init(): +def init(): + """Initialize plasma event handlers for asyncio support.""" + assert ray.is_initialized(), "Please call ray.init before async_api.init" + global handler if handler is None: worker = ray.worker.global_worker @@ -20,31 +20,6 @@ async def _async_init(): logger.debug("AsyncPlasma Connection Created!") -def init(): - """ - Initialize synchronously. - """ - assert ray.is_initialized(), "Please call ray.init before async_api.init" - - # Noop when handler is set. - if handler is not None: - return - - loop = asyncio.get_event_loop() - if loop.is_running(): - if loop._thread_id != threading.get_ident(): - # If the loop is runing outside current thread, we actually need - # to do this to make sure the context is initialized. - asyncio.run_coroutine_threadsafe(_async_init(), loop=loop) - else: - async_init_done = asyncio.get_event_loop().create_task( - _async_init()) - # Block until the async init finishes. - async_init_done.done() - else: - asyncio.get_event_loop().run_until_complete(_async_init()) - - def as_future(object_id): """Turn an object_id into a Future object. diff --git a/python/ray/tests/test_asyncio.py b/python/ray/tests/test_asyncio.py index eb6308df5..657a68664 100644 --- a/python/ray/tests/test_asyncio.py +++ b/python/ray/tests/test_asyncio.py @@ -113,8 +113,8 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop): loop.set_debug(True) # This is needed for async plasma - from ray.experimental.async_api import _async_init - await _async_init() + from ray.experimental.async_api import init + init() # Test Async Plasma @ray.remote