mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 19:00:36 +08:00
Code cleanup about python3 asyncio compat (#11134)
* cleanup python3 compat and others
This commit is contained in:
committed by
GitHub
parent
0dcfa9ed6c
commit
f0dba6bd2b
@@ -7,7 +7,6 @@
|
||||
from cpython.exc cimport PyErr_CheckSignals
|
||||
|
||||
import asyncio
|
||||
import numpy
|
||||
import gc
|
||||
import inspect
|
||||
import threading
|
||||
@@ -108,7 +107,6 @@ from ray.exceptions import (
|
||||
TaskCancelledError
|
||||
)
|
||||
from ray.utils import decode
|
||||
import gc
|
||||
import msgpack
|
||||
|
||||
cimport cpython
|
||||
|
||||
@@ -10,8 +10,6 @@ try:
|
||||
except ImportError:
|
||||
uvloop = None
|
||||
|
||||
import ray
|
||||
|
||||
|
||||
def get_new_event_loop():
|
||||
"""Construct a new event loop. Ray will use uvloop if it exists"""
|
||||
@@ -31,15 +29,3 @@ def sync_to_async(func):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def get_async(object_ref):
|
||||
"""C++ Asyncio version of ray.get"""
|
||||
loop = asyncio.get_event_loop()
|
||||
core_worker = ray.worker.global_worker.core_worker
|
||||
|
||||
future = loop.create_future()
|
||||
core_worker.get_async(object_ref, future)
|
||||
# A hack to keep a reference to the object ref for ref counting.
|
||||
future.object_ref = object_ref
|
||||
return future
|
||||
|
||||
@@ -552,7 +552,7 @@ class FunctionActorManager:
|
||||
else:
|
||||
return method(actor, *args, **kwargs)
|
||||
|
||||
# Set method_name and method as attributes to the executor clusore
|
||||
# Set method_name and method as attributes to the executor closure
|
||||
# so we can make decision based on these attributes in task executor.
|
||||
# Precisely, asyncio support requires to know whether:
|
||||
# - the method is a ray internal method: starts with __ray
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from ray.includes.unique_ids cimport CObjectID
|
||||
|
||||
import asyncio
|
||||
|
||||
import ray
|
||||
|
||||
cdef class ObjectRef(BaseID):
|
||||
@@ -62,11 +64,14 @@ cdef class ObjectRef(BaseID):
|
||||
return cls(CObjectID.FromRandom().Binary())
|
||||
|
||||
def __await__(self):
|
||||
# Delayed import because this can only be imported in py3.
|
||||
from ray.async_compat import get_async
|
||||
return get_async(self).__await__()
|
||||
return self.as_future().__await__()
|
||||
|
||||
def as_future(self):
|
||||
# Delayed import because this can only be imported in py3.
|
||||
from ray.async_compat import get_async
|
||||
return get_async(self)
|
||||
loop = asyncio.get_event_loop()
|
||||
core_worker = ray.worker.global_worker.core_worker
|
||||
|
||||
future = loop.create_future()
|
||||
core_worker.get_async(self, future)
|
||||
# A hack to keep a reference to the object ref for ref counting.
|
||||
future.object_ref = self
|
||||
return future
|
||||
|
||||
@@ -136,10 +136,7 @@ def wrap_to_ray_error(exception: Exception) -> RayTaskError:
|
||||
|
||||
|
||||
def ensure_async(func: Callable) -> Callable:
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return func
|
||||
else:
|
||||
return sync_to_async(func)
|
||||
return sync_to_async(func)
|
||||
|
||||
|
||||
class RayServeWorker:
|
||||
|
||||
@@ -119,14 +119,14 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop):
|
||||
def task():
|
||||
return 1
|
||||
|
||||
assert await ray.async_compat.get_async(task.remote()) == 1
|
||||
assert await task.remote().as_future() == 1
|
||||
|
||||
@ray.remote
|
||||
def task_throws():
|
||||
1 / 0
|
||||
|
||||
with pytest.raises(ray.exceptions.RayTaskError):
|
||||
await ray.async_compat.get_async(task_throws.remote())
|
||||
await task_throws.remote().as_future()
|
||||
|
||||
# Test actor calls.
|
||||
str_len = 200 * 1024
|
||||
@@ -145,15 +145,14 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop):
|
||||
|
||||
actor = Actor.remote()
|
||||
|
||||
actor_call_future = ray.async_compat.get_async(actor.echo.remote(2))
|
||||
actor_call_future = actor.echo.remote(2).as_future()
|
||||
assert await actor_call_future == 2
|
||||
|
||||
promoted_to_plasma_future = ray.async_compat.get_async(
|
||||
actor.big_object.remote())
|
||||
promoted_to_plasma_future = actor.big_object.remote().as_future()
|
||||
assert await promoted_to_plasma_future == "a" * str_len
|
||||
|
||||
with pytest.raises(ray.exceptions.RayTaskError):
|
||||
await ray.async_compat.get_async(actor.throw_error.remote())
|
||||
await actor.throw_error.remote().as_future()
|
||||
|
||||
|
||||
def test_asyncio_actor_async_get(ray_start_regular_shared):
|
||||
|
||||
Reference in New Issue
Block a user