mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 05:41:51 +08:00
UI improvement for asyncio (#6905)
This commit is contained in:
+18
-6
@@ -138,7 +138,8 @@ else:
|
||||
import cPickle as pickle
|
||||
|
||||
if PY3:
|
||||
from ray.async_compat import sync_to_async, AsyncGetResponse
|
||||
from ray.async_compat import (sync_to_async,
|
||||
AsyncGetResponse, AsyncMonitorState)
|
||||
|
||||
|
||||
def set_internal_config(dict options):
|
||||
@@ -609,10 +610,16 @@ cdef execute_task(
|
||||
|
||||
coroutine = async_function(actor, *arguments, **kwarguments)
|
||||
loop = core_worker.create_or_get_event_loop()
|
||||
|
||||
monitor_state = loop.monitor_state
|
||||
monitor_state.register_coroutine(coroutine,
|
||||
str(function.method))
|
||||
future = asyncio.run_coroutine_threadsafe(coroutine, loop)
|
||||
future.add_done_callback(
|
||||
lambda future: fiber_event.Notify())
|
||||
|
||||
def callback(future):
|
||||
fiber_event.Notify()
|
||||
monitor_state.unregister_coroutine(coroutine)
|
||||
|
||||
future.add_done_callback(callback)
|
||||
|
||||
with nogil:
|
||||
(core_worker.core_worker.get()
|
||||
@@ -814,8 +821,8 @@ cdef class CoreWorker:
|
||||
def get_actor_id(self):
|
||||
return ActorID(self.core_worker.get().GetActorId().Binary())
|
||||
|
||||
def set_webui_display(self, message):
|
||||
self.core_worker.get().SetWebuiDisplay(message)
|
||||
def set_webui_display(self, key, message):
|
||||
self.core_worker.get().SetWebuiDisplay(key, message)
|
||||
|
||||
def set_actor_title(self, title):
|
||||
self.core_worker.get().SetActorTitle(title)
|
||||
@@ -1187,6 +1194,11 @@ cdef class CoreWorker:
|
||||
# Delayed import due to async_api depends on _raylet.
|
||||
from ray.experimental.async_api import _async_init
|
||||
self.async_event_loop.run_until_complete(_async_init())
|
||||
|
||||
# Create and attach the monitor object
|
||||
monitor_state = AsyncMonitorState(self.async_event_loop)
|
||||
self.async_event_loop.monitor_state = monitor_state
|
||||
|
||||
if self.async_thread is None:
|
||||
self.async_thread = threading.Thread(
|
||||
target=lambda: self.async_event_loop.run_forever()
|
||||
|
||||
+9
-6
@@ -359,8 +359,7 @@ class ActorClass:
|
||||
is_direct_call=None,
|
||||
max_concurrency=None,
|
||||
name=None,
|
||||
detached=False,
|
||||
is_asyncio=False):
|
||||
detached=False):
|
||||
"""Create an actor.
|
||||
|
||||
This method allows more flexibility than the remote method because
|
||||
@@ -386,8 +385,6 @@ class ActorClass:
|
||||
name: The globally unique name for the actor.
|
||||
detached: Whether the actor should be kept alive after driver
|
||||
exits.
|
||||
is_asyncio: Turn on async actor calls. This only works with direct
|
||||
actor calls.
|
||||
|
||||
Returns:
|
||||
A handle to the newly created actor.
|
||||
@@ -398,6 +395,14 @@ class ActorClass:
|
||||
kwargs = {}
|
||||
if is_direct_call is None:
|
||||
is_direct_call = ray_constants.direct_call_enabled()
|
||||
|
||||
meta = self.__ray_metadata__
|
||||
actor_has_async_methods = len(
|
||||
inspect.getmembers(
|
||||
meta.modified_class,
|
||||
predicate=inspect.iscoroutinefunction)) > 0
|
||||
is_asyncio = actor_has_async_methods
|
||||
|
||||
if max_concurrency is None:
|
||||
if is_asyncio:
|
||||
max_concurrency = 1000
|
||||
@@ -419,8 +424,6 @@ class ActorClass:
|
||||
raise Exception("Actors cannot be created before ray.init() "
|
||||
"has been called.")
|
||||
|
||||
meta = self.__ray_metadata__
|
||||
|
||||
if detached and name is None:
|
||||
raise Exception("Detached actors must be named. "
|
||||
"Please use Actor._remote(name='some_name') "
|
||||
|
||||
@@ -3,8 +3,9 @@ This file should only be imported from Python 3.
|
||||
It will raise SyntaxError when importing from Python 2.
|
||||
"""
|
||||
import asyncio
|
||||
from collections import namedtuple
|
||||
from collections import namedtuple, Counter
|
||||
import time
|
||||
import threading
|
||||
|
||||
import ray
|
||||
|
||||
@@ -96,3 +97,35 @@ def get_async(object_id):
|
||||
user_future.object_id = object_id
|
||||
|
||||
return user_future
|
||||
|
||||
|
||||
class AsyncMonitorState:
|
||||
def __init__(self, loop):
|
||||
self.names = dict()
|
||||
self.names_lock = threading.Lock()
|
||||
|
||||
self.sleep_time = 1.0
|
||||
asyncio.ensure_future(self.monitor(), loop=loop)
|
||||
|
||||
async def monitor(self):
|
||||
while True:
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
all_tasks = self.get_all_task_names()
|
||||
ray.show_in_webui(
|
||||
str(len(all_tasks)), key="Number of concurrent task runing")
|
||||
ray.show_in_webui(
|
||||
str(dict(Counter(all_tasks))), key="Concurrent tasks")
|
||||
|
||||
def register_coroutine(self, coro, name):
|
||||
with self.names_lock:
|
||||
self.names[coro] = name
|
||||
|
||||
def unregister_coroutine(self, coro):
|
||||
with self.names_lock:
|
||||
self.names.pop(coro)
|
||||
|
||||
def get_all_task_names(self):
|
||||
names = []
|
||||
with self.names_lock:
|
||||
names = list(self.names.values())
|
||||
return names
|
||||
|
||||
@@ -120,7 +120,7 @@ export interface RayletInfoResponse {
|
||||
usedResources: { [key: string]: number };
|
||||
currentTaskDesc?: string;
|
||||
numPendingTasks?: number;
|
||||
webuiDisplay?: string;
|
||||
webuiDisplay?: Record<string, string>;
|
||||
}
|
||||
| {
|
||||
actorId: string;
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import Collapse from "@material-ui/core/Collapse";
|
||||
import { Theme } from "@material-ui/core/styles/createMuiTheme";
|
||||
import createStyles from "@material-ui/core/styles/createStyles";
|
||||
import withStyles, { WithStyles } from "@material-ui/core/styles/withStyles";
|
||||
@@ -12,6 +11,7 @@ import {
|
||||
RayletInfoResponse
|
||||
} from "../../../api";
|
||||
import Actors from "./Actors";
|
||||
import Collapse from "@material-ui/core/Collapse";
|
||||
|
||||
const styles = (theme: Theme) =>
|
||||
createStyles({
|
||||
@@ -171,6 +171,39 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
|
||||
}
|
||||
];
|
||||
|
||||
// Construct the custom message from the actor.
|
||||
let actorCustomDisplay: JSX.Element[] = [];
|
||||
if (actor.state !== -1 && actor.webuiDisplay) {
|
||||
actorCustomDisplay = Object.keys(actor.webuiDisplay)
|
||||
.sort()
|
||||
.map((key, _, __) => {
|
||||
// Construct the value from actor.
|
||||
// Please refer to worker.py::show_in_webui for schema.
|
||||
const valueEncoded = actor.webuiDisplay![key];
|
||||
const valueParsed = JSON.parse(valueEncoded);
|
||||
let valueRendered = valueParsed["message"];
|
||||
if (valueParsed["dtype"] === "html") {
|
||||
valueRendered = (
|
||||
<div dangerouslySetInnerHTML={{ __html: valueRendered }}></div>
|
||||
);
|
||||
}
|
||||
|
||||
if (key === "") {
|
||||
return (
|
||||
<Typography className={classes.webuiDisplay}>
|
||||
{valueRendered}
|
||||
</Typography>
|
||||
);
|
||||
} else {
|
||||
return (
|
||||
<Typography className={classes.webuiDisplay}>
|
||||
{key}: {valueRendered}
|
||||
</Typography>
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return (
|
||||
<div className={classes.root}>
|
||||
<Typography className={classes.title}>
|
||||
@@ -249,11 +282,10 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
|
||||
</Typography>
|
||||
{actor.state !== -1 && (
|
||||
<React.Fragment>
|
||||
{actor.webuiDisplay && (
|
||||
<Typography className={classes.webuiDisplay}>
|
||||
{actor.webuiDisplay}
|
||||
</Typography>
|
||||
{actorCustomDisplay.length > 0 && (
|
||||
<React.Fragment>{actorCustomDisplay}</React.Fragment>
|
||||
)}
|
||||
|
||||
<Collapse in={expanded}>
|
||||
<Actors actors={actor.children} />
|
||||
</Collapse>
|
||||
|
||||
@@ -94,13 +94,15 @@ def init():
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
assert loop._thread_id != threading.get_ident(), (
|
||||
"You are using async_api inside a running event loop. "
|
||||
"Please call `await _async_init()` to initialize inside "
|
||||
"asynchrounous context.")
|
||||
# If the loop is runing outside current thread, we actually need
|
||||
# to do this to make sure the context is initialized.
|
||||
asyncio.run_coroutine_threadsafe(_async_init(), loop=loop)
|
||||
if loop._thread_id != threading.get_ident():
|
||||
# If the loop is runing outside current thread, we actually need
|
||||
# to do this to make sure the context is initialized.
|
||||
asyncio.run_coroutine_threadsafe(_async_init(), loop=loop)
|
||||
else:
|
||||
async_init_done = asyncio.get_event_loop().create_task(
|
||||
_async_init())
|
||||
# Block until the async init finishes.
|
||||
async_init_done.done()
|
||||
else:
|
||||
asyncio.get_event_loop().run_until_complete(_async_init())
|
||||
|
||||
|
||||
@@ -113,8 +113,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
|
||||
CJobID GetCurrentJobId()
|
||||
CTaskID GetCurrentTaskId()
|
||||
const CActorID &GetActorId()
|
||||
void SetWebuiDisplay(const c_string &message)
|
||||
void SetActorTitle(const c_string &title)
|
||||
void SetWebuiDisplay(const c_string &key, const c_string &message)
|
||||
CTaskID GetCallerId()
|
||||
const ResourceMappingType &GetResourceIDs() const
|
||||
CActorID DeserializeAndRegisterActorHandle(const c_string &bytes)
|
||||
|
||||
@@ -110,7 +110,7 @@ def test_asyncio_actor(ray_start_regular_shared):
|
||||
await self.event.wait()
|
||||
return sorted(self.batch)
|
||||
|
||||
a = AsyncBatcher.options(is_direct_call=True, is_asyncio=True).remote()
|
||||
a = AsyncBatcher.options(is_direct_call=True).remote()
|
||||
x1 = a.add.remote(1)
|
||||
x2 = a.add.remote(2)
|
||||
x3 = a.add.remote(3)
|
||||
@@ -130,7 +130,7 @@ def test_asyncio_actor_same_thread(ray_start_regular_shared):
|
||||
async def async_thread_id(self):
|
||||
return threading.current_thread().ident
|
||||
|
||||
a = Actor.options(is_direct_call=True, is_asyncio=True).remote()
|
||||
a = Actor.options(is_direct_call=True).remote()
|
||||
sync_id, async_id = ray.get(
|
||||
[a.sync_thread_id.remote(),
|
||||
a.async_thread_id.remote()])
|
||||
@@ -154,8 +154,7 @@ def test_asyncio_actor_concurrency(ray_start_regular_shared):
|
||||
|
||||
num_calls = 10
|
||||
|
||||
a = RecordOrder.options(
|
||||
is_direct_call=True, max_concurrency=1, is_asyncio=True).remote()
|
||||
a = RecordOrder.options(is_direct_call=True, max_concurrency=1).remote()
|
||||
ray.get([a.do_work.remote() for _ in range(num_calls)])
|
||||
history = ray.get(a.get_history.remote())
|
||||
|
||||
@@ -189,8 +188,7 @@ def test_asyncio_actor_high_concurrency(ray_start_regular_shared):
|
||||
|
||||
batch_size = sys.getrecursionlimit() * 4
|
||||
actor = AsyncConcurrencyBatcher.options(
|
||||
is_asyncio=True, max_concurrency=batch_size * 2,
|
||||
is_direct_call=True).remote(batch_size)
|
||||
max_concurrency=batch_size * 2, is_direct_call=True).remote(batch_size)
|
||||
result = ray.get([actor.add.remote(i) for i in range(batch_size)])
|
||||
assert result[0] == list(range(batch_size))
|
||||
assert result[-1] == list(range(batch_size))
|
||||
@@ -264,6 +262,6 @@ def test_asyncio_actor_async_get(ray_start_regular_shared):
|
||||
async def plasma_get(self):
|
||||
return await plasma_object
|
||||
|
||||
getter = AsyncGetter.options(is_asyncio=True).remote()
|
||||
getter = AsyncGetter.options().remote()
|
||||
assert ray.get(getter.get.remote()) == 1
|
||||
assert ray.get(getter.plasma_get.remote()) == 2
|
||||
|
||||
@@ -61,11 +61,11 @@ def test_worker_stats(shutdown_only):
|
||||
target_worker_present = False
|
||||
for worker in reply.workers_stats:
|
||||
stats = worker.core_worker_stats
|
||||
if stats.webui_display == "test":
|
||||
if stats.webui_display[""] == '{"message": "test", "dtype": "text"}':
|
||||
target_worker_present = True
|
||||
assert worker.pid == worker_pid
|
||||
else:
|
||||
assert stats.webui_display == ""
|
||||
assert stats.webui_display[""] == "" # Empty proto
|
||||
assert target_worker_present
|
||||
|
||||
# Test show_in_webui for remote actors.
|
||||
@@ -75,11 +75,11 @@ def test_worker_stats(shutdown_only):
|
||||
target_worker_present = False
|
||||
for worker in reply.workers_stats:
|
||||
stats = worker.core_worker_stats
|
||||
if stats.webui_display == "test":
|
||||
if stats.webui_display[""] == '{"message": "test", "dtype": "text"}':
|
||||
target_worker_present = True
|
||||
assert worker.pid == worker_pid
|
||||
else:
|
||||
assert stats.webui_display == ""
|
||||
assert stats.webui_display[""] == "" # Empty proto
|
||||
assert target_worker_present
|
||||
|
||||
timeout_seconds = 20
|
||||
|
||||
+15
-2
@@ -1422,7 +1422,7 @@ def register_custom_serializer(cls,
|
||||
class_id=class_id)
|
||||
|
||||
|
||||
def show_in_webui(message):
|
||||
def show_in_webui(message, key="", dtype="text"):
|
||||
"""Display message in dashboard.
|
||||
|
||||
Display message for the current task or actor in the dashboard.
|
||||
@@ -1431,10 +1431,23 @@ def show_in_webui(message):
|
||||
|
||||
Args:
|
||||
message (str): Message to be displayed.
|
||||
key (str): The key name for the message. Multiple message under
|
||||
different keys will be displayed at the same time. Messages
|
||||
under the same key will be overriden.
|
||||
data_type (str): The type of message for rendering. One of the
|
||||
following: text, html.
|
||||
"""
|
||||
worker = global_worker
|
||||
worker.check_connected()
|
||||
worker.core_worker.set_webui_display(message.encode())
|
||||
|
||||
acceptable_dtypes = {"text", "html"}
|
||||
assert dtype in acceptable_dtypes, "dtype accepts only: {}".format(
|
||||
acceptable_dtypes)
|
||||
|
||||
message_wrapped = {"message": message, "dtype": dtype}
|
||||
message_encoded = json.dumps(message_wrapped).encode()
|
||||
|
||||
worker.core_worker.set_webui_display(key.encode(), message_encoded)
|
||||
|
||||
|
||||
def get(object_ids, timeout=None):
|
||||
|
||||
Reference in New Issue
Block a user