diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c602fda9a..1f6c043a7 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1173,6 +1173,10 @@ cdef class CoreWorker: if self.async_event_loop is None: self.async_event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.async_event_loop) + # Initialize the async plasma connection. + # Delayed import due to async_api depends on _raylet. + from ray.experimental.async_api import _async_init + self.async_event_loop.run_until_complete(_async_init()) if self.async_thread is None: self.async_thread = threading.Thread( target=lambda: self.async_event_loop.run_forever() diff --git a/python/ray/tests/py3_test.py b/python/ray/tests/py3_test.py index 31addb6e2..184521398 100644 --- a/python/ray/tests/py3_test.py +++ b/python/ray/tests/py3_test.py @@ -247,3 +247,17 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop): with pytest.raises(ray.exceptions.RayTaskError): await ray.async_compat.get_async(direct.throw_error.remote()) + + +def test_asyncio_actor_async_get(ray_start_regular_shared): + @ray.remote + def remote_task(): + return 1 + + @ray.remote + class AsyncGetter: + async def get(self): + return await remote_task.remote() + + getter = AsyncGetter.options(is_asyncio=True).remote() + assert ray.get(getter.get.remote()) == 1