[Serve] Refactor Metric System: Counter + Measure Support (#8114)

This commit is contained in:
Simon Mo
2020-05-06 17:44:02 -07:00
committed by GitHub
parent 1f312debbe
commit c5a5a5de89
18 changed files with 749 additions and 336 deletions
+41 -27
View File
@@ -13,6 +13,7 @@ from ray.serve.config import BackendConfig, ReplicaConfig
from ray.serve.policy import RoutePolicy
from ray.serve.router import Query
from ray.serve.request_params import RequestMetadata
from ray.serve.metric import InMemoryExporter
master_actor = None
@@ -21,6 +22,9 @@ def _get_master_actor():
"""Used for internal purpose because using just import serve.global_state
will always reference the original None object.
"""
global master_actor
if master_actor is None:
master_actor = ray.util.get_actor(SERVE_MASTER_NAME)
return master_actor
@@ -57,19 +61,17 @@ def accept_batch(f):
return f
def init(
blocking=False,
start_server=True,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
ray_init_kwargs={
"object_store_memory": int(1e8),
"num_cpus": max(cpu_count(), 8)
},
gc_window_seconds=3600,
queueing_policy=RoutePolicy.Random,
policy_kwargs={},
):
def init(blocking=False,
start_server=True,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
ray_init_kwargs={
"object_store_memory": int(1e8),
"num_cpus": max(cpu_count(), 8)
},
queueing_policy=RoutePolicy.Random,
policy_kwargs={},
metric_exporter=InMemoryExporter):
"""Initialize a serve cluster.
If serve cluster has already initialized, this function will just return.
@@ -88,12 +90,13 @@ def init(
ray_init_kwargs (dict): Argument passed to ray.init, if there is no ray
connection. Default to {"object_store_memory": int(1e8)} for
performance stability reason
gc_window_seconds(int): How long will we keep the metric data in
memory. Data older than the gc_window will be deleted. The default
is 3600 seconds, which is 1 hour.
queueing_policy(RoutePolicy): Define the queueing policy for selecting
the backend for a service. (Default: RoutePolicy.Random)
policy_kwargs: Arguments required to instantiate a queueing policy
metric_exporter(ExporterInterface): The class aggregates metrics from
all RayServe actors and optionally export them to external
services. RayServe has two options built in: InMemoryExporter and
PrometheusExporter
"""
global master_actor
if master_actor is not None:
@@ -126,7 +129,7 @@ def init(
name=SERVE_MASTER_NAME,
max_reconstructions=ray.ray_constants.INFINITE_RECONSTRUCTION,
).remote(queueing_policy.value, policy_kwargs, start_server, http_host,
http_port, gc_window_seconds)
http_port, metric_exporter)
if start_server and blocking:
block_until_http_ready("http://{}:{}/-/routes".format(
@@ -278,16 +281,27 @@ def get_handle(endpoint_name,
@_ensure_connected
def stat(percentiles=[50, 90, 95],
agg_windows_seconds=[10, 60, 300, 600, 3600]):
def stat():
"""Retrieve metric statistics about ray serve system.
Args:
percentiles(List[int]): The percentiles for aggregation operations.
Default is 50th, 90th, 95th percentile.
agg_windows_seconds(List[int]): The aggregation windows in seconds.
The longest aggregation window must be shorter or equal to the
gc_window_seconds.
Returns:
metric_stats(Any): Metric information returned by the metric exporter.
This can vary by exporter. For the default InMemoryExporter, it
returns a list of the following format:
.. code-block::python
[
{"info": {
"name": ...,
"type": COUNTER|MEASURE,
"label_key": label_value,
"label_key": label_value,
...
}, "value": float}
]
For PrometheusExporter, it returns the metrics in prometheus format
in plain text.
"""
[monitor] = retry_actor_failures(master_actor.get_metric_monitor)
return ray.get(monitor.collect.remote(percentiles, agg_windows_seconds))
[metric_exporter] = ray.get(master_actor.get_metric_exporter.remote())
return ray.get(metric_exporter.inspect_metrics.remote())
+32 -32
View File
@@ -1,4 +1,3 @@
import time
import traceback
import inspect
@@ -7,10 +6,13 @@ from ray import serve
from ray.serve import context as serve_context
from ray.serve.context import FakeFlaskRequest
from collections import defaultdict
from ray.serve.utils import parse_request_item
from ray.serve.utils import parse_request_item, _get_logger
from ray.serve.exceptions import RayServeException
from ray.serve.metric import MetricClient
from ray.async_compat import sync_to_async
logger = _get_logger()
def create_backend_worker(func_or_class):
"""Creates a worker class wrapping the provided function or class."""
@@ -30,10 +32,8 @@ def create_backend_worker(func_or_class):
else:
_callable = func_or_class(*init_args)
self.backend = RayServeWorker(backend_tag, _callable, is_function)
def get_metrics(self):
return self.backend.get_metrics()
self.backend = RayServeWorker(backend_tag, replica_tag, _callable,
is_function)
async def handle_request(self, request):
return await self.backend.handle_request(request)
@@ -66,37 +66,39 @@ def ensure_async(func):
class RayServeWorker:
"""Handles requests with the provided callable."""
def __init__(self, name, _callable, is_function):
def __init__(self, name, replica_tag, _callable, is_function):
self.name = name
self.replica_tag = replica_tag
self.callable = _callable
self.is_function = is_function
self.error_counter = 0
self.latency_list = []
self.metric_client = MetricClient.connect_from_serve(
default_labels={"backend": self.name})
self.request_counter = self.metric_client.new_counter(
"backend_request_counter",
description=("Number of queries that have been "
"processed in this replica"),
)
self.error_counter = self.metric_client.new_counter(
"backend_error_counter",
description=("Number of exceptions that have "
"occurred in the backend"),
)
self.restart_counter = self.metric_client.new_counter(
"backend_worker_starts",
description=("The number of time this replica workers "
"has been restarted due to failure."),
label_names=("replica_tag", ))
def get_metrics(self):
# Make a copy of the latency list and clear current list
latency_list = self.latency_list[:]
self.latency_list = []
return {
"{}_error_counter".format(self.name): {
"value": self.error_counter,
"type": "counter",
},
"{}_latency_s".format(self.name): {
"value": latency_list,
"type": "list",
},
}
self.restart_counter.labels(replica_tag=self.replica_tag).add()
def get_runner_method(self, request_item):
method_name = request_item.call_method
if not hasattr(self.callable, method_name):
raise RayServeException("Backend doesn't have method {} "
"which is specified in the request. "
"The avaiable methods are {}".format(
method_name, dir(self)))
"The available methods are {}".format(
method_name, dir(self.callable)))
return getattr(self.callable, method_name)
def has_positional_args(self, f):
@@ -116,18 +118,17 @@ class RayServeWorker:
async def invoke_single(self, request_item):
args, kwargs, is_web_context = parse_request_item(request_item)
serve_context.web = is_web_context
start_timestamp = time.time()
method_to_call = self.get_runner_method(request_item)
args = args if self.has_positional_args(method_to_call) else []
method_to_call = ensure_async(method_to_call)
try:
result = await method_to_call(*args, **kwargs)
self.request_counter.add()
except Exception as e:
result = wrap_to_ray_error(e)
self.error_counter += 1
self.error_counter.add()
self.latency_list.append(time.time() - start_timestamp)
return result
async def invoke_batch(self, request_item_list):
@@ -191,10 +192,9 @@ class RayServeWorker:
# Flask requests are passed to __call__ as a list
arg_list = [arg_list]
start_timestamp = time.time()
self.request_counter.add(batch_size)
result_list = await call_method(*arg_list, **kwargs_list)
self.latency_list.append(time.time() - start_timestamp)
if (not isinstance(result_list,
list)) or (len(result_list) != batch_size):
error_message = ("Worker doesn't preserve batch size. The "
@@ -206,7 +206,7 @@ class RayServeWorker:
return result_list
except Exception as e:
wrapped_exception = wrap_to_ray_error(e)
self.error_counter += batch_size
self.error_counter.add()
return [wrapped_exception for _ in range(batch_size)]
async def handle_request(self, request):
+4 -1
View File
@@ -8,7 +8,7 @@ SERVE_ROUTER_NAME = "SERVE_ROUTER_ACTOR"
SERVE_PROXY_NAME = "SERVE_PROXY_ACTOR"
#: Actor name used to register metric monitor actor
SERVE_METRIC_MONITOR_NAME = "SERVE_METRIC_MONITOR_ACTOR"
SERVE_METRIC_SINK_NAME = "SERVE_METRIC_SINK_ACTOR"
#: HTTP Address
DEFAULT_HTTP_ADDRESS = "http://127.0.0.1:8000"
@@ -24,3 +24,6 @@ ASYNC_CONCURRENCY = int(1e6)
#: Default latency SLO
DEFAULT_LATENCY_SLO_MS = 1e9
#: Interval for metric client to push metrics to exporters
METRIC_PUSH_INTERVAL_S = 2
+32 -4
View File
@@ -4,11 +4,12 @@ import socket
import uvicorn
import ray
from ray.serve.constants import SERVE_MASTER_NAME
from ray.serve.context import TaskContext
from ray.serve.metric import MetricClient
from ray.serve.request_params import RequestMetadata
from ray.serve.http_util import Response
from ray.serve.utils import logger, retry_actor_failures_async
from ray.serve.constants import SERVE_MASTER_NAME
from urllib.parse import parse_qs
@@ -29,10 +30,21 @@ class HTTPProxy:
async def fetch_config_from_master(self):
assert ray.is_initialized()
master = ray.util.get_actor(SERVE_MASTER_NAME)
self.route_table, [
self.router_handle
] = await retry_actor_failures_async(master.get_http_proxy_config)
# The exporter is required to return results for /-/metrics endpoint.
[self.metric_exporter] = await retry_actor_failures_async(
master.get_metric_exporter)
self.metric_client = MetricClient.connect_from_serve()
self.request_counter = self.metric_client.new_counter(
"num_http_requests",
description="The number of requests processed",
label_names=("route", ))
def set_route_table(self, route_table):
self.route_table = route_table
@@ -90,6 +102,19 @@ class HTTPProxy:
return sender
async def _handle_system_request(self, scope, receive, send):
current_path = scope["path"]
if current_path == "/-/routes":
await Response(self.route_table).send(scope, receive, send)
elif current_path == "/-/metrics":
metric_info = await retry_actor_failures_async(
self.metric_exporter.inspect_metrics)
await Response(metric_info).send(scope, receive, send)
else:
await Response(
"System path {} not found".format(current_path),
status_code=404).send(scope, receive, send)
async def __call__(self, scope, receive, send):
# NOTE: This implements ASGI protocol specified in
# https://asgi.readthedocs.io/en/latest/specs/index.html
@@ -104,8 +129,11 @@ class HTTPProxy:
"Route table must be set via set_route_table.")
assert scope["type"] == "http"
current_path = scope["path"]
if current_path == "/-/routes":
await Response(self.route_table).send(scope, receive, send)
self.request_counter.labels(route=current_path).add()
if current_path.startswith("/-/"):
await self._handle_system_request(scope, receive, send)
return
try:
@@ -120,7 +148,7 @@ class HTTPProxy:
if scope["method"] not in methods_allowed:
error_message = ("Methods {} not allowed. "
"Avaiable HTTP methods are {}.").format(
"Available HTTP methods are {}.").format(
scope["method"], methods_allowed)
await error_sender(error_message, 405)
return
+27 -26
View File
@@ -7,12 +7,12 @@ import time
import ray
import ray.cloudpickle as pickle
from ray.serve.constants import (ASYNC_CONCURRENCY, SERVE_ROUTER_NAME,
SERVE_PROXY_NAME, SERVE_METRIC_MONITOR_NAME)
SERVE_PROXY_NAME, SERVE_METRIC_SINK_NAME)
from ray.serve.http_proxy import HTTPProxyActor
from ray.serve.kv_store import RayInternalKVStore
from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop)
from ray.serve.backend_worker import create_backend_worker
from ray.serve.utils import async_retryable, get_random_letters, logger
from ray.serve.metric.exporter import MetricExporterActor
import numpy as np
@@ -49,7 +49,8 @@ class ServeMaster:
"""
async def __init__(self, router_class, router_kwargs, start_http_proxy,
http_proxy_host, http_proxy_port, metric_gc_window_s):
http_proxy_host, http_proxy_port,
metric_exporter_class):
# Used to read/write checkpoints.
# TODO(edoakes): namespace the master actor and its checkpoints.
self.kv_store = RayInternalKVStore()
@@ -82,14 +83,14 @@ class ServeMaster:
# Cached handles to actors in the system.
self.router = None
self.http_proxy = None
self.metric_monitor = None
self.metric_exporter = None
# If starting the actor for the first time, starts up the other system
# components. If recovering, fetches their actor handles.
self._get_or_start_metric_exporter(metric_exporter_class)
self._get_or_start_router(router_class, router_kwargs)
if start_http_proxy:
self._get_or_start_http_proxy(http_proxy_host, http_proxy_port)
self._get_or_start_metric_monitor(metric_gc_window_s)
# NOTE(edoakes): unfortunately, we can't completely recover from a
# checkpoint in the constructor because we block while waiting for
@@ -157,26 +158,23 @@ class ServeMaster:
"""Called by the HTTP proxy on startup to fetch required state."""
return self.routes, self.get_router()
def _get_or_start_metric_monitor(self, gc_window_s):
"""Get the metric monitor belonging to this serve cluster.
def _get_or_start_metric_exporter(self, metric_exporter_class):
"""Get the metric exporter belonging to this serve cluster.
If the metric monitor does not already exist, it will be started.
If the metric exporter does not already exist, it will be started.
"""
try:
self.metric_monitor = ray.util.get_actor(SERVE_METRIC_MONITOR_NAME)
self.metric_exporter = ray.util.get_actor(SERVE_METRIC_SINK_NAME)
except ValueError:
logger.info("Starting metric monitor with name '{}'".format(
SERVE_METRIC_MONITOR_NAME))
self.metric_monitor = MetricMonitor.options(
logger.info("Starting metric exporter with name '{}'".format(
SERVE_METRIC_SINK_NAME))
self.metric_exporter = MetricExporterActor.options(
detached=True,
name=SERVE_METRIC_MONITOR_NAME).remote(gc_window_s)
# TODO(edoakes): move these into the constructor.
start_metric_monitor_loop.remote(self.metric_monitor)
self.metric_monitor.add_target.remote(self.router)
name=SERVE_METRIC_SINK_NAME).remote(metric_exporter_class)
def get_metric_monitor(self):
"""Returns a handle to the metric monitor managed by this actor."""
return [self.metric_monitor]
def get_metric_exporter(self):
"""Returns a handle to the metric exporter managed by this actor."""
return [self.metric_exporter]
def _checkpoint(self):
"""Checkpoint internal state and write it to the KV store."""
@@ -213,10 +211,16 @@ class ServeMaster:
logger.info("Recovering from checkpoint")
# Load internal state from the checkpoint data.
(self.routes, self.backends, self.traffic_policies, self.replicas,
self.replicas_to_start, self.replicas_to_stop,
self.backends_to_remove,
self.endpoints_to_remove) = pickle.loads(checkpoint_bytes)
(
self.routes,
self.backends,
self.traffic_policies,
self.replicas,
self.replicas_to_start,
self.replicas_to_stop,
self.backends_to_remove,
self.endpoints_to_remove,
) = pickle.loads(checkpoint_bytes)
# Fetch actor handles for all of the backend replicas in the system.
# All of these workers are guaranteed to already exist because they
@@ -323,9 +327,6 @@ class ServeMaster:
await self.router.add_new_worker.remote(
backend_tag, replica_tag, worker_handle)
# Register the worker with the metric monitor.
self.metric_monitor.add_target.remote(worker_handle)
self.replicas_to_start.clear()
async def _stop_pending_replicas(self):
-157
View File
@@ -1,157 +0,0 @@
import time
import numpy as np
import pandas as pd
import ray
@ray.remote(num_cpus=0)
class MetricMonitor:
def __init__(self, gc_window_seconds=3600):
"""Metric monitor scrapes metrics from ray serve actors
and allow windowed query operations.
Args:
gc_window_seconds(int): How long will we keep the metric data in
memory. Data older than the gc_window will be deleted.
"""
#: Mapping actor ID (hex) -> actor handle
self.actor_handles = dict()
self.data_entries = []
self.gc_window_seconds = gc_window_seconds
self.latest_gc_time = time.time()
def add_target(self, target_handle):
hex_id = target_handle._actor_id.hex()
self.actor_handles[hex_id] = target_handle
def remove_target(self, target_handle):
hex_id = target_handle._actor_id.hex()
self.actor_handles.pop(hex_id)
def scrape(self):
# If expected gc time has passed, we will perform metric value GC.
expected_gc_time = self.latest_gc_time + self.gc_window_seconds
if expected_gc_time < time.time():
self._perform_gc()
self.latest_gc_time = time.time()
curr_time = time.time()
result = [
handle.get_metrics.remote()
for handle in self.actor_handles.values()
]
# TODO(simon): handle the possibility that an actor_handle is removed
for handle_result in ray.get(result):
for metric_name, metric_info in handle_result.items():
data_entry = {
"retrieved_at": curr_time,
"name": metric_name,
"type": metric_info["type"],
}
if metric_info["type"] == "counter":
data_entry["value"] = metric_info["value"]
self.data_entries.append(data_entry)
elif metric_info["type"] == "list":
for metric_value in metric_info["value"]:
new_entry = data_entry.copy()
new_entry["value"] = metric_value
self.data_entries.append(new_entry)
def _perform_gc(self):
curr_time = time.time()
earliest_time_allowed = curr_time - self.gc_window_seconds
# If we don"t have any data at hand, no need to gc.
if len(self.data_entries) == 0:
return
df = pd.DataFrame(self.data_entries)
df = df[df["retrieved_at"] >= earliest_time_allowed]
self.data_entries = df.to_dict(orient="record")
def _get_dataframe(self):
return pd.DataFrame(self.data_entries)
def collect(self,
percentiles=[50, 90, 95],
agg_windows_seconds=[10, 60, 300, 600, 3600]):
"""Collect and perform aggregation on all metrics.
Args:
percentiles(List[int]): The percentiles for aggregation operations.
Default is 50th, 90th, 95th percentile.
agg_windows_seconds(List[int]): The aggregation windows in seconds.
The longest aggregation window must be shorter or equal to the
gc_window_seconds.
"""
result = {}
df = pd.DataFrame(self.data_entries)
if len(df) == 0: # no metric to report
return {}
# Retrieve the {metric_name -> metric_type} mapping
metric_types = df[["name",
"type"]].set_index("name").squeeze().to_dict()
for metric_name, metric_type in metric_types.items():
if metric_type == "counter":
result[metric_name] = df.loc[df["name"] == metric_name,
"value"].tolist()[-1]
if metric_type == "list":
result.update(
self._aggregate(metric_name, percentiles,
agg_windows_seconds))
return result
def _aggregate(self, metric_name, percentiles, agg_windows_seconds):
"""Perform aggregation over a metric.
Note:
This metric must have type `list`.
"""
assert max(agg_windows_seconds) <= self.gc_window_seconds, (
"Aggregation window exceeds gc window. You should set a longer gc "
"window or shorter aggregation window.")
curr_time = time.time()
df = pd.DataFrame(self.data_entries)
filtered_df = df[df["name"] == metric_name]
if len(filtered_df) == 0:
return dict()
data_types = filtered_df["type"].unique().tolist()
assert data_types == [
"list"
], ("Can't aggreagte over non-list type. {} has type {}".format(
metric_name, data_types))
aggregated_metric = {}
for window in agg_windows_seconds:
earliest_time = curr_time - window
windowed_df = filtered_df[
filtered_df["retrieved_at"] > earliest_time]
percentile_values = np.percentile(windowed_df["value"],
percentiles)
for percentile, value in zip(percentiles, percentile_values):
result_key = "{name}_{perc}th_perc_{window}_window".format(
name=metric_name, perc=percentile, window=window)
aggregated_metric[result_key] = value
return aggregated_metric
@ray.remote(num_cpus=0)
def start_metric_monitor_loop(monitor_handle, duration_s=5):
while True:
time.sleep(duration_s)
try:
ray.get(monitor_handle.scrape.remote())
except ray.exceptions.RayActorError:
pass
+4
View File
@@ -0,0 +1,4 @@
from ray.serve.metric.client import MetricClient
from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter)
__all__ = ["MetricClient", "InMemoryExporter", "PrometheusExporter"]
+151
View File
@@ -0,0 +1,151 @@
import asyncio
from typing import Dict, Optional, Tuple
from ray.serve.metric.types import (
MetricType,
convert_event_type_to_class,
MetricMetadata,
)
from ray.serve.utils import (retry_actor_failures, retry_actor_failures_async,
_get_logger)
from ray.serve.constants import METRIC_PUSH_INTERVAL_S
logger = _get_logger()
class MetricClient:
def __init__(
self,
metric_exporter_actor,
push_interval: float = METRIC_PUSH_INTERVAL_S,
default_labels: Optional[Dict[str, str]] = None,
):
"""Initialize a client to push metrics to the exporter actor.
Args:
metric_exporter_actor: The actor to push metrics to.
default_labels(dict): The set of labels to apply for all metrics
created by this actor. For example, {"source": "worker"}.
"""
self.exporter = metric_exporter_actor
self.default_labels = default_labels or dict()
self.registered_metrics: Dict[str, MetricMetadata] = dict()
self.metric_records = []
assert asyncio.get_event_loop().is_running()
self.push_task = asyncio.get_event_loop().create_task(
self.push_to_exporter_forever(push_interval))
logger.debug("Initialized client")
@staticmethod
def connect_from_serve(default_labels: Optional[Dict[str, str]] = None):
"""Create the metric client automatically when running inside serve."""
from ray.serve.api import _get_master_actor
master_actor = _get_master_actor()
[metric_exporter] = retry_actor_failures(
master_actor.get_metric_exporter)
return MetricClient(
metric_exporter_actor=metric_exporter,
default_labels=default_labels)
def new_counter(self,
name: str,
*,
description: Optional[str] = "",
label_names: Optional[Tuple[str]] = ()):
"""Create a new counter.
Counters are used to capture changes in running sums. An essential
property of Counter instruments is that two events add(m) and add(n)
are semantically equivalent to one event add(m+n). This property means
that Counter events can be combined.
Args:
name(str): The unique name for the counter.
description(Optional[str]): The description for the counter.
label_names(Optional[Tuple[str]]): The set of label names to be
added when recording the metrics.
Usage:
>>> client = MetricClient(...)
>>> counter = client.new_counter(
"http_counter",
description="This is a simple counter for HTTP status",
label_names=("route", "status_code"))
>>> counter.labels(route="/hi", status_code=200).add()
"""
return self._new_metric(name, MetricType.COUNTER, description,
label_names)
def new_measure(self,
name,
*,
description: Optional[str] = "",
label_names: Optional[Tuple[str]] = ()):
"""Create a new measure.
Measure instruments are independent. They cannot be combined as with
counters. Measures can be aggregated after recording to compute
statistics about the distribution along selected dimension.
Args:
name(str): The unique name for the measure.
description(Optional[str]): The description for the measure.
label_names(Optional[Tuple[str]]): The set of label names to be
added when recording the metrics.
Usage:
>>> client = MetricClient(...)
>>> measure = client.new_measure(
"latency_measure",
description="This is a simple measure for latency in ms",
label_names=("route"))
>>> measure.labels(route="/hi").record(42)
"""
return self._new_metric(name, MetricType.MEASURE, description,
label_names)
def _new_metric(
self,
name,
metric_type: MetricType,
description: str,
label_names: Tuple[str] = (),
):
if name in self.registered_metrics:
raise ValueError(
"Metric with name {} is already registered.".format(name))
if not isinstance(label_names, tuple):
raise ValueError("label_names need to be a tuple, it is {}".format(
type(label_names)))
metric_metadata = MetricMetadata(
name=name,
type=metric_type,
description=description,
label_names=label_names,
default_labels=self.default_labels.copy(),
)
metric_class = convert_event_type_to_class(metric_type)
metric_object = metric_class(
client=self, name=name, label_names=label_names)
self.registered_metrics[name] = metric_metadata
return metric_object
async def _push_to_exporter_once(self):
if len(self.metric_records) == 0:
return
old_batch, self.metric_records = self.metric_records, []
logger.debug("Pushing metric batch {}".format(old_batch))
await retry_actor_failures_async(self.exporter.ingest,
self.registered_metrics, old_batch)
async def push_to_exporter_forever(self, interval_s):
while True:
await self._push_to_exporter_once()
await asyncio.sleep(interval_s)
+150
View File
@@ -0,0 +1,150 @@
from typing import Dict
from collections import Counter, namedtuple
import ray
from ray.serve.metric.types import MetricType, MetricMetadata, MetricBatch
from ray.serve.utils import _get_logger
logger = _get_logger()
def make_metric_namedtuple(metric_metadata: MetricMetadata,
record: MetricBatch):
fields = ["name", "type"]
fields += list(metric_metadata.default_labels.keys())
fields += list(record.labels.keys())
merged_labels = metric_metadata.default_labels.copy()
merged_labels.update(record.labels)
tuple_type = namedtuple(metric_metadata.name, fields)
return tuple_type(
name=metric_metadata.name, type=metric_metadata.type, **merged_labels)
class ExporterInterface:
def export(self, metric_metadata: Dict[str, MetricMetadata],
metric_batch: MetricBatch):
raise NotImplementedError(
"This method should be implemented by subclass.")
def inspect_metrics(self):
raise NotImplementedError(
"This method should be implemented by subclass.")
@ray.remote(num_cpus=0)
class MetricExporterActor:
"""Aggregate metrics from all RayServe actors and export them.
Metrics are aggregated pushed from other actors in the system to
this actor. They can then be stored or pushed to an external monitoring
system.
"""
def __init__(self, exporter_class: ExporterInterface):
# TODO(simon): Add support for initializer args and kwargs.
self.exporter = exporter_class()
# Stores the mapping metric_name -> MetricMetadata
# This field is tolerant to failures since each client will always push
# an updated copy of the metadata for each ingest call.
self.metric_metadata = dict()
logger.debug("Initialized with metric exporter of type {}".format(
type(self.exporter)))
def ingest(self, metric_metadata: Dict[str, MetricMetadata],
batch: MetricBatch):
self.metric_metadata.update(metric_metadata)
self.exporter.export(self.metric_metadata, batch)
def inspect_metrics(self):
return self.exporter.inspect_metrics()
class InMemoryExporter(ExporterInterface):
def __init__(self):
# Keep track of counters
self.counters: Counter[namedtuple, float] = Counter()
# Keep track of latest observation of measures
self.latest_measures: Dict[namedtuple, float] = dict()
def export(self, metric_metadata, metric_batch):
for record in metric_batch:
metadata = metric_metadata[record.name]
metric_key = make_metric_namedtuple(metadata, record)
if metadata.type == MetricType.COUNTER:
self.counters[metric_key] += record.value
elif metadata.type == MetricType.MEASURE:
self.latest_measures[metric_key] = record.value
else:
raise RuntimeError("Unrecognized metric type {}".format(
metadata.type))
def inspect_metrics(self):
items = []
metrics_to_collect = {**self.counters, **self.latest_measures}
for info_tuple, value in metrics_to_collect.items():
# Represent the metric type as a human readable name
info_tuple = info_tuple._replace(type=str(info_tuple.type))
items.append({"info": info_tuple._asdict(), "value": value})
return items
class PrometheusExporter(ExporterInterface):
def __init__(self):
super().__init__()
from prometheus_client import (CollectorRegistry, Counter, Gauge,
generate_latest)
self.metric_type_to_prom_type = {
MetricType.COUNTER: Counter,
MetricType.MEASURE: Gauge
}
self.prom_generate_latest = generate_latest
self.metrics_cache = dict()
self.default_labels = dict()
self.registry = CollectorRegistry()
def inspect_metrics(self):
return self.prom_generate_latest(self.registry)
def export(self, metric_metadata, metric_batch):
self._process_metric_metadata(metric_metadata)
self._process_batch(metric_batch)
def _process_metric_metadata(self, metric_metadata):
for name, metric_metadata in metric_metadata.items():
if name not in self.metrics_cache:
constructor = self.metric_type_to_prom_type[
metric_metadata.type]
default_labels = metric_metadata.default_labels
label_names = tuple(
default_labels.keys()) + metric_metadata.label_names
metric_object = constructor(
metric_metadata.name,
metric_metadata.description,
labelnames=label_names,
registry=self.registry,
)
self.metrics_cache[name] = (metric_object,
metric_metadata.type)
self.default_labels[name] = default_labels
def _process_batch(self, batch):
for name, labels, value in batch:
assert name in self.metrics_cache, (
"Metrics {} was not registered.".format(name))
metric, metric_type = self.metrics_cache[name]
default_labels = self.default_labels[name]
merged_labels = {**default_labels, **labels}
if metric_type == MetricType.COUNTER:
metric.labels(**merged_labels).inc(value)
elif metric_type == MetricType.MEASURE:
metric.labels(**merged_labels).set(value)
else:
raise RuntimeError(
"Unrecognized metric type {}".format(metric_type))
+91
View File
@@ -0,0 +1,91 @@
import enum
from typing import Tuple, List, Dict, Optional
from collections import namedtuple
# We split the information about a metric into two parts: the MetricMetadata
# and MetricRecord. Metadata is declared at creation time and include names
# and the label names. The label values will be supplied at observation time.
MetricMetadata = namedtuple(
"MetricMetadata",
["name", "type", "description", "label_names", "default_labels"])
MetricRecord = namedtuple("MetricRecord", ["name", "labels", "value"])
MetricBatch = List[MetricRecord]
class BaseMetric:
def __init__(self,
client,
name: str,
label_names: Tuple[str],
dynamic_labels: Optional[Dict[str, str]] = None):
"""Represent a single metric stream
Args:
client(MetricClient): The client object to push update to.
name(str): The name of the metric.
label_names(Tuple[str]): The names of the labels that must be set
before an observation.
dynamic_labels(Optional[Dict[str,str]]): A partially preset labels.
This fields make it possible to chain label calls together:
``metric.labels(a=b).labels(c=d)``.
"""
self.client = client
self.name = name
self.dynamic_labels = dynamic_labels or dict()
self.label_names = label_names
def check_all_labels_fulfilled_or_error(self):
unfulfilled = set(self.label_names) - set(self.dynamic_labels.keys())
if len(unfulfilled) != 0:
raise ValueError("The following labels doesn't have associated "
"values: {}".format(unfulfilled))
def labels(self, **kwargs):
"""Apply dynamic label to the metric
Usage:
>>> metric = BaseMetric(..., label_names=("a", "b"))
>>> metric.labels(a=1, b=2)
>>> metric.labels(a=1).labels(b=2) # Equivalent
"""
new_dynamic_labels = self.dynamic_labels.copy()
for k, v in kwargs.items():
if k not in self.label_names:
raise ValueError(
"Label {} was not part of registered "
"label names. Allowed label names are {}.".format(
k, self.label_names))
new_dynamic_labels[k] = str(v)
return type(self)(self.client, self.name, self.label_names,
new_dynamic_labels)
# The metric types are inspired by OpenTelemetry spec:
# https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/api.md#three-kinds-of-instrument
class Counter(BaseMetric):
def add(self, increment=1):
"""Increment the counter by some amount. Default is 1"""
self.check_all_labels_fulfilled_or_error()
self.client.metric_records.append(
MetricRecord(self.name, self.dynamic_labels, increment))
class Measure(BaseMetric):
def record(self, value):
"""Record the given value for the measure"""
self.check_all_labels_fulfilled_or_error()
self.client.metric_records.append(
MetricRecord(self.name, self.dynamic_labels, value))
class MetricType(enum.IntEnum):
COUNTER = 1
MEASURE = 2
def convert_event_type_to_class(event_type: MetricType) -> BaseMetric:
if event_type == MetricType.COUNTER:
return Counter
if event_type == MetricType.MEASURE:
return Measure
raise RuntimeError("Unknown event type {}".format(event_type))
+30 -12
View File
@@ -14,6 +14,8 @@ import blist
import ray
import ray.cloudpickle as pickle
from ray.exceptions import RayTaskError
from ray.serve.metric import MetricClient
from ray.serve.utils import logger, retry_actor_failures
@@ -166,22 +168,30 @@ class Router:
for backend, backend_config in backend_configs.items():
await self.set_backend_config(backend, backend_config)
self.metric_client = MetricClient.connect_from_serve()
self.num_router_requests = self.metric_client.new_counter(
"num_router_requests",
description="Number of requests processed by the router.",
label_names=("endpoint", ))
self.num_error_endpoint_request = self.metric_client.new_counter(
"num_error_endpoint_requests",
description=("Number of requests errored when getting result "
"for endpoint."),
label_names=("endpoint", ))
self.num_error_backend_request = self.metric_client.new_counter(
"num_error_backend_requests",
description=("Number of requests errored when getting result "
"from backend."),
label_names=("backend", ))
def is_ready(self):
return True
def get_metrics(self):
return {
"backend_{}_queue_size".format(backend_name): {
"value": len(queue),
"type": "counter",
}
for backend_name, queue in self.buffer_queues.items()
}
async def enqueue_request(self, request_meta, *request_args,
**request_kwargs):
endpoint = request_meta.endpoint
logger.debug("Received a request for endpoint {}".format(endpoint))
self.num_router_requests.labels(endpoint=endpoint).add()
# check if the slo specified is directly the
# wall clock time
@@ -202,7 +212,11 @@ class Router:
# Note: a future change can be to directly return the ObjectID from
# replica task submission
result = await query.async_future
try:
result = await query.async_future
except RayTaskError as e:
self.num_error_endpoint_request.labels(endpoint=endpoint).add()
result = e
return result
async def add_new_worker(self, backend_tag, replica_tag, worker_handle):
@@ -341,9 +355,13 @@ class Router:
logger.debug("Sending query to replica:" + backend_replica_tag)
start = time.time()
worker = self.replicas[backend_replica_tag]
result = await worker.handle_request.remote(req)
try:
result = await worker.handle_request.remote(req)
except RayTaskError as error:
self.num_error_backend_request.labels(backend=backend).add()
result = error
await self.mark_worker_idle(backend, backend_replica_tag)
logger.debug("Got result in {:.2f}s", time.time() - start)
logger.debug("Got result in {:.2f}s".format(time.time() - start))
return result
async def _assign_query_to_worker(self,
-11
View File
@@ -2,7 +2,6 @@ import os
import pytest
import ray
from ray import serve
if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False):
@@ -13,13 +12,3 @@ if os.environ.get("RAY_SERVE_INTENTIONALLY_CRASH", False):
def serve_instance():
serve.init(blocking=True, ray_init_kwargs={"num_cpus": 36})
yield
@pytest.fixture(scope="session")
def ray_instance():
ray_already_initialized = ray.is_initialized()
if not ray_already_initialized:
ray.init(object_store_memory=int(1e8))
yield
if not ray_already_initialized:
ray.shutdown()
@@ -26,9 +26,6 @@ def setup_worker(name, func_or_class, init_args=None):
def ready(self):
pass
def get_metrics(self):
return self.worker.get_metrics()
async def handle_request(self, *args, **kwargs):
return await self.worker.handle_request(*args, **kwargs)
+2 -2
View File
@@ -3,7 +3,7 @@ import pytest
from ray.serve.kv_store import RayInternalKVStore
def test_ray_internal_kv(ray_instance):
def test_ray_internal_kv(serve_instance):
with pytest.raises(TypeError):
RayInternalKVStore(namespace=1)
RayInternalKVStore(namespace=b"")
@@ -26,7 +26,7 @@ def test_ray_internal_kv(ray_instance):
assert kv.get("2") == b"4"
def test_ray_internal_kv_collisions(ray_instance):
def test_ray_internal_kv_collisions(serve_instance):
kv1 = RayInternalKVStore()
kv1.put("1", b"1")
assert kv1.get("1") == b"1"
+182 -59
View File
@@ -1,74 +1,197 @@
import numpy as np
import time
import pytest
import requests
import ray
from ray.serve.metric import MetricMonitor
from ray import serve
from ray.serve.metric.client import MetricClient
from ray.serve.metric.exporter import (InMemoryExporter, PrometheusExporter,
MetricExporterActor)
from ray.serve.metric.types import MetricType, MetricMetadata
pytestmark = pytest.mark.asyncio
@pytest.fixture(scope="session")
def start_target_actor(ray_instance):
@ray.remote
class Target:
def __init__(self):
self.counter_value = 0
class MockExporterActor:
def __init__(self):
self.metadata = dict()
self.batches = []
def get_metrics(self):
self.counter_value += 1
return {
"latency_list": {
"type": "list",
# Generate 0 to 100 inclusive.
# This means total of 101 items.
"value": np.arange(101).tolist()
},
"counter": {
"type": "counter",
"value": self.counter_value
}
}
@property
def ingest(self):
return self
def get_counter_value(self):
return self.counter_value
yield Target.remote()
async def remote(self, metadata, batch):
self.metadata.update(metadata)
self.batches.extend(batch)
def test_metric_gc(ray_instance, start_target_actor):
target_actor = start_target_actor
# this means when new scrapes are invoked, the
metric_monitor = MetricMonitor.remote(gc_window_seconds=0)
ray.get(metric_monitor.add_target.remote(target_actor))
async def test_client():
exporter = MockExporterActor()
collector = MetricClient(
exporter, push_interval=2, default_labels={"default": "label"})
counter = collector.new_counter(name="counter", label_names=("a", "b"))
ray.get(metric_monitor.scrape.remote())
df = ray.get(metric_monitor._get_dataframe.remote())
assert len(df) == 102
with pytest.raises(
ValueError, match="labels doesn't have associated values"):
counter.add()
# Old metric sould be cleared. So only 1 counter + 101 list values left.
ray.get(metric_monitor.scrape.remote())
df = ray.get(metric_monitor._get_dataframe.remote())
assert len(df) == 102
counter = counter.labels(a=1)
counter.labels(b=2).add()
counter.labels(b=3).add(42)
def test_metric_system(ray_instance, start_target_actor):
target_actor = start_target_actor
measure = collector.new_measure("measure")
measure.record(2)
metric_monitor = MetricMonitor.remote()
await collector._push_to_exporter_once()
ray.get(metric_monitor.add_target.remote(target_actor))
# Scrape once
ray.get(metric_monitor.scrape.remote())
percentiles = [50, 90, 95]
agg_windows_seconds = [60]
result = ray.get(
metric_monitor.collect.remote(percentiles, agg_windows_seconds))
real_counter_value = ray.get(target_actor.get_counter_value.remote())
expected_result = {
"counter": real_counter_value,
"latency_list_50th_perc_60_window": 50.0,
"latency_list_90th_perc_60_window": 90.0,
"latency_list_95th_perc_60_window": 95.0,
assert exporter.metadata == {
"counter": MetricMetadata(
name="counter",
type=MetricType.COUNTER,
description="",
label_names=("a", "b"),
default_labels={"default": "label"},
),
"measure": MetricMetadata(
name="measure",
type=MetricType.MEASURE,
description="",
label_names=(),
default_labels={"default": "label"},
)
}
assert result == expected_result
assert exporter.batches == [("counter", {
"a": "1",
"b": "2"
}, 1), ("counter", {
"a": "1",
"b": "3"
}, 42), ("measure", {}, 2)]
async def test_in_memory_exporter(serve_instance):
exporter = MetricExporterActor.remote(InMemoryExporter)
collector = MetricClient(
exporter, push_interval=2, default_labels={"default": "label"})
counter = collector.new_counter(name="my_counter", label_names=("a", ))
measure = collector.new_measure(
name="my_measure", description="help", label_names=("ray", "lang"))
measure = measure.labels(lang="C++")
counter.labels(a="1").add()
measure.labels(ray="").record(0)
measure.labels(ray="").record(42)
await collector._push_to_exporter_once()
metric_stored = await exporter.inspect_metrics.remote()
assert metric_stored == [{
"info": {
"name": "my_counter",
"type": "MetricType.COUNTER",
"default": "label",
"a": "1"
},
"value": 1
}, {
"info": {
"name": "my_measure",
"type": "MetricType.MEASURE",
"default": "label",
"lang": "C++",
"ray": ""
},
"value": 42
}]
async def test_prometheus_exporter(serve_instance):
exporter = MetricExporterActor.remote(PrometheusExporter)
collector = MetricClient(
exporter, push_interval=2, default_labels={"default": "label"})
counter = collector.new_counter(name="my_counter", label_names=("a", ))
measure = collector.new_measure(
name="my_measure", description="help", label_names=("ray", "lang"))
measure = measure.labels(lang="C++")
counter.labels(a="1").add()
measure.labels(ray="").record(0)
measure.labels(ray="").record(42)
await collector._push_to_exporter_once()
metric_stored = await exporter.inspect_metrics.remote()
metric_stored = metric_stored.decode()
fragments = [
"# HELP my_counter_total", "# TYPE my_counter_total counter",
'my_counter_total{a="1",default="label"} 1.0',
"# TYPE my_counter_created gauge",
'my_counter_created{a="1",default="label"}', "# HELP my_measure help",
"# TYPE my_measure gauge",
'my_measure{default="label",lang="C++",ray=""} 42.0'
]
for fragment in fragments:
assert fragment in metric_stored
async def test_system_metric_endpoints(serve_instance):
def test_error_counter(flask_request):
1 / 0
serve.create_endpoint("test_metrics", "/measure")
serve.create_backend("m:v1", test_error_counter)
serve.set_traffic("test_metrics", {"m:v1": 1})
# Check metrics are exposed under http endpoint
def test_metric_endpoint():
requests.get("http://127.0.0.1:8000/measure", timeout=5)
in_memory_metric = requests.get(
"http://127.0.0.1:8000/-/metrics", timeout=5).json()
# We don't want to check the values since this check might be retried.
in_memory_metric_without_values = []
for m in in_memory_metric:
m.pop("value")
in_memory_metric_without_values.append(m)
target_metrics = [{
"info": {
"name": "num_http_requests",
"type": "MetricType.COUNTER",
"route": "/measure"
},
}, {
"info": {
"name": "num_router_requests",
"type": "MetricType.COUNTER",
"endpoint": "test_metrics"
},
}, {
"info": {
"name": "backend_error_counter",
"type": "MetricType.COUNTER",
"backend": "m:v1"
},
}]
for target in target_metrics:
assert target in in_memory_metric_without_values
success = False
for _ in range(3):
try:
test_metric_endpoint()
success = True
break
except (AssertionError, requests.ReadTimeout):
# Metrics may not have been propagated yet
time.sleep(2)
print("Metric not correct, retrying...")
if not success:
test_metric_endpoint()
+1 -1
View File
@@ -83,7 +83,7 @@ if "RAY_USE_NEW_GCS" in os.environ and os.environ["RAY_USE_NEW_GCS"] == "on":
extras = {
"debug": [],
"dashboard": ["requests"],
"serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas", "blist"],
"serve": ["uvicorn", "flask", "blist"],
"tune": ["tabulate", "tensorboardX", "pandas"]
}