diff --git a/doc/source/package-ref.rst b/doc/source/package-ref.rst index 5fe98ccb7..cc3bec05f 100644 --- a/doc/source/package-ref.rst +++ b/doc/source/package-ref.rst @@ -167,6 +167,35 @@ get_current_placement_group .. autofunction:: ray.util.placement_group.get_current_placement_group +.. _custom-metric-api-ref: + +Custom Metrics APIs +------------------- + +Metric +~~~~~~ + +.. autoclass:: ray.util.metrics.Metric + :members: + +Count +~~~~~ + +.. autoclass:: ray.util.metrics.Count + :members: + +Gauge +~~~~~ + +.. autoclass:: ray.util.metrics.Gauge + :members: + +Histogram +~~~~~~~~~ + +.. autoclass:: ray.util.metrics.Histogram + :members: + Experimental APIs ----------------- diff --git a/doc/source/ray-metrics.rst b/doc/source/ray-metrics.rst index c6137ac38..cac3308cf 100644 --- a/doc/source/ray-metrics.rst +++ b/doc/source/ray-metrics.rst @@ -1,9 +1,10 @@ -Ray Monitoring with Prometheus -============================== +Ray Monitoring +============== To help monitoring Ray applications, Ray - Collects Ray's pre-selected system level metrics. - Exposes metrics in a Prometheus format. We'll call the endpoint to access these metrics a Prometheus endpoint. +- Support custom metrics APIs that resemble Prometheus `metric types `_. This page describes how to acces these metrics using Prometheus. @@ -151,3 +152,11 @@ Now, modify a Prometheus config to scrape the file for service discovery. - '/tmp/ray/prom_metrics_service_discovery.json' Prometheus will automatically detect that the file contents are changing and update addresses it scrapes to based on the service discovery file generated by Ray. + +Custom Metrics +-------------- +Ray supports custom metrics APIs to enable developers to have visibility to their applications. + +It current supports 3 metric types. All metric types have the same definition as `Prometheus metric types `_. + +:ref:`Custom Metrics APIs Package Reference ` \ No newline at end of file diff --git a/python/ray/experimental/metrics.py b/python/ray/experimental/metrics.py deleted file mode 100644 index db7079828..000000000 --- a/python/ray/experimental/metrics.py +++ /dev/null @@ -1,38 +0,0 @@ -from ray._raylet import ( - Count, - Histogram, - Gauge, - Sum, -) # noqa: E402 -"""Metric/Stats module for worker. - -This module is responsible for providing four classes mapping from stats of -cpp. - -How to use: - For Count, Gauge and Sum, we may define a metric like this following: - gauge = Gauge( - 'ray.worker.metric', - 'description', - 'unit', - ['tagk1', 'tagk2']). - The last parameter is default tag map. You can use gauge.record(1.0) with - default tags or gauge.record(1.0, {'tagk1', 'tagv1'}) that means the tagk1 - is updating in tagv1. - - It's addtional boundaries to Histogram measurement, - histogram = Histogram( - 'ray.worker.histogram1', - 'a', 'b', [1.0, 2.0], - ['tagk1']) - - Recommended metric name pattern : ray.{component_name}.{module_name}, and - name format must be in [0-9a-zA-Z]. -""" - -__all__ = [ - "Count", - "Histogram", - "Gauge", - "Sum", -] diff --git a/python/ray/includes/metric.pxi b/python/ray/includes/metric.pxi index 443a476ad..ba89ba840 100644 --- a/python/ray/includes/metric.pxi +++ b/python/ray/includes/metric.pxi @@ -50,7 +50,8 @@ cdef class Metric: # Default tags will be exported if it's empty map. if tags: for tag_k, tag_v in tags.items(): - c_tags[tag_k.encode("ascii")] = tag_v.encode("ascii") + if tag_v is not None: + c_tags[tag_k.encode("ascii")] = tag_v.encode("ascii") c_value = value with nogil: self.metric.get().Record(c_value, c_tags) diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index c8e188f15..d686c30f5 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -12,7 +12,7 @@ from ray.async_compat import sync_to_async from ray.serve.utils import (parse_request_item, _get_logger, chain_future, unpack_future) from ray.serve.exceptions import RayServeException -from ray.experimental import metrics +from ray.util import metrics from ray.serve.config import BackendConfig from ray.serve.router import Query from ray.serve.constants import DEFAULT_LATENCY_BUCKET_MS @@ -159,43 +159,72 @@ class RayServeWorker: self.num_ongoing_requests = 0 self.request_counter = metrics.Count( - "backend_request_counter", ("Number of queries that have been " - "processed in this replica"), - "requests", ["backend"]) - self.error_counter = metrics.Count("backend_error_counter", - ("Number of exceptions that have " - "occurred in the backend"), - "errors", ["backend"]) + "backend_request_counter", + description=("Number of queries that have been " + "processed in this replica"), + tag_keys=("backend", )) + self.request_counter.set_default_tags({"backend": self.backend_tag}) + + self.error_counter = metrics.Count( + "backend_error_counter", + description=("Number of exceptions that have " + "occurred in the backend"), + tag_keys=("backend", )) + self.error_counter.set_default_tags({"backend": self.backend_tag}) + self.restart_counter = metrics.Count( "backend_worker_starts", - ("The number of time this replica workers " - "has been restarted due to failure."), "restarts", - ["backend", "replica_tag"]) - - self.queuing_latency_tracker = metrics.Histogram( - "backend_queuing_latency_ms", - ("The latency for queries waiting in the replica's queue " - "waiting to be processed or batched."), "ms", - DEFAULT_LATENCY_BUCKET_MS, ["backend", "replica_tag"]) - self.processing_latency_tracker = metrics.Histogram( - "backend_processing_latency_ms", - "The latency for queries to be processed", "ms", - DEFAULT_LATENCY_BUCKET_MS, - ["backend", "replica_tag", "batch_size"]) - self.num_queued_items = metrics.Gauge( - "replica_queued_queries", - "Current number of queries queued in the the backend replicas", - "requests", ["backend", "replica_tag"]) - self.num_processing_items = metrics.Gauge( - "replica_processing_queries", - "Current number of queries being processed", "requests", - ["backend", "replica_tag"]) - - self.restart_counter.record(1, { + description=("The number of time this replica workers " + "has been restarted due to failure."), + tag_keys=("backend", "replica_tag")) + self.restart_counter.set_default_tags({ "backend": self.backend_tag, "replica_tag": self.replica_tag }) + self.queuing_latency_tracker = metrics.Histogram( + "backend_queuing_latency_ms", + description=( + "The latency for queries waiting in the replica's queue " + "waiting to be processed or batched."), + boundaries=DEFAULT_LATENCY_BUCKET_MS, + tag_keys=("backend", "replica_tag")) + self.queuing_latency_tracker.set_default_tags({ + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) + + self.processing_latency_tracker = metrics.Histogram( + "backend_processing_latency_ms", + description="The latency for queries to be processed", + boundaries=DEFAULT_LATENCY_BUCKET_MS, + tag_keys=("backend", "replica_tag", "batch_size")) + self.processing_latency_tracker.set_default_tags({ + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) + + self.num_queued_items = metrics.Gauge( + "replica_queued_queries", + description=("Current number of queries queued in the " + "the backend replicas"), + tag_keys=("backend", "replica_tag")) + self.num_queued_items.set_default_tags({ + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) + + self.num_processing_items = metrics.Gauge( + "replica_processing_queries", + description="Current number of queries being processed", + tag_keys=("backend", "replica_tag")) + self.num_processing_items.set_default_tags({ + "backend": self.backend_tag, + "replica_tag": self.replica_tag + }) + + self.restart_counter.record(1) + asyncio.get_event_loop().create_task(self.main_loop()) def get_runner_method(self, request_item: Query) -> Callable: @@ -216,17 +245,13 @@ class RayServeWorker: start = time.time() try: result = await method_to_call(arg) - self.request_counter.record(1, {"backend": self.backend_tag}) + self.request_counter.record(1) except Exception as e: result = wrap_to_ray_error(e) - self.error_counter.record(1, {"backend": self.backend_tag}) + self.error_counter.record(1) self.processing_latency_tracker.record( - (time.time() - start) * 1000, { - "backend": self.backend_tag, - "replica": self.replica_tag, - "batch_size": "1" - }) + (time.time() - start) * 1000, tags={"batch_size": "1"}) return result @@ -248,8 +273,7 @@ class RayServeWorker: "Please only send the same type of requests in batching " "mode.") - self.request_counter.record(batch_size, - {"backend": self.backend_tag}) + self.request_counter.record(batch_size) call_method = ensure_async(call_methods.pop()) result_list = await call_method(args) @@ -274,15 +298,12 @@ class RayServeWorker: raise RayServeException(error_message) except Exception as e: wrapped_exception = wrap_to_ray_error(e) - self.error_counter.record(1, {"backend": self.backend_tag}) + self.error_counter.record(1) result_list = [wrapped_exception for _ in range(batch_size)] self.processing_latency_tracker.record( - (time.time() - timing_start) * 1000, { - "backend": self.backend_tag, - "replica_tag": self.replica_tag, - "batch_size": str(batch_size) - }) + (time.time() - timing_start) * 1000, + tags={"batch_size": str(batch_size)}) return result_list @@ -294,21 +315,12 @@ class RayServeWorker: batch = await self.batch_queue.wait_for_batch() # Record metrics - self.num_queued_items.record(self.batch_queue.qsize(), { - "backend": self.backend_tag, - "replica_tag": self.replica_tag - }) - self.num_processing_items.record( - self.num_ongoing_requests - self.batch_queue.qsize(), { - "backend": self.backend_tag, - "replica_tag": self.replica_tag - }) + self.num_queued_items.record(self.batch_queue.qsize()) + self.num_processing_items.record(self.num_ongoing_requests - + self.batch_queue.qsize()) for query in batch: queuing_time = (time.time() - query.tick_enter_replica) * 1000 - self.queuing_latency_tracker.record(queuing_time, { - "backend": self.backend_tag, - "replica_tag": self.replica_tag - }) + self.queuing_latency_tracker.record(queuing_time) all_evaluated_futures = [] diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 2d0e2265f..59fb47f38 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -7,7 +7,7 @@ import uvicorn import ray from ray.exceptions import RayTaskError from ray.serve.context import TaskContext -from ray.experimental import metrics +from ray.util import metrics from ray.serve.http_util import Response from ray.serve.router import Router, RequestMetadata @@ -32,8 +32,9 @@ class HTTPProxy: self.route_table = await controller.get_router_config.remote() self.request_counter = metrics.Count( - "num_http_requests", "The number of HTTP requests processed", - "requests", ["route"]) + "num_http_requests", + description="The number of HTTP requests processed", + tag_keys=("route", )) self.router = Router() await self.router.setup(name, controller_name) @@ -80,7 +81,7 @@ class HTTPProxy: assert scope["type"] == "http" current_path = scope["path"] - self.request_counter.record(1, {"route": current_path}) + self.request_counter.record(1, tags={"route": current_path}) if current_path.startswith("/-/"): await self._handle_system_request(scope, receive, send) diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index aac4b8eac..4ff393044 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -9,7 +9,7 @@ from dataclasses import dataclass, field from ray.exceptions import RayTaskError import ray -from ray.experimental import metrics +from ray.util import metrics from ray.serve.context import TaskContext from ray.serve.endpoint_policy import RandomEndpointPolicy from ray.serve.utils import logger, chain_future @@ -139,21 +139,25 @@ class Router: # -- Metrics Registration -- # self.num_router_requests = metrics.Count( "num_router_requests", - "Number of requests processed by the router.", "requests", - ["endpoint"]) + description="Number of requests processed by the router.", + tag_keys=("endpoint", )) self.num_error_endpoint_requests = metrics.Count( "num_error_endpoint_requests", - ("Number of requests that errored when getting results " - "for the endpoint."), "requests", ["endpoint"]) + description=( + "Number of requests that errored when getting results " + "for the endpoint."), + tag_keys=("endpoint", )) self.num_error_backend_requests = metrics.Count( "num_error_backend_requests", - ("Number of requests that errored when getting result " - "from the backend."), "requests", ["backend"]) + description=("Number of requests that errored when getting result " + "from the backend."), + tag_keys=("backend", )) self.backend_queue_size = metrics.Gauge( "backend_queued_queries", - "Current number of queries queued in the router for a backend", - "requests", ["backend"]) + description=("Current number of queries queued " + "in the router for a backend"), + tag_keys=("backend", )) asyncio.get_event_loop().create_task(self.report_queue_lengths()) @@ -161,7 +165,7 @@ class Router: **request_kwargs): endpoint = request_meta.endpoint logger.debug("Received a request for endpoint {}".format(endpoint)) - self.num_router_requests.record(1, {"endpoint": endpoint}) + self.num_router_requests.record(1, tags={"endpoint": endpoint}) request_context = request_meta.request_context query = Query( @@ -177,7 +181,8 @@ class Router: try: result = await query.async_future except RayTaskError as e: - self.num_error_endpoint_requests.record(1, {"endpoint": endpoint}) + self.num_error_endpoint_requests.record( + 1, tags={"endpoint": endpoint}) result = e return result @@ -301,7 +306,8 @@ class Router: else: result = await object_ref except RayTaskError as error: - self.num_error_backend_requests.record(1, {"backend": backend}) + self.num_error_backend_requests.record( + 1, tags={"backend": backend}) result = error self.queries_counter[backend][backend_replica_tag] -= 1 await self.mark_worker_idle(backend, backend_replica_tag) @@ -358,6 +364,7 @@ class Router: self.name, queue_lengths) for backend, length in queue_lengths.items(): - self.backend_queue_size.record(length, {"backend": backend}) + self.backend_queue_size.record( + length, tags={"backend": backend}) await asyncio.sleep(REPORT_QUEUE_LENGTH_PERIOD_S) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 1c5627199..2c36a5f7d 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -1,5 +1,6 @@ import json from pprint import pformat +from unittest.mock import MagicMock import requests import pytest @@ -7,7 +8,7 @@ from prometheus_client.parser import text_string_to_metric_families import ray from ray.metrics_agent import PrometheusServiceDiscoveryWriter -from ray.experimental.metrics import Count, Histogram +from ray.util.metrics import Count, Histogram, Gauge from ray.test_utils import wait_for_condition, SignalActor @@ -59,16 +60,16 @@ def _setup_cluster_for_test(ray_start_cluster): # Generate some metrics from actor & tasks. @ray.remote def f(): - counter = Count(f"test_counter", "desc", "unit", []) - counter.record(1, {}) + counter = Count("test_counter", description="desc") + counter.record(1) ray.get(worker_should_exit.wait.remote()) @ray.remote class A: async def ping(self): - histogram = Histogram("test_histogram", "desc", "unit", [0.1, 1.6], - []) - histogram.record(1.5, {}) + histogram = Histogram( + "test_histogram", description="desc", boundaries=[0.1, 1.6]) + histogram.record(1.5) ray.get(worker_should_exit.wait.remote()) a = A.remote() @@ -181,6 +182,98 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test): test_cases() # Should fail assert +@pytest.fixture +def metric_mock(): + mock = MagicMock() + mock.record.return_value = "haha" + yield mock + + +""" +Unit test custom metrics. +""" + + +def test_basic_custom_metrics(metric_mock): + # Make sure each of metric works as expected. + # -- Count -- + count = Count("count", tag_keys=("a", )) + count._metric = metric_mock + count.record(1) + metric_mock.record.assert_called_with(1, tags={}) + + # -- Gauge -- + gauge = Gauge("gauge", description="gauge") + gauge._metric = metric_mock + gauge.record(4) + metric_mock.record.assert_called_with(4, tags={}) + + # -- Histogram + histogram = Histogram( + "hist", description="hist", boundaries=[1.0, 3.0], tag_keys=("a", "b")) + histogram._metric = metric_mock + histogram.record(4) + metric_mock.record.assert_called_with(4, tags={}) + tags = {"a": "3"} + histogram.record(10, tags=tags) + metric_mock.record.assert_called_with(10, tags=tags) + tags = {"a": "10", "b": "b"} + histogram.record(8, tags=tags) + metric_mock.record.assert_called_with(8, tags=tags) + + +def test_custom_metrics_info(metric_mock): + # Make sure .info public method works. + histogram = Histogram( + "hist", description="hist", boundaries=[1.0, 2.0], tag_keys=("a", "b")) + assert histogram.info["name"] == "hist" + assert histogram.info["description"] == "hist" + assert histogram.info["boundaries"] == [1.0, 2.0] + assert histogram.info["tag_keys"] == ("a", "b") + assert histogram.info["default_tags"] == {} + histogram.set_default_tags({"a": "a"}) + assert histogram.info["default_tags"] == {"a": "a"} + + +def test_custom_metrics_default_tags(metric_mock): + histogram = Histogram( + "hist", description="hist", boundaries=[1.0, 2.0], + tag_keys=("a", "b")).set_default_tags({ + "b": "b" + }) + histogram._metric = metric_mock + + # Check default tags. + histogram.record(4) + metric_mock.record.assert_called_with(4, tags={"b": "b"}) + + # Check specifying non-default tags. + histogram.record(10, tags={"a": "a"}) + metric_mock.record.assert_called_with(10, tags={"a": "a", "b": "b"}) + + # Check overriding default tags. + tags = {"a": "10", "b": "c"} + histogram.record(8, tags=tags) + metric_mock.record.assert_called_with(8, tags=tags) + + +def test_custom_metrics_edge_cases(metric_mock): + # None or empty boundaries are not allowed. + with pytest.raises(ValueError): + Histogram("hist") + + with pytest.raises(ValueError): + Histogram("hist", boundaries=[]) + + # Empty name is not allowed. + with pytest.raises(ValueError): + Count("") + + # The tag keys must be a tuple type. + with pytest.raises(ValueError): + Count("name", tag_keys=("a")) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/metrics.py b/python/ray/util/metrics.py new file mode 100644 index 000000000..e7a07b950 --- /dev/null +++ b/python/ray/util/metrics.py @@ -0,0 +1,183 @@ +import logging + +from typing import Dict, Any, List, Optional, Tuple + +from ray._raylet import ( + Count as CythonCount, + Histogram as CythonHistogram, + Gauge as CythonGauge, +) # noqa: E402 + +logger = logging.getLogger(__name__) + + +class Metric: + """The parent class of custom metrics. + + Ray's custom metrics APIs are rooted from this class and share + the same public methods. + """ + + def __init__(self, + name: str, + description: str = "", + tag_keys: Optional[Tuple[str]] = None): + if len(name) == 0: + raise ValueError("Empty name is not allowed. " + "Please provide a metric name.") + self._name = name + self._description = description + # We don't specify unit because it won't be + # exported to Prometheus anyway. + self._unit = "" + # The default tags key-value pair. + self._default_tags = {} + # Keys of tags. + self._tag_keys = tag_keys or tuple() + # The Cython metric class. This should be set in the child class. + self._metric = None + + if type(self._tag_keys) != tuple: + raise ValueError( + "tag_keys should be a tuple type, " + f"but {type(self._tag_keys)} type was specified instead.") + + def set_default_tags(self, default_tags: Dict[str, str]): + """Set default tags of metrics. + + Example: + >>> # Note that set_default_tags returns the instance itself. + >>> counter = Counter("name") + >>> counter2 = counter.set_default_tags({"a": "b"}) + >>> assert counter is counter2 + >>> # this means you can instantiate it in this way. + >>> counter = Counter("name").set_default_tags({"a": "b"}) + + Args: + default_tags(dict): Default tags that are + used for every record method. + + Returns: + Metric: it returns the instance itself. + """ + self._default_tags = default_tags + return self + + def record(self, value: float, tags: dict = None) -> None: + """Record the metric point of the metric. + + Args: + value(float): The value to be recorded as a metric point. + """ + assert self._metric is not None + default_tag_copy = self._default_tags.copy() + default_tag_copy.update(tags or {}) + self._metric.record(value, tags=default_tag_copy) + + @property + def info(self) -> Dict[str, Any]: + """Return the information of this metric. + + Example: + >>> counter = Counter("name", description="desc") + print(counter.info) + \""" + { + "name": "name", + "description": "desc" + "tag_keys": ("ray.key") + "default_tags": {"ray.key": "abc"} + } + \""" + """ + return { + "name": self._name, + "description": self._description, + "tag_keys": self._tag_keys, + "default_tags": self._default_tags + } + + +class Count(Metric): + """The count of the number of metric points. + + This is corresponding to Prometheus' Count metric. + + Args: + name(str): Name of the metric. + description(str): Description of the metric. + tag_keys(tuple): Tag keys of the metric. + """ + + def __init__(self, + name: str, + description: str = "", + tag_keys: Optional[Tuple[str]] = None): + super().__init__(name, description, tag_keys) + self._metric = CythonCount(self._name, self._description, self._unit, + self._tag_keys) + + +class Histogram(Metric): + """Histogram distribution of metric points. + + This is corresponding to Prometheus' Histogram metric. + Recording metrics with histogram will enable you to import + min, mean, max, 25, 50, 95, 99 percentile latency. + + Args: + name(str): Name of the metric. + description(str): Description of the metric. + boundaries(list): Boundaries of histogram buckets. + tag_keys(tuple): Tag keys of the metric. + """ + + def __init__(self, + name: str, + description: str = "", + boundaries: List[float] = None, + tag_keys: Optional[Tuple[str]] = None): + super().__init__(name, description, tag_keys) + if boundaries is None or len(boundaries) == 0: + raise ValueError( + "boundaries argument should be provided when using the " + "Histogram class. EX) Histgoram(boundaries=[1.0, 2.0])") + self.boundaries = boundaries + self._metric = CythonHistogram(self._name, self._description, + self._unit, self.boundaries, + self._tag_keys) + + @property + def info(self): + """Return information about histogram metric.""" + info = super().info + info.update({"boundaries": self.boundaries}) + return info + + +class Gauge(Metric): + """Gauge Keeps the last recorded value, drops everything before. + + This is corresponding to Prometheus' Gauge metric. + + Args: + name(str): Name of the metric. + description(str): Description of the metric. + tag_keys(tuple): Tag keys of the metric. + tags(dict): Dictionary of default tag values. + """ + + def __init__(self, + name: str, + description: str = "", + tag_keys: Optional[Tuple[str]] = None): + super().__init__(name, description, tag_keys) + self._metric = CythonGauge(self._name, self._description, self._unit, + self._tag_keys) + + +__all__ = [ + "Count", + "Histogram", + "Gauge", +]