diff --git a/.gitignore b/.gitignore index d4b199839..31d4d847d 100644 --- a/.gitignore +++ b/.gitignore @@ -174,3 +174,4 @@ tools/prometheus* # ray project files project-id +.mypy_cache/ diff --git a/ci/travis/install-dependencies.sh b/ci/travis/install-dependencies.sh index d3151b41d..bed0e8465 100755 --- a/ci/travis/install-dependencies.sh +++ b/ci/travis/install-dependencies.sh @@ -211,7 +211,7 @@ install_dependencies() { opencv-python-headless pyyaml pandas==0.24.2 requests feather-format lxml openpyxl xlrd \ py-spy pytest pytest-timeout networkx tabulate aiohttp uvicorn dataclasses pygments werkzeug \ kubernetes flask grpcio pytest-sugar pytest-rerunfailures pytest-asyncio scikit-learn numba \ - Pillow) + Pillow prometheus_client) if [ "${OSTYPE}" != msys ]; then # These packages aren't Windows-compatible pip_packages+=(blist) # https://github.com/DanielStutzbach/blist/issues/81#issue-391460716 diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index eeba518cd..863ec474c 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -13,6 +13,7 @@ from ray.serve.config import BackendConfig, ReplicaConfig from ray.serve.policy import RoutePolicy from ray.serve.router import Query from ray.serve.request_params import RequestMetadata +from ray.serve.metric import InMemoryExporter master_actor = None @@ -21,6 +22,9 @@ def _get_master_actor(): """Used for internal purpose because using just import serve.global_state will always reference the original None object. """ + global master_actor + if master_actor is None: + master_actor = ray.util.get_actor(SERVE_MASTER_NAME) return master_actor @@ -57,19 +61,17 @@ def accept_batch(f): return f -def init( - blocking=False, - start_server=True, - http_host=DEFAULT_HTTP_HOST, - http_port=DEFAULT_HTTP_PORT, - ray_init_kwargs={ - "object_store_memory": int(1e8), - "num_cpus": max(cpu_count(), 8) - }, - gc_window_seconds=3600, - queueing_policy=RoutePolicy.Random, - policy_kwargs={}, -): +def init(blocking=False, + start_server=True, + http_host=DEFAULT_HTTP_HOST, + http_port=DEFAULT_HTTP_PORT, + ray_init_kwargs={ + "object_store_memory": int(1e8), + "num_cpus": max(cpu_count(), 8) + }, + queueing_policy=RoutePolicy.Random, + policy_kwargs={}, + metric_exporter=InMemoryExporter): """Initialize a serve cluster. If serve cluster has already initialized, this function will just return. @@ -88,12 +90,13 @@ def init( ray_init_kwargs (dict): Argument passed to ray.init, if there is no ray connection. Default to {"object_store_memory": int(1e8)} for performance stability reason - gc_window_seconds(int): How long will we keep the metric data in - memory. Data older than the gc_window will be deleted. The default - is 3600 seconds, which is 1 hour. queueing_policy(RoutePolicy): Define the queueing policy for selecting the backend for a service. (Default: RoutePolicy.Random) policy_kwargs: Arguments required to instantiate a queueing policy + metric_exporter(ExporterInterface): The class aggregates metrics from + all RayServe actors and optionally export them to external + services. RayServe has two options built in: InMemoryExporter and + PrometheusExporter """ global master_actor if master_actor is not None: @@ -126,7 +129,7 @@ def init( name=SERVE_MASTER_NAME, max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION, ).remote(queueing_policy.value, policy_kwargs, start_server, http_host, - http_port, gc_window_seconds) + http_port, metric_exporter) if start_server and blocking: block_until_http_ready("http://{}:{}/-/routes".format( @@ -278,16 +281,27 @@ def get_handle(endpoint_name, @_ensure_connected -def stat(percentiles=[50, 90, 95], - agg_windows_seconds=[10, 60, 300, 600, 3600]): +def stat(): """Retrieve metric statistics about ray serve system. - Args: - percentiles(List[int]): The percentiles for aggregation operations. - Default is 50th, 90th, 95th percentile. - agg_windows_seconds(List[int]): The aggregation windows in seconds. - The longest aggregation window must be shorter or equal to the - gc_window_seconds. + Returns: + metric_stats(Any): Metric information returned by the metric exporter. + This can vary by exporter. For the default InMemoryExporter, it + returns a list of the following format: + + .. code-block::python + [ + {"info": { + "name": ..., + "type": COUNTER|MEASURE, + "label_key": label_value, + "label_key": label_value, + ... + }, "value": float} + ] + + For PrometheusExporter, it returns the metrics in prometheus format + in plain text. """ - [monitor] = retry_actor_failures(master_actor.get_metric_monitor) - return ray.get(monitor.collect.remote(percentiles, agg_windows_seconds)) + [metric_exporter] = ray.get(master_actor.get_metric_exporter.remote()) + return ray.get(metric_exporter.inspect_metrics.remote()) diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 524e2b60b..4cbc537e3 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -1,4 +1,3 @@ -import time import traceback import inspect @@ -7,10 +6,13 @@ from ray import serve from ray.serve import context as serve_context from ray.serve.context import FakeFlaskRequest from collections import defaultdict -from ray.serve.utils import parse_request_item +from ray.serve.utils import parse_request_item, _get_logger from ray.serve.exceptions import RayServeException +from ray.serve.metric import MetricClient from ray.async_compat import sync_to_async +logger = _get_logger() + def create_backend_worker(func_or_class): """Creates a worker class wrapping the provided function or class.""" @@ -30,10 +32,8 @@ def create_backend_worker(func_or_class): else: _callable = func_or_class(*init_args) - self.backend = RayServeWorker(backend_tag, _callable, is_function) - - def get_metrics(self): - return self.backend.get_metrics() + self.backend = RayServeWorker(backend_tag, replica_tag, _callable, + is_function) async def handle_request(self, request): return await self.backend.handle_request(request) @@ -66,37 +66,39 @@ def ensure_async(func): class RayServeWorker: """Handles requests with the provided callable.""" - def __init__(self, name, _callable, is_function): + def __init__(self, name, replica_tag, _callable, is_function): self.name = name + self.replica_tag = replica_tag self.callable = _callable self.is_function = is_function - self.error_counter = 0 - self.latency_list = [] + self.metric_client = MetricClient.connect_from_serve( + default_labels={"backend": self.name}) + self.request_counter = self.metric_client.new_counter( + "backend_request_counter", + description=("Number of queries that have been " + "processed in this replica"), + ) + self.error_counter = self.metric_client.new_counter( + "backend_error_counter", + description=("Number of exceptions that have " + "occurred in the backend"), + ) + self.restart_counter = self.metric_client.new_counter( + "backend_worker_starts", + description=("The number of time this replica workers " + "has been restarted due to failure."), + label_names=("replica_tag", )) - def get_metrics(self): - # Make a copy of the latency list and clear current list - latency_list = self.latency_list[:] - self.latency_list = [] - - return { - "{}_error_counter".format(self.name): { - "value": self.error_counter, - "type": "counter", - }, - "{}_latency_s".format(self.name): { - "value": latency_list, - "type": "list", - }, - } + self.restart_counter.labels(replica_tag=self.replica_tag).add() def get_runner_method(self, request_item): method_name = request_item.call_method if not hasattr(self.callable, method_name): raise RayServeException("Backend doesn't have method {} " "which is specified in the request. " - "The avaiable methods are {}".format( - method_name, dir(self))) + "The available methods are {}".format( + method_name, dir(self.callable))) return getattr(self.callable, method_name) def has_positional_args(self, f): @@ -116,18 +118,17 @@ class RayServeWorker: async def invoke_single(self, request_item): args, kwargs, is_web_context = parse_request_item(request_item) serve_context.web = is_web_context - start_timestamp = time.time() method_to_call = self.get_runner_method(request_item) args = args if self.has_positional_args(method_to_call) else [] method_to_call = ensure_async(method_to_call) try: result = await method_to_call(*args, **kwargs) + self.request_counter.add() except Exception as e: result = wrap_to_ray_error(e) - self.error_counter += 1 + self.error_counter.add() - self.latency_list.append(time.time() - start_timestamp) return result async def invoke_batch(self, request_item_list): @@ -191,10 +192,9 @@ class RayServeWorker: # Flask requests are passed to __call__ as a list arg_list = [arg_list] - start_timestamp = time.time() + self.request_counter.add(batch_size) result_list = await call_method(*arg_list, **kwargs_list) - self.latency_list.append(time.time() - start_timestamp) if (not isinstance(result_list, list)) or (len(result_list) != batch_size): error_message = ("Worker doesn't preserve batch size. The " @@ -206,7 +206,7 @@ class RayServeWorker: return result_list except Exception as e: wrapped_exception = wrap_to_ray_error(e) - self.error_counter += batch_size + self.error_counter.add() return [wrapped_exception for _ in range(batch_size)] async def handle_request(self, request): diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index a229c28df..5648f7051 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -8,7 +8,7 @@ SERVE_ROUTER_NAME = "SERVE_ROUTER_ACTOR" SERVE_PROXY_NAME = "SERVE_PROXY_ACTOR" #: Actor name used to register metric monitor actor -SERVE_METRIC_MONITOR_NAME = "SERVE_METRIC_MONITOR_ACTOR" +SERVE_METRIC_SINK_NAME = "SERVE_METRIC_SINK_ACTOR" #: HTTP Address DEFAULT_HTTP_ADDRESS = "http://127.0.0.1:8000" @@ -24,3 +24,6 @@ ASYNC_CONCURRENCY = int(1e6) #: Default latency SLO DEFAULT_LATENCY_SLO_MS = 1e9 + +#: Interval for metric client to push metrics to exporters +METRIC_PUSH_INTERVAL_S = 2 diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 865f83d52..f3ebffaac 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -4,11 +4,12 @@ import socket import uvicorn import ray -from ray.serve.constants import SERVE_MASTER_NAME from ray.serve.context import TaskContext +from ray.serve.metric import MetricClient from ray.serve.request_params import RequestMetadata from ray.serve.http_util import Response from ray.serve.utils import logger, retry_actor_failures_async +from ray.serve.constants import SERVE_MASTER_NAME from urllib.parse import parse_qs @@ -29,10 +30,21 @@ class HTTPProxy: async def fetch_config_from_master(self): assert ray.is_initialized() master = ray.util.get_actor(SERVE_MASTER_NAME) + self.route_table, [ self.router_handle ] = await retry_actor_failures_async(master.get_http_proxy_config) + # The exporter is required to return results for /-/metrics endpoint. + [self.metric_exporter] = await retry_actor_failures_async( + master.get_metric_exporter) + + self.metric_client = MetricClient.connect_from_serve() + self.request_counter = self.metric_client.new_counter( + "num_http_requests", + description="The number of requests processed", + label_names=("route", )) + def set_route_table(self, route_table): self.route_table = route_table @@ -90,6 +102,19 @@ class HTTPProxy: return sender + async def _handle_system_request(self, scope, receive, send): + current_path = scope["path"] + if current_path == "/-/routes": + await Response(self.route_table).send(scope, receive, send) + elif current_path == "/-/metrics": + metric_info = await retry_actor_failures_async( + self.metric_exporter.inspect_metrics) + await Response(metric_info).send(scope, receive, send) + else: + await Response( + "System path {} not found".format(current_path), + status_code=404).send(scope, receive, send) + async def __call__(self, scope, receive, send): # NOTE: This implements ASGI protocol specified in # https://asgi.readthedocs.io/en/latest/specs/index.html @@ -104,8 +129,11 @@ class HTTPProxy: "Route table must be set via set_route_table.") assert scope["type"] == "http" current_path = scope["path"] - if current_path == "/-/routes": - await Response(self.route_table).send(scope, receive, send) + + self.request_counter.labels(route=current_path).add() + + if current_path.startswith("/-/"): + await self._handle_system_request(scope, receive, send) return try: @@ -120,7 +148,7 @@ class HTTPProxy: if scope["method"] not in methods_allowed: error_message = ("Methods {} not allowed. " - "Avaiable HTTP methods are {}.").format( + "Available HTTP methods are {}.").format( scope["method"], methods_allowed) await error_sender(error_message, 405) return diff --git a/python/ray/serve/master.py b/python/ray/serve/master.py index 6cd061a1d..a33871989 100644 --- a/python/ray/serve/master.py +++ b/python/ray/serve/master.py @@ -7,12 +7,12 @@ import time import ray import ray.cloudpickle as pickle from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_ROUTER_NAME, - SERVE_PROXY_NAME, SERVE_METRIC_MONITOR_NAME) + SERVE_PROXY_NAME, SERVE_METRIC_SINK_NAME) from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store import RayInternalKVStore -from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop) from ray.serve.backend_worker import create_backend_worker from ray.serve.utils import async_retryable, get_random_letters, logger +from ray.serve.metric.exporter import MetricExporterActor import numpy as np @@ -49,7 +49,8 @@ class ServeMaster: """ async def __init__(self, router_class, router_kwargs, start_http_proxy, - http_proxy_host, http_proxy_port, metric_gc_window_s): + http_proxy_host, http_proxy_port, + metric_exporter_class): # Used to read/write checkpoints. # TODO(edoakes): namespace the master actor and its checkpoints. self.kv_store = RayInternalKVStore() @@ -82,14 +83,14 @@ class ServeMaster: # Cached handles to actors in the system. self.router = None self.http_proxy = None - self.metric_monitor = None + self.metric_exporter = None # If starting the actor for the first time, starts up the other system # components. If recovering, fetches their actor handles. + self._get_or_start_metric_exporter(metric_exporter_class) self._get_or_start_router(router_class, router_kwargs) if start_http_proxy: self._get_or_start_http_proxy(http_proxy_host, http_proxy_port) - self._get_or_start_metric_monitor(metric_gc_window_s) # NOTE(edoakes): unfortunately, we can't completely recover from a # checkpoint in the constructor because we block while waiting for @@ -157,26 +158,23 @@ class ServeMaster: """Called by the HTTP proxy on startup to fetch required state.""" return self.routes, self.get_router() - def _get_or_start_metric_monitor(self, gc_window_s): - """Get the metric monitor belonging to this serve cluster. + def _get_or_start_metric_exporter(self, metric_exporter_class): + """Get the metric exporter belonging to this serve cluster. - If the metric monitor does not already exist, it will be started. + If the metric exporter does not already exist, it will be started. """ try: - self.metric_monitor = ray.util.get_actor(SERVE_METRIC_MONITOR_NAME) + self.metric_exporter = ray.util.get_actor(SERVE_METRIC_SINK_NAME) except ValueError: - logger.info("Starting metric monitor with name '{}'".format( - SERVE_METRIC_MONITOR_NAME)) - self.metric_monitor = MetricMonitor.options( + logger.info("Starting metric exporter with name '{}'".format( + SERVE_METRIC_SINK_NAME)) + self.metric_exporter = MetricExporterActor.options( detached=True, - name=SERVE_METRIC_MONITOR_NAME).remote(gc_window_s) - # TODO(edoakes): move these into the constructor. - start_metric_monitor_loop.remote(self.metric_monitor) - self.metric_monitor.add_target.remote(self.router) + name=SERVE_METRIC_SINK_NAME).remote(metric_exporter_class) - def get_metric_monitor(self): - """Returns a handle to the metric monitor managed by this actor.""" - return [self.metric_monitor] + def get_metric_exporter(self): + """Returns a handle to the metric exporter managed by this actor.""" + return [self.metric_exporter] def _checkpoint(self): """Checkpoint internal state and write it to the KV store.""" @@ -213,10 +211,16 @@ class ServeMaster: logger.info("Recovering from checkpoint") # Load internal state from the checkpoint data. - (self.routes, self.backends, self.traffic_policies, self.replicas, - self.replicas_to_start, self.replicas_to_stop, - self.backends_to_remove, - self.endpoints_to_remove) = pickle.loads(checkpoint_bytes) + ( + self.routes, + self.backends, + self.traffic_policies, + self.replicas, + self.replicas_to_start, + self.replicas_to_stop, + self.backends_to_remove, + self.endpoints_to_remove, + ) = pickle.loads(checkpoint_bytes) # Fetch actor handles for all of the backend replicas in the system. # All of these workers are guaranteed to already exist because they @@ -323,9 +327,6 @@ class ServeMaster: await self.router.add_new_worker.remote( backend_tag, replica_tag, worker_handle) - # Register the worker with the metric monitor. - self.metric_monitor.add_target.remote(worker_handle) - self.replicas_to_start.clear() async def _stop_pending_replicas(self): diff --git a/python/ray/serve/metric.py b/python/ray/serve/metric.py deleted file mode 100644 index 84d89c839..000000000 --- a/python/ray/serve/metric.py +++ /dev/null @@ -1,157 +0,0 @@ -import time - -import numpy as np -import pandas as pd - -import ray - - -@ray.remote(num_cpus=0) -class MetricMonitor: - def __init__(self, gc_window_seconds=3600): - """Metric monitor scrapes metrics from ray serve actors - and allow windowed query operations. - - Args: - gc_window_seconds(int): How long will we keep the metric data in - memory. Data older than the gc_window will be deleted. - """ - #: Mapping actor ID (hex) -> actor handle - self.actor_handles = dict() - - self.data_entries = [] - - self.gc_window_seconds = gc_window_seconds - self.latest_gc_time = time.time() - - def add_target(self, target_handle): - hex_id = target_handle._actor_id.hex() - self.actor_handles[hex_id] = target_handle - - def remove_target(self, target_handle): - hex_id = target_handle._actor_id.hex() - self.actor_handles.pop(hex_id) - - def scrape(self): - # If expected gc time has passed, we will perform metric value GC. - expected_gc_time = self.latest_gc_time + self.gc_window_seconds - if expected_gc_time < time.time(): - self._perform_gc() - self.latest_gc_time = time.time() - - curr_time = time.time() - result = [ - handle.get_metrics.remote() - for handle in self.actor_handles.values() - ] - # TODO(simon): handle the possibility that an actor_handle is removed - for handle_result in ray.get(result): - for metric_name, metric_info in handle_result.items(): - data_entry = { - "retrieved_at": curr_time, - "name": metric_name, - "type": metric_info["type"], - } - - if metric_info["type"] == "counter": - data_entry["value"] = metric_info["value"] - self.data_entries.append(data_entry) - - elif metric_info["type"] == "list": - for metric_value in metric_info["value"]: - new_entry = data_entry.copy() - new_entry["value"] = metric_value - self.data_entries.append(new_entry) - - def _perform_gc(self): - curr_time = time.time() - earliest_time_allowed = curr_time - self.gc_window_seconds - - # If we don"t have any data at hand, no need to gc. - if len(self.data_entries) == 0: - return - - df = pd.DataFrame(self.data_entries) - df = df[df["retrieved_at"] >= earliest_time_allowed] - self.data_entries = df.to_dict(orient="record") - - def _get_dataframe(self): - return pd.DataFrame(self.data_entries) - - def collect(self, - percentiles=[50, 90, 95], - agg_windows_seconds=[10, 60, 300, 600, 3600]): - """Collect and perform aggregation on all metrics. - - Args: - percentiles(List[int]): The percentiles for aggregation operations. - Default is 50th, 90th, 95th percentile. - agg_windows_seconds(List[int]): The aggregation windows in seconds. - The longest aggregation window must be shorter or equal to the - gc_window_seconds. - """ - result = {} - df = pd.DataFrame(self.data_entries) - - if len(df) == 0: # no metric to report - return {} - - # Retrieve the {metric_name -> metric_type} mapping - metric_types = df[["name", - "type"]].set_index("name").squeeze().to_dict() - - for metric_name, metric_type in metric_types.items(): - if metric_type == "counter": - result[metric_name] = df.loc[df["name"] == metric_name, - "value"].tolist()[-1] - if metric_type == "list": - result.update( - self._aggregate(metric_name, percentiles, - agg_windows_seconds)) - return result - - def _aggregate(self, metric_name, percentiles, agg_windows_seconds): - """Perform aggregation over a metric. - - Note: - This metric must have type `list`. - """ - assert max(agg_windows_seconds) <= self.gc_window_seconds, ( - "Aggregation window exceeds gc window. You should set a longer gc " - "window or shorter aggregation window.") - - curr_time = time.time() - df = pd.DataFrame(self.data_entries) - filtered_df = df[df["name"] == metric_name] - if len(filtered_df) == 0: - return dict() - - data_types = filtered_df["type"].unique().tolist() - assert data_types == [ - "list" - ], ("Can't aggreagte over non-list type. {} has type {}".format( - metric_name, data_types)) - - aggregated_metric = {} - for window in agg_windows_seconds: - earliest_time = curr_time - window - windowed_df = filtered_df[ - filtered_df["retrieved_at"] > earliest_time] - percentile_values = np.percentile(windowed_df["value"], - percentiles) - for percentile, value in zip(percentiles, percentile_values): - result_key = "{name}_{perc}th_perc_{window}_window".format( - name=metric_name, perc=percentile, window=window) - aggregated_metric[result_key] = value - - return aggregated_metric - - -@ray.remote(num_cpus=0) -def start_metric_monitor_loop(monitor_handle, duration_s=5): - while True: - time.sleep(duration_s) - try: - ray.get(monitor_handle.scrape.remote()) - except ray.exceptions.RayActorError: - pass diff --git a/python/ray/serve/metric/__init__.py b/python/ray/serve/metric/__init__.py new file mode 100644 index 000000000..fe2dfcf3f --- /dev/null +++ b/python/ray/serve/metric/__init__.py @@ -0,0 +1,4 @@ +from ray.serve.metric.client import MetricClient +from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter) + +__all__ = ["MetricClient", "InMemoryExporter", "PrometheusExporter"] diff --git a/python/ray/serve/metric/client.py b/python/ray/serve/metric/client.py new file mode 100644 index 000000000..457b92a1e --- /dev/null +++ b/python/ray/serve/metric/client.py @@ -0,0 +1,151 @@ +import asyncio +from typing import Dict, Optional, Tuple + +from ray.serve.metric.types import ( + MetricType, + convert_event_type_to_class, + MetricMetadata, +) +from ray.serve.utils import (retry_actor_failures, retry_actor_failures_async, + _get_logger) +from ray.serve.constants import METRIC_PUSH_INTERVAL_S + +logger = _get_logger() + + +class MetricClient: + def __init__( + self, + metric_exporter_actor, + push_interval: float = METRIC_PUSH_INTERVAL_S, + default_labels: Optional[Dict[str, str]] = None, + ): + """Initialize a client to push metrics to the exporter actor. + + Args: + metric_exporter_actor: The actor to push metrics to. + default_labels(dict): The set of labels to apply for all metrics + created by this actor. For example, {"source": "worker"}. + """ + self.exporter = metric_exporter_actor + self.default_labels = default_labels or dict() + + self.registered_metrics: Dict[str, MetricMetadata] = dict() + self.metric_records = [] + + assert asyncio.get_event_loop().is_running() + self.push_task = asyncio.get_event_loop().create_task( + self.push_to_exporter_forever(push_interval)) + logger.debug("Initialized client") + + @staticmethod + def connect_from_serve(default_labels: Optional[Dict[str, str]] = None): + """Create the metric client automatically when running inside serve.""" + from ray.serve.api import _get_master_actor + + master_actor = _get_master_actor() + [metric_exporter] = retry_actor_failures( + master_actor.get_metric_exporter) + return MetricClient( + metric_exporter_actor=metric_exporter, + default_labels=default_labels) + + def new_counter(self, + name: str, + *, + description: Optional[str] = "", + label_names: Optional[Tuple[str]] = ()): + """Create a new counter. + + Counters are used to capture changes in running sums. An essential + property of Counter instruments is that two events add(m) and add(n) + are semantically equivalent to one event add(m+n). This property means + that Counter events can be combined. + + Args: + name(str): The unique name for the counter. + description(Optional[str]): The description for the counter. + label_names(Optional[Tuple[str]]): The set of label names to be + added when recording the metrics. + + Usage: + >>> client = MetricClient(...) + >>> counter = client.new_counter( + "http_counter", + description="This is a simple counter for HTTP status", + label_names=("route", "status_code")) + >>> counter.labels(route="/hi", status_code=200).add() + """ + return self._new_metric(name, MetricType.COUNTER, description, + label_names) + + def new_measure(self, + name, + *, + description: Optional[str] = "", + label_names: Optional[Tuple[str]] = ()): + """Create a new measure. + + Measure instruments are independent. They cannot be combined as with + counters. Measures can be aggregated after recording to compute + statistics about the distribution along selected dimension. + + Args: + name(str): The unique name for the measure. + description(Optional[str]): The description for the measure. + label_names(Optional[Tuple[str]]): The set of label names to be + added when recording the metrics. + + Usage: + >>> client = MetricClient(...) + >>> measure = client.new_measure( + "latency_measure", + description="This is a simple measure for latency in ms", + label_names=("route")) + >>> measure.labels(route="/hi").record(42) + """ + return self._new_metric(name, MetricType.MEASURE, description, + label_names) + + def _new_metric( + self, + name, + metric_type: MetricType, + description: str, + label_names: Tuple[str] = (), + ): + if name in self.registered_metrics: + raise ValueError( + "Metric with name {} is already registered.".format(name)) + + if not isinstance(label_names, tuple): + raise ValueError("label_names need to be a tuple, it is {}".format( + type(label_names))) + + metric_metadata = MetricMetadata( + name=name, + type=metric_type, + description=description, + label_names=label_names, + default_labels=self.default_labels.copy(), + ) + metric_class = convert_event_type_to_class(metric_type) + metric_object = metric_class( + client=self, name=name, label_names=label_names) + + self.registered_metrics[name] = metric_metadata + return metric_object + + async def _push_to_exporter_once(self): + if len(self.metric_records) == 0: + return + + old_batch, self.metric_records = self.metric_records, [] + logger.debug("Pushing metric batch {}".format(old_batch)) + await retry_actor_failures_async(self.exporter.ingest, + self.registered_metrics, old_batch) + + async def push_to_exporter_forever(self, interval_s): + while True: + await self._push_to_exporter_once() + await asyncio.sleep(interval_s) diff --git a/python/ray/serve/metric/exporter.py b/python/ray/serve/metric/exporter.py new file mode 100644 index 000000000..29d9c1389 --- /dev/null +++ b/python/ray/serve/metric/exporter.py @@ -0,0 +1,150 @@ +from typing import Dict +from collections import Counter, namedtuple + +import ray +from ray.serve.metric.types import MetricType, MetricMetadata, MetricBatch +from ray.serve.utils import _get_logger + +logger = _get_logger() + + +def make_metric_namedtuple(metric_metadata: MetricMetadata, + record: MetricBatch): + fields = ["name", "type"] + fields += list(metric_metadata.default_labels.keys()) + fields += list(record.labels.keys()) + + merged_labels = metric_metadata.default_labels.copy() + merged_labels.update(record.labels) + + tuple_type = namedtuple(metric_metadata.name, fields) + return tuple_type( + name=metric_metadata.name, type=metric_metadata.type, **merged_labels) + + +class ExporterInterface: + def export(self, metric_metadata: Dict[str, MetricMetadata], + metric_batch: MetricBatch): + raise NotImplementedError( + "This method should be implemented by subclass.") + + def inspect_metrics(self): + raise NotImplementedError( + "This method should be implemented by subclass.") + + +@ray.remote(num_cpus=0) +class MetricExporterActor: + """Aggregate metrics from all RayServe actors and export them. + + Metrics are aggregated pushed from other actors in the system to + this actor. They can then be stored or pushed to an external monitoring + system. + """ + + def __init__(self, exporter_class: ExporterInterface): + # TODO(simon): Add support for initializer args and kwargs. + self.exporter = exporter_class() + + # Stores the mapping metric_name -> MetricMetadata + # This field is tolerant to failures since each client will always push + # an updated copy of the metadata for each ingest call. + self.metric_metadata = dict() + + logger.debug("Initialized with metric exporter of type {}".format( + type(self.exporter))) + + def ingest(self, metric_metadata: Dict[str, MetricMetadata], + batch: MetricBatch): + self.metric_metadata.update(metric_metadata) + self.exporter.export(self.metric_metadata, batch) + + def inspect_metrics(self): + return self.exporter.inspect_metrics() + + +class InMemoryExporter(ExporterInterface): + def __init__(self): + # Keep track of counters + self.counters: Counter[namedtuple, float] = Counter() + # Keep track of latest observation of measures + self.latest_measures: Dict[namedtuple, float] = dict() + + def export(self, metric_metadata, metric_batch): + for record in metric_batch: + metadata = metric_metadata[record.name] + metric_key = make_metric_namedtuple(metadata, record) + if metadata.type == MetricType.COUNTER: + self.counters[metric_key] += record.value + elif metadata.type == MetricType.MEASURE: + self.latest_measures[metric_key] = record.value + else: + raise RuntimeError("Unrecognized metric type {}".format( + metadata.type)) + + def inspect_metrics(self): + items = [] + metrics_to_collect = {**self.counters, **self.latest_measures} + for info_tuple, value in metrics_to_collect.items(): + # Represent the metric type as a human readable name + info_tuple = info_tuple._replace(type=str(info_tuple.type)) + items.append({"info": info_tuple._asdict(), "value": value}) + return items + + +class PrometheusExporter(ExporterInterface): + def __init__(self): + super().__init__() + from prometheus_client import (CollectorRegistry, Counter, Gauge, + generate_latest) + self.metric_type_to_prom_type = { + MetricType.COUNTER: Counter, + MetricType.MEASURE: Gauge + } + self.prom_generate_latest = generate_latest + + self.metrics_cache = dict() + self.default_labels = dict() + self.registry = CollectorRegistry() + + def inspect_metrics(self): + return self.prom_generate_latest(self.registry) + + def export(self, metric_metadata, metric_batch): + self._process_metric_metadata(metric_metadata) + self._process_batch(metric_batch) + + def _process_metric_metadata(self, metric_metadata): + for name, metric_metadata in metric_metadata.items(): + if name not in self.metrics_cache: + constructor = self.metric_type_to_prom_type[ + metric_metadata.type] + + default_labels = metric_metadata.default_labels + label_names = tuple( + default_labels.keys()) + metric_metadata.label_names + metric_object = constructor( + metric_metadata.name, + metric_metadata.description, + labelnames=label_names, + registry=self.registry, + ) + + self.metrics_cache[name] = (metric_object, + metric_metadata.type) + self.default_labels[name] = default_labels + + def _process_batch(self, batch): + for name, labels, value in batch: + assert name in self.metrics_cache, ( + "Metrics {} was not registered.".format(name)) + metric, metric_type = self.metrics_cache[name] + default_labels = self.default_labels[name] + merged_labels = {**default_labels, **labels} + if metric_type == MetricType.COUNTER: + metric.labels(**merged_labels).inc(value) + elif metric_type == MetricType.MEASURE: + metric.labels(**merged_labels).set(value) + else: + raise RuntimeError( + "Unrecognized metric type {}".format(metric_type)) diff --git a/python/ray/serve/metric/types.py b/python/ray/serve/metric/types.py new file mode 100644 index 000000000..80c2f796e --- /dev/null +++ b/python/ray/serve/metric/types.py @@ -0,0 +1,91 @@ +import enum +from typing import Tuple, List, Dict, Optional +from collections import namedtuple + +# We split the information about a metric into two parts: the MetricMetadata +# and MetricRecord. Metadata is declared at creation time and include names +# and the label names. The label values will be supplied at observation time. +MetricMetadata = namedtuple( + "MetricMetadata", + ["name", "type", "description", "label_names", "default_labels"]) +MetricRecord = namedtuple("MetricRecord", ["name", "labels", "value"]) +MetricBatch = List[MetricRecord] + + +class BaseMetric: + def __init__(self, + client, + name: str, + label_names: Tuple[str], + dynamic_labels: Optional[Dict[str, str]] = None): + """Represent a single metric stream + + Args: + client(MetricClient): The client object to push update to. + name(str): The name of the metric. + label_names(Tuple[str]): The names of the labels that must be set + before an observation. + dynamic_labels(Optional[Dict[str,str]]): A partially preset labels. + This fields make it possible to chain label calls together: + ``metric.labels(a=b).labels(c=d)``. + """ + self.client = client + self.name = name + self.dynamic_labels = dynamic_labels or dict() + self.label_names = label_names + + def check_all_labels_fulfilled_or_error(self): + unfulfilled = set(self.label_names) - set(self.dynamic_labels.keys()) + if len(unfulfilled) != 0: + raise ValueError("The following labels doesn't have associated " + "values: {}".format(unfulfilled)) + + def labels(self, **kwargs): + """Apply dynamic label to the metric + + Usage: + >>> metric = BaseMetric(..., label_names=("a", "b")) + >>> metric.labels(a=1, b=2) + >>> metric.labels(a=1).labels(b=2) # Equivalent + """ + new_dynamic_labels = self.dynamic_labels.copy() + for k, v in kwargs.items(): + if k not in self.label_names: + raise ValueError( + "Label {} was not part of registered " + "label names. Allowed label names are {}.".format( + k, self.label_names)) + new_dynamic_labels[k] = str(v) + return type(self)(self.client, self.name, self.label_names, + new_dynamic_labels) + + +# The metric types are inspired by OpenTelemetry spec: +# https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/api.md#three-kinds-of-instrument +class Counter(BaseMetric): + def add(self, increment=1): + """Increment the counter by some amount. Default is 1""" + self.check_all_labels_fulfilled_or_error() + self.client.metric_records.append( + MetricRecord(self.name, self.dynamic_labels, increment)) + + +class Measure(BaseMetric): + def record(self, value): + """Record the given value for the measure""" + self.check_all_labels_fulfilled_or_error() + self.client.metric_records.append( + MetricRecord(self.name, self.dynamic_labels, value)) + + +class MetricType(enum.IntEnum): + COUNTER = 1 + MEASURE = 2 + + +def convert_event_type_to_class(event_type: MetricType) -> BaseMetric: + if event_type == MetricType.COUNTER: + return Counter + if event_type == MetricType.MEASURE: + return Measure + raise RuntimeError("Unknown event type {}".format(event_type)) diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index b048182ad..b3551b495 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -14,6 +14,8 @@ import blist import ray import ray.cloudpickle as pickle +from ray.exceptions import RayTaskError +from ray.serve.metric import MetricClient from ray.serve.utils import logger, retry_actor_failures @@ -166,22 +168,30 @@ class Router: for backend, backend_config in backend_configs.items(): await self.set_backend_config(backend, backend_config) + self.metric_client = MetricClient.connect_from_serve() + self.num_router_requests = self.metric_client.new_counter( + "num_router_requests", + description="Number of requests processed by the router.", + label_names=("endpoint", )) + self.num_error_endpoint_request = self.metric_client.new_counter( + "num_error_endpoint_requests", + description=("Number of requests errored when getting result " + "for endpoint."), + label_names=("endpoint", )) + self.num_error_backend_request = self.metric_client.new_counter( + "num_error_backend_requests", + description=("Number of requests errored when getting result " + "from backend."), + label_names=("backend", )) + def is_ready(self): return True - def get_metrics(self): - return { - "backend_{}_queue_size".format(backend_name): { - "value": len(queue), - "type": "counter", - } - for backend_name, queue in self.buffer_queues.items() - } - async def enqueue_request(self, request_meta, *request_args, **request_kwargs): endpoint = request_meta.endpoint logger.debug("Received a request for endpoint {}".format(endpoint)) + self.num_router_requests.labels(endpoint=endpoint).add() # check if the slo specified is directly the # wall clock time @@ -202,7 +212,11 @@ class Router: # Note: a future change can be to directly return the ObjectID from # replica task submission - result = await query.async_future + try: + result = await query.async_future + except RayTaskError as e: + self.num_error_endpoint_request.labels(endpoint=endpoint).add() + result = e return result async def add_new_worker(self, backend_tag, replica_tag, worker_handle): @@ -341,9 +355,13 @@ class Router: logger.debug("Sending query to replica:" + backend_replica_tag) start = time.time() worker = self.replicas[backend_replica_tag] - result = await worker.handle_request.remote(req) + try: + result = await worker.handle_request.remote(req) + except RayTaskError as error: + self.num_error_backend_request.labels(backend=backend).add() + result = error await self.mark_worker_idle(backend, backend_replica_tag) - logger.debug("Got result in {:.2f}s", time.time() - start) + logger.debug("Got result in {:.2f}s".format(time.time() - start)) return result async def _assign_query_to_worker(self, diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index ec581698e..0d8f19811 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -2,7 +2,6 @@ import os import pytest -import ray from ray import serve if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False): @@ -13,13 +12,3 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False): def serve_instance(): serve.init(blocking=True, ray_init_kwargs={"num_cpus": 36}) yield - - -@pytest.fixture(scope="session") -def ray_instance(): - ray_already_initialized = ray.is_initialized() - if not ray_already_initialized: - ray.init(object_store_memory=int(1e8)) - yield - if not ray_already_initialized: - ray.shutdown() diff --git a/python/ray/serve/tests/test_backend_worker.py b/python/ray/serve/tests/test_backend_worker.py index 18c0e46bc..53f515939 100644 --- a/python/ray/serve/tests/test_backend_worker.py +++ b/python/ray/serve/tests/test_backend_worker.py @@ -26,9 +26,6 @@ def setup_worker(name, func_or_class, init_args=None): def ready(self): pass - def get_metrics(self): - return self.worker.get_metrics() - async def handle_request(self, *args, **kwargs): return await self.worker.handle_request(*args, **kwargs) diff --git a/python/ray/serve/tests/test_kv_store.py b/python/ray/serve/tests/test_kv_store.py index 82cd26b1c..964881f98 100644 --- a/python/ray/serve/tests/test_kv_store.py +++ b/python/ray/serve/tests/test_kv_store.py @@ -3,7 +3,7 @@ import pytest from ray.serve.kv_store import RayInternalKVStore -def test_ray_internal_kv(ray_instance): +def test_ray_internal_kv(serve_instance): with pytest.raises(TypeError): RayInternalKVStore(namespace=1) RayInternalKVStore(namespace=b"") @@ -26,7 +26,7 @@ def test_ray_internal_kv(ray_instance): assert kv.get("2") == b"4" -def test_ray_internal_kv_collisions(ray_instance): +def test_ray_internal_kv_collisions(serve_instance): kv1 = RayInternalKVStore() kv1.put("1", b"1") assert kv1.get("1") == b"1" diff --git a/python/ray/serve/tests/test_metric.py b/python/ray/serve/tests/test_metric.py index f3efb385f..0a04c9858 100644 --- a/python/ray/serve/tests/test_metric.py +++ b/python/ray/serve/tests/test_metric.py @@ -1,74 +1,197 @@ -import numpy as np +import time + import pytest +import requests -import ray -from ray.serve.metric import MetricMonitor +from ray import serve +from ray.serve.metric.client import MetricClient +from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter, + MetricExporterActor) +from ray.serve.metric.types import MetricType, MetricMetadata + +pytestmark = pytest.mark.asyncio -@pytest.fixture(scope="session") -def start_target_actor(ray_instance): - @ray.remote - class Target: - def __init__(self): - self.counter_value = 0 +class MockExporterActor: + def __init__(self): + self.metadata = dict() + self.batches = [] - def get_metrics(self): - self.counter_value += 1 - return { - "latency_list": { - "type": "list", - # Generate 0 to 100 inclusive. - # This means total of 101 items. - "value": np.arange(101).tolist() - }, - "counter": { - "type": "counter", - "value": self.counter_value - } - } + @property + def ingest(self): + return self - def get_counter_value(self): - return self.counter_value - - yield Target.remote() + async def remote(self, metadata, batch): + self.metadata.update(metadata) + self.batches.extend(batch) -def test_metric_gc(ray_instance, start_target_actor): - target_actor = start_target_actor - # this means when new scrapes are invoked, the - metric_monitor = MetricMonitor.remote(gc_window_seconds=0) - ray.get(metric_monitor.add_target.remote(target_actor)) +async def test_client(): + exporter = MockExporterActor() + collector = MetricClient( + exporter, push_interval=2, default_labels={"default": "label"}) + counter = collector.new_counter(name="counter", label_names=("a", "b")) - ray.get(metric_monitor.scrape.remote()) - df = ray.get(metric_monitor._get_dataframe.remote()) - assert len(df) == 102 + with pytest.raises( + ValueError, match="labels doesn't have associated values"): + counter.add() - # Old metric sould be cleared. So only 1 counter + 101 list values left. - ray.get(metric_monitor.scrape.remote()) - df = ray.get(metric_monitor._get_dataframe.remote()) - assert len(df) == 102 + counter = counter.labels(a=1) + counter.labels(b=2).add() + counter.labels(b=3).add(42) -def test_metric_system(ray_instance, start_target_actor): - target_actor = start_target_actor + measure = collector.new_measure("measure") + measure.record(2) - metric_monitor = MetricMonitor.remote() + await collector._push_to_exporter_once() - ray.get(metric_monitor.add_target.remote(target_actor)) - - # Scrape once - ray.get(metric_monitor.scrape.remote()) - - percentiles = [50, 90, 95] - agg_windows_seconds = [60] - result = ray.get( - metric_monitor.collect.remote(percentiles, agg_windows_seconds)) - real_counter_value = ray.get(target_actor.get_counter_value.remote()) - - expected_result = { - "counter": real_counter_value, - "latency_list_50th_perc_60_window": 50.0, - "latency_list_90th_perc_60_window": 90.0, - "latency_list_95th_perc_60_window": 95.0, + assert exporter.metadata == { + "counter": MetricMetadata( + name="counter", + type=MetricType.COUNTER, + description="", + label_names=("a", "b"), + default_labels={"default": "label"}, + ), + "measure": MetricMetadata( + name="measure", + type=MetricType.MEASURE, + description="", + label_names=(), + default_labels={"default": "label"}, + ) } - assert result == expected_result + assert exporter.batches == [("counter", { + "a": "1", + "b": "2" + }, 1), ("counter", { + "a": "1", + "b": "3" + }, 42), ("measure", {}, 2)] + + +async def test_in_memory_exporter(serve_instance): + exporter = MetricExporterActor.remote(InMemoryExporter) + collector = MetricClient( + exporter, push_interval=2, default_labels={"default": "label"}) + + counter = collector.new_counter(name="my_counter", label_names=("a", )) + measure = collector.new_measure( + name="my_measure", description="help", label_names=("ray", "lang")) + measure = measure.labels(lang="C++") + + counter.labels(a="1").add() + measure.labels(ray="").record(0) + measure.labels(ray="").record(42) + + await collector._push_to_exporter_once() + + metric_stored = await exporter.inspect_metrics.remote() + assert metric_stored == [{ + "info": { + "name": "my_counter", + "type": "MetricType.COUNTER", + "default": "label", + "a": "1" + }, + "value": 1 + }, { + "info": { + "name": "my_measure", + "type": "MetricType.MEASURE", + "default": "label", + "lang": "C++", + "ray": "" + }, + "value": 42 + }] + + +async def test_prometheus_exporter(serve_instance): + exporter = MetricExporterActor.remote(PrometheusExporter) + collector = MetricClient( + exporter, push_interval=2, default_labels={"default": "label"}) + + counter = collector.new_counter(name="my_counter", label_names=("a", )) + measure = collector.new_measure( + name="my_measure", description="help", label_names=("ray", "lang")) + measure = measure.labels(lang="C++") + + counter.labels(a="1").add() + measure.labels(ray="").record(0) + measure.labels(ray="").record(42) + + await collector._push_to_exporter_once() + + metric_stored = await exporter.inspect_metrics.remote() + metric_stored = metric_stored.decode() + + fragments = [ + "# HELP my_counter_total", "# TYPE my_counter_total counter", + 'my_counter_total{a="1",default="label"} 1.0', + "# TYPE my_counter_created gauge", + 'my_counter_created{a="1",default="label"}', "# HELP my_measure help", + "# TYPE my_measure gauge", + 'my_measure{default="label",lang="C++",ray=""} 42.0' + ] + + for fragment in fragments: + assert fragment in metric_stored + + +async def test_system_metric_endpoints(serve_instance): + def test_error_counter(flask_request): + 1 / 0 + + serve.create_endpoint("test_metrics", "/measure") + serve.create_backend("m:v1", test_error_counter) + serve.set_traffic("test_metrics", {"m:v1": 1}) + + # Check metrics are exposed under http endpoint + def test_metric_endpoint(): + requests.get("http://127.0.0.1:8000/measure", timeout=5) + in_memory_metric = requests.get( + "http://127.0.0.1:8000/-/metrics", timeout=5).json() + + # We don't want to check the values since this check might be retried. + in_memory_metric_without_values = [] + for m in in_memory_metric: + m.pop("value") + in_memory_metric_without_values.append(m) + + target_metrics = [{ + "info": { + "name": "num_http_requests", + "type": "MetricType.COUNTER", + "route": "/measure" + }, + }, { + "info": { + "name": "num_router_requests", + "type": "MetricType.COUNTER", + "endpoint": "test_metrics" + }, + }, { + "info": { + "name": "backend_error_counter", + "type": "MetricType.COUNTER", + "backend": "m:v1" + }, + }] + + for target in target_metrics: + assert target in in_memory_metric_without_values + + success = False + for _ in range(3): + try: + test_metric_endpoint() + success = True + break + except (AssertionError, requests.ReadTimeout): + # Metrics may not have been propagated yet + time.sleep(2) + print("Metric not correct, retrying...") + if not success: + test_metric_endpoint() diff --git a/python/setup.py b/python/setup.py index 4e5638fd0..c36ad2e95 100644 --- a/python/setup.py +++ b/python/setup.py @@ -83,7 +83,7 @@ if "RAY_USE_NEW_GCS" in os.environ and os.environ["RAY_USE_NEW_GCS"] == "on": extras = { "debug": [], "dashboard": ["requests"], - "serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas", "blist"], + "serve": ["uvicorn", "flask", "blist"], "tune": ["tabulate", "tensorboardX", "pandas"] }