From a25472c657d6e3338c396d4261c30a060d608274 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 2 Jul 2020 11:40:14 -0700 Subject: [PATCH] [Serve] Fix prometheus exporter label override (#9227) --- python/ray/serve/metric/client.py | 27 ++++++----- python/ray/serve/metric/exporter.py | 41 ++++++++++------- python/ray/serve/metric/types.py | 51 +++++++++++++++------ python/ray/serve/tests/test_metric.py | 66 +++++++++++++++++---------- 4 files changed, 116 insertions(+), 69 deletions(-) diff --git a/python/ray/serve/metric/client.py b/python/ray/serve/metric/client.py index c0b0b9c53..b855da62e 100644 --- a/python/ray/serve/metric/client.py +++ b/python/ray/serve/metric/client.py @@ -1,11 +1,8 @@ import asyncio -from typing import Dict, Optional, Tuple +from typing import Dict, Optional, Tuple, List -from ray.serve.metric.types import ( - MetricType, - convert_event_type_to_class, - MetricMetadata, -) +from ray.serve.metric.types import (MetricType, convert_event_type_to_class, + MetricMetadata, MetricRecord) from ray.serve.utils import _get_logger from ray.serve.constants import METRIC_PUSH_INTERVAL_S @@ -29,8 +26,8 @@ class MetricClient: self.exporter = metric_exporter_actor self.default_labels = default_labels or dict() - self.registered_metrics: Dict[str, MetricMetadata] = dict() - self.metric_records = [] + self.registered_metrics: Dict[int, MetricMetadata] = dict() + self.metric_records: List[MetricRecord] = [] assert asyncio.get_event_loop().is_running() self.push_task = asyncio.get_event_loop().create_task( @@ -101,10 +98,6 @@ class MetricClient: description: str, label_names: Tuple[str] = (), ): - if name in self.registered_metrics: - raise ValueError( - "Metric with name {} is already registered.".format(name)) - if not isinstance(label_names, tuple): raise ValueError("label_names need to be a tuple, it is {}".format( type(label_names))) @@ -116,11 +109,17 @@ class MetricClient: label_names=label_names, default_labels=self.default_labels.copy(), ) + + key = hash(metric_metadata) + if key in self.registered_metrics: + raise ValueError("Metric named {} and associated metadata " + "is already registered.".format(name)) + self.registered_metrics[key] = metric_metadata + metric_class = convert_event_type_to_class(metric_type) metric_object = metric_class( - client=self, name=name, label_names=label_names) + client=self, key=key, label_names=label_names) - self.registered_metrics[name] = metric_metadata return metric_object async def _push_to_exporter_once(self): diff --git a/python/ray/serve/metric/exporter.py b/python/ray/serve/metric/exporter.py index 29d9c1389..23ee8f74f 100644 --- a/python/ray/serve/metric/exporter.py +++ b/python/ray/serve/metric/exporter.py @@ -1,4 +1,4 @@ -from typing import Dict +from typing import Dict, Any from collections import Counter, namedtuple import ray @@ -46,15 +46,15 @@ class MetricExporterActor: # TODO(simon): Add support for initializer args and kwargs. self.exporter = exporter_class() - # Stores the mapping metric_name -> MetricMetadata + # Stores the all recorded MetricMetadata (keyed by hash key) # This field is tolerant to failures since each client will always push # an updated copy of the metadata for each ingest call. - self.metric_metadata = dict() + self.metric_metadata: Dict[int, MetricMetadata] = dict() logger.debug("Initialized with metric exporter of type {}".format( type(self.exporter))) - def ingest(self, metric_metadata: Dict[str, MetricMetadata], + def ingest(self, metric_metadata: Dict[int, MetricMetadata], batch: MetricBatch): self.metric_metadata.update(metric_metadata) self.exporter.export(self.metric_metadata, batch) @@ -70,9 +70,10 @@ class InMemoryExporter(ExporterInterface): # Keep track of latest observation of measures self.latest_measures: Dict[namedtuple, float] = dict() - def export(self, metric_metadata, metric_batch): + def export(self, metric_metadata: Dict[int, MetricMetadata], + metric_batch: MetricBatch): for record in metric_batch: - metadata = metric_metadata[record.name] + metadata = metric_metadata[record.key] metric_key = make_metric_namedtuple(metadata, record) if metadata.type == MetricType.COUNTER: self.counters[metric_key] += record.value @@ -88,7 +89,8 @@ class InMemoryExporter(ExporterInterface): for info_tuple, value in metrics_to_collect.items(): # Represent the metric type as a human readable name info_tuple = info_tuple._replace(type=str(info_tuple.type)) - items.append({"info": info_tuple._asdict(), "value": value}) + # _asdict returns OrderedDict, we just need to return regular dict + items.append({"info": dict(info_tuple._asdict()), "value": value}) return items @@ -103,7 +105,7 @@ class PrometheusExporter(ExporterInterface): } self.prom_generate_latest = generate_latest - self.metrics_cache = dict() + self.metrics_cache: Dict[str, Any] = dict() self.default_labels = dict() self.registry = CollectorRegistry() @@ -112,10 +114,12 @@ class PrometheusExporter(ExporterInterface): def export(self, metric_metadata, metric_batch): self._process_metric_metadata(metric_metadata) - self._process_batch(metric_batch) + self._process_batch(metric_metadata, metric_batch) def _process_metric_metadata(self, metric_metadata): - for name, metric_metadata in metric_metadata.items(): + for key, metric_metadata in metric_metadata.items(): + name = metric_metadata.name + if name not in self.metrics_cache: constructor = self.metric_type_to_prom_type[ metric_metadata.type] @@ -124,7 +128,7 @@ class PrometheusExporter(ExporterInterface): label_names = tuple( default_labels.keys()) + metric_metadata.label_names metric_object = constructor( - metric_metadata.name, + name, metric_metadata.description, labelnames=label_names, registry=self.registry, @@ -132,14 +136,17 @@ class PrometheusExporter(ExporterInterface): self.metrics_cache[name] = (metric_object, metric_metadata.type) - self.default_labels[name] = default_labels - def _process_batch(self, batch): - for name, labels, value in batch: - assert name in self.metrics_cache, ( - "Metrics {} was not registered.".format(name)) + self.default_labels[key] = metric_metadata.default_labels + + def _process_batch(self, metric_metadata, batch): + for key, labels, value in batch: + if key not in metric_metadata: + continue + + name = metric_metadata[key].name metric, metric_type = self.metrics_cache[name] - default_labels = self.default_labels[name] + default_labels = self.default_labels[key] merged_labels = {**default_labels, **labels} if metric_type == MetricType.COUNTER: metric.labels(**merged_labels).inc(value) diff --git a/python/ray/serve/metric/types.py b/python/ray/serve/metric/types.py index 80c2f796e..93b3fb6ce 100644 --- a/python/ray/serve/metric/types.py +++ b/python/ray/serve/metric/types.py @@ -2,27 +2,53 @@ import enum from typing import Tuple, List, Dict, Optional from collections import namedtuple + +class MetricType(enum.IntEnum): + COUNTER = 1 + MEASURE = 2 + + # We split the information about a metric into two parts: the MetricMetadata # and MetricRecord. Metadata is declared at creation time and include names # and the label names. The label values will be supplied at observation time. -MetricMetadata = namedtuple( - "MetricMetadata", - ["name", "type", "description", "label_names", "default_labels"]) -MetricRecord = namedtuple("MetricRecord", ["name", "labels", "value"]) +class MetricMetadata: + def __init__(self, name: str, type: MetricType, description: str, + label_names: Tuple[str], default_labels: Dict[str, str]): + self.name = name + self.type = type + self.description = description + self.label_names = label_names + self.default_labels = default_labels + + def __eq__(self, value: "MetricMetadata"): + if not isinstance(value, MetricMetadata): + return False + + return (self.name == value.name and self.type == self.type + and self.description == value.description + and self.label_names == value.label_names + and self.default_labels == value.default_labels) + + def __hash__(self): + return hash((self.name, self.type, self.description, self.label_names, + frozenset(self.default_labels.items()))) + + +MetricRecord = namedtuple("MetricRecord", ["key", "labels", "value"]) MetricBatch = List[MetricRecord] class BaseMetric: def __init__(self, client, - name: str, + key: int, label_names: Tuple[str], dynamic_labels: Optional[Dict[str, str]] = None): """Represent a single metric stream Args: client(MetricClient): The client object to push update to. - name(str): The name of the metric. + key(int): The unique hash key of the metric. label_names(Tuple[str]): The names of the labels that must be set before an observation. dynamic_labels(Optional[Dict[str,str]]): A partially preset labels. @@ -30,7 +56,7 @@ class BaseMetric: ``metric.labels(a=b).labels(c=d)``. """ self.client = client - self.name = name + self.key = key self.dynamic_labels = dynamic_labels or dict() self.label_names = label_names @@ -56,7 +82,7 @@ class BaseMetric: "label names. Allowed label names are {}.".format( k, self.label_names)) new_dynamic_labels[k] = str(v) - return type(self)(self.client, self.name, self.label_names, + return type(self)(self.client, self.key, self.label_names, new_dynamic_labels) @@ -67,7 +93,7 @@ class Counter(BaseMetric): """Increment the counter by some amount. Default is 1""" self.check_all_labels_fulfilled_or_error() self.client.metric_records.append( - MetricRecord(self.name, self.dynamic_labels, increment)) + MetricRecord(self.key, self.dynamic_labels, increment)) class Measure(BaseMetric): @@ -75,12 +101,7 @@ class Measure(BaseMetric): """Record the given value for the measure""" self.check_all_labels_fulfilled_or_error() self.client.metric_records.append( - MetricRecord(self.name, self.dynamic_labels, value)) - - -class MetricType(enum.IntEnum): - COUNTER = 1 - MEASURE = 2 + MetricRecord(self.key, self.dynamic_labels, value)) def convert_event_type_to_class(event_type: MetricType) -> BaseMetric: diff --git a/python/ray/serve/tests/test_metric.py b/python/ray/serve/tests/test_metric.py index 11773cc6e..0aea41fd2 100644 --- a/python/ray/serve/tests/test_metric.py +++ b/python/ray/serve/tests/test_metric.py @@ -46,29 +46,29 @@ async def test_client(): await collector._push_to_exporter_once() - assert exporter.metadata == { - "counter": MetricMetadata( - name="counter", - type=MetricType.COUNTER, - description="", - label_names=("a", "b"), - default_labels={"default": "label"}, - ), - "measure": MetricMetadata( - name="measure", - type=MetricType.MEASURE, - description="", - label_names=(), - default_labels={"default": "label"}, - ) - } - assert exporter.batches == [("counter", { - "a": "1", - "b": "2" - }, 1), ("counter", { - "a": "1", - "b": "3" - }, 42), ("measure", {}, 2)] + assert MetricMetadata( + name="counter", + type=MetricType.COUNTER, + description="", + label_names=("a", "b"), + default_labels={"default": "label"}, + ) in exporter.metadata.values() + assert MetricMetadata( + name="measure", + type=MetricType.MEASURE, + description="", + label_names=(), + default_labels={"default": "label"}, + ) in exporter.metadata.values() + + metric_values = [item.value for item in exporter.batches] + assert set(metric_values) == {1, 42, 2} + + metric_labels = [ + frozenset(item.labels.items()) for item in exporter.batches + ] + assert frozenset(dict(a="1", b="2").items()) in metric_labels + assert frozenset(dict(a="1", b="3").items()) in metric_labels async def test_in_memory_exporter(serve_instance): @@ -140,6 +140,26 @@ async def test_prometheus_exporter(serve_instance): assert fragment in metric_stored +async def test_prometheus_conflicting_labels(serve_instance): + exporter = MetricExporterActor.remote(PrometheusExporter) + + collector_a = MetricClient( + exporter, push_interval=2, default_labels={"default": "a"}) + collector_b = MetricClient( + exporter, push_interval=2, default_labels={"default": "b"}) + + for collector in [collector_a, collector_b]: + counter = collector.new_counter("num") + counter.add() + await collector._push_to_exporter_once() + + metric_stored = (await exporter.inspect_metrics.remote()).decode() + + fragments = ['num_total{default="a"}', 'num_total{default="b"}'] + for fragment in fragments: + assert fragment in metric_stored + + async def test_system_metric_endpoints(serve_instance): def test_error_counter(flask_request): 1 / 0