diff --git a/doc/source/async_api.rst b/doc/source/async_api.rst index a305c2dd1..644699d88 100644 --- a/doc/source/async_api.rst +++ b/doc/source/async_api.rst @@ -162,3 +162,28 @@ Instead, you can use the ``max_concurrency`` Actor options without any async met Each invocation of the threaded actor will be running in a thread pool. The size of the threadpool is limited by the ``max_concurrency`` value. + +AsyncIO for Remote Tasks +------------------------ + +We don't support asyncio for remote tasks. The following snippet will fail: + +.. code-block:: python + + @ray.remote + async def f(): + pass + +Instead, you can wrap the ``async`` function with a wrapper to run the task synchronously: + +.. code-block:: python + + async def f(): + pass + + @ray.remote + def wrapper(): + import asyncio + asyncio.get_event_loop().run_until_complete(f()) + + \ No newline at end of file diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3d2b9ea73..47b6aa4f8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -477,6 +477,12 @@ cdef execute_task( if debugger_breakpoint != b"": ray.util.pdb.set_trace( breakpoint_uuid=debugger_breakpoint) + if inspect.iscoroutinefunction(function_executor): + raise ValueError( + "'async def' should not be used for remote " + "tasks. You can wrap the async function with " + "`asyncio.get_event_loop.run_until(f())`. " + "See more at docs.ray.io/async_api.html") outputs = function_executor(*args, **kwargs) next_breakpoint = ( ray.worker.global_worker.debugger_breakpoint) diff --git a/python/ray/tests/test_asyncio.py b/python/ray/tests/test_asyncio.py index 31f03aefa..fd9934325 100644 --- a/python/ray/tests/test_asyncio.py +++ b/python/ray/tests/test_asyncio.py @@ -244,6 +244,17 @@ def test_async_callback(ray_start_regular_shared): wait_for_condition(lambda: "completed-2" in global_set) +def test_async_function_errored(ray_start_regular_shared): + @ray.remote + async def f(): + pass + + ref = f.remote() + + with pytest.raises(ValueError): + ray.get(ref) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__]))