Fix exit_actor in asyncio mode (#12693)

This commit is contained in:
Simon Mo
2020-12-11 09:35:17 -08:00
committed by GitHub
parent 699ded5328
commit 68d7fa2137
3 changed files with 49 additions and 0 deletions
+14
View File
@@ -351,6 +351,18 @@ cdef execute_task(
# Automatically restrict the GPUs available to this task.
ray.utils.set_cuda_visible_devices(ray.get_gpu_ids())
# Helper method used to exit current asyncio actor.
# This is called when a KeyboardInterrupt is received by the main thread.
# Upon receiving a KeyboardInterrupt signal, Ray will exit the current
# worker. If the worker is processing normal tasks, Ray treat it as task
# cancellation from ray.cancel(object_ref). If the worker is an asyncio
# actor, Ray will exit the actor.
def exit_current_actor_if_asyncio():
if core_worker.current_actor_is_asyncio():
error = SystemExit(0)
error.is_ray_terminate = True
raise error
function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())
@@ -476,6 +488,7 @@ cdef execute_task(
ray.worker.global_worker.debugger_breakpoint = b""
task_exception = False
except KeyboardInterrupt as e:
exit_current_actor_if_asyncio()
raise TaskCancelledError(
core_worker.get_current_task_id())
if c_return_ids.size() == 1:
@@ -483,6 +496,7 @@ cdef execute_task(
# Check for a cancellation that was called when the function
# was exiting and was raised after the except block.
if not check_signals().ok():
exit_current_actor_if_asyncio()
task_exception = True
raise TaskCancelledError(
core_worker.get_current_task_id())
+9
View File
@@ -1,6 +1,7 @@
import inspect
import logging
import weakref
import _thread
import ray.ray_constants as ray_constants
import ray._raylet
@@ -1018,6 +1019,14 @@ def exit_actor():
ray.disconnect()
# Disconnect global state from GCS.
ray.state.state.disconnect()
# In asyncio actor mode, we can't raise SystemExit because it will just
# quit the asycnio event loop thread, not the main thread. Instead, we
# raise an interrupt signal to the main thread to tell it to exit.
if worker.core_worker.current_actor_is_asyncio():
_thread.interrupt_main()
return
# Set a flag to indicate this is an intentional actor exit. This
# reduces log verbosity.
exit = SystemExit(0)
+26
View File
@@ -198,6 +198,32 @@ async def test_asyncio_double_await(ray_start_regular_shared):
await waiting
@pytest.mark.asyncio
async def test_asyncio_exit_actor(ray_start_regular_shared):
# https://github.com/ray-project/ray/issues/12649
# The test should just hang without the fix.
@ray.remote
class Actor:
async def exit(self):
ray.actor.exit_actor()
async def ping(self):
return "pong"
async def loop_forever(self):
while True:
await asyncio.sleep(5)
a = Actor.options(max_task_retries=0).remote()
a.loop_forever.remote()
# Make sure exit_actor exits immediately, not once all tasks completed.
ray.get(a.exit.remote())
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ping.remote())
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))