mirror of
https://github.com/wassname/ray.git
synced 2026-07-05 14:44:48 +08:00
Initialize async plasma for async actors (#6813)
* Initialize async plasma for async actors * Address comment
This commit is contained in:
@@ -1173,6 +1173,10 @@ cdef class CoreWorker:
|
||||
if self.async_event_loop is None:
|
||||
self.async_event_loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.async_event_loop)
|
||||
# Initialize the async plasma connection.
|
||||
# Delayed import due to async_api depends on _raylet.
|
||||
from ray.experimental.async_api import _async_init
|
||||
self.async_event_loop.run_until_complete(_async_init())
|
||||
if self.async_thread is None:
|
||||
self.async_thread = threading.Thread(
|
||||
target=lambda: self.async_event_loop.run_forever()
|
||||
|
||||
@@ -247,3 +247,17 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop):
|
||||
|
||||
with pytest.raises(ray.exceptions.RayTaskError):
|
||||
await ray.async_compat.get_async(direct.throw_error.remote())
|
||||
|
||||
|
||||
def test_asyncio_actor_async_get(ray_start_regular_shared):
|
||||
@ray.remote
|
||||
def remote_task():
|
||||
return 1
|
||||
|
||||
@ray.remote
|
||||
class AsyncGetter:
|
||||
async def get(self):
|
||||
return await remote_task.remote()
|
||||
|
||||
getter = AsyncGetter.options(is_asyncio=True).remote()
|
||||
assert ray.get(getter.get.remote()) == 1
|
||||
|
||||
Reference in New Issue
Block a user