diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index a55c87144..3be251003 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -7,7 +7,6 @@ from cpython.exc cimport PyErr_CheckSignals import asyncio -import numpy import gc import inspect import threading @@ -108,7 +107,6 @@ from ray.exceptions import ( TaskCancelledError ) from ray.utils import decode -import gc import msgpack cimport cpython diff --git a/python/ray/async_compat.py b/python/ray/async_compat.py index 05b02cecb..b1ecccf25 100644 --- a/python/ray/async_compat.py +++ b/python/ray/async_compat.py @@ -10,8 +10,6 @@ try: except ImportError: uvloop = None -import ray - def get_new_event_loop(): """Construct a new event loop. Ray will use uvloop if it exists""" @@ -31,15 +29,3 @@ def sync_to_async(func): return func(*args, **kwargs) return wrapper - - -def get_async(object_ref): - """C++ Asyncio version of ray.get""" - loop = asyncio.get_event_loop() - core_worker = ray.worker.global_worker.core_worker - - future = loop.create_future() - core_worker.get_async(object_ref, future) - # A hack to keep a reference to the object ref for ref counting. - future.object_ref = object_ref - return future diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index 52a218c9e..ffe86a54b 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -552,7 +552,7 @@ class FunctionActorManager: else: return method(actor, *args, **kwargs) - # Set method_name and method as attributes to the executor clusore + # Set method_name and method as attributes to the executor closure # so we can make decision based on these attributes in task executor. # Precisely, asyncio support requires to know whether: # - the method is a ray internal method: starts with __ray diff --git a/python/ray/includes/object_ref.pxi b/python/ray/includes/object_ref.pxi index 1b4f73001..ae99c088c 100644 --- a/python/ray/includes/object_ref.pxi +++ b/python/ray/includes/object_ref.pxi @@ -1,5 +1,7 @@ from ray.includes.unique_ids cimport CObjectID +import asyncio + import ray cdef class ObjectRef(BaseID): @@ -62,11 +64,14 @@ cdef class ObjectRef(BaseID): return cls(CObjectID.FromRandom().Binary()) def __await__(self): - # Delayed import because this can only be imported in py3. - from ray.async_compat import get_async - return get_async(self).__await__() + return self.as_future().__await__() def as_future(self): - # Delayed import because this can only be imported in py3. - from ray.async_compat import get_async - return get_async(self) + loop = asyncio.get_event_loop() + core_worker = ray.worker.global_worker.core_worker + + future = loop.create_future() + core_worker.get_async(self, future) + # A hack to keep a reference to the object ref for ref counting. + future.object_ref = self + return future diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index d686c30f5..510663932 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -136,10 +136,7 @@ def wrap_to_ray_error(exception: Exception) -> RayTaskError: def ensure_async(func: Callable) -> Callable: - if inspect.iscoroutinefunction(func): - return func - else: - return sync_to_async(func) + return sync_to_async(func) class RayServeWorker: diff --git a/python/ray/tests/test_asyncio.py b/python/ray/tests/test_asyncio.py index acb1c126e..5bcb55d57 100644 --- a/python/ray/tests/test_asyncio.py +++ b/python/ray/tests/test_asyncio.py @@ -119,14 +119,14 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop): def task(): return 1 - assert await ray.async_compat.get_async(task.remote()) == 1 + assert await task.remote().as_future() == 1 @ray.remote def task_throws(): 1 / 0 with pytest.raises(ray.exceptions.RayTaskError): - await ray.async_compat.get_async(task_throws.remote()) + await task_throws.remote().as_future() # Test actor calls. str_len = 200 * 1024 @@ -145,15 +145,14 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop): actor = Actor.remote() - actor_call_future = ray.async_compat.get_async(actor.echo.remote(2)) + actor_call_future = actor.echo.remote(2).as_future() assert await actor_call_future == 2 - promoted_to_plasma_future = ray.async_compat.get_async( - actor.big_object.remote()) + promoted_to_plasma_future = actor.big_object.remote().as_future() assert await promoted_to_plasma_future == "a" * str_len with pytest.raises(ray.exceptions.RayTaskError): - await ray.async_compat.get_async(actor.throw_error.remote()) + await actor.throw_error.remote().as_future() def test_asyncio_actor_async_get(ray_start_regular_shared):