[Serve] Fix prometheus exporter label override (#9227)

This commit is contained in:
Simon Mo
2020-07-02 11:40:14 -07:00
committed by GitHub
parent f1173d55e0
commit a25472c657
4 changed files with 116 additions and 69 deletions
+13 -14
View File
@@ -1,11 +1,8 @@
import asyncio
from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Tuple, List
from ray.serve.metric.types import (
MetricType,
convert_event_type_to_class,
MetricMetadata,
)
from ray.serve.metric.types import (MetricType, convert_event_type_to_class,
MetricMetadata, MetricRecord)
from ray.serve.utils import _get_logger
from ray.serve.constants import METRIC_PUSH_INTERVAL_S
@@ -29,8 +26,8 @@ class MetricClient:
self.exporter = metric_exporter_actor
self.default_labels = default_labels or dict()
self.registered_metrics: Dict[str, MetricMetadata] = dict()
self.metric_records = []
self.registered_metrics: Dict[int, MetricMetadata] = dict()
self.metric_records: List[MetricRecord] = []
assert asyncio.get_event_loop().is_running()
self.push_task = asyncio.get_event_loop().create_task(
@@ -101,10 +98,6 @@ class MetricClient:
description: str,
label_names: Tuple[str] = (),
):
if name in self.registered_metrics:
raise ValueError(
"Metric with name {} is already registered.".format(name))
if not isinstance(label_names, tuple):
raise ValueError("label_names need to be a tuple, it is {}".format(
type(label_names)))
@@ -116,11 +109,17 @@ class MetricClient:
label_names=label_names,
default_labels=self.default_labels.copy(),
)
key = hash(metric_metadata)
if key in self.registered_metrics:
raise ValueError("Metric named {} and associated metadata "
"is already registered.".format(name))
self.registered_metrics[key] = metric_metadata
metric_class = convert_event_type_to_class(metric_type)
metric_object = metric_class(
client=self, name=name, label_names=label_names)
client=self, key=key, label_names=label_names)
self.registered_metrics[name] = metric_metadata
return metric_object
async def _push_to_exporter_once(self):
+24 -17
View File
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Dict, Any
from collections import Counter, namedtuple
import ray
@@ -46,15 +46,15 @@ class MetricExporterActor:
# TODO(simon): Add support for initializer args and kwargs.
self.exporter = exporter_class()
# Stores the mapping metric_name -> MetricMetadata
# Stores the all recorded MetricMetadata (keyed by hash key)
# This field is tolerant to failures since each client will always push
# an updated copy of the metadata for each ingest call.
self.metric_metadata = dict()
self.metric_metadata: Dict[int, MetricMetadata] = dict()
logger.debug("Initialized with metric exporter of type {}".format(
type(self.exporter)))
def ingest(self, metric_metadata: Dict[str, MetricMetadata],
def ingest(self, metric_metadata: Dict[int, MetricMetadata],
batch: MetricBatch):
self.metric_metadata.update(metric_metadata)
self.exporter.export(self.metric_metadata, batch)
@@ -70,9 +70,10 @@ class InMemoryExporter(ExporterInterface):
# Keep track of latest observation of measures
self.latest_measures: Dict[namedtuple, float] = dict()
def export(self, metric_metadata, metric_batch):
def export(self, metric_metadata: Dict[int, MetricMetadata],
metric_batch: MetricBatch):
for record in metric_batch:
metadata = metric_metadata[record.name]
metadata = metric_metadata[record.key]
metric_key = make_metric_namedtuple(metadata, record)
if metadata.type == MetricType.COUNTER:
self.counters[metric_key] += record.value
@@ -88,7 +89,8 @@ class InMemoryExporter(ExporterInterface):
for info_tuple, value in metrics_to_collect.items():
# Represent the metric type as a human readable name
info_tuple = info_tuple._replace(type=str(info_tuple.type))
items.append({"info": info_tuple._asdict(), "value": value})
# _asdict returns OrderedDict, we just need to return regular dict
items.append({"info": dict(info_tuple._asdict()), "value": value})
return items
@@ -103,7 +105,7 @@ class PrometheusExporter(ExporterInterface):
}
self.prom_generate_latest = generate_latest
self.metrics_cache = dict()
self.metrics_cache: Dict[str, Any] = dict()
self.default_labels = dict()
self.registry = CollectorRegistry()
@@ -112,10 +114,12 @@ class PrometheusExporter(ExporterInterface):
def export(self, metric_metadata, metric_batch):
self._process_metric_metadata(metric_metadata)
self._process_batch(metric_batch)
self._process_batch(metric_metadata, metric_batch)
def _process_metric_metadata(self, metric_metadata):
for name, metric_metadata in metric_metadata.items():
for key, metric_metadata in metric_metadata.items():
name = metric_metadata.name
if name not in self.metrics_cache:
constructor = self.metric_type_to_prom_type[
metric_metadata.type]
@@ -124,7 +128,7 @@ class PrometheusExporter(ExporterInterface):
label_names = tuple(
default_labels.keys()) + metric_metadata.label_names
metric_object = constructor(
metric_metadata.name,
name,
metric_metadata.description,
labelnames=label_names,
registry=self.registry,
@@ -132,14 +136,17 @@ class PrometheusExporter(ExporterInterface):
self.metrics_cache[name] = (metric_object,
metric_metadata.type)
self.default_labels[name] = default_labels
def _process_batch(self, batch):
for name, labels, value in batch:
assert name in self.metrics_cache, (
"Metrics {} was not registered.".format(name))
self.default_labels[key] = metric_metadata.default_labels
def _process_batch(self, metric_metadata, batch):
for key, labels, value in batch:
if key not in metric_metadata:
continue
name = metric_metadata[key].name
metric, metric_type = self.metrics_cache[name]
default_labels = self.default_labels[name]
default_labels = self.default_labels[key]
merged_labels = {**default_labels, **labels}
if metric_type == MetricType.COUNTER:
metric.labels(**merged_labels).inc(value)
+36 -15
View File
@@ -2,27 +2,53 @@ import enum
from typing import Tuple, List, Dict, Optional
from collections import namedtuple
class MetricType(enum.IntEnum):
COUNTER = 1
MEASURE = 2
# We split the information about a metric into two parts: the MetricMetadata
# and MetricRecord. Metadata is declared at creation time and include names
# and the label names. The label values will be supplied at observation time.
MetricMetadata = namedtuple(
"MetricMetadata",
["name", "type", "description", "label_names", "default_labels"])
MetricRecord = namedtuple("MetricRecord", ["name", "labels", "value"])
class MetricMetadata:
def __init__(self, name: str, type: MetricType, description: str,
label_names: Tuple[str], default_labels: Dict[str, str]):
self.name = name
self.type = type
self.description = description
self.label_names = label_names
self.default_labels = default_labels
def __eq__(self, value: "MetricMetadata"):
if not isinstance(value, MetricMetadata):
return False
return (self.name == value.name and self.type == self.type
and self.description == value.description
and self.label_names == value.label_names
and self.default_labels == value.default_labels)
def __hash__(self):
return hash((self.name, self.type, self.description, self.label_names,
frozenset(self.default_labels.items())))
MetricRecord = namedtuple("MetricRecord", ["key", "labels", "value"])
MetricBatch = List[MetricRecord]
class BaseMetric:
def __init__(self,
client,
name: str,
key: int,
label_names: Tuple[str],
dynamic_labels: Optional[Dict[str, str]] = None):
"""Represent a single metric stream
Args:
client(MetricClient): The client object to push update to.
name(str): The name of the metric.
key(int): The unique hash key of the metric.
label_names(Tuple[str]): The names of the labels that must be set
before an observation.
dynamic_labels(Optional[Dict[str,str]]): A partially preset labels.
@@ -30,7 +56,7 @@ class BaseMetric:
``metric.labels(a=b).labels(c=d)``.
"""
self.client = client
self.name = name
self.key = key
self.dynamic_labels = dynamic_labels or dict()
self.label_names = label_names
@@ -56,7 +82,7 @@ class BaseMetric:
"label names. Allowed label names are {}.".format(
k, self.label_names))
new_dynamic_labels[k] = str(v)
return type(self)(self.client, self.name, self.label_names,
return type(self)(self.client, self.key, self.label_names,
new_dynamic_labels)
@@ -67,7 +93,7 @@ class Counter(BaseMetric):
"""Increment the counter by some amount. Default is 1"""
self.check_all_labels_fulfilled_or_error()
self.client.metric_records.append(
MetricRecord(self.name, self.dynamic_labels, increment))
MetricRecord(self.key, self.dynamic_labels, increment))
class Measure(BaseMetric):
@@ -75,12 +101,7 @@ class Measure(BaseMetric):
"""Record the given value for the measure"""
self.check_all_labels_fulfilled_or_error()
self.client.metric_records.append(
MetricRecord(self.name, self.dynamic_labels, value))
class MetricType(enum.IntEnum):
COUNTER = 1
MEASURE = 2
MetricRecord(self.key, self.dynamic_labels, value))
def convert_event_type_to_class(event_type: MetricType) -> BaseMetric:
+43 -23
View File
@@ -46,29 +46,29 @@ async def test_client():
await collector._push_to_exporter_once()
assert exporter.metadata == {
"counter": MetricMetadata(
name="counter",
type=MetricType.COUNTER,
description="",
label_names=("a", "b"),
default_labels={"default": "label"},
),
"measure": MetricMetadata(
name="measure",
type=MetricType.MEASURE,
description="",
label_names=(),
default_labels={"default": "label"},
)
}
assert exporter.batches == [("counter", {
"a": "1",
"b": "2"
}, 1), ("counter", {
"a": "1",
"b": "3"
}, 42), ("measure", {}, 2)]
assert MetricMetadata(
name="counter",
type=MetricType.COUNTER,
description="",
label_names=("a", "b"),
default_labels={"default": "label"},
) in exporter.metadata.values()
assert MetricMetadata(
name="measure",
type=MetricType.MEASURE,
description="",
label_names=(),
default_labels={"default": "label"},
) in exporter.metadata.values()
metric_values = [item.value for item in exporter.batches]
assert set(metric_values) == {1, 42, 2}
metric_labels = [
frozenset(item.labels.items()) for item in exporter.batches
]
assert frozenset(dict(a="1", b="2").items()) in metric_labels
assert frozenset(dict(a="1", b="3").items()) in metric_labels
async def test_in_memory_exporter(serve_instance):
@@ -140,6 +140,26 @@ async def test_prometheus_exporter(serve_instance):
assert fragment in metric_stored
async def test_prometheus_conflicting_labels(serve_instance):
exporter = MetricExporterActor.remote(PrometheusExporter)
collector_a = MetricClient(
exporter, push_interval=2, default_labels={"default": "a"})
collector_b = MetricClient(
exporter, push_interval=2, default_labels={"default": "b"})
for collector in [collector_a, collector_b]:
counter = collector.new_counter("num")
counter.add()
await collector._push_to_exporter_once()
metric_stored = (await exporter.inspect_metrics.remote()).decode()
fragments = ['num_total{default="a"}', 'num_total{default="b"}']
for fragment in fragments:
assert fragment in metric_stored
async def test_system_metric_endpoints(serve_instance):
def test_error_counter(flask_request):
1 / 0