From ef1b0c13c35391434fdc0ccaa750bd9d30ce24dc Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Tue, 1 Dec 2020 13:07:43 -0800 Subject: [PATCH] Async Future Throws RayError as well (#12419) --- python/ray/_raylet.pyx | 3 +++ python/ray/serve/long_poll.py | 27 ++++++++++++++++++--------- python/ray/tests/test_asyncio.py | 4 ++++ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 757e9794d..ce21a88ec 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1505,6 +1505,9 @@ cdef void async_set_result(shared_ptr[CRayObject] obj, if isinstance(result, RayTaskError): ray.worker.last_task_error_raise_time = time.time() py_future.set_exception(result.as_instanceof_cause()) + elif isinstance(result, RayError): + # Directly raise exception for RayActorError + py_future.set_exception(result) else: py_future.set_result(result) diff --git a/python/ray/serve/long_poll.py b/python/ray/serve/long_poll.py index 91c1d8c68..2d747e94e 100644 --- a/python/ray/serve/long_poll.py +++ b/python/ray/serve/long_poll.py @@ -63,15 +63,24 @@ class LongPollerAsyncClient: async def _do_long_poll(self): while True: - updates: Dict[str, UpdatedObject] = await self._poll_once() - self._update(updates) - logger.debug(f"LongPollerClient received updates: {updates}") - for key, updated_object in updates.items(): - # NOTE(simon): This blocks the loop from doing another poll. - # Consider use loop.create_task here or poll first then call - # the callbacks. - callback = self.key_listeners[key] - await callback(updated_object.object_snapshot) + try: + updates: Dict[str, UpdatedObject] = await self._poll_once() + self._update(updates) + logger.debug(f"LongPollerClient received udpates: {updates}") + for key, updated_object in updates.items(): + # NOTE(simon): + # This blocks the loop from doing another poll. Consider + # use loop.create_task here or poll first then call the + # callbacks. + callback = self.key_listeners[key] + await callback(updated_object.object_snapshot) + except ray.exceptions.RayActorError: + # This can happen during shutdown where the controller is + # intentionally killed, the client should just gracefully + # exit. + logger.debug("LongPollerClient failed to connect to host. " + "Shutting down.") + break class LongPollerHost: diff --git a/python/ray/tests/test_asyncio.py b/python/ray/tests/test_asyncio.py index 5bcb55d57..9bf3d86b1 100644 --- a/python/ray/tests/test_asyncio.py +++ b/python/ray/tests/test_asyncio.py @@ -154,6 +154,10 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop): with pytest.raises(ray.exceptions.RayTaskError): await actor.throw_error.remote().as_future() + ray.kill(actor) + with pytest.raises(ray.exceptions.RayActorError): + await actor.echo.remote(1) + def test_asyncio_actor_async_get(ray_start_regular_shared): @ray.remote