[Asyncio] Remove async init legacy code (#8177)

* [Asyncio] Remove async init legacy code

* Fix places that call async_init
This commit is contained in:
Simon Mo
2020-04-25 09:32:38 -07:00
committed by GitHub
parent 9dc625318f
commit 13c14eac07
3 changed files with 8 additions and 33 deletions
+2 -2
View File
@@ -1142,8 +1142,8 @@ cdef class CoreWorker:
asyncio.set_event_loop(self.async_event_loop)
# Initialize the async plasma connection.
# Delayed import due to async_api depends on _raylet.
from ray.experimental.async_api import _async_init
self.async_event_loop.run_until_complete(_async_init())
from ray.experimental.async_api import init as plasma_async_init
plasma_async_init()
# Create and attach the monitor object
monitor_state = AsyncMonitorState(self.async_event_loop)
+4 -29
View File
@@ -1,7 +1,4 @@
# Note: asyncio is only compatible with Python 3
import asyncio
import threading
import ray
from ray.experimental.async_plasma import PlasmaEventHandler
@@ -10,7 +7,10 @@ from ray.services import logger
handler = None
async def _async_init():
def init():
"""Initialize plasma event handlers for asyncio support."""
assert ray.is_initialized(), "Please call ray.init before async_api.init"
global handler
if handler is None:
worker = ray.worker.global_worker
@@ -20,31 +20,6 @@ async def _async_init():
logger.debug("AsyncPlasma Connection Created!")
def init():
"""
Initialize synchronously.
"""
assert ray.is_initialized(), "Please call ray.init before async_api.init"
# Noop when handler is set.
if handler is not None:
return
loop = asyncio.get_event_loop()
if loop.is_running():
if loop._thread_id != threading.get_ident():
# If the loop is runing outside current thread, we actually need
# to do this to make sure the context is initialized.
asyncio.run_coroutine_threadsafe(_async_init(), loop=loop)
else:
async_init_done = asyncio.get_event_loop().create_task(
_async_init())
# Block until the async init finishes.
async_init_done.done()
else:
asyncio.get_event_loop().run_until_complete(_async_init())
def as_future(object_id):
"""Turn an object_id into a Future object.
+2 -2
View File
@@ -113,8 +113,8 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop):
loop.set_debug(True)
# This is needed for async plasma
from ray.experimental.async_api import _async_init
await _async_init()
from ray.experimental.async_api import init
init()
# Test Async Plasma
@ray.remote