From 68d7fa21378a97dc8fb3ce81d3b60b2ac9d04a0d Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Fri, 11 Dec 2020 09:35:17 -0800 Subject: [PATCH] Fix exit_actor in asyncio mode (#12693) --- python/ray/_raylet.pyx | 14 ++++++++++++++ python/ray/actor.py | 9 +++++++++ python/ray/tests/test_asyncio.py | 26 ++++++++++++++++++++++++++ 3 files changed, 49 insertions(+) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 18ecb7c19..462ba3ade 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -351,6 +351,18 @@ cdef execute_task( # Automatically restrict the GPUs available to this task. ray.utils.set_cuda_visible_devices(ray.get_gpu_ids()) + # Helper method used to exit current asyncio actor. + # This is called when a KeyboardInterrupt is received by the main thread. + # Upon receiving a KeyboardInterrupt signal, Ray will exit the current + # worker. If the worker is processing normal tasks, Ray treat it as task + # cancellation from ray.cancel(object_ref). If the worker is an asyncio + # actor, Ray will exit the actor. + def exit_current_actor_if_asyncio(): + if core_worker.current_actor_is_asyncio(): + error = SystemExit(0) + error.is_ray_terminate = True + raise error + function_descriptor = CFunctionDescriptorToPython( ray_function.GetFunctionDescriptor()) @@ -476,6 +488,7 @@ cdef execute_task( ray.worker.global_worker.debugger_breakpoint = b"" task_exception = False except KeyboardInterrupt as e: + exit_current_actor_if_asyncio() raise TaskCancelledError( core_worker.get_current_task_id()) if c_return_ids.size() == 1: @@ -483,6 +496,7 @@ cdef execute_task( # Check for a cancellation that was called when the function # was exiting and was raised after the except block. if not check_signals().ok(): + exit_current_actor_if_asyncio() task_exception = True raise TaskCancelledError( core_worker.get_current_task_id()) diff --git a/python/ray/actor.py b/python/ray/actor.py index b8981ca3d..5baaf6a9a 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1,6 +1,7 @@ import inspect import logging import weakref +import _thread import ray.ray_constants as ray_constants import ray._raylet @@ -1018,6 +1019,14 @@ def exit_actor(): ray.disconnect() # Disconnect global state from GCS. ray.state.state.disconnect() + + # In asyncio actor mode, we can't raise SystemExit because it will just + # quit the asycnio event loop thread, not the main thread. Instead, we + # raise an interrupt signal to the main thread to tell it to exit. + if worker.core_worker.current_actor_is_asyncio(): + _thread.interrupt_main() + return + # Set a flag to indicate this is an intentional actor exit. This # reduces log verbosity. exit = SystemExit(0) diff --git a/python/ray/tests/test_asyncio.py b/python/ray/tests/test_asyncio.py index 9bf3d86b1..18dd63a22 100644 --- a/python/ray/tests/test_asyncio.py +++ b/python/ray/tests/test_asyncio.py @@ -198,6 +198,32 @@ async def test_asyncio_double_await(ray_start_regular_shared): await waiting +@pytest.mark.asyncio +async def test_asyncio_exit_actor(ray_start_regular_shared): + # https://github.com/ray-project/ray/issues/12649 + # The test should just hang without the fix. + + @ray.remote + class Actor: + async def exit(self): + ray.actor.exit_actor() + + async def ping(self): + return "pong" + + async def loop_forever(self): + while True: + await asyncio.sleep(5) + + a = Actor.options(max_task_retries=0).remote() + a.loop_forever.remote() + # Make sure exit_actor exits immediately, not once all tasks completed. + ray.get(a.exit.remote()) + + with pytest.raises(ray.exceptions.RayActorError): + ray.get(a.ping.remote()) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__]))