From c5544eb07080156bca03d061e83edab0efc48b7a Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Tue, 2 Jun 2020 14:11:16 -0700 Subject: [PATCH] [Async] Remove Monitor + Cleanup Code (#8691) --- python/ray/_raylet.pyx | 41 +++++++++----------------------- python/ray/async_compat.py | 48 ++++---------------------------------- 2 files changed, 16 insertions(+), 73 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 6e9055979..48df281cc 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -82,8 +82,7 @@ from ray.includes.ray_config cimport RayConfig from ray.includes.global_state_accessor cimport CGlobalStateAccessor import ray -from ray.async_compat import (sync_to_async, - AsyncGetResponse, AsyncMonitorState) +from ray.async_compat import (sync_to_async, AsyncGetResponse) import ray.memory_monitor as memory_monitor import ray.ray_constants as ray_constants from ray import profiling @@ -395,21 +394,8 @@ cdef execute_task( return function(actor, *arguments, **kwarguments) async_function = sync_to_async(function) - coroutine = async_function(actor, *arguments, **kwarguments) - loop = core_worker.create_or_get_event_loop() - monitor_state = loop.monitor_state - monitor_state.register_coroutine(coroutine, - str(function.method)) - future = asyncio.run_coroutine_threadsafe(coroutine, loop) - - def callback(future): - task_done_event.Notify() - monitor_state.unregister_coroutine(coroutine) - - future.add_done_callback(callback) - core_worker.yield_current_fiber(task_done_event) - - return future.result() + return core_worker.run_async_func_in_event_loop( + async_function, actor, *arguments, **kwarguments) return function(actor, *arguments, **kwarguments) @@ -431,11 +417,11 @@ cdef execute_task( if core_worker.current_actor_is_asyncio(): # We deserialize objects in event loop thread to # prevent segfaults. See #7799 - def deserialize_args(): + async def deserialize_args(): return (ray.worker.global_worker .deserialize_objects( metadata_pairs, object_ids)) - args = core_worker.run_function_in_event_loop( + args = core_worker.run_async_func_in_event_loop( deserialize_args) else: args = ray.worker.global_worker.deserialize_objects( @@ -1198,10 +1184,6 @@ cdef class CoreWorker: from ray.experimental.async_api import init as plasma_async_init plasma_async_init() - # Create and attach the monitor object - monitor_state = AsyncMonitorState(self.async_event_loop) - self.async_event_loop.monitor_state = monitor_state - if self.async_thread is None: self.async_thread = threading.Thread( target=lambda: self.async_event_loop.run_forever(), @@ -1214,12 +1196,15 @@ cdef class CoreWorker: return self.async_event_loop - def run_function_in_event_loop(self, func): + def run_async_func_in_event_loop(self, func, *args, **kwargs): cdef: CFiberEvent event loop = self.create_or_get_event_loop() - coroutine = sync_to_async(func)() - future = asyncio.run_coroutine_threadsafe(coroutine, loop) + coroutine = func(*args, **kwargs) + if threading.get_ident() == self.async_thread.ident: + future = asyncio.ensure_future(coroutine, loop) + else: + future = asyncio.run_coroutine_threadsafe(coroutine, loop) future.add_done_callback(lambda _: event.Notify()) with nogil: (CCoreWorkerProcess.GetCoreWorker() @@ -1228,10 +1213,6 @@ cdef class CoreWorker: def destroy_event_loop_if_exists(self): if self.async_event_loop is not None: - # We should stop the monitor first because otherwise, - # loop.stop() will continue forever as monitor - # main loop will not be terminated. - self.async_event_loop.monitor_state.kill() self.async_event_loop.stop() if self.async_thread is not None: self.async_thread.join() diff --git a/python/ray/async_compat.py b/python/ray/async_compat.py index 836206369..ff1cd595a 100644 --- a/python/ray/async_compat.py +++ b/python/ray/async_compat.py @@ -3,9 +3,9 @@ This file should only be imported from Python 3. It will raise SyntaxError when importing from Python 2. """ import asyncio -from collections import namedtuple, Counter +from collections import namedtuple import time -import threading +import inspect import ray @@ -13,6 +13,9 @@ import ray def sync_to_async(func): """Convert a blocking function to async function""" + if inspect.iscoroutinefunction(func): + return func + async def wrapper(*args, **kwargs): return func(*args, **kwargs) @@ -95,44 +98,3 @@ def get_async(object_id): user_future.object_id = object_id return user_future - - -class AsyncMonitorState: - def __init__(self, loop): - self.names = dict() - self.names_lock = threading.Lock() - - self.sleep_time = 1.0 - self.monitor_loop_future = asyncio.ensure_future( - self.monitor(), loop=loop) - - async def monitor(self): - while True: - await asyncio.sleep(self.sleep_time) - all_tasks = self.get_all_task_names() - ray.show_in_webui( - str(len(all_tasks)), key="Number of concurrent task runing") - ray.show_in_webui( - str(dict(Counter(all_tasks))), key="Concurrent tasks") - - def register_coroutine(self, coro, name): - with self.names_lock: - self.names[coro] = name - - def unregister_coroutine(self, coro): - with self.names_lock: - self.names.pop(coro) - - def get_all_task_names(self): - names = [] - with self.names_lock: - names = list(self.names.values()) - return names - - def kill(self): - """Kill the monitor's loop - - This should be called in order to clean an event loop - that this monitor is running. - """ - self.monitor_loop_future.cancel()