mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 16:13:54 +08:00
[Async] Remove Monitor + Cleanup Code (#8691)
This commit is contained in:
+11
-30
@@ -82,8 +82,7 @@ from ray.includes.ray_config cimport RayConfig
|
||||
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
|
||||
|
||||
import ray
|
||||
from ray.async_compat import (sync_to_async,
|
||||
AsyncGetResponse, AsyncMonitorState)
|
||||
from ray.async_compat import (sync_to_async, AsyncGetResponse)
|
||||
import ray.memory_monitor as memory_monitor
|
||||
import ray.ray_constants as ray_constants
|
||||
from ray import profiling
|
||||
@@ -395,21 +394,8 @@ cdef execute_task(
|
||||
return function(actor, *arguments, **kwarguments)
|
||||
async_function = sync_to_async(function)
|
||||
|
||||
coroutine = async_function(actor, *arguments, **kwarguments)
|
||||
loop = core_worker.create_or_get_event_loop()
|
||||
monitor_state = loop.monitor_state
|
||||
monitor_state.register_coroutine(coroutine,
|
||||
str(function.method))
|
||||
future = asyncio.run_coroutine_threadsafe(coroutine, loop)
|
||||
|
||||
def callback(future):
|
||||
task_done_event.Notify()
|
||||
monitor_state.unregister_coroutine(coroutine)
|
||||
|
||||
future.add_done_callback(callback)
|
||||
core_worker.yield_current_fiber(task_done_event)
|
||||
|
||||
return future.result()
|
||||
return core_worker.run_async_func_in_event_loop(
|
||||
async_function, actor, *arguments, **kwarguments)
|
||||
|
||||
return function(actor, *arguments, **kwarguments)
|
||||
|
||||
@@ -431,11 +417,11 @@ cdef execute_task(
|
||||
if core_worker.current_actor_is_asyncio():
|
||||
# We deserialize objects in event loop thread to
|
||||
# prevent segfaults. See #7799
|
||||
def deserialize_args():
|
||||
async def deserialize_args():
|
||||
return (ray.worker.global_worker
|
||||
.deserialize_objects(
|
||||
metadata_pairs, object_ids))
|
||||
args = core_worker.run_function_in_event_loop(
|
||||
args = core_worker.run_async_func_in_event_loop(
|
||||
deserialize_args)
|
||||
else:
|
||||
args = ray.worker.global_worker.deserialize_objects(
|
||||
@@ -1198,10 +1184,6 @@ cdef class CoreWorker:
|
||||
from ray.experimental.async_api import init as plasma_async_init
|
||||
plasma_async_init()
|
||||
|
||||
# Create and attach the monitor object
|
||||
monitor_state = AsyncMonitorState(self.async_event_loop)
|
||||
self.async_event_loop.monitor_state = monitor_state
|
||||
|
||||
if self.async_thread is None:
|
||||
self.async_thread = threading.Thread(
|
||||
target=lambda: self.async_event_loop.run_forever(),
|
||||
@@ -1214,12 +1196,15 @@ cdef class CoreWorker:
|
||||
|
||||
return self.async_event_loop
|
||||
|
||||
def run_function_in_event_loop(self, func):
|
||||
def run_async_func_in_event_loop(self, func, *args, **kwargs):
|
||||
cdef:
|
||||
CFiberEvent event
|
||||
loop = self.create_or_get_event_loop()
|
||||
coroutine = sync_to_async(func)()
|
||||
future = asyncio.run_coroutine_threadsafe(coroutine, loop)
|
||||
coroutine = func(*args, **kwargs)
|
||||
if threading.get_ident() == self.async_thread.ident:
|
||||
future = asyncio.ensure_future(coroutine, loop)
|
||||
else:
|
||||
future = asyncio.run_coroutine_threadsafe(coroutine, loop)
|
||||
future.add_done_callback(lambda _: event.Notify())
|
||||
with nogil:
|
||||
(CCoreWorkerProcess.GetCoreWorker()
|
||||
@@ -1228,10 +1213,6 @@ cdef class CoreWorker:
|
||||
|
||||
def destroy_event_loop_if_exists(self):
|
||||
if self.async_event_loop is not None:
|
||||
# We should stop the monitor first because otherwise,
|
||||
# loop.stop() will continue forever as monitor
|
||||
# main loop will not be terminated.
|
||||
self.async_event_loop.monitor_state.kill()
|
||||
self.async_event_loop.stop()
|
||||
if self.async_thread is not None:
|
||||
self.async_thread.join()
|
||||
|
||||
@@ -3,9 +3,9 @@ This file should only be imported from Python 3.
|
||||
It will raise SyntaxError when importing from Python 2.
|
||||
"""
|
||||
import asyncio
|
||||
from collections import namedtuple, Counter
|
||||
from collections import namedtuple
|
||||
import time
|
||||
import threading
|
||||
import inspect
|
||||
|
||||
import ray
|
||||
|
||||
@@ -13,6 +13,9 @@ import ray
|
||||
def sync_to_async(func):
|
||||
"""Convert a blocking function to async function"""
|
||||
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return func
|
||||
|
||||
async def wrapper(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
@@ -95,44 +98,3 @@ def get_async(object_id):
|
||||
user_future.object_id = object_id
|
||||
|
||||
return user_future
|
||||
|
||||
|
||||
class AsyncMonitorState:
|
||||
def __init__(self, loop):
|
||||
self.names = dict()
|
||||
self.names_lock = threading.Lock()
|
||||
|
||||
self.sleep_time = 1.0
|
||||
self.monitor_loop_future = asyncio.ensure_future(
|
||||
self.monitor(), loop=loop)
|
||||
|
||||
async def monitor(self):
|
||||
while True:
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
all_tasks = self.get_all_task_names()
|
||||
ray.show_in_webui(
|
||||
str(len(all_tasks)), key="Number of concurrent task runing")
|
||||
ray.show_in_webui(
|
||||
str(dict(Counter(all_tasks))), key="Concurrent tasks")
|
||||
|
||||
def register_coroutine(self, coro, name):
|
||||
with self.names_lock:
|
||||
self.names[coro] = name
|
||||
|
||||
def unregister_coroutine(self, coro):
|
||||
with self.names_lock:
|
||||
self.names.pop(coro)
|
||||
|
||||
def get_all_task_names(self):
|
||||
names = []
|
||||
with self.names_lock:
|
||||
names = list(self.names.values())
|
||||
return names
|
||||
|
||||
def kill(self):
|
||||
"""Kill the monitor's loop
|
||||
|
||||
This should be called in order to clean an event loop
|
||||
that this monitor is running.
|
||||
"""
|
||||
self.monitor_loop_future.cancel()
|
||||
|
||||
Reference in New Issue
Block a user