diff --git a/doc/source/serve/advanced.rst b/doc/source/serve/advanced.rst index 91bfd29b4..9e6c7c539 100644 --- a/doc/source/serve/advanced.rst +++ b/doc/source/serve/advanced.rst @@ -245,66 +245,10 @@ That's it. Let's take a look at an example: Monitoring ========== -Ray Serve exposes system metrics like number of requests through Python API -``serve.stat`` and HTTP ``/-/metrics`` API. By default, it uses a custom -structured format for easy parsing and debugging. - -Via python: - -.. code-block:: python - - serve.stat() - """ - [..., { - "info": { - "name": "num_http_requests", - "route": "/-/routes", - "type": "MetricType.COUNTER" - }, - "value": 1 - }, - { - "info": { - "name": "num_http_requests", - "route": "/echo", - "type": "MetricType.COUNTER" - }, - "value": 10 - }, ...] - """ - -Via HTTP: - -.. code-block:: - - curl http://localhost:8000/-/metrics - # Returns the same output as above in JSON format. - -You can also access the result in `Prometheus `_ format, -by setting the ``metric_exporter`` option in :mod:`serve.init `. - -.. code-block:: python - - from ray.serve.metric import PrometheusExporter - serve.init(metric_exporter=PrometheusExporter) - -.. code-block:: - - curl http://localhost:8000/-/metrics - - # HELP backend_request_counter_total Number of queries that have been processed in this replica - # TYPE backend_request_counter_total counter - backend_request_counter_total{backend="echo:v1"} 5.0 - backend_request_counter_total{backend="echo:v2"} 5.0 - ... - -The metric exporter is extensible and you can customize it for your own metric -infrastructure. We are gathering feedback and welcome contribution! Feel free -to submit a github issue to chat with us in #serve channel in `community slack `_. - -Here's an simple example of a dummy exporter that writes metrics to file: - -.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_metric_export.py +Ray Serve exposes important system metrics like the number of successful and +errored requests through the Ray metrics monitoring infrastructure. By default, +the metrics are exposed in Prometheus format on each node. See the +`Ray Monitoring documentation <../ray-metrics.html>`__ for more information. .. _serve-faq: diff --git a/doc/source/serve/package-ref.rst b/doc/source/serve/package-ref.rst index 2d4ca81a7..719e9ef20 100644 --- a/doc/source/serve/package-ref.rst +++ b/doc/source/serve/package-ref.rst @@ -32,9 +32,6 @@ Advanced APIs .. autofunction:: ray.serve.get_handle .. autoclass:: ray.serve.handle.RayServeHandle -``serve.stat`` queries Ray Serve's built-in metric monitor. -.. autofunction:: ray.serve.stat - ``serve.accept_batch`` marks your backend API does accept list of input instead of just single input. diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index 360feda86..178298f66 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -64,15 +64,6 @@ py_test( ) -py_test( - name = "test_metric", - size = "small", - srcs = serve_tests_srcs, - tags = ["exclusive"], - deps = [":serve_lib"], -) - - py_test( name = "test_persistence", size = "small", @@ -201,14 +192,6 @@ py_test( deps = [":serve_lib"] ) -py_test( - name = "snippet_metric_export", - size = "small", - srcs = glob(["examples/doc/*.py"]), - tags = ["exclusive"], - deps = [":serve_lib"] -) - # Disable the deployment tutorial test because it requires # ray start --head in the background. # py_test( diff --git a/python/ray/serve/__init__.py b/python/ray/serve/__init__.py index 91845cca2..f8b86c20a 100644 --- a/python/ray/serve/__init__.py +++ b/python/ray/serve/__init__.py @@ -1,8 +1,8 @@ -from ray.serve.api import ( - init, create_backend, delete_backend, create_endpoint, delete_endpoint, - set_traffic, shadow_traffic, get_handle, stat, update_backend_config, - get_backend_config, accept_batch, list_backends, list_endpoints, - shutdown) # noqa: E402 +from ray.serve.api import (init, create_backend, delete_backend, + create_endpoint, delete_endpoint, set_traffic, + shadow_traffic, get_handle, update_backend_config, + get_backend_config, accept_batch, list_backends, + list_endpoints, shutdown) # noqa: E402 __all__ = [ "init", @@ -13,7 +13,6 @@ __all__ = [ "set_traffic", "shadow_traffic", "get_handle", - "stat", "update_backend_config", "get_backend_config", "accept_batch", diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index c23498a0f..a125430d2 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -8,7 +8,6 @@ from ray.serve.handle import RayServeHandle from ray.serve.utils import (block_until_http_ready, format_actor_name) from ray.serve.exceptions import RayServeException from ray.serve.config import BackendConfig, ReplicaConfig -from ray.serve.metric import InMemoryExporter controller = None @@ -58,7 +57,6 @@ def accept_batch(f): def init(name=None, http_host=DEFAULT_HTTP_HOST, http_port=DEFAULT_HTTP_PORT, - metric_exporter=InMemoryExporter, _http_middlewares=[]): """Initialize or connect to a serve cluster. @@ -75,10 +73,6 @@ def init(name=None, http_host (str): Host for HTTP servers. Default to "0.0.0.0". Serve starts one HTTP server per node in the Ray cluster. http_port (int, List[int]): Port for HTTP server. Default to 8000. - metric_exporter(ExporterInterface): The class aggregates metrics from - all RayServe actors and optionally export them to external - services. Ray Serve has two options built in: InMemoryExporter and - PrometheusExporter """ if name is not None and not isinstance(name, str): raise TypeError("name must be a string.") @@ -100,7 +94,7 @@ def init(name=None, name=controller_name, max_restarts=-1, max_task_retries=-1, - ).remote(name, http_host, http_port, metric_exporter, _http_middlewares) + ).remote(name, http_host, http_port, _http_middlewares) futures = [] for node_id in ray.state.node_ids(): @@ -376,30 +370,3 @@ def get_handle(endpoint_name, missing_ok=False): list(routers.values())[0], endpoint_name, ) - - -@_ensure_connected -def stat(): - """Retrieve metric statistics about ray serve system. - - Returns: - metric_stats(Any): Metric information returned by the metric exporter. - This can vary by exporter. For the default InMemoryExporter, it - returns a list of the following format: - - .. code-block::python - [ - {"info": { - "name": ..., - "type": COUNTER|MEASURE, - "label_key": label_value, - "label_key": label_value, - ... - }, "value": float} - ] - - For PrometheusExporter, it returns the metrics in prometheus format - in plain text. - """ - [metric_exporter] = ray.get(controller.get_metric_exporter.remote()) - return ray.get(metric_exporter.inspect_metrics.remote()) diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 8a43a49ab..736458699 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -17,7 +17,7 @@ from ray.serve.context import FakeFlaskRequest from ray.serve.utils import (parse_request_item, _get_logger, chain_future, unpack_future) from ray.serve.exceptions import RayServeException -from ray.serve.metric import MetricClient +from ray.experimental import metrics from ray.serve.config import BackendConfig from ray.serve.router import Query @@ -113,14 +113,8 @@ def create_backend_worker(func_or_class): else: _callable = func_or_class(*init_args) - controller = serve.api._get_controller() - [metric_exporter] = ray.get( - controller.get_metric_exporter.remote()) - metric_client = MetricClient( - metric_exporter, default_labels={"backend": backend_tag}) self.backend = RayServeWorker(backend_tag, replica_tag, _callable, - backend_config, is_function, - metric_client) + backend_config, is_function) async def handle_request(self, request): return await self.backend.handle_request(request) @@ -157,7 +151,7 @@ class RayServeWorker: """Handles requests with the provided callable.""" def __init__(self, backend_tag, replica_tag, _callable, - backend_config: BackendConfig, is_function, metric_client): + backend_config: BackendConfig, is_function): self.backend_tag = backend_tag self.replica_tag = replica_tag self.callable = _callable @@ -167,24 +161,24 @@ class RayServeWorker: self.batch_queue = BatchQueue(self.config.max_batch_size or 1, self.config.batch_wait_timeout) - self.metric_client = metric_client - self.request_counter = self.metric_client.new_counter( - "backend_request_counter", - description=("Number of queries that have been " - "processed in this replica"), - ) - self.error_counter = self.metric_client.new_counter( - "backend_error_counter", - description=("Number of exceptions that have " - "occurred in the backend"), - ) - self.restart_counter = self.metric_client.new_counter( + self.request_counter = metrics.Count( + "backend_request_counter", ("Number of queries that have been " + "processed in this replica"), + "requests", ["backend"]) + self.error_counter = metrics.Count("backend_error_counter", + ("Number of exceptions that have " + "occurred in the backend"), + "errors", ["backend"]) + self.restart_counter = metrics.Count( "backend_worker_starts", - description=("The number of time this replica workers " - "has been restarted due to failure."), - label_names=("replica_tag", )) + ("The number of time this replica workers " + "has been restarted due to failure."), "restarts", + ["backend", "replica_tag"]) - self.restart_counter.labels(replica_tag=self.replica_tag).add() + self.restart_counter.record(1, { + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) asyncio.get_event_loop().create_task(self.main_loop()) @@ -228,10 +222,10 @@ class RayServeWorker: method_to_call = ensure_async(method_to_call) try: result = await method_to_call(*args, **kwargs) - self.request_counter.add() + self.request_counter.record(1, {"backend": self.backend_tag}) except Exception as e: result = wrap_to_ray_error(e) - self.error_counter.add() + self.error_counter.record(1, {"backend": self.backend_tag}) finally: self._reset_context() @@ -284,7 +278,8 @@ class RayServeWorker: # Flask requests are passed to __call__ as a list arg_list = [arg_list] - self.request_counter.add(batch_size) + self.request_counter.record(batch_size, + {"backend": self.backend_tag}) result_list = await call_method(*arg_list, **kwargs_list) if not isinstance(result_list, Iterable) or isinstance( @@ -309,7 +304,7 @@ class RayServeWorker: return result_list except Exception as e: wrapped_exception = wrap_to_ray_error(e) - self.error_counter.add() + self.error_counter.record(1, {"backend": self.backend_tag}) self._reset_context() return [wrapped_exception for _ in range(batch_size)] diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index 3de75037f..ff950503c 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -4,9 +4,6 @@ SERVE_CONTROLLER_NAME = "SERVE_CONTROLLER_ACTOR" #: Actor name used to register HTTP proxy actor SERVE_PROXY_NAME = "SERVE_PROXY_ACTOR" -#: Actor name used to register metric monitor actor -SERVE_METRIC_SINK_NAME = "SERVE_METRIC_SINK_ACTOR" - #: HTTP Address DEFAULT_HTTP_ADDRESS = "http://127.0.0.1:8000" @@ -19,8 +16,5 @@ DEFAULT_HTTP_PORT = 8000 #: Max concurrency ASYNC_CONCURRENCY = int(1e6) -#: Interval for metric client to push metrics to exporters -METRIC_PUSH_INTERVAL_S = 2 - #: Time to wait for HTTP proxy in `serve.init()` HTTP_PROXY_TIMEOUT = 60 diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 2e2c7adaf..c66f2c53b 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -8,11 +8,9 @@ import ray import ray.cloudpickle as pickle from ray.serve.autoscaling_policy import BasicAutoscalingPolicy from ray.serve.backend_worker import create_backend_worker -from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_PROXY_NAME, - SERVE_METRIC_SINK_NAME) +from ray.serve.constants import ASYNC_CONCURRENCY, SERVE_PROXY_NAME from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store import RayInternalKVStore -from ray.serve.metric.exporter import MetricExporterActor from ray.serve.exceptions import RayServeException from ray.serve.utils import (format_actor_name, get_random_letters, logger, try_schedule_resources_on_nodes, get_all_node_ids) @@ -92,7 +90,7 @@ class ServeController: """ async def __init__(self, instance_name, http_host, http_port, - metric_exporter_class, _http_middlewares): + _http_middlewares): # Unique name of the serve instance managed by this actor. Used to # namespace child actors and checkpoints. self.instance_name = instance_name @@ -131,7 +129,6 @@ class ServeController: # Cached handles to actors in the system. # node_id -> actor_handle self.routers = dict() - self.metric_exporter = None self.http_host = http_host self.http_port = http_port @@ -139,7 +136,6 @@ class ServeController: # If starting the actor for the first time, starts up the other system # components. If recovering, fetches their actor handles. - self._start_metric_exporter(metric_exporter_class) self._start_routers_if_needed() # NOTE(edoakes): unfortunately, we can't completely recover from a @@ -226,25 +222,6 @@ class ServeController: """Called by the router on startup to fetch required state.""" return self.routes - def _start_metric_exporter(self, metric_exporter_class): - """Get the metric exporter belonging to this serve instance. - - If the metric exporter does not already exist, it will be started. - """ - metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME, - self.instance_name) - try: - self.metric_exporter = ray.get_actor(metric_sink_name) - except ValueError: - logger.info("Starting metric exporter with name '{}'".format( - metric_sink_name)) - self.metric_exporter = MetricExporterActor.options( - name=metric_sink_name).remote(metric_exporter_class) - - def get_metric_exporter(self): - """Returns a handle to the metric exporter managed by this actor.""" - return [self.metric_exporter] - def _checkpoint(self): """Checkpoint internal state and write it to the KV store.""" assert self.write_lock.locked() @@ -879,7 +856,6 @@ class ServeController: async with self.write_lock: for router in self.routers.values(): ray.kill(router, no_restart=True) - ray.kill(self.metric_exporter, no_restart=True) for replica_dict in self.workers.values(): for replica in replica_dict.values(): ray.kill(replica, no_restart=True) diff --git a/python/ray/serve/examples/doc/snippet_metric_export.py b/python/ray/serve/examples/doc/snippet_metric_export.py deleted file mode 100644 index 059301955..000000000 --- a/python/ray/serve/examples/doc/snippet_metric_export.py +++ /dev/null @@ -1,55 +0,0 @@ -import json -import time - -import requests - -from ray import serve -from ray.serve.metric.exporter import ExporterInterface - - -class FileExporter(ExporterInterface): - def __init__(self): - self.file = open("/tmp/serve_metrics.log", "w") - - def export(self, metric_metadata, metric_batch): - for metric_item in metric_batch: - data = metric_metadata[metric_item.key].__dict__ - data["labels"] = metric_item.labels - data["values"] = metric_item.value - self.file.write(json.dumps(data)) - self.file.write("\n") - self.file.flush() - - def inspect_metrics(self): - return "Metric is located at /tmp/serve_metrics.log" - - -serve.init(metric_exporter=FileExporter) - - -def echo(flask_request): - return "hello " + flask_request.args.get("name", "serve!") - - -serve.create_backend("hello", echo) -serve.create_endpoint("hello", backend="hello", route="/hello") - -for _ in range(5): - requests.get("http://127.0.0.1:8000/hello").text - time.sleep(0.2) - -print("Retrieving metrics from file...") -with open("/tmp/serve_metrics.log") as metric_log: - for line in metric_log: - print(line) - -# Retrieving metrics from file... -# {"name": "backend_worker_starts", -# "type": 1, -# "description": "The number of time this replica workers ...", -# "label_names": ["replica_tag"], -# "default_labels": {"backend": "hello"}, " -# labels": {"replica_tag": "hello#XwzPQn"}, -# "values": 1 -# } -# ... diff --git a/python/ray/serve/examples/echo_full.py b/python/ray/serve/examples/echo_full.py index 52c80c16a..6c89291bb 100644 --- a/python/ray/serve/examples/echo_full.py +++ b/python/ray/serve/examples/echo_full.py @@ -4,11 +4,10 @@ import requests import ray import ray.serve as serve -from ray.serve.metric import PrometheusExporter # initialize ray serve system. ray.init(num_cpus=10) -serve.init(metric_exporter=PrometheusExporter) +serve.init() # a backend can be a function or class. @@ -52,6 +51,3 @@ for _ in range(10): # You can also change number of replicas for each backend independently. serve.update_backend_config("echo:v1", {"num_replicas": 2}) serve.update_backend_config("echo:v2", {"num_replicas": 2}) - -# As well as retrieving relevant system metrics -print(serve.stat().decode()) diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 775938e37..243b79a26 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -8,7 +8,7 @@ import ray from ray.exceptions import RayTaskError from ray import serve from ray.serve.context import TaskContext -from ray.serve.metric import MetricClient +from ray.experimental import metrics from ray.serve.request_params import RequestMetadata from ray.serve.http_util import Response from ray.serve.router import Router @@ -33,14 +33,9 @@ class HTTPProxy: self.route_table = await controller.get_router_config.remote() - # The exporter is required to return results for /-/metrics endpoint. - [self.metric_exporter] = await controller.get_metric_exporter.remote() - - self.metric_client = MetricClient(self.metric_exporter) - self.request_counter = self.metric_client.new_counter( - "num_http_requests", - description="The number of requests processed", - label_names=("route", )) + self.request_counter = metrics.Count( + "num_http_requests", "The number of HTTP requests processed", + "requests", ["route"]) self.router = Router() await self.router.setup(name, instance_name) @@ -71,9 +66,6 @@ class HTTPProxy: current_path = scope["path"] if current_path == "/-/routes": await Response(self.route_table).send(scope, receive, send) - elif current_path == "/-/metrics": - metric_info = await self.metric_exporter.inspect_metrics.remote() - await Response(metric_info).send(scope, receive, send) else: await Response( "System path {} not found".format(current_path), @@ -90,7 +82,7 @@ class HTTPProxy: assert scope["type"] == "http" current_path = scope["path"] - self.request_counter.labels(route=current_path).add() + self.request_counter.record(1, {"route": current_path}) if current_path.startswith("/-/"): await self._handle_system_request(scope, receive, send) diff --git a/python/ray/serve/metric/__init__.py b/python/ray/serve/metric/__init__.py deleted file mode 100644 index fe2dfcf3f..000000000 --- a/python/ray/serve/metric/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from ray.serve.metric.client import MetricClient -from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter) - -__all__ = ["MetricClient", "InMemoryExporter", "PrometheusExporter"] diff --git a/python/ray/serve/metric/client.py b/python/ray/serve/metric/client.py deleted file mode 100644 index b855da62e..000000000 --- a/python/ray/serve/metric/client.py +++ /dev/null @@ -1,136 +0,0 @@ -import asyncio -from typing import Dict, Optional, Tuple, List - -from ray.serve.metric.types import (MetricType, convert_event_type_to_class, - MetricMetadata, MetricRecord) -from ray.serve.utils import _get_logger -from ray.serve.constants import METRIC_PUSH_INTERVAL_S - -logger = _get_logger() - - -class MetricClient: - def __init__( - self, - metric_exporter_actor, - push_interval: float = METRIC_PUSH_INTERVAL_S, - default_labels: Optional[Dict[str, str]] = None, - ): - """Initialize a client to push metrics to the exporter actor. - - Args: - metric_exporter_actor: The actor to push metrics to. - default_labels(dict): The set of labels to apply for all metrics - created by this actor. For example, {"source": "worker"}. - """ - self.exporter = metric_exporter_actor - self.default_labels = default_labels or dict() - - self.registered_metrics: Dict[int, MetricMetadata] = dict() - self.metric_records: List[MetricRecord] = [] - - assert asyncio.get_event_loop().is_running() - self.push_task = asyncio.get_event_loop().create_task( - self.push_to_exporter_forever(push_interval)) - logger.debug("Initialized client") - - def new_counter(self, - name: str, - *, - description: Optional[str] = "", - label_names: Optional[Tuple[str]] = ()): - """Create a new counter. - - Counters are used to capture changes in running sums. An essential - property of Counter instruments is that two events add(m) and add(n) - are semantically equivalent to one event add(m+n). This property means - that Counter events can be combined. - - Args: - name(str): The unique name for the counter. - description(Optional[str]): The description for the counter. - label_names(Optional[Tuple[str]]): The set of label names to be - added when recording the metrics. - - Usage: - >>> client = MetricClient(...) - >>> counter = client.new_counter( - "http_counter", - description="This is a simple counter for HTTP status", - label_names=("route", "status_code")) - >>> counter.labels(route="/hi", status_code=200).add() - """ - return self._new_metric(name, MetricType.COUNTER, description, - label_names) - - def new_measure(self, - name, - *, - description: Optional[str] = "", - label_names: Optional[Tuple[str]] = ()): - """Create a new measure. - - Measure instruments are independent. They cannot be combined as with - counters. Measures can be aggregated after recording to compute - statistics about the distribution along selected dimension. - - Args: - name(str): The unique name for the measure. - description(Optional[str]): The description for the measure. - label_names(Optional[Tuple[str]]): The set of label names to be - added when recording the metrics. - - Usage: - >>> client = MetricClient(...) - >>> measure = client.new_measure( - "latency_measure", - description="This is a simple measure for latency in ms", - label_names=("route")) - >>> measure.labels(route="/hi").record(42) - """ - return self._new_metric(name, MetricType.MEASURE, description, - label_names) - - def _new_metric( - self, - name, - metric_type: MetricType, - description: str, - label_names: Tuple[str] = (), - ): - if not isinstance(label_names, tuple): - raise ValueError("label_names need to be a tuple, it is {}".format( - type(label_names))) - - metric_metadata = MetricMetadata( - name=name, - type=metric_type, - description=description, - label_names=label_names, - default_labels=self.default_labels.copy(), - ) - - key = hash(metric_metadata) - if key in self.registered_metrics: - raise ValueError("Metric named {} and associated metadata " - "is already registered.".format(name)) - self.registered_metrics[key] = metric_metadata - - metric_class = convert_event_type_to_class(metric_type) - metric_object = metric_class( - client=self, key=key, label_names=label_names) - - return metric_object - - async def _push_to_exporter_once(self): - if len(self.metric_records) == 0: - return - - old_batch, self.metric_records = self.metric_records, [] - logger.debug("Pushing metric batch {}".format(old_batch)) - await self.exporter.ingest.remote(self.registered_metrics, old_batch) - - async def push_to_exporter_forever(self, interval_s): - while True: - await self._push_to_exporter_once() - await asyncio.sleep(interval_s) diff --git a/python/ray/serve/metric/exporter.py b/python/ray/serve/metric/exporter.py deleted file mode 100644 index 23ee8f74f..000000000 --- a/python/ray/serve/metric/exporter.py +++ /dev/null @@ -1,157 +0,0 @@ -from typing import Dict, Any -from collections import Counter, namedtuple - -import ray -from ray.serve.metric.types import MetricType, MetricMetadata, MetricBatch -from ray.serve.utils import _get_logger - -logger = _get_logger() - - -def make_metric_namedtuple(metric_metadata: MetricMetadata, - record: MetricBatch): - fields = ["name", "type"] - fields += list(metric_metadata.default_labels.keys()) - fields += list(record.labels.keys()) - - merged_labels = metric_metadata.default_labels.copy() - merged_labels.update(record.labels) - - tuple_type = namedtuple(metric_metadata.name, fields) - return tuple_type( - name=metric_metadata.name, type=metric_metadata.type, **merged_labels) - - -class ExporterInterface: - def export(self, metric_metadata: Dict[str, MetricMetadata], - metric_batch: MetricBatch): - raise NotImplementedError( - "This method should be implemented by subclass.") - - def inspect_metrics(self): - raise NotImplementedError( - "This method should be implemented by subclass.") - - -@ray.remote(num_cpus=0) -class MetricExporterActor: - """Aggregate metrics from all RayServe actors and export them. - - Metrics are aggregated pushed from other actors in the system to - this actor. They can then be stored or pushed to an external monitoring - system. - """ - - def __init__(self, exporter_class: ExporterInterface): - # TODO(simon): Add support for initializer args and kwargs. - self.exporter = exporter_class() - - # Stores the all recorded MetricMetadata (keyed by hash key) - # This field is tolerant to failures since each client will always push - # an updated copy of the metadata for each ingest call. - self.metric_metadata: Dict[int, MetricMetadata] = dict() - - logger.debug("Initialized with metric exporter of type {}".format( - type(self.exporter))) - - def ingest(self, metric_metadata: Dict[int, MetricMetadata], - batch: MetricBatch): - self.metric_metadata.update(metric_metadata) - self.exporter.export(self.metric_metadata, batch) - - def inspect_metrics(self): - return self.exporter.inspect_metrics() - - -class InMemoryExporter(ExporterInterface): - def __init__(self): - # Keep track of counters - self.counters: Counter[namedtuple, float] = Counter() - # Keep track of latest observation of measures - self.latest_measures: Dict[namedtuple, float] = dict() - - def export(self, metric_metadata: Dict[int, MetricMetadata], - metric_batch: MetricBatch): - for record in metric_batch: - metadata = metric_metadata[record.key] - metric_key = make_metric_namedtuple(metadata, record) - if metadata.type == MetricType.COUNTER: - self.counters[metric_key] += record.value - elif metadata.type == MetricType.MEASURE: - self.latest_measures[metric_key] = record.value - else: - raise RuntimeError("Unrecognized metric type {}".format( - metadata.type)) - - def inspect_metrics(self): - items = [] - metrics_to_collect = {**self.counters, **self.latest_measures} - for info_tuple, value in metrics_to_collect.items(): - # Represent the metric type as a human readable name - info_tuple = info_tuple._replace(type=str(info_tuple.type)) - # _asdict returns OrderedDict, we just need to return regular dict - items.append({"info": dict(info_tuple._asdict()), "value": value}) - return items - - -class PrometheusExporter(ExporterInterface): - def __init__(self): - super().__init__() - from prometheus_client import (CollectorRegistry, Counter, Gauge, - generate_latest) - self.metric_type_to_prom_type = { - MetricType.COUNTER: Counter, - MetricType.MEASURE: Gauge - } - self.prom_generate_latest = generate_latest - - self.metrics_cache: Dict[str, Any] = dict() - self.default_labels = dict() - self.registry = CollectorRegistry() - - def inspect_metrics(self): - return self.prom_generate_latest(self.registry) - - def export(self, metric_metadata, metric_batch): - self._process_metric_metadata(metric_metadata) - self._process_batch(metric_metadata, metric_batch) - - def _process_metric_metadata(self, metric_metadata): - for key, metric_metadata in metric_metadata.items(): - name = metric_metadata.name - - if name not in self.metrics_cache: - constructor = self.metric_type_to_prom_type[ - metric_metadata.type] - - default_labels = metric_metadata.default_labels - label_names = tuple( - default_labels.keys()) + metric_metadata.label_names - metric_object = constructor( - name, - metric_metadata.description, - labelnames=label_names, - registry=self.registry, - ) - - self.metrics_cache[name] = (metric_object, - metric_metadata.type) - - self.default_labels[key] = metric_metadata.default_labels - - def _process_batch(self, metric_metadata, batch): - for key, labels, value in batch: - if key not in metric_metadata: - continue - - name = metric_metadata[key].name - metric, metric_type = self.metrics_cache[name] - default_labels = self.default_labels[key] - merged_labels = {**default_labels, **labels} - if metric_type == MetricType.COUNTER: - metric.labels(**merged_labels).inc(value) - elif metric_type == MetricType.MEASURE: - metric.labels(**merged_labels).set(value) - else: - raise RuntimeError( - "Unrecognized metric type {}".format(metric_type)) diff --git a/python/ray/serve/metric/types.py b/python/ray/serve/metric/types.py deleted file mode 100644 index 93b3fb6ce..000000000 --- a/python/ray/serve/metric/types.py +++ /dev/null @@ -1,112 +0,0 @@ -import enum -from typing import Tuple, List, Dict, Optional -from collections import namedtuple - - -class MetricType(enum.IntEnum): - COUNTER = 1 - MEASURE = 2 - - -# We split the information about a metric into two parts: the MetricMetadata -# and MetricRecord. Metadata is declared at creation time and include names -# and the label names. The label values will be supplied at observation time. -class MetricMetadata: - def __init__(self, name: str, type: MetricType, description: str, - label_names: Tuple[str], default_labels: Dict[str, str]): - self.name = name - self.type = type - self.description = description - self.label_names = label_names - self.default_labels = default_labels - - def __eq__(self, value: "MetricMetadata"): - if not isinstance(value, MetricMetadata): - return False - - return (self.name == value.name and self.type == self.type - and self.description == value.description - and self.label_names == value.label_names - and self.default_labels == value.default_labels) - - def __hash__(self): - return hash((self.name, self.type, self.description, self.label_names, - frozenset(self.default_labels.items()))) - - -MetricRecord = namedtuple("MetricRecord", ["key", "labels", "value"]) -MetricBatch = List[MetricRecord] - - -class BaseMetric: - def __init__(self, - client, - key: int, - label_names: Tuple[str], - dynamic_labels: Optional[Dict[str, str]] = None): - """Represent a single metric stream - - Args: - client(MetricClient): The client object to push update to. - key(int): The unique hash key of the metric. - label_names(Tuple[str]): The names of the labels that must be set - before an observation. - dynamic_labels(Optional[Dict[str,str]]): A partially preset labels. - This fields make it possible to chain label calls together: - ``metric.labels(a=b).labels(c=d)``. - """ - self.client = client - self.key = key - self.dynamic_labels = dynamic_labels or dict() - self.label_names = label_names - - def check_all_labels_fulfilled_or_error(self): - unfulfilled = set(self.label_names) - set(self.dynamic_labels.keys()) - if len(unfulfilled) != 0: - raise ValueError("The following labels doesn't have associated " - "values: {}".format(unfulfilled)) - - def labels(self, **kwargs): - """Apply dynamic label to the metric - - Usage: - >>> metric = BaseMetric(..., label_names=("a", "b")) - >>> metric.labels(a=1, b=2) - >>> metric.labels(a=1).labels(b=2) # Equivalent - """ - new_dynamic_labels = self.dynamic_labels.copy() - for k, v in kwargs.items(): - if k not in self.label_names: - raise ValueError( - "Label {} was not part of registered " - "label names. Allowed label names are {}.".format( - k, self.label_names)) - new_dynamic_labels[k] = str(v) - return type(self)(self.client, self.key, self.label_names, - new_dynamic_labels) - - -# The metric types are inspired by OpenTelemetry spec: -# https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/api.md#three-kinds-of-instrument -class Counter(BaseMetric): - def add(self, increment=1): - """Increment the counter by some amount. Default is 1""" - self.check_all_labels_fulfilled_or_error() - self.client.metric_records.append( - MetricRecord(self.key, self.dynamic_labels, increment)) - - -class Measure(BaseMetric): - def record(self, value): - """Record the given value for the measure""" - self.check_all_labels_fulfilled_or_error() - self.client.metric_records.append( - MetricRecord(self.key, self.dynamic_labels, value)) - - -def convert_event_type_to_class(event_type: MetricType) -> BaseMetric: - if event_type == MetricType.COUNTER: - return Counter - if event_type == MetricType.MEASURE: - return Measure - raise RuntimeError("Unknown event type {}".format(event_type)) diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index fd6fdfd02..501e04453 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -9,7 +9,7 @@ from ray.exceptions import RayTaskError import ray from ray import serve -from ray.serve.metric import MetricClient +from ray.experimental import metrics from ray.serve.endpoint_policy import RandomEndpointPolicy from ray.serve.utils import logger, chain_future @@ -145,24 +145,19 @@ class Router: for backend, backend_config in backend_configs.items(): await self.set_backend_config(backend, backend_config) - # -- Metric Registration -- # - [metric_exporter] = ray.get( - self.controller.get_metric_exporter.remote()) - self.metric_client = MetricClient(metric_exporter) - self.num_router_requests = self.metric_client.new_counter( + # -- Metrics Registration -- # + self.num_router_requests = metrics.Count( "num_router_requests", - description="Number of requests processed by the router.", - label_names=("endpoint", )) - self.num_error_endpoint_request = self.metric_client.new_counter( + "Number of requests processed by the router.", "requests", + ["endpoint"]) + self.num_error_endpoint_requests = metrics.Count( "num_error_endpoint_requests", - description=("Number of requests errored when getting result " - "for endpoint."), - label_names=("endpoint", )) - self.num_error_backend_request = self.metric_client.new_counter( + ("Number of requests that errored when getting results " + "for the endpoint."), "requests", ["endpoint"]) + self.num_error_backend_requests = metrics.Count( "num_error_backend_requests", - description=("Number of requests errored when getting result " - "from backend."), - label_names=("backend", )) + ("Number of requests that errored when getting result " + "from the backend."), "requests", ["backend"]) asyncio.get_event_loop().create_task(self.report_queue_lengths()) @@ -170,7 +165,7 @@ class Router: **request_kwargs): endpoint = request_meta.endpoint logger.debug("Received a request for endpoint {}".format(endpoint)) - self.num_router_requests.labels(endpoint=endpoint).add() + self.num_router_requests.record(1, {"endpoint": endpoint}) request_context = request_meta.request_context query = Query( @@ -187,7 +182,7 @@ class Router: try: result = await query.async_future except RayTaskError as e: - self.num_error_endpoint_request.labels(endpoint=endpoint).add() + self.num_error_endpoint_requests.record(1, {"endpoint": endpoint}) result = e return result @@ -311,7 +306,7 @@ class Router: else: result = await object_ref except RayTaskError as error: - self.num_error_backend_request.labels(backend=backend).add() + self.num_error_backend_requests.record(1, {"backend": backend}) result = error self.queries_counter[backend][backend_replica_tag] -= 1 await self.mark_worker_idle(backend, backend_replica_tag) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index f023fd65a..c1b141724 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -1,5 +1,6 @@ -import time import asyncio +from collections import defaultdict +import time import pytest import requests @@ -597,8 +598,7 @@ def test_shutdown(serve_instance): def check_dead(): for actor_name in [ - constants.SERVE_CONTROLLER_NAME, constants.SERVE_PROXY_NAME, - constants.SERVE_METRIC_SINK_NAME + constants.SERVE_CONTROLLER_NAME, constants.SERVE_PROXY_NAME ]: try: ray.get_actor(format_actor_name(actor_name, instance_name)) @@ -611,16 +611,39 @@ def test_shutdown(serve_instance): def test_shadow_traffic(serve_instance): + @ray.remote + class RequestCounter: + def __init__(self): + self.requests = defaultdict(int) + + def record(self, backend): + self.requests[backend] += 1 + + def get(self, backend): + return self.requests[backend] + + counter = RequestCounter.remote() + def f(): + ray.get(counter.record.remote("backend1")) return "hello" - def f_shadow(): + def f_shadow_1(): + ray.get(counter.record.remote("backend2")) + return "oops" + + def f_shadow_2(): + ray.get(counter.record.remote("backend3")) + return "oops" + + def f_shadow_3(): + ray.get(counter.record.remote("backend4")) return "oops" serve.create_backend("backend1", f) - serve.create_backend("backend2", f_shadow) - serve.create_backend("backend3", f_shadow) - serve.create_backend("backend4", f_shadow) + serve.create_backend("backend2", f_shadow_1) + serve.create_backend("backend3", f_shadow_2) + serve.create_backend("backend4", f_shadow_3) serve.create_endpoint("endpoint", backend="backend1", route="/api") serve.shadow_traffic("endpoint", "backend2", 1.0) @@ -634,12 +657,7 @@ def test_shadow_traffic(serve_instance): print("Finished 100 requests in {}s.".format(time.time() - start)) def requests_to_backend(backend): - for entry in serve.stat(): - if entry["info"]["name"] == "backend_request_counter": - if entry["info"]["backend"] == backend: - return entry["value"] - - return 0 + return ray.get(counter.get.remote(backend)) def check_requests(): return all([ diff --git a/python/ray/serve/tests/test_metric.py b/python/ray/serve/tests/test_metric.py deleted file mode 100644 index 0aea41fd2..000000000 --- a/python/ray/serve/tests/test_metric.py +++ /dev/null @@ -1,222 +0,0 @@ -import time - -import pytest -import requests - -from ray import serve -from ray.serve.metric.client import MetricClient -from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter, - MetricExporterActor) -from ray.serve.metric.types import MetricType, MetricMetadata - -pytestmark = pytest.mark.asyncio - - -class MockExporterActor: - def __init__(self): - self.metadata = dict() - self.batches = [] - - @property - def ingest(self): - return self - - async def remote(self, metadata, batch): - self.metadata.update(metadata) - self.batches.extend(batch) - - -async def test_client(): - exporter = MockExporterActor() - collector = MetricClient( - exporter, push_interval=2, default_labels={"default": "label"}) - counter = collector.new_counter(name="counter", label_names=("a", "b")) - - with pytest.raises( - ValueError, match="labels doesn't have associated values"): - counter.add() - - counter = counter.labels(a=1) - - counter.labels(b=2).add() - counter.labels(b=3).add(42) - - measure = collector.new_measure("measure") - measure.record(2) - - await collector._push_to_exporter_once() - - assert MetricMetadata( - name="counter", - type=MetricType.COUNTER, - description="", - label_names=("a", "b"), - default_labels={"default": "label"}, - ) in exporter.metadata.values() - assert MetricMetadata( - name="measure", - type=MetricType.MEASURE, - description="", - label_names=(), - default_labels={"default": "label"}, - ) in exporter.metadata.values() - - metric_values = [item.value for item in exporter.batches] - assert set(metric_values) == {1, 42, 2} - - metric_labels = [ - frozenset(item.labels.items()) for item in exporter.batches - ] - assert frozenset(dict(a="1", b="2").items()) in metric_labels - assert frozenset(dict(a="1", b="3").items()) in metric_labels - - -async def test_in_memory_exporter(serve_instance): - exporter = MetricExporterActor.remote(InMemoryExporter) - collector = MetricClient( - exporter, push_interval=2, default_labels={"default": "label"}) - - counter = collector.new_counter(name="my_counter", label_names=("a", )) - measure = collector.new_measure( - name="my_measure", description="help", label_names=("ray", "lang")) - measure = measure.labels(lang="C++") - - counter.labels(a="1").add() - measure.labels(ray="").record(0) - measure.labels(ray="").record(42) - - await collector._push_to_exporter_once() - - metric_stored = await exporter.inspect_metrics.remote() - assert metric_stored == [{ - "info": { - "name": "my_counter", - "type": "MetricType.COUNTER", - "default": "label", - "a": "1" - }, - "value": 1 - }, { - "info": { - "name": "my_measure", - "type": "MetricType.MEASURE", - "default": "label", - "lang": "C++", - "ray": "" - }, - "value": 42 - }] - - -async def test_prometheus_exporter(serve_instance): - exporter = MetricExporterActor.remote(PrometheusExporter) - collector = MetricClient( - exporter, push_interval=2, default_labels={"default": "label"}) - - counter = collector.new_counter(name="my_counter", label_names=("a", )) - measure = collector.new_measure( - name="my_measure", description="help", label_names=("ray", "lang")) - measure = measure.labels(lang="C++") - - counter.labels(a="1").add() - measure.labels(ray="").record(0) - measure.labels(ray="").record(42) - - await collector._push_to_exporter_once() - - metric_stored = await exporter.inspect_metrics.remote() - metric_stored = metric_stored.decode() - - fragments = [ - "# HELP my_counter_total", "# TYPE my_counter_total counter", - 'my_counter_total{a="1",default="label"} 1.0', - "# TYPE my_counter_created gauge", - 'my_counter_created{a="1",default="label"}', "# HELP my_measure help", - "# TYPE my_measure gauge", - 'my_measure{default="label",lang="C++",ray=""} 42.0' - ] - - for fragment in fragments: - assert fragment in metric_stored - - -async def test_prometheus_conflicting_labels(serve_instance): - exporter = MetricExporterActor.remote(PrometheusExporter) - - collector_a = MetricClient( - exporter, push_interval=2, default_labels={"default": "a"}) - collector_b = MetricClient( - exporter, push_interval=2, default_labels={"default": "b"}) - - for collector in [collector_a, collector_b]: - counter = collector.new_counter("num") - counter.add() - await collector._push_to_exporter_once() - - metric_stored = (await exporter.inspect_metrics.remote()).decode() - - fragments = ['num_total{default="a"}', 'num_total{default="b"}'] - for fragment in fragments: - assert fragment in metric_stored - - -async def test_system_metric_endpoints(serve_instance): - def test_error_counter(flask_request): - 1 / 0 - - serve.create_backend("m:v1", test_error_counter) - serve.create_endpoint("test_metrics", backend="m:v1", route="/measure") - serve.set_traffic("test_metrics", {"m:v1": 1}) - - # Check metrics are exposed under http endpoint - def test_metric_endpoint(): - requests.get("http://127.0.0.1:8000/measure", timeout=5) - in_memory_metric = requests.get( - "http://127.0.0.1:8000/-/metrics", timeout=5).json() - - # We don't want to check the values since this check might be retried. - in_memory_metric_without_values = [] - for m in in_memory_metric: - m.pop("value") - in_memory_metric_without_values.append(m) - - target_metrics = [{ - "info": { - "name": "num_http_requests", - "type": "MetricType.COUNTER", - "route": "/measure" - }, - }, { - "info": { - "name": "num_router_requests", - "type": "MetricType.COUNTER", - "endpoint": "test_metrics" - }, - }, { - "info": { - "name": "backend_error_counter", - "type": "MetricType.COUNTER", - "backend": "m:v1" - }, - }] - - for target in target_metrics: - assert target in in_memory_metric_without_values - - success = False - for _ in range(3): - try: - test_metric_endpoint() - success = True - break - except (AssertionError, requests.ReadTimeout): - # Metrics may not have been propagated yet - time.sleep(2) - print("Metric not correct, retrying...") - if not success: - test_metric_endpoint() - - -if __name__ == "__main__": - import sys - sys.exit(pytest.main(["-v", "-s", __file__]))