diff --git a/python/ray/experimental/serve/__init__.py b/python/ray/experimental/serve/__init__.py index 9b14959b5..db1f5d9f1 100644 --- a/python/ray/experimental/serve/__init__.py +++ b/python/ray/experimental/serve/__init__.py @@ -2,11 +2,11 @@ import sys if sys.version_info < (3, 0): raise ImportError("serve is Python 3 only.") -from ray.experimental.serve.api import (init, create_backend, create_endpoint, - link, split, rollback, get_handle, - global_state, scale) # noqa: E402 +from ray.experimental.serve.api import ( + init, create_backend, create_endpoint, link, split, rollback, get_handle, + global_state, stat, scale) # noqa: E402 __all__ = [ "init", "create_backend", "create_endpoint", "link", "split", "rollback", - "get_handle", "global_state", "scale" + "get_handle", "global_state", "stat", "scale" ] diff --git a/python/ray/experimental/serve/api.py b/python/ray/experimental/serve/api.py index a1ae67b01..14ed208cf 100644 --- a/python/ray/experimental/serve/api.py +++ b/python/ray/experimental/serve/api.py @@ -10,7 +10,7 @@ from ray.experimental.serve.global_state import GlobalState global_state = GlobalState() -def init(blocking=False, object_store_memory=int(1e8)): +def init(blocking=False, object_store_memory=int(1e8), gc_window_seconds=3600): """Initialize a serve cluster. Calling `ray.init` before `serve.init` is optional. When there is not a ray @@ -19,22 +19,30 @@ def init(blocking=False, object_store_memory=int(1e8)): Args: blocking (bool): If true, the function will wait for the HTTP server to - be healthy before returns. + be healthy, and other components to be ready before returns. object_store_memory (int): Allocated shared memory size in bytes. The default is 100MiB. The default is kept low for latency stability reason. + gc_window_seconds(int): How long will we keep the metric data in + memory. Data older than the gc_window will be deleted. The default + is 3600 seconds, which is 1 hour. """ if not ray.is_initialized(): ray.init(object_store_memory=object_store_memory) # NOTE(simon): Currently the initialization order is fixed. # HTTP server depends on the API server. + # Metric monitor depends on the router. global_state.init_api_server() global_state.init_router() global_state.init_http_server() + global_state.init_metric_monitor() if blocking: global_state.wait_until_http_ready() + ray.get(global_state.router_actor_handle.is_ready.remote()) + ray.get(global_state.kv_store_actor_handle.is_ready.remote()) + ray.get(global_state.metric_monitor_handle.is_ready.remote()) def create_endpoint(endpoint_name, route_expression, blocking=True): @@ -103,6 +111,7 @@ def _start_replica(backend_tag): runner._ray_serve_main_loop.remote(runner) global_state.backend_replicas[backend_tag].append(runner) + global_state.metric_monitor_handle.add_target.remote(runner) def _remove_replica(backend_tag): @@ -114,6 +123,9 @@ def _remove_replica(backend_tag): replicas = global_state.backend_replicas[backend_tag] oldest_replica_handle = replicas.popleft() + + global_state.metric_monitor_handle.remove_target.remote( + oldest_replica_handle) # explicitly terminate that actor del oldest_replica_handle @@ -236,3 +248,19 @@ def get_handle(endpoint_name): from ray.experimental.serve.handle import RayServeHandle return RayServeHandle(global_state.router_actor_handle, endpoint_name) + + +def stat(percentiles=[50, 90, 95], + agg_windows_seconds=[10, 60, 300, 600, 3600]): + """Retrieve metric statistics about ray serve system. + + Args: + percentiles(List[int]): The percentiles for aggregation operations. + Default is 50th, 90th, 95th percentile. + agg_windows_seconds(List[int]): The aggregation windows in seconds. + The longest aggregation window must be shorter or equal to the + gc_window_seconds. + """ + return ray.get( + global_state.metric_monitor_handle.collect.remote( + percentiles, agg_windows_seconds)) diff --git a/python/ray/experimental/serve/examples/echo_full.py b/python/ray/experimental/serve/examples/echo_full.py index ffbfa9608..f8e939914 100644 --- a/python/ray/experimental/serve/examples/echo_full.py +++ b/python/ray/experimental/serve/examples/echo_full.py @@ -4,6 +4,7 @@ Full example of ray.serve module import ray import ray.experimental.serve as serve +from ray.experimental.serve.utils import pformat_color_json import requests import time @@ -56,3 +57,6 @@ for _ in range(10): # You can also scale each backend independently. serve.scale("echo:v1", 2) serve.scale("echo:v2", 2) + +# As well as retrieving relevant system metrics +print(pformat_color_json(serve.stat())) diff --git a/python/ray/experimental/serve/global_state.py b/python/ray/experimental/serve/global_state.py index 780d93b51..f85da82d5 100644 --- a/python/ray/experimental/serve/global_state.py +++ b/python/ray/experimental/serve/global_state.py @@ -8,6 +8,8 @@ from ray.experimental.serve.kv_store_service import KVStoreProxyActor from ray.experimental.serve.queues import CentralizedQueuesActor from ray.experimental.serve.utils import logger from ray.experimental.serve.server import HTTPActor +from ray.experimental.serve.metric import (MetricMonitor, + start_metric_monitor_loop) # TODO(simon): Global state currently is designed to resides in the driver # process. In the next iteration, we will move all mutable states into @@ -53,6 +55,9 @@ class GlobalState: # use random/available port in a pre-defined port range. TODO(simon) self.http_address = "" + #: Metric monitor handle + self.metric_monitor_handle = None + def init_api_server(self): logger.info(LOG_PREFIX + "Initalizing routing table") self.kv_store_actor_handle = KVStoreProxyActor.remote() @@ -72,6 +77,12 @@ class GlobalState: self.router_actor_handle.register_self_handle.remote( self.router_actor_handle) + def init_metric_monitor(self, gc_window_seconds=3600): + logger.info(LOG_PREFIX + "Initializing metric monitor") + self.metric_monitor_handle = MetricMonitor.remote(gc_window_seconds) + start_metric_monitor_loop.remote(self.metric_monitor_handle) + self.metric_monitor_handle.add_target.remote(self.router_actor_handle) + def wait_until_http_ready(self, num_retries=5, backoff_time_s=1): http_is_ready = False retries = num_retries diff --git a/python/ray/experimental/serve/kv_store_service.py b/python/ray/experimental/serve/kv_store_service.py index e472fee69..2c7dffef4 100644 --- a/python/ray/experimental/serve/kv_store_service.py +++ b/python/ray/experimental/serve/kv_store_service.py @@ -167,7 +167,10 @@ class KVStoreProxy: return self.request_count -@ray.remote +@ray.remote(num_cpus=0) class KVStoreProxyActor(KVStoreProxy): def __init__(self, kv_class=RayInternalKVStore): super().__init__(kv_class=kv_class) + + def is_ready(self): + return True diff --git a/python/ray/experimental/serve/metric.py b/python/ray/experimental/serve/metric.py new file mode 100644 index 000000000..b56bd9943 --- /dev/null +++ b/python/ray/experimental/serve/metric.py @@ -0,0 +1,155 @@ +import time + +import ray +import numpy as np +import pandas as pd + + +@ray.remote(num_cpus=0) +class MetricMonitor: + def __init__(self, gc_window_seconds=3600): + """Metric monitor scrapes metrics from ray serve actors + and allow windowed query operations. + + Args: + gc_window_seconds(int): How long will we keep the metric data in + memory. Data older than the gc_window will be deleted. + """ + #: Mapping actor ID (hex) -> actor handle + self.actor_handles = dict() + + self.data_entries = [] + + self.gc_window_seconds = gc_window_seconds + self.latest_gc_time = time.time() + + def is_ready(self): + return True + + def add_target(self, target_handle): + hex_id = target_handle._ray_actor_id.hex() + self.actor_handles[hex_id] = target_handle + + def remove_target(self, target_handle): + hex_id = target_handle._ray_actor_id.hex() + self.actor_handles.pop(hex_id) + + def scrape(self): + # If expected gc time has passed, we will perform metric value GC. + expected_gc_time = self.latest_gc_time + self.gc_window_seconds + if expected_gc_time < time.time(): + self._perform_gc() + self.latest_gc_time = time.time() + + curr_time = time.time() + result = [ + handle._serve_metric.remote() + for handle in self.actor_handles.values() + ] + for handle_result in ray.get(result): + for metric_name, metric_info in handle_result.items(): + data_entry = { + "retrieved_at": curr_time, + "name": metric_name, + "type": metric_info["type"], + } + + if metric_info["type"] == "counter": + data_entry["value"] = metric_info["value"] + self.data_entries.append(data_entry) + + elif metric_info["type"] == "list": + for metric_value in metric_info["value"]: + new_entry = data_entry.copy() + new_entry["value"] = metric_value + self.data_entries.append(new_entry) + + def _perform_gc(self): + curr_time = time.time() + earliest_time_allowed = curr_time - self.gc_window_seconds + + # If we don"t have any data at hand, no need to gc. + if len(self.data_entries) == 0: + return + + df = pd.DataFrame(self.data_entries) + df = df[df["retrieved_at"] >= earliest_time_allowed] + self.data_entries = df.to_dict(orient="record") + + def _get_dataframe(self): + return pd.DataFrame(self.data_entries) + + def collect(self, + percentiles=[50, 90, 95], + agg_windows_seconds=[10, 60, 300, 600, 3600]): + """Collect and perform aggregation on all metrics. + + Args: + percentiles(List[int]): The percentiles for aggregation operations. + Default is 50th, 90th, 95th percentile. + agg_windows_seconds(List[int]): The aggregation windows in seconds. + The longest aggregation window must be shorter or equal to the + gc_window_seconds. + """ + result = {} + df = pd.DataFrame(self.data_entries) + + if len(df) == 0: # no metric to report + return {} + + # Retrieve the {metric_name -> metric_type} mapping + metric_types = df[["name", + "type"]].set_index("name").squeeze().to_dict() + + for metric_name, metric_type in metric_types.items(): + if metric_type == "counter": + result[metric_name] = df.loc[df["name"] == metric_name, + "value"].tolist()[-1] + if metric_type == "list": + result.update( + self._aggregate(metric_name, percentiles, + agg_windows_seconds)) + return result + + def _aggregate(self, metric_name, percentiles, agg_windows_seconds): + """Perform aggregation over a metric. + + Note: + This metric must have type `list`. + """ + assert max(agg_windows_seconds) <= self.gc_window_seconds, ( + "Aggregation window exceeds gc window. You should set a longer gc " + "window or shorter aggregation window.") + + curr_time = time.time() + df = pd.DataFrame(self.data_entries) + filtered_df = df[df["name"] == metric_name] + if len(filtered_df) == 0: + return dict() + + data_types = filtered_df["type"].unique().tolist() + assert data_types == [ + "list" + ], ("Can't aggreagte over non-list type. {} has type {}".format( + metric_name, data_types)) + + aggregated_metric = {} + for window in agg_windows_seconds: + earliest_time = curr_time - window + windowed_df = filtered_df[ + filtered_df["retrieved_at"] > earliest_time] + percentile_values = np.percentile(windowed_df["value"], + percentiles) + for percentile, value in zip(percentiles, percentile_values): + result_key = "{name}_{perc}th_perc_{window}_window".format( + name=metric_name, perc=percentile, window=window) + aggregated_metric[result_key] = value + + return aggregated_metric + + +@ray.remote(num_cpus=0) +def start_metric_monitor_loop(monitor_handle, duration_s=5): + while True: + ray.get(monitor_handle.scrape.remote()) + time.sleep(duration_s) diff --git a/python/ray/experimental/serve/queues.py b/python/ray/experimental/serve/queues.py index 9b48ae2b2..175533d64 100644 --- a/python/ray/experimental/serve/queues.py +++ b/python/ray/experimental/serve/queues.py @@ -76,6 +76,18 @@ class CentralizedQueues: # backend_name -> worker queue self.workers = defaultdict(deque) + def is_ready(self): + return True + + def _serve_metric(self): + return { + "service_{}_queue_size".format(service_name): { + "value": len(queue), + "type": "counter", + } + for service_name, queue in self.queues.items() + } + def enqueue_request(self, service, request_args, request_kwargs, request_context): query = Query(request_args, request_kwargs, request_context) diff --git a/python/ray/experimental/serve/task_runner.py b/python/ray/experimental/serve/task_runner.py index 4aecaa1b4..d0fa4c7de 100644 --- a/python/ray/experimental/serve/task_runner.py +++ b/python/ray/experimental/serve/task_runner.py @@ -1,4 +1,6 @@ import traceback +import time + import ray from ray.experimental.serve import context as serve_context from ray.experimental.serve.context import TaskContext, FakeFlaskQuest @@ -50,6 +52,33 @@ class RayServeMixin: _ray_serve_setup_completed = False _ray_serve_dequeue_requestr_name = None + # Work token can be unfullfilled from last iteration. + # This cache will be used to determine whether or not we should + # work on the same task as previous iteration or we are ready to + # move on. + _ray_serve_cached_work_token = None + + _serve_metric_error_counter = 0 + _serve_metric_latency_list = [] + + def _serve_metric(self): + # Make a copy of the latency list and clear current list + latency_lst = self._serve_metric_latency_list[:] + self._serve_metric_latency_list = [] + + my_name = self._ray_serve_dequeue_requestr_name + + return { + "{}_error_counter".format(my_name): { + "value": self._serve_metric_error_counter, + "type": "counter", + }, + "{}_latency_s".format(my_name): { + "value": latency_lst, + "type": "list", + }, + } + def _ray_serve_setup(self, my_name, _ray_serve_router_handle): self._ray_serve_dequeue_requestr_name = my_name self._ray_serve_router_handle = _ray_serve_router_handle @@ -59,10 +88,24 @@ class RayServeMixin: assert self._ray_serve_setup_completed self._ray_serve_self_handle = my_handle - work_token = ray.get( - self._ray_serve_router_handle.dequeue_request.remote( - self._ray_serve_dequeue_requestr_name)) - work_item = ray.get(ray.ObjectID(work_token)) + # Only retrieve the next task if we have completed previous task. + if self._ray_serve_cached_work_token is None: + work_token = ray.get( + self._ray_serve_router_handle.dequeue_request.remote( + self._ray_serve_dequeue_requestr_name)) + else: + work_token = self._ray_serve_cached_work_token + + work_token_id = ray.ObjectID(work_token) + ready, not_ready = ray.wait( + [work_token_id], num_returns=1, timeout=0.5) + if len(ready) == 1: + work_item = ray.get(work_token_id) + self._ray_serve_cached_work_token = None + else: + self._ray_serve_cached_work_token = work_token + self._ray_serve_self_handle._ray_serve_main_loop.remote(my_handle) + return if work_item.request_context == TaskContext.Web: serve_context.web = True @@ -77,13 +120,16 @@ class RayServeMixin: result_object_id = work_item.result_object_id + start_timestamp = time.time() try: result = self.__call__(*args, **kwargs) ray.worker.global_worker.put_object(result_object_id, result) except Exception as e: wrapped_exception = wrap_to_ray_error(e) + self._serve_metric_error_counter += 1 ray.worker.global_worker.put_object(result_object_id, wrapped_exception) + self._serve_metric_latency_list.append(time.time() - start_timestamp) serve_context.web = False # The worker finished one unit of work. diff --git a/python/ray/experimental/serve/tests/test_metric.py b/python/ray/experimental/serve/tests/test_metric.py new file mode 100644 index 000000000..84c5f7f3d --- /dev/null +++ b/python/ray/experimental/serve/tests/test_metric.py @@ -0,0 +1,76 @@ +import numpy as np +import pytest + +import ray + +from ray.experimental.serve.metric import MetricMonitor + + +@pytest.fixture(scope="session") +def start_target_actor(ray_instance): + @ray.remote + class Target(): + def __init__(self): + self.counter_value = 0 + + def _serve_metric(self): + self.counter_value += 1 + return { + "latency_list": { + "type": "list", + # Generate 0 to 100 inclusive. + # This means total of 101 items. + "value": np.arange(101).tolist() + }, + "counter": { + "type": "counter", + "value": self.counter_value + } + } + + def get_counter_value(self): + return self.counter_value + + yield Target.remote() + + +def test_metric_gc(ray_instance, start_target_actor): + target_actor = start_target_actor + # this means when new scrapes are invoked, the + metric_monitor = MetricMonitor.remote(gc_window_seconds=0) + metric_monitor.add_target.remote(target_actor) + + ray.get(metric_monitor.scrape.remote()) + df = ray.get(metric_monitor._get_dataframe.remote()) + print(df) + assert len(df) == 102 + + # Old metric sould be cleared. So only 1 counter + 101 list values left. + ray.get(metric_monitor.scrape.remote()) + df = ray.get(metric_monitor._get_dataframe.remote()) + assert len(df) == 102 + + +def test_metric_system(ray_instance, start_target_actor): + target_actor = start_target_actor + + metric_monitor = MetricMonitor.remote() + + metric_monitor.add_target.remote(target_actor) + + # Scrape once + metric_monitor.scrape.remote() + + percentiles = [50, 90, 95] + agg_windows_seconds = [60] + result = ray.get( + metric_monitor.collect.remote(percentiles, agg_windows_seconds)) + real_counter_value = ray.get(target_actor.get_counter_value.remote()) + + expected_result = { + "counter": real_counter_value, + "latency_list_50th_perc_60_window": 50.0, + "latency_list_90th_perc_60_window": 90.0, + "latency_list_95th_perc_60_window": 95.0, + } + assert result == expected_result diff --git a/python/setup.py b/python/setup.py index 3e92b4999..886a53703 100644 --- a/python/setup.py +++ b/python/setup.py @@ -77,7 +77,7 @@ extras = { ], "debug": ["psutil", "setproctitle", "py-spy >= 0.2.0"], "dashboard": ["aiohttp", "psutil", "setproctitle"], - "serve": ["uvicorn", "pygments", "werkzeug", "flask"], + "serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas"], }