diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 973f778d8..aad636723 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -138,7 +138,8 @@ else: import cPickle as pickle if PY3: - from ray.async_compat import sync_to_async, AsyncGetResponse + from ray.async_compat import (sync_to_async, + AsyncGetResponse, AsyncMonitorState) def set_internal_config(dict options): @@ -609,10 +610,16 @@ cdef execute_task( coroutine = async_function(actor, *arguments, **kwarguments) loop = core_worker.create_or_get_event_loop() - + monitor_state = loop.monitor_state + monitor_state.register_coroutine(coroutine, + str(function.method)) future = asyncio.run_coroutine_threadsafe(coroutine, loop) - future.add_done_callback( - lambda future: fiber_event.Notify()) + + def callback(future): + fiber_event.Notify() + monitor_state.unregister_coroutine(coroutine) + + future.add_done_callback(callback) with nogil: (core_worker.core_worker.get() @@ -814,8 +821,8 @@ cdef class CoreWorker: def get_actor_id(self): return ActorID(self.core_worker.get().GetActorId().Binary()) - def set_webui_display(self, message): - self.core_worker.get().SetWebuiDisplay(message) + def set_webui_display(self, key, message): + self.core_worker.get().SetWebuiDisplay(key, message) def set_actor_title(self, title): self.core_worker.get().SetActorTitle(title) @@ -1187,6 +1194,11 @@ cdef class CoreWorker: # Delayed import due to async_api depends on _raylet. from ray.experimental.async_api import _async_init self.async_event_loop.run_until_complete(_async_init()) + + # Create and attach the monitor object + monitor_state = AsyncMonitorState(self.async_event_loop) + self.async_event_loop.monitor_state = monitor_state + if self.async_thread is None: self.async_thread = threading.Thread( target=lambda: self.async_event_loop.run_forever() diff --git a/python/ray/actor.py b/python/ray/actor.py index 0223c2ef7..1d9ef73d7 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -359,8 +359,7 @@ class ActorClass: is_direct_call=None, max_concurrency=None, name=None, - detached=False, - is_asyncio=False): + detached=False): """Create an actor. This method allows more flexibility than the remote method because @@ -386,8 +385,6 @@ class ActorClass: name: The globally unique name for the actor. detached: Whether the actor should be kept alive after driver exits. - is_asyncio: Turn on async actor calls. This only works with direct - actor calls. Returns: A handle to the newly created actor. @@ -398,6 +395,14 @@ class ActorClass: kwargs = {} if is_direct_call is None: is_direct_call = ray_constants.direct_call_enabled() + + meta = self.__ray_metadata__ + actor_has_async_methods = len( + inspect.getmembers( + meta.modified_class, + predicate=inspect.iscoroutinefunction)) > 0 + is_asyncio = actor_has_async_methods + if max_concurrency is None: if is_asyncio: max_concurrency = 1000 @@ -419,8 +424,6 @@ class ActorClass: raise Exception("Actors cannot be created before ray.init() " "has been called.") - meta = self.__ray_metadata__ - if detached and name is None: raise Exception("Detached actors must be named. " "Please use Actor._remote(name='some_name') " diff --git a/python/ray/async_compat.py b/python/ray/async_compat.py index 143eb1c88..23540e23c 100644 --- a/python/ray/async_compat.py +++ b/python/ray/async_compat.py @@ -3,8 +3,9 @@ This file should only be imported from Python 3. It will raise SyntaxError when importing from Python 2. """ import asyncio -from collections import namedtuple +from collections import namedtuple, Counter import time +import threading import ray @@ -96,3 +97,35 @@ def get_async(object_id): user_future.object_id = object_id return user_future + + +class AsyncMonitorState: + def __init__(self, loop): + self.names = dict() + self.names_lock = threading.Lock() + + self.sleep_time = 1.0 + asyncio.ensure_future(self.monitor(), loop=loop) + + async def monitor(self): + while True: + await asyncio.sleep(self.sleep_time) + all_tasks = self.get_all_task_names() + ray.show_in_webui( + str(len(all_tasks)), key="Number of concurrent task runing") + ray.show_in_webui( + str(dict(Counter(all_tasks))), key="Concurrent tasks") + + def register_coroutine(self, coro, name): + with self.names_lock: + self.names[coro] = name + + def unregister_coroutine(self, coro): + with self.names_lock: + self.names.pop(coro) + + def get_all_task_names(self): + names = [] + with self.names_lock: + names = list(self.names.values()) + return names diff --git a/python/ray/dashboard/client/src/api.ts b/python/ray/dashboard/client/src/api.ts index 1719da764..05fa47b18 100644 --- a/python/ray/dashboard/client/src/api.ts +++ b/python/ray/dashboard/client/src/api.ts @@ -120,7 +120,7 @@ export interface RayletInfoResponse { usedResources: { [key: string]: number }; currentTaskDesc?: string; numPendingTasks?: number; - webuiDisplay?: string; + webuiDisplay?: Record; } | { actorId: string; diff --git a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx index 2475aed14..160362e30 100644 --- a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx +++ b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx @@ -1,4 +1,3 @@ -import Collapse from "@material-ui/core/Collapse"; import { Theme } from "@material-ui/core/styles/createMuiTheme"; import createStyles from "@material-ui/core/styles/createStyles"; import withStyles, { WithStyles } from "@material-ui/core/styles/withStyles"; @@ -12,6 +11,7 @@ import { RayletInfoResponse } from "../../../api"; import Actors from "./Actors"; +import Collapse from "@material-ui/core/Collapse"; const styles = (theme: Theme) => createStyles({ @@ -171,6 +171,39 @@ class Actor extends React.Component, State> { } ]; + // Construct the custom message from the actor. + let actorCustomDisplay: JSX.Element[] = []; + if (actor.state !== -1 && actor.webuiDisplay) { + actorCustomDisplay = Object.keys(actor.webuiDisplay) + .sort() + .map((key, _, __) => { + // Construct the value from actor. + // Please refer to worker.py::show_in_webui for schema. + const valueEncoded = actor.webuiDisplay![key]; + const valueParsed = JSON.parse(valueEncoded); + let valueRendered = valueParsed["message"]; + if (valueParsed["dtype"] === "html") { + valueRendered = ( +
+ ); + } + + if (key === "") { + return ( + +     {valueRendered} + + ); + } else { + return ( + +     {key}: {valueRendered} + + ); + } + }); + } + return (
@@ -249,11 +282,10 @@ class Actor extends React.Component, State> { {actor.state !== -1 && ( - {actor.webuiDisplay && ( - - {actor.webuiDisplay} - + {actorCustomDisplay.length > 0 && ( + {actorCustomDisplay} )} + diff --git a/python/ray/experimental/async_api.py b/python/ray/experimental/async_api.py index 5cbbc929e..0d10cca1d 100644 --- a/python/ray/experimental/async_api.py +++ b/python/ray/experimental/async_api.py @@ -94,13 +94,15 @@ def init(): loop = asyncio.get_event_loop() if loop.is_running(): - assert loop._thread_id != threading.get_ident(), ( - "You are using async_api inside a running event loop. " - "Please call `await _async_init()` to initialize inside " - "asynchrounous context.") - # If the loop is runing outside current thread, we actually need - # to do this to make sure the context is initialized. - asyncio.run_coroutine_threadsafe(_async_init(), loop=loop) + if loop._thread_id != threading.get_ident(): + # If the loop is runing outside current thread, we actually need + # to do this to make sure the context is initialized. + asyncio.run_coroutine_threadsafe(_async_init(), loop=loop) + else: + async_init_done = asyncio.get_event_loop().create_task( + _async_init()) + # Block until the async init finishes. + async_init_done.done() else: asyncio.get_event_loop().run_until_complete(_async_init()) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 09d826381..49aa706fb 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -113,8 +113,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CJobID GetCurrentJobId() CTaskID GetCurrentTaskId() const CActorID &GetActorId() - void SetWebuiDisplay(const c_string &message) void SetActorTitle(const c_string &title) + void SetWebuiDisplay(const c_string &key, const c_string &message) CTaskID GetCallerId() const ResourceMappingType &GetResourceIDs() const CActorID DeserializeAndRegisterActorHandle(const c_string &bytes) diff --git a/python/ray/tests/py3_test.py b/python/ray/tests/py3_test.py index 02700057c..4ebfad1f2 100644 --- a/python/ray/tests/py3_test.py +++ b/python/ray/tests/py3_test.py @@ -110,7 +110,7 @@ def test_asyncio_actor(ray_start_regular_shared): await self.event.wait() return sorted(self.batch) - a = AsyncBatcher.options(is_direct_call=True, is_asyncio=True).remote() + a = AsyncBatcher.options(is_direct_call=True).remote() x1 = a.add.remote(1) x2 = a.add.remote(2) x3 = a.add.remote(3) @@ -130,7 +130,7 @@ def test_asyncio_actor_same_thread(ray_start_regular_shared): async def async_thread_id(self): return threading.current_thread().ident - a = Actor.options(is_direct_call=True, is_asyncio=True).remote() + a = Actor.options(is_direct_call=True).remote() sync_id, async_id = ray.get( [a.sync_thread_id.remote(), a.async_thread_id.remote()]) @@ -154,8 +154,7 @@ def test_asyncio_actor_concurrency(ray_start_regular_shared): num_calls = 10 - a = RecordOrder.options( - is_direct_call=True, max_concurrency=1, is_asyncio=True).remote() + a = RecordOrder.options(is_direct_call=True, max_concurrency=1).remote() ray.get([a.do_work.remote() for _ in range(num_calls)]) history = ray.get(a.get_history.remote()) @@ -189,8 +188,7 @@ def test_asyncio_actor_high_concurrency(ray_start_regular_shared): batch_size = sys.getrecursionlimit() * 4 actor = AsyncConcurrencyBatcher.options( - is_asyncio=True, max_concurrency=batch_size * 2, - is_direct_call=True).remote(batch_size) + max_concurrency=batch_size * 2, is_direct_call=True).remote(batch_size) result = ray.get([actor.add.remote(i) for i in range(batch_size)]) assert result[0] == list(range(batch_size)) assert result[-1] == list(range(batch_size)) @@ -264,6 +262,6 @@ def test_asyncio_actor_async_get(ray_start_regular_shared): async def plasma_get(self): return await plasma_object - getter = AsyncGetter.options(is_asyncio=True).remote() + getter = AsyncGetter.options().remote() assert ray.get(getter.get.remote()) == 1 assert ray.get(getter.plasma_get.remote()) == 2 diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index d9ef78857..6e1bf1a23 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -61,11 +61,11 @@ def test_worker_stats(shutdown_only): target_worker_present = False for worker in reply.workers_stats: stats = worker.core_worker_stats - if stats.webui_display == "test": + if stats.webui_display[""] == '{"message": "test", "dtype": "text"}': target_worker_present = True assert worker.pid == worker_pid else: - assert stats.webui_display == "" + assert stats.webui_display[""] == "" # Empty proto assert target_worker_present # Test show_in_webui for remote actors. @@ -75,11 +75,11 @@ def test_worker_stats(shutdown_only): target_worker_present = False for worker in reply.workers_stats: stats = worker.core_worker_stats - if stats.webui_display == "test": + if stats.webui_display[""] == '{"message": "test", "dtype": "text"}': target_worker_present = True assert worker.pid == worker_pid else: - assert stats.webui_display == "" + assert stats.webui_display[""] == "" # Empty proto assert target_worker_present timeout_seconds = 20 diff --git a/python/ray/worker.py b/python/ray/worker.py index e292fb1f9..22e5a1e69 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1422,7 +1422,7 @@ def register_custom_serializer(cls, class_id=class_id) -def show_in_webui(message): +def show_in_webui(message, key="", dtype="text"): """Display message in dashboard. Display message for the current task or actor in the dashboard. @@ -1431,10 +1431,23 @@ def show_in_webui(message): Args: message (str): Message to be displayed. + key (str): The key name for the message. Multiple message under + different keys will be displayed at the same time. Messages + under the same key will be overriden. + data_type (str): The type of message for rendering. One of the + following: text, html. """ worker = global_worker worker.check_connected() - worker.core_worker.set_webui_display(message.encode()) + + acceptable_dtypes = {"text", "html"} + assert dtype in acceptable_dtypes, "dtype accepts only: {}".format( + acceptable_dtypes) + + message_wrapped = {"message": message, "dtype": dtype} + message_encoded = json.dumps(message_wrapped).encode() + + worker.core_worker.set_webui_display(key.encode(), message_encoded) def get(object_ids, timeout=None): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index edc205a42..704528cbf 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1188,8 +1188,11 @@ void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest & } (*used_resources_map)[it.first] = quantity; } - stats->set_webui_display(webui_display_); stats->set_actor_title(actor_title_); + google::protobuf::Map webui_map(webui_display_.begin(), + webui_display_.end()); + (*stats->mutable_webui_display()) = webui_map; + MemoryStoreStats memory_store_stats = memory_store_->GetMemoryStoreStatisticalData(); stats->set_num_local_objects(memory_store_stats.num_local_objects); stats->set_used_object_store_memory(memory_store_stats.used_object_store_memory); @@ -1221,9 +1224,9 @@ void CoreWorker::SetActorId(const ActorID &actor_id) { actor_id_ = actor_id; } -void CoreWorker::SetWebuiDisplay(const std::string &message) { +void CoreWorker::SetWebuiDisplay(const std::string &key, const std::string &message) { absl::MutexLock lock(&mutex_); - webui_display_ = message; + webui_display_[key] = message; } void CoreWorker::SetActorTitle(const std::string &title) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 57b833759..88f518a1f 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -97,7 +97,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void SetActorId(const ActorID &actor_id); - void SetWebuiDisplay(const std::string &message); + void SetWebuiDisplay(const std::string &key, const std::string &message); void SetActorTitle(const std::string &title); @@ -656,8 +656,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// we cannot access the thread-local worker contexts from GetCoreWorkerStats() TaskSpecification current_task_ GUARDED_BY(mutex_); - /// String to be displayed on Web UI. - std::string webui_display_ GUARDED_BY(mutex_); + /// Key value pairs to be displayed on Web UI. + std::unordered_map webui_display_ GUARDED_BY(mutex_); /// Actor title that consists of class name, args, kwargs for actor construction. std::string actor_title_ GUARDED_BY(mutex_); diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index b1d9db3c0..d6ed85bba 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -205,7 +205,7 @@ message CoreWorkerStats { // A map from the resource name (e.g. "CPU") to the amount of resource used. map used_resources = 9; // A string displayed on Dashboard. - string webui_display = 10; + map webui_display = 10; // Number of objects stored in local memory. int32 num_local_objects = 11; // Used local object store memory.