[serve] Use ray.experimental.metrics (#10185)

This commit is contained in:
Edward Oakes
2020-08-19 13:03:22 -05:00
committed by GitHub
parent de46464aa3
commit 888f0a2c60
18 changed files with 87 additions and 917 deletions
-17
View File
@@ -64,15 +64,6 @@ py_test(
)
py_test(
name = "test_metric",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive"],
deps = [":serve_lib"],
)
py_test(
name = "test_persistence",
size = "small",
@@ -201,14 +192,6 @@ py_test(
deps = [":serve_lib"]
)
py_test(
name = "snippet_metric_export",
size = "small",
srcs = glob(["examples/doc/*.py"]),
tags = ["exclusive"],
deps = [":serve_lib"]
)
# Disable the deployment tutorial test because it requires
# ray start --head in the background.
# py_test(
+5 -6
View File
@@ -1,8 +1,8 @@
from ray.serve.api import (
init, create_backend, delete_backend, create_endpoint, delete_endpoint,
set_traffic, shadow_traffic, get_handle, stat, update_backend_config,
get_backend_config, accept_batch, list_backends, list_endpoints,
shutdown) # noqa: E402
from ray.serve.api import (init, create_backend, delete_backend,
create_endpoint, delete_endpoint, set_traffic,
shadow_traffic, get_handle, update_backend_config,
get_backend_config, accept_batch, list_backends,
list_endpoints, shutdown) # noqa: E402
__all__ = [
"init",
@@ -13,7 +13,6 @@ __all__ = [
"set_traffic",
"shadow_traffic",
"get_handle",
"stat",
"update_backend_config",
"get_backend_config",
"accept_batch",
+1 -34
View File
@@ -8,7 +8,6 @@ from ray.serve.handle import RayServeHandle
from ray.serve.utils import (block_until_http_ready, format_actor_name)
from ray.serve.exceptions import RayServeException
from ray.serve.config import BackendConfig, ReplicaConfig
from ray.serve.metric import InMemoryExporter
controller = None
@@ -58,7 +57,6 @@ def accept_batch(f):
def init(name=None,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
metric_exporter=InMemoryExporter,
_http_middlewares=[]):
"""Initialize or connect to a serve cluster.
@@ -75,10 +73,6 @@ def init(name=None,
http_host (str): Host for HTTP servers. Default to "0.0.0.0". Serve
starts one HTTP server per node in the Ray cluster.
http_port (int, List[int]): Port for HTTP server. Default to 8000.
metric_exporter(ExporterInterface): The class aggregates metrics from
all RayServe actors and optionally export them to external
services. Ray Serve has two options built in: InMemoryExporter and
PrometheusExporter
"""
if name is not None and not isinstance(name, str):
raise TypeError("name must be a string.")
@@ -100,7 +94,7 @@ def init(name=None,
name=controller_name,
max_restarts=-1,
max_task_retries=-1,
).remote(name, http_host, http_port, metric_exporter, _http_middlewares)
).remote(name, http_host, http_port, _http_middlewares)
futures = []
for node_id in ray.state.node_ids():
@@ -376,30 +370,3 @@ def get_handle(endpoint_name, missing_ok=False):
list(routers.values())[0],
endpoint_name,
)
@_ensure_connected
def stat():
"""Retrieve metric statistics about ray serve system.
Returns:
metric_stats(Any): Metric information returned by the metric exporter.
This can vary by exporter. For the default InMemoryExporter, it
returns a list of the following format:
.. code-block::python
[
{"info": {
"name": ...,
"type": COUNTER|MEASURE,
"label_key": label_value,
"label_key": label_value,
...
}, "value": float}
]
For PrometheusExporter, it returns the metrics in prometheus format
in plain text.
"""
[metric_exporter] = ray.get(controller.get_metric_exporter.remote())
return ray.get(metric_exporter.inspect_metrics.remote())
+24 -29
View File
@@ -17,7 +17,7 @@ from ray.serve.context import FakeFlaskRequest
from ray.serve.utils import (parse_request_item, _get_logger, chain_future,
unpack_future)
from ray.serve.exceptions import RayServeException
from ray.serve.metric import MetricClient
from ray.experimental import metrics
from ray.serve.config import BackendConfig
from ray.serve.router import Query
@@ -113,14 +113,8 @@ def create_backend_worker(func_or_class):
else:
_callable = func_or_class(*init_args)
controller = serve.api._get_controller()
[metric_exporter] = ray.get(
controller.get_metric_exporter.remote())
metric_client = MetricClient(
metric_exporter, default_labels={"backend": backend_tag})
self.backend = RayServeWorker(backend_tag, replica_tag, _callable,
backend_config, is_function,
metric_client)
backend_config, is_function)
async def handle_request(self, request):
return await self.backend.handle_request(request)
@@ -157,7 +151,7 @@ class RayServeWorker:
"""Handles requests with the provided callable."""
def __init__(self, backend_tag, replica_tag, _callable,
backend_config: BackendConfig, is_function, metric_client):
backend_config: BackendConfig, is_function):
self.backend_tag = backend_tag
self.replica_tag = replica_tag
self.callable = _callable
@@ -167,24 +161,24 @@ class RayServeWorker:
self.batch_queue = BatchQueue(self.config.max_batch_size or 1,
self.config.batch_wait_timeout)
self.metric_client = metric_client
self.request_counter = self.metric_client.new_counter(
"backend_request_counter",
description=("Number of queries that have been "
"processed in this replica"),
)
self.error_counter = self.metric_client.new_counter(
"backend_error_counter",
description=("Number of exceptions that have "
"occurred in the backend"),
)
self.restart_counter = self.metric_client.new_counter(
self.request_counter = metrics.Count(
"backend_request_counter", ("Number of queries that have been "
"processed in this replica"),
"requests", ["backend"])
self.error_counter = metrics.Count("backend_error_counter",
("Number of exceptions that have "
"occurred in the backend"),
"errors", ["backend"])
self.restart_counter = metrics.Count(
"backend_worker_starts",
description=("The number of time this replica workers "
"has been restarted due to failure."),
label_names=("replica_tag", ))
("The number of time this replica workers "
"has been restarted due to failure."), "restarts",
["backend", "replica_tag"])
self.restart_counter.labels(replica_tag=self.replica_tag).add()
self.restart_counter.record(1, {
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
asyncio.get_event_loop().create_task(self.main_loop())
@@ -228,10 +222,10 @@ class RayServeWorker:
method_to_call = ensure_async(method_to_call)
try:
result = await method_to_call(*args, **kwargs)
self.request_counter.add()
self.request_counter.record(1, {"backend": self.backend_tag})
except Exception as e:
result = wrap_to_ray_error(e)
self.error_counter.add()
self.error_counter.record(1, {"backend": self.backend_tag})
finally:
self._reset_context()
@@ -284,7 +278,8 @@ class RayServeWorker:
# Flask requests are passed to __call__ as a list
arg_list = [arg_list]
self.request_counter.add(batch_size)
self.request_counter.record(batch_size,
{"backend": self.backend_tag})
result_list = await call_method(*arg_list, **kwargs_list)
if not isinstance(result_list, Iterable) or isinstance(
@@ -309,7 +304,7 @@ class RayServeWorker:
return result_list
except Exception as e:
wrapped_exception = wrap_to_ray_error(e)
self.error_counter.add()
self.error_counter.record(1, {"backend": self.backend_tag})
self._reset_context()
return [wrapped_exception for _ in range(batch_size)]
-6
View File
@@ -4,9 +4,6 @@ SERVE_CONTROLLER_NAME = "SERVE_CONTROLLER_ACTOR"
#: Actor name used to register HTTP proxy actor
SERVE_PROXY_NAME = "SERVE_PROXY_ACTOR"
#: Actor name used to register metric monitor actor
SERVE_METRIC_SINK_NAME = "SERVE_METRIC_SINK_ACTOR"
#: HTTP Address
DEFAULT_HTTP_ADDRESS = "http://127.0.0.1:8000"
@@ -19,8 +16,5 @@ DEFAULT_HTTP_PORT = 8000
#: Max concurrency
ASYNC_CONCURRENCY = int(1e6)
#: Interval for metric client to push metrics to exporters
METRIC_PUSH_INTERVAL_S = 2
#: Time to wait for HTTP proxy in `serve.init()`
HTTP_PROXY_TIMEOUT = 60
+2 -26
View File
@@ -8,11 +8,9 @@ import ray
import ray.cloudpickle as pickle
from ray.serve.autoscaling_policy import BasicAutoscalingPolicy
from ray.serve.backend_worker import create_backend_worker
from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_PROXY_NAME,
SERVE_METRIC_SINK_NAME)
from ray.serve.constants import ASYNC_CONCURRENCY, SERVE_PROXY_NAME
from ray.serve.http_proxy import HTTPProxyActor
from ray.serve.kv_store import RayInternalKVStore
from ray.serve.metric.exporter import MetricExporterActor
from ray.serve.exceptions import RayServeException
from ray.serve.utils import (format_actor_name, get_random_letters, logger,
try_schedule_resources_on_nodes, get_all_node_ids)
@@ -92,7 +90,7 @@ class ServeController:
"""
async def __init__(self, instance_name, http_host, http_port,
metric_exporter_class, _http_middlewares):
_http_middlewares):
# Unique name of the serve instance managed by this actor. Used to
# namespace child actors and checkpoints.
self.instance_name = instance_name
@@ -131,7 +129,6 @@ class ServeController:
# Cached handles to actors in the system.
# node_id -> actor_handle
self.routers = dict()
self.metric_exporter = None
self.http_host = http_host
self.http_port = http_port
@@ -139,7 +136,6 @@ class ServeController:
# If starting the actor for the first time, starts up the other system
# components. If recovering, fetches their actor handles.
self._start_metric_exporter(metric_exporter_class)
self._start_routers_if_needed()
# NOTE(edoakes): unfortunately, we can't completely recover from a
@@ -226,25 +222,6 @@ class ServeController:
"""Called by the router on startup to fetch required state."""
return self.routes
def _start_metric_exporter(self, metric_exporter_class):
"""Get the metric exporter belonging to this serve instance.
If the metric exporter does not already exist, it will be started.
"""
metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME,
self.instance_name)
try:
self.metric_exporter = ray.get_actor(metric_sink_name)
except ValueError:
logger.info("Starting metric exporter with name '{}'".format(
metric_sink_name))
self.metric_exporter = MetricExporterActor.options(
name=metric_sink_name).remote(metric_exporter_class)
def get_metric_exporter(self):
"""Returns a handle to the metric exporter managed by this actor."""
return [self.metric_exporter]
def _checkpoint(self):
"""Checkpoint internal state and write it to the KV store."""
assert self.write_lock.locked()
@@ -879,7 +856,6 @@ class ServeController:
async with self.write_lock:
for router in self.routers.values():
ray.kill(router, no_restart=True)
ray.kill(self.metric_exporter, no_restart=True)
for replica_dict in self.workers.values():
for replica in replica_dict.values():
ray.kill(replica, no_restart=True)
@@ -1,55 +0,0 @@
import json
import time
import requests
from ray import serve
from ray.serve.metric.exporter import ExporterInterface
class FileExporter(ExporterInterface):
def __init__(self):
self.file = open("/tmp/serve_metrics.log", "w")
def export(self, metric_metadata, metric_batch):
for metric_item in metric_batch:
data = metric_metadata[metric_item.key].__dict__
data["labels"] = metric_item.labels
data["values"] = metric_item.value
self.file.write(json.dumps(data))
self.file.write("\n")
self.file.flush()
def inspect_metrics(self):
return "Metric is located at /tmp/serve_metrics.log"
serve.init(metric_exporter=FileExporter)
def echo(flask_request):
return "hello " + flask_request.args.get("name", "serve!")
serve.create_backend("hello", echo)
serve.create_endpoint("hello", backend="hello", route="/hello")
for _ in range(5):
requests.get("http://127.0.0.1:8000/hello").text
time.sleep(0.2)
print("Retrieving metrics from file...")
with open("/tmp/serve_metrics.log") as metric_log:
for line in metric_log:
print(line)
# Retrieving metrics from file...
# {"name": "backend_worker_starts",
# "type": 1,
# "description": "The number of time this replica workers ...",
# "label_names": ["replica_tag"],
# "default_labels": {"backend": "hello"}, "
# labels": {"replica_tag": "hello#XwzPQn"},
# "values": 1
# }
# ...
+1 -5
View File
@@ -4,11 +4,10 @@ import requests
import ray
import ray.serve as serve
from ray.serve.metric import PrometheusExporter
# initialize ray serve system.
ray.init(num_cpus=10)
serve.init(metric_exporter=PrometheusExporter)
serve.init()
# a backend can be a function or class.
@@ -52,6 +51,3 @@ for _ in range(10):
# You can also change number of replicas for each backend independently.
serve.update_backend_config("echo:v1", {"num_replicas": 2})
serve.update_backend_config("echo:v2", {"num_replicas": 2})
# As well as retrieving relevant system metrics
print(serve.stat().decode())
+5 -13
View File
@@ -8,7 +8,7 @@ import ray
from ray.exceptions import RayTaskError
from ray import serve
from ray.serve.context import TaskContext
from ray.serve.metric import MetricClient
from ray.experimental import metrics
from ray.serve.request_params import RequestMetadata
from ray.serve.http_util import Response
from ray.serve.router import Router
@@ -33,14 +33,9 @@ class HTTPProxy:
self.route_table = await controller.get_router_config.remote()
# The exporter is required to return results for /-/metrics endpoint.
[self.metric_exporter] = await controller.get_metric_exporter.remote()
self.metric_client = MetricClient(self.metric_exporter)
self.request_counter = self.metric_client.new_counter(
"num_http_requests",
description="The number of requests processed",
label_names=("route", ))
self.request_counter = metrics.Count(
"num_http_requests", "The number of HTTP requests processed",
"requests", ["route"])
self.router = Router()
await self.router.setup(name, instance_name)
@@ -71,9 +66,6 @@ class HTTPProxy:
current_path = scope["path"]
if current_path == "/-/routes":
await Response(self.route_table).send(scope, receive, send)
elif current_path == "/-/metrics":
metric_info = await self.metric_exporter.inspect_metrics.remote()
await Response(metric_info).send(scope, receive, send)
else:
await Response(
"System path {} not found".format(current_path),
@@ -90,7 +82,7 @@ class HTTPProxy:
assert scope["type"] == "http"
current_path = scope["path"]
self.request_counter.labels(route=current_path).add()
self.request_counter.record(1, {"route": current_path})
if current_path.startswith("/-/"):
await self._handle_system_request(scope, receive, send)
-4
View File
@@ -1,4 +0,0 @@
from ray.serve.metric.client import MetricClient
from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter)
__all__ = ["MetricClient", "InMemoryExporter", "PrometheusExporter"]
-136
View File
@@ -1,136 +0,0 @@
import asyncio
from typing import Dict, Optional, Tuple, List
from ray.serve.metric.types import (MetricType, convert_event_type_to_class,
MetricMetadata, MetricRecord)
from ray.serve.utils import _get_logger
from ray.serve.constants import METRIC_PUSH_INTERVAL_S
logger = _get_logger()
class MetricClient:
def __init__(
self,
metric_exporter_actor,
push_interval: float = METRIC_PUSH_INTERVAL_S,
default_labels: Optional[Dict[str, str]] = None,
):
"""Initialize a client to push metrics to the exporter actor.
Args:
metric_exporter_actor: The actor to push metrics to.
default_labels(dict): The set of labels to apply for all metrics
created by this actor. For example, {"source": "worker"}.
"""
self.exporter = metric_exporter_actor
self.default_labels = default_labels or dict()
self.registered_metrics: Dict[int, MetricMetadata] = dict()
self.metric_records: List[MetricRecord] = []
assert asyncio.get_event_loop().is_running()
self.push_task = asyncio.get_event_loop().create_task(
self.push_to_exporter_forever(push_interval))
logger.debug("Initialized client")
def new_counter(self,
name: str,
*,
description: Optional[str] = "",
label_names: Optional[Tuple[str]] = ()):
"""Create a new counter.
Counters are used to capture changes in running sums. An essential
property of Counter instruments is that two events add(m) and add(n)
are semantically equivalent to one event add(m+n). This property means
that Counter events can be combined.
Args:
name(str): The unique name for the counter.
description(Optional[str]): The description for the counter.
label_names(Optional[Tuple[str]]): The set of label names to be
added when recording the metrics.
Usage:
>>> client = MetricClient(...)
>>> counter = client.new_counter(
"http_counter",
description="This is a simple counter for HTTP status",
label_names=("route", "status_code"))
>>> counter.labels(route="/hi", status_code=200).add()
"""
return self._new_metric(name, MetricType.COUNTER, description,
label_names)
def new_measure(self,
name,
*,
description: Optional[str] = "",
label_names: Optional[Tuple[str]] = ()):
"""Create a new measure.
Measure instruments are independent. They cannot be combined as with
counters. Measures can be aggregated after recording to compute
statistics about the distribution along selected dimension.
Args:
name(str): The unique name for the measure.
description(Optional[str]): The description for the measure.
label_names(Optional[Tuple[str]]): The set of label names to be
added when recording the metrics.
Usage:
>>> client = MetricClient(...)
>>> measure = client.new_measure(
"latency_measure",
description="This is a simple measure for latency in ms",
label_names=("route"))
>>> measure.labels(route="/hi").record(42)
"""
return self._new_metric(name, MetricType.MEASURE, description,
label_names)
def _new_metric(
self,
name,
metric_type: MetricType,
description: str,
label_names: Tuple[str] = (),
):
if not isinstance(label_names, tuple):
raise ValueError("label_names need to be a tuple, it is {}".format(
type(label_names)))
metric_metadata = MetricMetadata(
name=name,
type=metric_type,
description=description,
label_names=label_names,
default_labels=self.default_labels.copy(),
)
key = hash(metric_metadata)
if key in self.registered_metrics:
raise ValueError("Metric named {} and associated metadata "
"is already registered.".format(name))
self.registered_metrics[key] = metric_metadata
metric_class = convert_event_type_to_class(metric_type)
metric_object = metric_class(
client=self, key=key, label_names=label_names)
return metric_object
async def _push_to_exporter_once(self):
if len(self.metric_records) == 0:
return
old_batch, self.metric_records = self.metric_records, []
logger.debug("Pushing metric batch {}".format(old_batch))
await self.exporter.ingest.remote(self.registered_metrics, old_batch)
async def push_to_exporter_forever(self, interval_s):
while True:
await self._push_to_exporter_once()
await asyncio.sleep(interval_s)
-157
View File
@@ -1,157 +0,0 @@
from typing import Dict, Any
from collections import Counter, namedtuple
import ray
from ray.serve.metric.types import MetricType, MetricMetadata, MetricBatch
from ray.serve.utils import _get_logger
logger = _get_logger()
def make_metric_namedtuple(metric_metadata: MetricMetadata,
record: MetricBatch):
fields = ["name", "type"]
fields += list(metric_metadata.default_labels.keys())
fields += list(record.labels.keys())
merged_labels = metric_metadata.default_labels.copy()
merged_labels.update(record.labels)
tuple_type = namedtuple(metric_metadata.name, fields)
return tuple_type(
name=metric_metadata.name, type=metric_metadata.type, **merged_labels)
class ExporterInterface:
def export(self, metric_metadata: Dict[str, MetricMetadata],
metric_batch: MetricBatch):
raise NotImplementedError(
"This method should be implemented by subclass.")
def inspect_metrics(self):
raise NotImplementedError(
"This method should be implemented by subclass.")
@ray.remote(num_cpus=0)
class MetricExporterActor:
"""Aggregate metrics from all RayServe actors and export them.
Metrics are aggregated pushed from other actors in the system to
this actor. They can then be stored or pushed to an external monitoring
system.
"""
def __init__(self, exporter_class: ExporterInterface):
# TODO(simon): Add support for initializer args and kwargs.
self.exporter = exporter_class()
# Stores the all recorded MetricMetadata (keyed by hash key)
# This field is tolerant to failures since each client will always push
# an updated copy of the metadata for each ingest call.
self.metric_metadata: Dict[int, MetricMetadata] = dict()
logger.debug("Initialized with metric exporter of type {}".format(
type(self.exporter)))
def ingest(self, metric_metadata: Dict[int, MetricMetadata],
batch: MetricBatch):
self.metric_metadata.update(metric_metadata)
self.exporter.export(self.metric_metadata, batch)
def inspect_metrics(self):
return self.exporter.inspect_metrics()
class InMemoryExporter(ExporterInterface):
def __init__(self):
# Keep track of counters
self.counters: Counter[namedtuple, float] = Counter()
# Keep track of latest observation of measures
self.latest_measures: Dict[namedtuple, float] = dict()
def export(self, metric_metadata: Dict[int, MetricMetadata],
metric_batch: MetricBatch):
for record in metric_batch:
metadata = metric_metadata[record.key]
metric_key = make_metric_namedtuple(metadata, record)
if metadata.type == MetricType.COUNTER:
self.counters[metric_key] += record.value
elif metadata.type == MetricType.MEASURE:
self.latest_measures[metric_key] = record.value
else:
raise RuntimeError("Unrecognized metric type {}".format(
metadata.type))
def inspect_metrics(self):
items = []
metrics_to_collect = {**self.counters, **self.latest_measures}
for info_tuple, value in metrics_to_collect.items():
# Represent the metric type as a human readable name
info_tuple = info_tuple._replace(type=str(info_tuple.type))
# _asdict returns OrderedDict, we just need to return regular dict
items.append({"info": dict(info_tuple._asdict()), "value": value})
return items
class PrometheusExporter(ExporterInterface):
def __init__(self):
super().__init__()
from prometheus_client import (CollectorRegistry, Counter, Gauge,
generate_latest)
self.metric_type_to_prom_type = {
MetricType.COUNTER: Counter,
MetricType.MEASURE: Gauge
}
self.prom_generate_latest = generate_latest
self.metrics_cache: Dict[str, Any] = dict()
self.default_labels = dict()
self.registry = CollectorRegistry()
def inspect_metrics(self):
return self.prom_generate_latest(self.registry)
def export(self, metric_metadata, metric_batch):
self._process_metric_metadata(metric_metadata)
self._process_batch(metric_metadata, metric_batch)
def _process_metric_metadata(self, metric_metadata):
for key, metric_metadata in metric_metadata.items():
name = metric_metadata.name
if name not in self.metrics_cache:
constructor = self.metric_type_to_prom_type[
metric_metadata.type]
default_labels = metric_metadata.default_labels
label_names = tuple(
default_labels.keys()) + metric_metadata.label_names
metric_object = constructor(
name,
metric_metadata.description,
labelnames=label_names,
registry=self.registry,
)
self.metrics_cache[name] = (metric_object,
metric_metadata.type)
self.default_labels[key] = metric_metadata.default_labels
def _process_batch(self, metric_metadata, batch):
for key, labels, value in batch:
if key not in metric_metadata:
continue
name = metric_metadata[key].name
metric, metric_type = self.metrics_cache[name]
default_labels = self.default_labels[key]
merged_labels = {**default_labels, **labels}
if metric_type == MetricType.COUNTER:
metric.labels(**merged_labels).inc(value)
elif metric_type == MetricType.MEASURE:
metric.labels(**merged_labels).set(value)
else:
raise RuntimeError(
"Unrecognized metric type {}".format(metric_type))
-112
View File
@@ -1,112 +0,0 @@
import enum
from typing import Tuple, List, Dict, Optional
from collections import namedtuple
class MetricType(enum.IntEnum):
COUNTER = 1
MEASURE = 2
# We split the information about a metric into two parts: the MetricMetadata
# and MetricRecord. Metadata is declared at creation time and include names
# and the label names. The label values will be supplied at observation time.
class MetricMetadata:
def __init__(self, name: str, type: MetricType, description: str,
label_names: Tuple[str], default_labels: Dict[str, str]):
self.name = name
self.type = type
self.description = description
self.label_names = label_names
self.default_labels = default_labels
def __eq__(self, value: "MetricMetadata"):
if not isinstance(value, MetricMetadata):
return False
return (self.name == value.name and self.type == self.type
and self.description == value.description
and self.label_names == value.label_names
and self.default_labels == value.default_labels)
def __hash__(self):
return hash((self.name, self.type, self.description, self.label_names,
frozenset(self.default_labels.items())))
MetricRecord = namedtuple("MetricRecord", ["key", "labels", "value"])
MetricBatch = List[MetricRecord]
class BaseMetric:
def __init__(self,
client,
key: int,
label_names: Tuple[str],
dynamic_labels: Optional[Dict[str, str]] = None):
"""Represent a single metric stream
Args:
client(MetricClient): The client object to push update to.
key(int): The unique hash key of the metric.
label_names(Tuple[str]): The names of the labels that must be set
before an observation.
dynamic_labels(Optional[Dict[str,str]]): A partially preset labels.
This fields make it possible to chain label calls together:
``metric.labels(a=b).labels(c=d)``.
"""
self.client = client
self.key = key
self.dynamic_labels = dynamic_labels or dict()
self.label_names = label_names
def check_all_labels_fulfilled_or_error(self):
unfulfilled = set(self.label_names) - set(self.dynamic_labels.keys())
if len(unfulfilled) != 0:
raise ValueError("The following labels doesn't have associated "
"values: {}".format(unfulfilled))
def labels(self, **kwargs):
"""Apply dynamic label to the metric
Usage:
>>> metric = BaseMetric(..., label_names=("a", "b"))
>>> metric.labels(a=1, b=2)
>>> metric.labels(a=1).labels(b=2) # Equivalent
"""
new_dynamic_labels = self.dynamic_labels.copy()
for k, v in kwargs.items():
if k not in self.label_names:
raise ValueError(
"Label {} was not part of registered "
"label names. Allowed label names are {}.".format(
k, self.label_names))
new_dynamic_labels[k] = str(v)
return type(self)(self.client, self.key, self.label_names,
new_dynamic_labels)
# The metric types are inspired by OpenTelemetry spec:
# https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/api.md#three-kinds-of-instrument
class Counter(BaseMetric):
def add(self, increment=1):
"""Increment the counter by some amount. Default is 1"""
self.check_all_labels_fulfilled_or_error()
self.client.metric_records.append(
MetricRecord(self.key, self.dynamic_labels, increment))
class Measure(BaseMetric):
def record(self, value):
"""Record the given value for the measure"""
self.check_all_labels_fulfilled_or_error()
self.client.metric_records.append(
MetricRecord(self.key, self.dynamic_labels, value))
def convert_event_type_to_class(event_type: MetricType) -> BaseMetric:
if event_type == MetricType.COUNTER:
return Counter
if event_type == MetricType.MEASURE:
return Measure
raise RuntimeError("Unknown event type {}".format(event_type))
+14 -19
View File
@@ -9,7 +9,7 @@ from ray.exceptions import RayTaskError
import ray
from ray import serve
from ray.serve.metric import MetricClient
from ray.experimental import metrics
from ray.serve.endpoint_policy import RandomEndpointPolicy
from ray.serve.utils import logger, chain_future
@@ -145,24 +145,19 @@ class Router:
for backend, backend_config in backend_configs.items():
await self.set_backend_config(backend, backend_config)
# -- Metric Registration -- #
[metric_exporter] = ray.get(
self.controller.get_metric_exporter.remote())
self.metric_client = MetricClient(metric_exporter)
self.num_router_requests = self.metric_client.new_counter(
# -- Metrics Registration -- #
self.num_router_requests = metrics.Count(
"num_router_requests",
description="Number of requests processed by the router.",
label_names=("endpoint", ))
self.num_error_endpoint_request = self.metric_client.new_counter(
"Number of requests processed by the router.", "requests",
["endpoint"])
self.num_error_endpoint_requests = metrics.Count(
"num_error_endpoint_requests",
description=("Number of requests errored when getting result "
"for endpoint."),
label_names=("endpoint", ))
self.num_error_backend_request = self.metric_client.new_counter(
("Number of requests that errored when getting results "
"for the endpoint."), "requests", ["endpoint"])
self.num_error_backend_requests = metrics.Count(
"num_error_backend_requests",
description=("Number of requests errored when getting result "
"from backend."),
label_names=("backend", ))
("Number of requests that errored when getting result "
"from the backend."), "requests", ["backend"])
asyncio.get_event_loop().create_task(self.report_queue_lengths())
@@ -170,7 +165,7 @@ class Router:
**request_kwargs):
endpoint = request_meta.endpoint
logger.debug("Received a request for endpoint {}".format(endpoint))
self.num_router_requests.labels(endpoint=endpoint).add()
self.num_router_requests.record(1, {"endpoint": endpoint})
request_context = request_meta.request_context
query = Query(
@@ -187,7 +182,7 @@ class Router:
try:
result = await query.async_future
except RayTaskError as e:
self.num_error_endpoint_request.labels(endpoint=endpoint).add()
self.num_error_endpoint_requests.record(1, {"endpoint": endpoint})
result = e
return result
@@ -311,7 +306,7 @@ class Router:
else:
result = await object_ref
except RayTaskError as error:
self.num_error_backend_request.labels(backend=backend).add()
self.num_error_backend_requests.record(1, {"backend": backend})
result = error
self.queries_counter[backend][backend_replica_tag] -= 1
await self.mark_worker_idle(backend, backend_replica_tag)
+31 -13
View File
@@ -1,5 +1,6 @@
import time
import asyncio
from collections import defaultdict
import time
import pytest
import requests
@@ -597,8 +598,7 @@ def test_shutdown(serve_instance):
def check_dead():
for actor_name in [
constants.SERVE_CONTROLLER_NAME, constants.SERVE_PROXY_NAME,
constants.SERVE_METRIC_SINK_NAME
constants.SERVE_CONTROLLER_NAME, constants.SERVE_PROXY_NAME
]:
try:
ray.get_actor(format_actor_name(actor_name, instance_name))
@@ -611,16 +611,39 @@ def test_shutdown(serve_instance):
def test_shadow_traffic(serve_instance):
@ray.remote
class RequestCounter:
def __init__(self):
self.requests = defaultdict(int)
def record(self, backend):
self.requests[backend] += 1
def get(self, backend):
return self.requests[backend]
counter = RequestCounter.remote()
def f():
ray.get(counter.record.remote("backend1"))
return "hello"
def f_shadow():
def f_shadow_1():
ray.get(counter.record.remote("backend2"))
return "oops"
def f_shadow_2():
ray.get(counter.record.remote("backend3"))
return "oops"
def f_shadow_3():
ray.get(counter.record.remote("backend4"))
return "oops"
serve.create_backend("backend1", f)
serve.create_backend("backend2", f_shadow)
serve.create_backend("backend3", f_shadow)
serve.create_backend("backend4", f_shadow)
serve.create_backend("backend2", f_shadow_1)
serve.create_backend("backend3", f_shadow_2)
serve.create_backend("backend4", f_shadow_3)
serve.create_endpoint("endpoint", backend="backend1", route="/api")
serve.shadow_traffic("endpoint", "backend2", 1.0)
@@ -634,12 +657,7 @@ def test_shadow_traffic(serve_instance):
print("Finished 100 requests in {}s.".format(time.time() - start))
def requests_to_backend(backend):
for entry in serve.stat():
if entry["info"]["name"] == "backend_request_counter":
if entry["info"]["backend"] == backend:
return entry["value"]
return 0
return ray.get(counter.get.remote(backend))
def check_requests():
return all([
-222
View File
@@ -1,222 +0,0 @@
import time
import pytest
import requests
from ray import serve
from ray.serve.metric.client import MetricClient
from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter,
MetricExporterActor)
from ray.serve.metric.types import MetricType, MetricMetadata
pytestmark = pytest.mark.asyncio
class MockExporterActor:
def __init__(self):
self.metadata = dict()
self.batches = []
@property
def ingest(self):
return self
async def remote(self, metadata, batch):
self.metadata.update(metadata)
self.batches.extend(batch)
async def test_client():
exporter = MockExporterActor()
collector = MetricClient(
exporter, push_interval=2, default_labels={"default": "label"})
counter = collector.new_counter(name="counter", label_names=("a", "b"))
with pytest.raises(
ValueError, match="labels doesn't have associated values"):
counter.add()
counter = counter.labels(a=1)
counter.labels(b=2).add()
counter.labels(b=3).add(42)
measure = collector.new_measure("measure")
measure.record(2)
await collector._push_to_exporter_once()
assert MetricMetadata(
name="counter",
type=MetricType.COUNTER,
description="",
label_names=("a", "b"),
default_labels={"default": "label"},
) in exporter.metadata.values()
assert MetricMetadata(
name="measure",
type=MetricType.MEASURE,
description="",
label_names=(),
default_labels={"default": "label"},
) in exporter.metadata.values()
metric_values = [item.value for item in exporter.batches]
assert set(metric_values) == {1, 42, 2}
metric_labels = [
frozenset(item.labels.items()) for item in exporter.batches
]
assert frozenset(dict(a="1", b="2").items()) in metric_labels
assert frozenset(dict(a="1", b="3").items()) in metric_labels
async def test_in_memory_exporter(serve_instance):
exporter = MetricExporterActor.remote(InMemoryExporter)
collector = MetricClient(
exporter, push_interval=2, default_labels={"default": "label"})
counter = collector.new_counter(name="my_counter", label_names=("a", ))
measure = collector.new_measure(
name="my_measure", description="help", label_names=("ray", "lang"))
measure = measure.labels(lang="C++")
counter.labels(a="1").add()
measure.labels(ray="").record(0)
measure.labels(ray="").record(42)
await collector._push_to_exporter_once()
metric_stored = await exporter.inspect_metrics.remote()
assert metric_stored == [{
"info": {
"name": "my_counter",
"type": "MetricType.COUNTER",
"default": "label",
"a": "1"
},
"value": 1
}, {
"info": {
"name": "my_measure",
"type": "MetricType.MEASURE",
"default": "label",
"lang": "C++",
"ray": ""
},
"value": 42
}]
async def test_prometheus_exporter(serve_instance):
exporter = MetricExporterActor.remote(PrometheusExporter)
collector = MetricClient(
exporter, push_interval=2, default_labels={"default": "label"})
counter = collector.new_counter(name="my_counter", label_names=("a", ))
measure = collector.new_measure(
name="my_measure", description="help", label_names=("ray", "lang"))
measure = measure.labels(lang="C++")
counter.labels(a="1").add()
measure.labels(ray="").record(0)
measure.labels(ray="").record(42)
await collector._push_to_exporter_once()
metric_stored = await exporter.inspect_metrics.remote()
metric_stored = metric_stored.decode()
fragments = [
"# HELP my_counter_total", "# TYPE my_counter_total counter",
'my_counter_total{a="1",default="label"} 1.0',
"# TYPE my_counter_created gauge",
'my_counter_created{a="1",default="label"}', "# HELP my_measure help",
"# TYPE my_measure gauge",
'my_measure{default="label",lang="C++",ray=""} 42.0'
]
for fragment in fragments:
assert fragment in metric_stored
async def test_prometheus_conflicting_labels(serve_instance):
exporter = MetricExporterActor.remote(PrometheusExporter)
collector_a = MetricClient(
exporter, push_interval=2, default_labels={"default": "a"})
collector_b = MetricClient(
exporter, push_interval=2, default_labels={"default": "b"})
for collector in [collector_a, collector_b]:
counter = collector.new_counter("num")
counter.add()
await collector._push_to_exporter_once()
metric_stored = (await exporter.inspect_metrics.remote()).decode()
fragments = ['num_total{default="a"}', 'num_total{default="b"}']
for fragment in fragments:
assert fragment in metric_stored
async def test_system_metric_endpoints(serve_instance):
def test_error_counter(flask_request):
1 / 0
serve.create_backend("m:v1", test_error_counter)
serve.create_endpoint("test_metrics", backend="m:v1", route="/measure")
serve.set_traffic("test_metrics", {"m:v1": 1})
# Check metrics are exposed under http endpoint
def test_metric_endpoint():
requests.get("http://127.0.0.1:8000/measure", timeout=5)
in_memory_metric = requests.get(
"http://127.0.0.1:8000/-/metrics", timeout=5).json()
# We don't want to check the values since this check might be retried.
in_memory_metric_without_values = []
for m in in_memory_metric:
m.pop("value")
in_memory_metric_without_values.append(m)
target_metrics = [{
"info": {
"name": "num_http_requests",
"type": "MetricType.COUNTER",
"route": "/measure"
},
}, {
"info": {
"name": "num_router_requests",
"type": "MetricType.COUNTER",
"endpoint": "test_metrics"
},
}, {
"info": {
"name": "backend_error_counter",
"type": "MetricType.COUNTER",
"backend": "m:v1"
},
}]
for target in target_metrics:
assert target in in_memory_metric_without_values
success = False
for _ in range(3):
try:
test_metric_endpoint()
success = True
break
except (AssertionError, requests.ReadTimeout):
# Metrics may not have been propagated yet
time.sleep(2)
print("Metric not correct, retrying...")
if not success:
test_metric_endpoint()
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))