[Metric] custom metrics refinement (#10861)

* In progress

* In Progress.

* Addressed code review.

* Add unit tests.

* Add a simple doc.

* Fixed test failure.

* Fix all test failures from serve.

* Addressed code review.
This commit is contained in:
SangBin Cho
2020-09-25 09:10:28 -07:00
committed by GitHub
parent 609c1b8acd
commit 109481afd9
9 changed files with 421 additions and 124 deletions
-38
View File
@@ -1,38 +0,0 @@
from ray._raylet import (
Count,
Histogram,
Gauge,
Sum,
) # noqa: E402
"""Metric/Stats module for worker.
This module is responsible for providing four classes mapping from stats of
cpp.
How to use:
For Count, Gauge and Sum, we may define a metric like this following:
gauge = Gauge(
'ray.worker.metric',
'description',
'unit',
['tagk1', 'tagk2']).
The last parameter is default tag map. You can use gauge.record(1.0) with
default tags or gauge.record(1.0, {'tagk1', 'tagv1'}) that means the tagk1
is updating in tagv1.
It's addtional boundaries to Histogram measurement,
histogram = Histogram(
'ray.worker.histogram1',
'a', 'b', [1.0, 2.0],
['tagk1'])
Recommended metric name pattern : ray.{component_name}.{module_name}, and
name format must be in [0-9a-zA-Z].
"""
__all__ = [
"Count",
"Histogram",
"Gauge",
"Sum",
]
+2 -1
View File
@@ -50,7 +50,8 @@ cdef class Metric:
# Default tags will be exported if it's empty map.
if tags:
for tag_k, tag_v in tags.items():
c_tags[tag_k.encode("ascii")] = tag_v.encode("ascii")
if tag_v is not None:
c_tags[tag_k.encode("ascii")] = tag_v.encode("ascii")
c_value = value
with nogil:
self.metric.get().Record(c_value, c_tags)
+72 -60
View File
@@ -12,7 +12,7 @@ from ray.async_compat import sync_to_async
from ray.serve.utils import (parse_request_item, _get_logger, chain_future,
unpack_future)
from ray.serve.exceptions import RayServeException
from ray.experimental import metrics
from ray.util import metrics
from ray.serve.config import BackendConfig
from ray.serve.router import Query
from ray.serve.constants import DEFAULT_LATENCY_BUCKET_MS
@@ -159,43 +159,72 @@ class RayServeWorker:
self.num_ongoing_requests = 0
self.request_counter = metrics.Count(
"backend_request_counter", ("Number of queries that have been "
"processed in this replica"),
"requests", ["backend"])
self.error_counter = metrics.Count("backend_error_counter",
("Number of exceptions that have "
"occurred in the backend"),
"errors", ["backend"])
"backend_request_counter",
description=("Number of queries that have been "
"processed in this replica"),
tag_keys=("backend", ))
self.request_counter.set_default_tags({"backend": self.backend_tag})
self.error_counter = metrics.Count(
"backend_error_counter",
description=("Number of exceptions that have "
"occurred in the backend"),
tag_keys=("backend", ))
self.error_counter.set_default_tags({"backend": self.backend_tag})
self.restart_counter = metrics.Count(
"backend_worker_starts",
("The number of time this replica workers "
"has been restarted due to failure."), "restarts",
["backend", "replica_tag"])
self.queuing_latency_tracker = metrics.Histogram(
"backend_queuing_latency_ms",
("The latency for queries waiting in the replica's queue "
"waiting to be processed or batched."), "ms",
DEFAULT_LATENCY_BUCKET_MS, ["backend", "replica_tag"])
self.processing_latency_tracker = metrics.Histogram(
"backend_processing_latency_ms",
"The latency for queries to be processed", "ms",
DEFAULT_LATENCY_BUCKET_MS,
["backend", "replica_tag", "batch_size"])
self.num_queued_items = metrics.Gauge(
"replica_queued_queries",
"Current number of queries queued in the the backend replicas",
"requests", ["backend", "replica_tag"])
self.num_processing_items = metrics.Gauge(
"replica_processing_queries",
"Current number of queries being processed", "requests",
["backend", "replica_tag"])
self.restart_counter.record(1, {
description=("The number of time this replica workers "
"has been restarted due to failure."),
tag_keys=("backend", "replica_tag"))
self.restart_counter.set_default_tags({
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.queuing_latency_tracker = metrics.Histogram(
"backend_queuing_latency_ms",
description=(
"The latency for queries waiting in the replica's queue "
"waiting to be processed or batched."),
boundaries=DEFAULT_LATENCY_BUCKET_MS,
tag_keys=("backend", "replica_tag"))
self.queuing_latency_tracker.set_default_tags({
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.processing_latency_tracker = metrics.Histogram(
"backend_processing_latency_ms",
description="The latency for queries to be processed",
boundaries=DEFAULT_LATENCY_BUCKET_MS,
tag_keys=("backend", "replica_tag", "batch_size"))
self.processing_latency_tracker.set_default_tags({
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.num_queued_items = metrics.Gauge(
"replica_queued_queries",
description=("Current number of queries queued in the "
"the backend replicas"),
tag_keys=("backend", "replica_tag"))
self.num_queued_items.set_default_tags({
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.num_processing_items = metrics.Gauge(
"replica_processing_queries",
description="Current number of queries being processed",
tag_keys=("backend", "replica_tag"))
self.num_processing_items.set_default_tags({
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.restart_counter.record(1)
asyncio.get_event_loop().create_task(self.main_loop())
def get_runner_method(self, request_item: Query) -> Callable:
@@ -216,17 +245,13 @@ class RayServeWorker:
start = time.time()
try:
result = await method_to_call(arg)
self.request_counter.record(1, {"backend": self.backend_tag})
self.request_counter.record(1)
except Exception as e:
result = wrap_to_ray_error(e)
self.error_counter.record(1, {"backend": self.backend_tag})
self.error_counter.record(1)
self.processing_latency_tracker.record(
(time.time() - start) * 1000, {
"backend": self.backend_tag,
"replica": self.replica_tag,
"batch_size": "1"
})
(time.time() - start) * 1000, tags={"batch_size": "1"})
return result
@@ -248,8 +273,7 @@ class RayServeWorker:
"Please only send the same type of requests in batching "
"mode.")
self.request_counter.record(batch_size,
{"backend": self.backend_tag})
self.request_counter.record(batch_size)
call_method = ensure_async(call_methods.pop())
result_list = await call_method(args)
@@ -274,15 +298,12 @@ class RayServeWorker:
raise RayServeException(error_message)
except Exception as e:
wrapped_exception = wrap_to_ray_error(e)
self.error_counter.record(1, {"backend": self.backend_tag})
self.error_counter.record(1)
result_list = [wrapped_exception for _ in range(batch_size)]
self.processing_latency_tracker.record(
(time.time() - timing_start) * 1000, {
"backend": self.backend_tag,
"replica_tag": self.replica_tag,
"batch_size": str(batch_size)
})
(time.time() - timing_start) * 1000,
tags={"batch_size": str(batch_size)})
return result_list
@@ -294,21 +315,12 @@ class RayServeWorker:
batch = await self.batch_queue.wait_for_batch()
# Record metrics
self.num_queued_items.record(self.batch_queue.qsize(), {
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.num_processing_items.record(
self.num_ongoing_requests - self.batch_queue.qsize(), {
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.num_queued_items.record(self.batch_queue.qsize())
self.num_processing_items.record(self.num_ongoing_requests -
self.batch_queue.qsize())
for query in batch:
queuing_time = (time.time() - query.tick_enter_replica) * 1000
self.queuing_latency_tracker.record(queuing_time, {
"backend": self.backend_tag,
"replica_tag": self.replica_tag
})
self.queuing_latency_tracker.record(queuing_time)
all_evaluated_futures = []
+5 -4
View File
@@ -7,7 +7,7 @@ import uvicorn
import ray
from ray.exceptions import RayTaskError
from ray.serve.context import TaskContext
from ray.experimental import metrics
from ray.util import metrics
from ray.serve.http_util import Response
from ray.serve.router import Router, RequestMetadata
@@ -32,8 +32,9 @@ class HTTPProxy:
self.route_table = await controller.get_router_config.remote()
self.request_counter = metrics.Count(
"num_http_requests", "The number of HTTP requests processed",
"requests", ["route"])
"num_http_requests",
description="The number of HTTP requests processed",
tag_keys=("route", ))
self.router = Router()
await self.router.setup(name, controller_name)
@@ -80,7 +81,7 @@ class HTTPProxy:
assert scope["type"] == "http"
current_path = scope["path"]
self.request_counter.record(1, {"route": current_path})
self.request_counter.record(1, tags={"route": current_path})
if current_path.startswith("/-/"):
await self._handle_system_request(scope, receive, send)
+20 -13
View File
@@ -9,7 +9,7 @@ from dataclasses import dataclass, field
from ray.exceptions import RayTaskError
import ray
from ray.experimental import metrics
from ray.util import metrics
from ray.serve.context import TaskContext
from ray.serve.endpoint_policy import RandomEndpointPolicy
from ray.serve.utils import logger, chain_future
@@ -139,21 +139,25 @@ class Router:
# -- Metrics Registration -- #
self.num_router_requests = metrics.Count(
"num_router_requests",
"Number of requests processed by the router.", "requests",
["endpoint"])
description="Number of requests processed by the router.",
tag_keys=("endpoint", ))
self.num_error_endpoint_requests = metrics.Count(
"num_error_endpoint_requests",
("Number of requests that errored when getting results "
"for the endpoint."), "requests", ["endpoint"])
description=(
"Number of requests that errored when getting results "
"for the endpoint."),
tag_keys=("endpoint", ))
self.num_error_backend_requests = metrics.Count(
"num_error_backend_requests",
("Number of requests that errored when getting result "
"from the backend."), "requests", ["backend"])
description=("Number of requests that errored when getting result "
"from the backend."),
tag_keys=("backend", ))
self.backend_queue_size = metrics.Gauge(
"backend_queued_queries",
"Current number of queries queued in the router for a backend",
"requests", ["backend"])
description=("Current number of queries queued "
"in the router for a backend"),
tag_keys=("backend", ))
asyncio.get_event_loop().create_task(self.report_queue_lengths())
@@ -161,7 +165,7 @@ class Router:
**request_kwargs):
endpoint = request_meta.endpoint
logger.debug("Received a request for endpoint {}".format(endpoint))
self.num_router_requests.record(1, {"endpoint": endpoint})
self.num_router_requests.record(1, tags={"endpoint": endpoint})
request_context = request_meta.request_context
query = Query(
@@ -177,7 +181,8 @@ class Router:
try:
result = await query.async_future
except RayTaskError as e:
self.num_error_endpoint_requests.record(1, {"endpoint": endpoint})
self.num_error_endpoint_requests.record(
1, tags={"endpoint": endpoint})
result = e
return result
@@ -301,7 +306,8 @@ class Router:
else:
result = await object_ref
except RayTaskError as error:
self.num_error_backend_requests.record(1, {"backend": backend})
self.num_error_backend_requests.record(
1, tags={"backend": backend})
result = error
self.queries_counter[backend][backend_replica_tag] -= 1
await self.mark_worker_idle(backend, backend_replica_tag)
@@ -358,6 +364,7 @@ class Router:
self.name, queue_lengths)
for backend, length in queue_lengths.items():
self.backend_queue_size.record(length, {"backend": backend})
self.backend_queue_size.record(
length, tags={"backend": backend})
await asyncio.sleep(REPORT_QUEUE_LENGTH_PERIOD_S)
+99 -6
View File
@@ -1,5 +1,6 @@
import json
from pprint import pformat
from unittest.mock import MagicMock
import requests
import pytest
@@ -7,7 +8,7 @@ from prometheus_client.parser import text_string_to_metric_families
import ray
from ray.metrics_agent import PrometheusServiceDiscoveryWriter
from ray.experimental.metrics import Count, Histogram
from ray.util.metrics import Count, Histogram, Gauge
from ray.test_utils import wait_for_condition, SignalActor
@@ -59,16 +60,16 @@ def _setup_cluster_for_test(ray_start_cluster):
# Generate some metrics from actor & tasks.
@ray.remote
def f():
counter = Count(f"test_counter", "desc", "unit", [])
counter.record(1, {})
counter = Count("test_counter", description="desc")
counter.record(1)
ray.get(worker_should_exit.wait.remote())
@ray.remote
class A:
async def ping(self):
histogram = Histogram("test_histogram", "desc", "unit", [0.1, 1.6],
[])
histogram.record(1.5, {})
histogram = Histogram(
"test_histogram", description="desc", boundaries=[0.1, 1.6])
histogram.record(1.5)
ray.get(worker_should_exit.wait.remote())
a = A.remote()
@@ -181,6 +182,98 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test):
test_cases() # Should fail assert
@pytest.fixture
def metric_mock():
mock = MagicMock()
mock.record.return_value = "haha"
yield mock
"""
Unit test custom metrics.
"""
def test_basic_custom_metrics(metric_mock):
# Make sure each of metric works as expected.
# -- Count --
count = Count("count", tag_keys=("a", ))
count._metric = metric_mock
count.record(1)
metric_mock.record.assert_called_with(1, tags={})
# -- Gauge --
gauge = Gauge("gauge", description="gauge")
gauge._metric = metric_mock
gauge.record(4)
metric_mock.record.assert_called_with(4, tags={})
# -- Histogram
histogram = Histogram(
"hist", description="hist", boundaries=[1.0, 3.0], tag_keys=("a", "b"))
histogram._metric = metric_mock
histogram.record(4)
metric_mock.record.assert_called_with(4, tags={})
tags = {"a": "3"}
histogram.record(10, tags=tags)
metric_mock.record.assert_called_with(10, tags=tags)
tags = {"a": "10", "b": "b"}
histogram.record(8, tags=tags)
metric_mock.record.assert_called_with(8, tags=tags)
def test_custom_metrics_info(metric_mock):
# Make sure .info public method works.
histogram = Histogram(
"hist", description="hist", boundaries=[1.0, 2.0], tag_keys=("a", "b"))
assert histogram.info["name"] == "hist"
assert histogram.info["description"] == "hist"
assert histogram.info["boundaries"] == [1.0, 2.0]
assert histogram.info["tag_keys"] == ("a", "b")
assert histogram.info["default_tags"] == {}
histogram.set_default_tags({"a": "a"})
assert histogram.info["default_tags"] == {"a": "a"}
def test_custom_metrics_default_tags(metric_mock):
histogram = Histogram(
"hist", description="hist", boundaries=[1.0, 2.0],
tag_keys=("a", "b")).set_default_tags({
"b": "b"
})
histogram._metric = metric_mock
# Check default tags.
histogram.record(4)
metric_mock.record.assert_called_with(4, tags={"b": "b"})
# Check specifying non-default tags.
histogram.record(10, tags={"a": "a"})
metric_mock.record.assert_called_with(10, tags={"a": "a", "b": "b"})
# Check overriding default tags.
tags = {"a": "10", "b": "c"}
histogram.record(8, tags=tags)
metric_mock.record.assert_called_with(8, tags=tags)
def test_custom_metrics_edge_cases(metric_mock):
# None or empty boundaries are not allowed.
with pytest.raises(ValueError):
Histogram("hist")
with pytest.raises(ValueError):
Histogram("hist", boundaries=[])
# Empty name is not allowed.
with pytest.raises(ValueError):
Count("")
# The tag keys must be a tuple type.
with pytest.raises(ValueError):
Count("name", tag_keys=("a"))
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
+183
View File
@@ -0,0 +1,183 @@
import logging
from typing import Dict, Any, List, Optional, Tuple
from ray._raylet import (
Count as CythonCount,
Histogram as CythonHistogram,
Gauge as CythonGauge,
) # noqa: E402
logger = logging.getLogger(__name__)
class Metric:
"""The parent class of custom metrics.
Ray's custom metrics APIs are rooted from this class and share
the same public methods.
"""
def __init__(self,
name: str,
description: str = "",
tag_keys: Optional[Tuple[str]] = None):
if len(name) == 0:
raise ValueError("Empty name is not allowed. "
"Please provide a metric name.")
self._name = name
self._description = description
# We don't specify unit because it won't be
# exported to Prometheus anyway.
self._unit = ""
# The default tags key-value pair.
self._default_tags = {}
# Keys of tags.
self._tag_keys = tag_keys or tuple()
# The Cython metric class. This should be set in the child class.
self._metric = None
if type(self._tag_keys) != tuple:
raise ValueError(
"tag_keys should be a tuple type, "
f"but {type(self._tag_keys)} type was specified instead.")
def set_default_tags(self, default_tags: Dict[str, str]):
"""Set default tags of metrics.
Example:
>>> # Note that set_default_tags returns the instance itself.
>>> counter = Counter("name")
>>> counter2 = counter.set_default_tags({"a": "b"})
>>> assert counter is counter2
>>> # this means you can instantiate it in this way.
>>> counter = Counter("name").set_default_tags({"a": "b"})
Args:
default_tags(dict): Default tags that are
used for every record method.
Returns:
Metric: it returns the instance itself.
"""
self._default_tags = default_tags
return self
def record(self, value: float, tags: dict = None) -> None:
"""Record the metric point of the metric.
Args:
value(float): The value to be recorded as a metric point.
"""
assert self._metric is not None
default_tag_copy = self._default_tags.copy()
default_tag_copy.update(tags or {})
self._metric.record(value, tags=default_tag_copy)
@property
def info(self) -> Dict[str, Any]:
"""Return the information of this metric.
Example:
>>> counter = Counter("name", description="desc")
print(counter.info)
\"""
{
"name": "name",
"description": "desc"
"tag_keys": ("ray.key")
"default_tags": {"ray.key": "abc"}
}
\"""
"""
return {
"name": self._name,
"description": self._description,
"tag_keys": self._tag_keys,
"default_tags": self._default_tags
}
class Count(Metric):
"""The count of the number of metric points.
This is corresponding to Prometheus' Count metric.
Args:
name(str): Name of the metric.
description(str): Description of the metric.
tag_keys(tuple): Tag keys of the metric.
"""
def __init__(self,
name: str,
description: str = "",
tag_keys: Optional[Tuple[str]] = None):
super().__init__(name, description, tag_keys)
self._metric = CythonCount(self._name, self._description, self._unit,
self._tag_keys)
class Histogram(Metric):
"""Histogram distribution of metric points.
This is corresponding to Prometheus' Histogram metric.
Recording metrics with histogram will enable you to import
min, mean, max, 25, 50, 95, 99 percentile latency.
Args:
name(str): Name of the metric.
description(str): Description of the metric.
boundaries(list): Boundaries of histogram buckets.
tag_keys(tuple): Tag keys of the metric.
"""
def __init__(self,
name: str,
description: str = "",
boundaries: List[float] = None,
tag_keys: Optional[Tuple[str]] = None):
super().__init__(name, description, tag_keys)
if boundaries is None or len(boundaries) == 0:
raise ValueError(
"boundaries argument should be provided when using the "
"Histogram class. EX) Histgoram(boundaries=[1.0, 2.0])")
self.boundaries = boundaries
self._metric = CythonHistogram(self._name, self._description,
self._unit, self.boundaries,
self._tag_keys)
@property
def info(self):
"""Return information about histogram metric."""
info = super().info
info.update({"boundaries": self.boundaries})
return info
class Gauge(Metric):
"""Gauge Keeps the last recorded value, drops everything before.
This is corresponding to Prometheus' Gauge metric.
Args:
name(str): Name of the metric.
description(str): Description of the metric.
tag_keys(tuple): Tag keys of the metric.
tags(dict): Dictionary of default tag values.
"""
def __init__(self,
name: str,
description: str = "",
tag_keys: Optional[Tuple[str]] = None):
super().__init__(name, description, tag_keys)
self._metric = CythonGauge(self._name, self._description, self._unit,
self._tag_keys)
__all__ = [
"Count",
"Histogram",
"Gauge",
]