[Stats] Basic Metrics Infrastructure (Metrics Agent + Prometheus Exporter) (#9607)

This commit is contained in:
SangBin Cho
2020-07-28 10:28:01 -07:00
committed by GitHub
parent feb3751824
commit 7e3ba289dc
18 changed files with 868 additions and 289 deletions
+17
View File
@@ -270,3 +270,20 @@ LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
------------------
Code in python/ray/prometheus_exporter.py is adapted from https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-prometheus/opencensus/ext/prometheus/stats_exporter/__init__.py
# Copyright 2018, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+29 -1
View File
@@ -1,5 +1,9 @@
from base64 import b64decode
import datetime
import random
import socket
from base64 import b64decode
import ray
@@ -45,3 +49,27 @@ def measures_to_dict(measures):
elif "doubleValue" in measure:
measures_dict[tags] = measure["doubleValue"]
return measures_dict
def get_unused_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
port = s.getsockname()[1]
# Try to generate a port that is far above the 'next available' one.
# This solves issue #8254 where GRPC fails because the port assigned
# from this method has been used by a different process.
for _ in range(30):
new_port = random.randint(port, 65535)
new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
new_s.bind(("", new_port))
except OSError:
new_s.close()
continue
s.close()
new_s.close()
return new_port
print("Unable to succeed in selecting a random port.")
s.close()
return port
+4 -3
View File
@@ -18,7 +18,7 @@ cdef class TagKey:
def __init__(self, name):
self.name = name.encode("ascii")
CTagKey.Register(self.name)
def name(self):
return self.name
@@ -34,8 +34,9 @@ cdef class Metric:
def __init__(self, tag_keys):
for tag_key in tag_keys:
self.c_tag_keys.push_back(CTagKey.Register(tag_key.encode("ascii")))
self.c_tag_keys.push_back(
CTagKey.Register(tag_key.encode("ascii")))
def record(self, value, tags=None):
"""Record a measurement of metric.
+183
View File
@@ -0,0 +1,183 @@
import logging
import threading
from collections import defaultdict
from typing import List
from opencensus.stats import aggregation
from opencensus.stats import measure as measure_module
from opencensus.stats.measurement_map import MeasurementMap
from opencensus.stats import stats as stats_module
from opencensus.tags import tag_key as tag_key_module
from opencensus.tags import tag_map as tag_map_module
from opencensus.tags import tag_value as tag_value_module
from opencensus.stats import view
from ray import prometheus_exporter
from ray.core.generated.common_pb2 import MetricPoint
logger = logging.getLogger(__name__)
# We don't need counter, histogram, or sum because reporter just needs to
# collect momental values (gauge) that are already counted or sampled
# (histogram for example), or summed inside cpp processes.
class Gauge(view.View):
def __init__(self, name, description, unit,
tags: List[tag_key_module.TagKey]):
self._measure = measure_module.MeasureInt(name, description, unit)
self._view = view.View(name, description, tags, self.measure,
aggregation.LastValueAggregation())
@property
def measure(self):
return self._measure
@property
def view(self):
return self._view
@property
def name(self):
return self.measure.name
@property
def description(self):
return self.measure.description
@property
def units(self):
return self.measure.unit
@property
def tags(self):
return self.view.columns
def __dict__(self):
return {
"name": self.measure.name,
"description": self.measure.description,
"units": self.measure.unit,
"tags": self.view.columns,
}
def __str__(self):
return self.__repr__()
def __repr__(self):
return str(self.__dict__())
class MetricsAgent:
def __init__(self, metrics_export_port):
assert metrics_export_port is not None
# OpenCensus classes.
self.view_manager = stats_module.stats.view_manager
self.stats_recorder = stats_module.stats.stats_recorder
# Port where we will expose metrics.
self.metrics_export_port = metrics_export_port
# metric name(str) -> view (view.View)
self._registry = defaultdict(lambda: None)
# Lock required because gRPC server uses
# multiple threads to process requests.
self._lock = threading.Lock()
# Whether or not there are metrics that are missing description and
# units information. This is used to dynamically update registry.
self._missing_information = False
# Configure exporter. (We currently only support prometheus).
self.view_manager.register_exporter(
prometheus_exporter.new_stats_exporter(
prometheus_exporter.Options(
namespace="ray", port=metrics_export_port)))
@property
def registry(self):
"""Return metric definition registry.
Metrics definition registry is dynamically updated
by metrics reported by Ray processes.
"""
return self._registry
def record_metrics_points(self, metrics_points: List[MetricPoint]):
with self._lock:
measurement_map = self.stats_recorder.new_measurement_map()
for metric_point in metrics_points:
self._register_if_needed(metric_point)
self._record(metric_point, measurement_map)
return self._missing_information
def _record(self, metric_point: MetricPoint,
measurement_map: MeasurementMap):
"""Record a single metric point to export.
NOTE: When this method is called, the caller should acquire a lock.
Args:
metric_point(MetricPoint) metric point defined in common.proto
measurement_map(MeasurementMap): Measurement map to record metrics.
"""
metric_name = metric_point.metric_name
tags = metric_point.tags
metric = self._registry.get(metric_name)
# Metrics should be always registered dynamically.
assert metric
tag_map = tag_map_module.TagMap()
for key, value in tags.items():
tag_key = tag_key_module.TagKey(key)
tag_value = tag_value_module.TagValue(value)
tag_map.insert(tag_key, tag_value)
metric_value = metric_point.value
measurement_map.measure_float_put(metric.measure, metric_value)
# NOTE: When we record this metric, timestamp will be renewed.
measurement_map.record(tag_map)
def _register_if_needed(self, metric_point: MetricPoint):
"""Register metrics if they are not registered.
NOTE: When this method is called, the caller should acquire a lock.
Unseen metrics:
Register it with Gauge type metrics. Note that all metrics in
the agent will be gauge because sampling is already done
within cpp processes.
Metrics that are missing description & units:
In this case, we will notify cpp proceses that we need this
information. Cpp processes will then report description and units
of all metrics they have.
Args:
metric_point metric point defined in common.proto
Return:
True if given metrics are missing description and units.
False otherwise.
"""
metric_name = metric_point.metric_name
metric_description = metric_point.description
metric_units = metric_point.units
if self._registry[metric_name] is None:
tags = metric_point.tags
metric_tags = []
for tag_key in tags:
metric_tags.append(tag_key_module.TagKey(tag_key))
metric = Gauge(metric_name, metric_description, metric_units,
metric_tags)
self._registry[metric_name] = metric
self.view_manager.register_view(metric.view)
# If there are missing description & unit information,
# we should notify cpp processes that we need them.
if not metric_description or not metric_units:
self._missing_information = True
if metric_description and metric_units:
self._registry[metric_name].view._description = metric_description
self._registry[
metric_name].view.measure._description = metric_description
self._registry[metric_name].view.measure._unit = metric_units
self._missing_information = False
+363
View File
@@ -0,0 +1,363 @@
# NOTE: This file has been copied from OpenCensus Python exporter.
# It is because OpenCensus Prometheus exporter hasn't released for a while
# and the latest version has a compatibility issue with the latest OpenCensus
# library.
import re
from prometheus_client import start_http_server
from prometheus_client.core import (
REGISTRY,
CollectorRegistry,
CounterMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
UnknownMetricFamily,
)
from opencensus.common.transports import sync
from opencensus.stats import aggregation_data as aggregation_data_module
from opencensus.stats import base_exporter
class Options(object):
""" Options contains options for configuring the exporter.
The address can be empty as the prometheus client will
assume it's localhost
:type namespace: str
:param namespace: The prometheus namespace to be used. Defaults to ''.
:type port: int
:param port: The Prometheus port to be used. Defaults to 8000.
:type address: str
:param address: The Prometheus address to be used. Defaults to ''.
:type registry: registry
:param registry: The Prometheus address to be used. Defaults to ''.
:type registry: :class:`~prometheus_client.core.CollectorRegistry`
:param registry: A Prometheus collector registry instance.
"""
def __init__(self,
namespace="",
port=8000,
address="",
registry=CollectorRegistry()):
self._namespace = namespace
self._registry = registry
self._port = int(port)
self._address = address
@property
def registry(self):
""" Prometheus Collector Registry instance
"""
return self._registry
@property
def namespace(self):
""" Prefix to be used with view name
"""
return self._namespace
@property
def port(self):
""" Port number to listen
"""
return self._port
@property
def address(self):
""" Endpoint address (default is localhost)
"""
return self._address
class Collector(object):
""" Collector represents the Prometheus Collector object
"""
def __init__(self, options=Options(), view_name_to_data_map=None):
if view_name_to_data_map is None:
view_name_to_data_map = {}
self._options = options
self._registry = options.registry
self._view_name_to_data_map = view_name_to_data_map
self._registered_views = {}
@property
def options(self):
""" Options to be used to configure the exporter
"""
return self._options
@property
def registry(self):
""" Prometheus Collector Registry instance
"""
return self._registry
@property
def view_name_to_data_map(self):
""" Map with all view data objects
that will be sent to Prometheus
"""
return self._view_name_to_data_map
@property
def registered_views(self):
""" Map with all registered views
"""
return self._registered_views
def register_view(self, view):
""" register_view will create the needed structure
in order to be able to sent all data to Prometheus
"""
v_name = get_view_name(self.options.namespace, view)
if v_name not in self.registered_views:
desc = {
"name": v_name,
"documentation": view.description,
"labels": list(map(sanitize, view.columns)),
"units": view.measure.unit
}
self.registered_views[v_name] = desc
self.registry.register(self)
def add_view_data(self, view_data):
""" Add view data object to be sent to server
"""
self.register_view(view_data.view)
v_name = get_view_name(self.options.namespace, view_data.view)
self.view_name_to_data_map[v_name] = view_data
# TODO: add start and end timestamp
def to_metric(self, desc, tag_values, agg_data):
""" to_metric translate the data that OpenCensus create
to Prometheus format, using Prometheus Metric object
:type desc: dict
:param desc: The map that describes view definition
:type tag_values: tuple of :class:
`~opencensus.tags.tag_value.TagValue`
:param object of opencensus.tags.tag_value.TagValue:
TagValue object used as label values
:type agg_data: object of :class:
`~opencensus.stats.aggregation_data.AggregationData`
:param object of opencensus.stats.aggregation_data.AggregationData:
Aggregated data that needs to be converted as Prometheus samples
:rtype: :class:`~prometheus_client.core.CounterMetricFamily` or
:class:`~prometheus_client.core.HistogramMetricFamily` or
:class:`~prometheus_client.core.UnknownMetricFamily` or
:class:`~prometheus_client.core.GaugeMetricFamily`
:returns: A Prometheus metric object
"""
metric_name = desc["name"]
metric_description = desc["documentation"]
label_keys = desc["labels"]
metric_units = desc["units"]
assert (len(tag_values) == len(label_keys))
# Prometheus requires that all tag values be strings hence
# the need to cast none to the empty string before exporting. See
# https://github.com/census-instrumentation/opencensus-python/issues/480
tag_values = [tv if tv else "" for tv in tag_values]
if isinstance(agg_data, aggregation_data_module.CountAggregationData):
metric = CounterMetricFamily(
name=metric_name,
documentation=metric_description,
unit=metric_units,
labels=label_keys)
metric.add_metric(labels=tag_values, value=agg_data.count_data)
return metric
elif isinstance(agg_data,
aggregation_data_module.DistributionAggregationData):
assert (agg_data.bounds == sorted(agg_data.bounds))
# buckets are a list of buckets. Each bucket is another list with
# a pair of bucket name and value, or a triple of bucket name,
# value, and exemplar. buckets need to be in order.
buckets = []
cum_count = 0 # Prometheus buckets expect cumulative count.
for ii, bound in enumerate(agg_data.bounds):
cum_count += agg_data.counts_per_bucket[ii]
bucket = [str(bound), cum_count]
buckets.append(bucket)
# Prometheus requires buckets to be sorted, and +Inf present.
# In OpenCensus we don't have +Inf in the bucket bonds so need to
# append it here.
buckets.append(["+Inf", agg_data.count_data])
metric = HistogramMetricFamily(
name=metric_name,
documentation=metric_description,
labels=label_keys)
metric.add_metric(
labels=tag_values,
buckets=buckets,
sum_value=agg_data.sum,
)
return metric
elif isinstance(agg_data, aggregation_data_module.SumAggregationData):
metric = UnknownMetricFamily(
name=metric_name,
documentation=metric_description,
labels=label_keys)
metric.add_metric(labels=tag_values, value=agg_data.sum_data)
return metric
elif isinstance(agg_data,
aggregation_data_module.LastValueAggregationData):
metric = GaugeMetricFamily(
name=metric_name,
documentation=metric_description,
labels=label_keys)
metric.add_metric(labels=tag_values, value=agg_data.value)
return metric
else:
raise ValueError(
"unsupported aggregation type %s" % type(agg_data))
def collect(self): # pragma: NO COVER
"""Collect fetches the statistics from OpenCensus
and delivers them as Prometheus Metrics.
Collect is invoked every time a prometheus.Gatherer is run
for example when the HTTP endpoint is invoked by Prometheus.
"""
for v_name, view_data in self.view_name_to_data_map.items():
if v_name not in self.registered_views:
continue
desc = self.registered_views[v_name]
for tag_values in view_data.tag_value_aggregation_data_map:
agg_data = view_data.tag_value_aggregation_data_map[tag_values]
metric = self.to_metric(desc, tag_values, agg_data)
yield metric
class PrometheusStatsExporter(base_exporter.StatsExporter):
""" Exporter exports stats to Prometheus, users need
to register the exporter as an HTTP Handler to be
able to export.
:type options:
:class:`~opencensus.ext.prometheus.stats_exporter.Options`
:param options: An options object with the parameters to instantiate the
prometheus exporter.
:type gatherer: :class:`~prometheus_client.core.CollectorRegistry`
:param gatherer: A Prometheus collector registry instance.
:type transport:
:class:`opencensus.common.transports.sync.SyncTransport` or
:class:`opencensus.common.transports.async_.AsyncTransport`
:param transport: An instance of a Transpor to send data with.
:type collector:
:class:`~opencensus.ext.prometheus.stats_exporter.Collector`
:param collector: An instance of the Prometheus Collector object.
"""
def __init__(self,
options,
gatherer,
transport=sync.SyncTransport,
collector=Collector()):
self._options = options
self._gatherer = gatherer
self._collector = collector
self._transport = transport(self)
self.serve_http()
REGISTRY.register(self._collector)
@property
def transport(self):
""" The transport way to be sent data to server
(default is sync).
"""
return self._transport
@property
def collector(self):
""" Collector class instance to be used
to communicate with Prometheus
"""
return self._collector
@property
def gatherer(self):
""" Prometheus Collector Registry instance
"""
return self._gatherer
@property
def options(self):
""" Options to be used to configure the exporter
"""
return self._options
def export(self, view_data):
""" export send the data to the transport class
in order to be sent to Prometheus in a sync or async way.
"""
if view_data is not None: # pragma: NO COVER
self.transport.export(view_data)
def on_register_view(self, view):
return NotImplementedError("Not supported by Prometheus")
def emit(self, view_data): # pragma: NO COVER
""" Emit exports to the Prometheus if view data has one or more rows.
Each OpenCensus AggregationData will be converted to
corresponding Prometheus Metric: SumData will be converted
to Untyped Metric, CountData will be a Counter Metric
DistributionData will be a Histogram Metric.
"""
for v_data in view_data:
if v_data.tag_value_aggregation_data_map:
self.collector.add_view_data(v_data)
def serve_http(self):
""" serve_http serves the Prometheus endpoint.
"""
start_http_server(
port=self.options.port, addr=str(self.options.address))
def new_stats_exporter(option):
""" new_stats_exporter returns an exporter
that exports stats to Prometheus.
"""
if option.namespace == "":
raise ValueError("Namespace can not be empty string.")
collector = new_collector(option)
exporter = PrometheusStatsExporter(
options=option, gatherer=option.registry, collector=collector)
return exporter
def new_collector(options):
""" new_collector should be used
to create instance of Collector class in order to
prevent the usage of constructor directly
"""
return Collector(options=options)
def get_view_name(namespace, view):
""" create the name for the view
"""
name = ""
if namespace != "":
name = namespace + "_"
return sanitize(name + view.name)
_NON_LETTERS_NOR_DIGITS_RE = re.compile(r"[^\w]", re.UNICODE | re.IGNORECASE)
def sanitize(key):
""" sanitize the given metric name or label according to Prometheus rule.
Replace all characters other than [A-Za-z0-9_] with '_'.
"""
return _NON_LETTERS_NOR_DIGITS_RE.sub("_", key)
+30 -6
View File
@@ -10,13 +10,17 @@ import platform
import subprocess
import sys
from concurrent import futures
import ray
import psutil
import ray.ray_constants as ray_constants
import ray.services
import ray.utils
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.dashboard.util import get_unused_port
from ray.metrics_agent import MetricsAgent
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
@@ -32,8 +36,8 @@ except ImportError:
class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer):
def __init__(self):
pass
def __init__(self, metrics_agent):
self.metrics_agent = metrics_agent
def GetProfilingStats(self, request, context):
pid = request.pid
@@ -56,8 +60,22 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer):
profiling_stats=profiling_stats, std_out=stdout, std_err=stderr)
def ReportMetrics(self, request, context):
# TODO(sang): Process metrics here.
return reporter_pb2.ReportMetricsReply()
# NOTE: Exceptions are not propagated properly
# when we don't catch them here.
try:
metrcs_description_required = (
self.metrics_agent.record_metrics_points(
request.metrics_points))
except Exception as e:
logger.error(e)
logger.error(traceback.format_exc())
# If metrics description is missing, we should notify cpp processes
# that we need them. Cpp processes will then report them to here.
# We need it when (1) a new metric is reported (application metric)
# (2) a reporter goes down and restarted (currently not implemented).
return reporter_pb2.ReportMetricsReply(
metrcs_description_required=metrcs_description_required)
def recursive_asdict(o):
@@ -104,6 +122,11 @@ class Reporter:
self.ip = ray.services.get_node_ip_address()
self.hostname = platform.node()
self.port = port
metrics_agent_port = os.getenv("METRICS_AGENT_PORT")
if not metrics_agent_port:
metrics_agent_port = get_unused_port()
self.metrics_agent = MetricsAgent(metrics_agent_port)
self.reporter_grpc_server = ReporterServer(self.metrics_agent)
_ = psutil.cpu_percent() # For initialization
@@ -225,13 +248,14 @@ class Reporter:
)
def run(self):
"""Publish the port."""
thread_pool = futures.ThreadPoolExecutor(max_workers=10)
server = grpc.server(thread_pool, options=(("grpc.so_reuseport", 0), ))
reporter_pb2_grpc.add_ReporterServiceServicer_to_server(
ReporterServer(), server)
self.reporter_grpc_server, server)
port = server.add_insecure_port("[::]:{}".format(self.port))
server.start()
# Publish the port.
self.redis_client.set("REPORTER_PORT:{}".format(self.ip), port)
"""Run the reporter."""
while True:
+2 -2
View File
@@ -301,9 +301,9 @@ py_test(
)
py_test(
name = "test_metrics_export",
name = "test_metrics_agent",
size = "small",
srcs = SRCS + ["test_metrics_export.py"],
srcs = SRCS + ["test_metrics_agent.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
+185
View File
@@ -0,0 +1,185 @@
import pytest
from collections import defaultdict
import requests
from opencensus.tags import tag_key as tag_key_module
from prometheus_client.parser import text_string_to_metric_families
from ray.core.generated.common_pb2 import MetricPoint
from ray.dashboard.util import get_unused_port
from ray.metrics_agent import Gauge, MetricsAgent
def generate_metrics_point(name: str,
value: float,
timestamp: int,
tags: dict,
description: str = None,
units: str = None):
return MetricPoint(
metric_name=name,
timestamp=timestamp,
value=value,
tags=tags,
description=description,
units=units)
# NOTE: Opencensus metrics is a singleton per process.
# That says, we should re-use the same agent for all tests.
# Please be careful when you add new tests here. If each
# test doesn't use different metrics, it can have some confliction.
metrics_agent = None
@pytest.fixture
def cleanup_agent():
global metrics_agent
if not metrics_agent:
metrics_agent = MetricsAgent(get_unused_port())
yield
metrics_agent._registry = defaultdict(lambda: None)
def test_gauge():
tags = [tag_key_module.TagKey(str(i)) for i in range(10)]
name = "name"
description = "description"
units = "units"
gauge = Gauge(name, description, units, tags)
assert gauge.__dict__()["name"] == name
assert gauge.__dict__()["description"] == description
assert gauge.__dict__()["units"] == units
assert gauge.__dict__()["tags"] == tags
def test_basic_e2e(cleanup_agent):
# Test the basic end to end workflow. This includes.
# - Metrics are reported.
# - Metrics are dynamically registered to registry.
# - Metrics are accessbiel from Prometheus.
POINTS_DEF = [0, 1, 2]
tag = {"TAG_KEY": "TAG_VALUE"}
metrics_points = [
generate_metrics_point(
str(i), float(i), i, tag, description=str(i), units=str(i))
for i in POINTS_DEF
]
metrics_points_dict = {
metric_point.metric_name: metric_point
for metric_point in metrics_points
}
assert metrics_agent.record_metrics_points(metrics_points) is False
# Make sure all metrics are registered.
for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()):
metric_name, metric_entry = metric_entry
assert metric_name == metric_entry.name
assert metric_entry.name == str(i)
assert metric_entry.description == str(i)
assert metric_entry.units == str(i)
assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag]
# Make sure all metrics are available through a port.
response = requests.get("http://localhost:{}".format(
metrics_agent.metrics_export_port))
response.raise_for_status()
for line in response.text.split("\n"):
for family in text_string_to_metric_families(line):
metric_name = family.name
if metric_name not in metrics_points_dict:
continue
if line.startswith("# HELP"):
# description
assert (family.documentation == metrics_points_dict[
metric_name].description)
else:
for sample in family.samples:
metrics_points_dict[metric_name].value == sample.value
def test_missing_def(cleanup_agent):
# Make sure when metrics with description and units are reported,
# agent updates its registry to include them.
POINTS_DEF = [4, 5, 6]
tag = {"TAG_KEY": "TAG_VALUE"}
metrics_points = [
generate_metrics_point(
str(i),
float(i),
i,
tag,
) for i in POINTS_DEF
]
# At first, metrics shouldn't have description and units.
assert metrics_agent.record_metrics_points(metrics_points) is True
for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()):
metric_name, metric_entry = metric_entry
assert metric_name == metric_entry.name
assert metric_entry.name == str(i)
assert metric_entry.description == ""
assert metric_entry.units == ""
assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag]
# The points are coming again with description and units.
# Make sure they are updated.
metrics_points = [
generate_metrics_point(
str(i), float(i), i, tag, description=str(i), units=str(i))
for i in POINTS_DEF
]
assert metrics_agent.record_metrics_points(metrics_points) is False
for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()):
metric_name, metric_entry = metric_entry
assert metric_name == metric_entry.name
assert metric_entry.name == str(i)
assert metric_entry.description == str(i)
assert metric_entry.units == str(i)
assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag]
def test_multiple_record(cleanup_agent):
# Make sure prometheus export data properly when multiple points with
# the same name is reported.
TOTAL_POINTS = 10
NAME = "TEST"
values = list(range(TOTAL_POINTS))
tags = [{"TAG_KEY": str(i)} for i in range(TOTAL_POINTS)]
timestamps = list(range(TOTAL_POINTS))
points = []
for i in range(TOTAL_POINTS):
points.append(
generate_metrics_point(
name=NAME,
value=values[i],
timestamp=timestamps[i],
tags=tags[i]))
for point in points:
metrics_agent.record_metrics_points([point])
# Make sure data is available at prometheus.
response = requests.get("http://localhost:{}".format(
metrics_agent.metrics_export_port))
response.raise_for_status()
sample_values = []
for line in response.text.split("\n"):
for family in text_string_to_metric_families(line):
metric_name = family.name
name_without_prefix = metric_name.split("_")[1]
if name_without_prefix != NAME:
continue
# Lines for recorded metrics values.
for sample in family.samples:
sample_values.append(sample.value)
assert sample_values == [point.value for point in points]
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
-261
View File
@@ -1,261 +0,0 @@
import pytest
import requests
from unittest.mock import patch
from ray.dashboard.metrics_exporter.actions import ActionHandler
from ray.dashboard.metrics_exporter.client import MetricsExportClient
from ray.dashboard.metrics_exporter.client import Exporter
from ray.dashboard.metrics_exporter.schema import (AuthResponse, BaseModel,
ValidationError, Field)
MOCK_DASHBOARD_ID = "1234"
MOCK_DASHBOARD_ADDRESS = "http://127.0.0.1:9081"
MOCK_ACCESS_TOKEN = "1234"
def _setup_client_and_exporter(controller):
exporter = Exporter(MOCK_DASHBOARD_ID, MOCK_DASHBOARD_ADDRESS, controller)
client = MetricsExportClient(MOCK_DASHBOARD_ADDRESS, controller,
MOCK_DASHBOARD_ID, exporter)
return exporter, client
"""
Test Exporter
"""
@patch("ray.dashboard.dashboard.DashboardController")
def test_verify_exporter_cannot_run_without_access_token(mock_controller):
exporter, client = _setup_client_and_exporter(mock_controller)
# Should raise an assertion error because there's no access token set.
with pytest.raises(AssertionError):
exporter.run()
"""
Test Client
"""
@patch("ray.dashboard.dashboard.DashboardController")
@patch(
"ray.dashboard.metrics_exporter.api.authentication_request",
side_effect=requests.exceptions.HTTPError)
def test_client_invalid_request_status_returned(auth_request, mock_controller):
"""
If authentication request fails with an invalid status code,
`start_exporting_metrics` should fail.
"""
exporter, client = _setup_client_and_exporter(mock_controller)
# authenticate should throw an exception because API request fails.
with pytest.raises(requests.exceptions.HTTPError):
client._authenticate()
# This should fail because authentication throws an exception.
result, error = client.start_exporting_metrics()
assert result is False
@patch("ray.dashboard.dashboard.DashboardController")
@patch("ray.dashboard.metrics_exporter.api.authentication_request")
def test_authentication(auth_request, mock_controller):
auth_request.return_value = AuthResponse(
access_token_dashboard=MOCK_ACCESS_TOKEN,
access_token_ingest=MOCK_ACCESS_TOKEN)
exporter, client = _setup_client_and_exporter(mock_controller)
assert client.enabled is False
client._authenticate()
assert client.dashboard_url == "{address}/dashboard/{access_token}".format(
address=MOCK_DASHBOARD_ADDRESS, access_token=MOCK_ACCESS_TOKEN)
assert client.enabled is True
@patch.object(Exporter, "start")
@patch("ray.dashboard.dashboard.DashboardController")
@patch("ray.dashboard.metrics_exporter.api.authentication_request")
def test_start_exporting_metrics_without_authentication(
auth_request, mock_controller, start):
"""
`start_exporting_metrics` should trigger authentication if users
are not authenticated.
"""
auth_request.return_value = AuthResponse(
access_token_dashboard=MOCK_ACCESS_TOKEN,
access_token_ingest=MOCK_ACCESS_TOKEN)
exporter, client = _setup_client_and_exporter(mock_controller)
# start_exporting_metrics should succeed.
result, error = client.start_exporting_metrics()
assert result is True
assert error is None
assert client.enabled is True
@patch.object(Exporter, "start")
@patch("ray.dashboard.dashboard.DashboardController")
@patch("ray.dashboard.metrics_exporter.api.authentication_request")
def test_start_exporting_metrics_with_authentication(auth_request,
mock_controller, start):
"""
If users are already authenticated, `start_exporting_metrics`
should not authenticate users.
"""
auth_request.return_value = AuthResponse(
access_token_dashboard=MOCK_ACCESS_TOKEN,
access_token_ingest=MOCK_ACCESS_TOKEN)
exporter, client = _setup_client_and_exporter(mock_controller)
# Already authenticated.
client._authenticate()
assert client.enabled is True
result, error = client.start_exporting_metrics()
# Auth request should be called only once because
# it was already authenticated.
auth_request.call_count == 1
assert result is True
assert error is None
@patch.object(Exporter, "start")
@patch("ray.dashboard.dashboard.DashboardController")
@patch("ray.dashboard.metrics_exporter.api.authentication_request")
def test_start_exporting_metrics_succeed(auth_request, mock_controller, start):
auth_request.return_value = AuthResponse(
access_token_dashboard=MOCK_ACCESS_TOKEN,
access_token_ingest=MOCK_ACCESS_TOKEN)
exporter, client = _setup_client_and_exporter(mock_controller)
result, error = client.start_exporting_metrics()
assert result is True
assert error is None
assert client.is_exporting_started is True
start.call_count == 1
with pytest.raises(AssertionError):
client.start_exporting_metrics()
"""
BaseModel Test
"""
def test_base_model():
DEFAULT_VALUE = "default"
class A(BaseModel):
__schema__ = {
"a": Field(required=True, default=None, type=str),
"b": Field(required=False, default=DEFAULT_VALUE, type=str)
}
# Test the correct case.
obj = {"a": "1", "b": "1"}
a = A.parse_obj(obj)
assert a.a == "1"
assert a.b == "1"
assert a._dict == obj
string = "{name}\n{dict}".format(name=A.__name__, dict=str(obj))
assert str(a) == string
# Test wrong types. It is not checked in the current implementation.
obj = {"a": 1, "b": 2}
a = A.parse_obj(obj)
assert a.a == 1
assert a.b == 2
# Test wrong types. parse_obj can only parse dictionary.
obj = None
with pytest.raises(AssertionError):
a = A.parse_obj(obj)
# Test when required fields are not provided.
obj = {"b": "1"}
with pytest.raises(ValidationError):
a = A.parse_obj(obj)
# Test optional fields are set to default when fields are not given.
obj = {"a": "1"}
a = A.parse_obj(obj)
assert a.b == DEFAULT_VALUE
# Test when fields that are not defined in the schema is given.
# It should be ignoered
obj = {"a": "a", "b": "b", "c": "c"}
a = A.parse_obj(obj)
assert a.a == "a"
assert a.b == "b"
assert a.c == "c"
"""
Test Action Handler
"""
def _get_mock_kill_action():
return {
"type": "KILL_ACTOR",
"actor_id": "1234",
"ip_address": "1234",
"port": 30
}
@patch("ray.dashboard.dashboard.DashboardController")
def test_handle_kill_action(mock_controller):
action_handler = ActionHandler(mock_controller)
kill_action = _get_mock_kill_action()
action_handler.handle_kill_action(kill_action)
assert mock_controller.kill_actor.call_count == 1
@patch("ray.dashboard.dashboard.DashboardController")
def test_handle_kill_action_invalid_dict(mock_controller):
action_handler = ActionHandler(mock_controller)
kill_action = {"type": "KILL_ACTOR", "ip_address": "1234", "port": 30}
with pytest.raises(ValidationError):
action_handler.handle_kill_action(kill_action)
@patch("ray.dashboard.dashboard.DashboardController")
def test_handle_actions_many_kill_actor(mock_controller):
action_handler = ActionHandler(mock_controller)
# 10 actions required.
actions = [_get_mock_kill_action() for _ in range(10)]
action_handler.handle_actions(actions)
assert mock_controller.kill_actor.call_count == 10
@patch("ray.dashboard.dashboard.DashboardController")
def test_handle_actions_kill_actor_and_mixed_type(mock_controller):
action_handler = ActionHandler(mock_controller)
wrong_type_action = {"type": "NON_EXIST"}
actions = [
_get_mock_kill_action(), wrong_type_action,
_get_mock_kill_action()
]
action_handler.handle_actions(actions)
assert mock_controller.kill_actor.call_count == 2
@patch("ray.dashboard.dashboard.DashboardController")
def test_handle_actions_only_wrong_type(mock_controller):
action_handler = ActionHandler(mock_controller)
wrong_type_action = {"type": "NON_EXIST"}
actions = [wrong_type_action for _ in range(10)]
action_handler.handle_actions(actions)
assert mock_controller.kill_actor.call_count == 0
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
+2
View File
@@ -317,6 +317,8 @@ install_requires = [
"pyyaml",
"requests",
"redis >= 3.3.2, < 3.5.0",
"opencensus",
"prometheus_client >= 0.7.1",
]
+8
View File
@@ -334,8 +334,16 @@ message CoreWorkerStats {
}
message MetricPoint {
// Name of the metric.
string metric_name = 1;
// Timestamp when metric is exported.
int64 timestamp = 2;
// Value of the metric point.
double value = 3;
// Tags of the metric.
map<string, string> tags = 4;
// [Optional] Description of the metric.
string description = 5;
// [Optional] Unit of the metric.
string units = 6;
}
+2 -1
View File
@@ -35,10 +35,11 @@ message GetProfilingStatsReply {
}
message ReportMetricsRequest {
repeated MetricPoint metrics_point = 1;
repeated MetricPoint metrics_points = 1;
}
message ReportMetricsReply {
bool metrcs_description_required = 1;
}
// Service for communicating with the reporter.py process on a remote node.
+7
View File
@@ -36,6 +36,10 @@ static void RegisterAsView(opencensus::stats::ViewDescriptor view_descriptor,
view_descriptor.RegisterForExport();
}
///
/// Stats Config
///
StatsConfig &StatsConfig::instance() {
static StatsConfig instance;
return instance;
@@ -71,6 +75,9 @@ void StatsConfig::SetIsInitialized(bool initialized) { is_initialized_ = initial
bool StatsConfig::IsInitialized() const { return is_initialized_; }
///
/// Metric
///
void Metric::Record(double value, const TagsType &tags) {
if (StatsConfig::instance().IsStatsDisabled()) {
return;
+2
View File
@@ -185,7 +185,9 @@ struct MetricPoint {
int64_t timestamp;
double value;
std::unordered_map<std::string, std::string> tags;
const opencensus::stats::MeasureDescriptor &measure_descriptor;
};
} // namespace stats
} // namespace ray
+14 -10
View File
@@ -23,13 +23,15 @@ template <>
void MetricExporter::ExportToPoints(
const opencensus::stats::ViewData::DataMap<opencensus::stats::Distribution>
&view_data,
const std::string &metric_name, std::vector<std::string> &keys,
std::vector<MetricPoint> &points) {
const opencensus::stats::MeasureDescriptor &measure_descriptor,
std::vector<std::string> &keys, std::vector<MetricPoint> &points) {
// Return if no raw data found in view map.
if (view_data.size() == 0) {
return;
}
const auto &metric_name = measure_descriptor.name();
// NOTE(lingxuan.zlx): No sampling in histogram data, so all points all be filled in.
std::unordered_map<std::string, std::string> tags;
for (size_t i = 0; i < view_data.begin()->first.size(); ++i) {
@@ -53,10 +55,12 @@ void MetricExporter::ExportToPoints(
}
}
hist_mean /= view_data.size();
MetricPoint mean_point = {metric_name + ".mean", current_sys_time_ms(), hist_mean,
tags};
MetricPoint max_point = {metric_name + ".max", current_sys_time_ms(), hist_max, tags};
MetricPoint min_point = {metric_name + ".min", current_sys_time_ms(), hist_min, tags};
MetricPoint mean_point = {metric_name + ".mean", current_sys_time_ms(), hist_mean, tags,
measure_descriptor};
MetricPoint max_point = {metric_name + ".max", current_sys_time_ms(), hist_max, tags,
measure_descriptor};
MetricPoint min_point = {metric_name + ".min", current_sys_time_ms(), hist_min, tags,
measure_descriptor};
points.push_back(std::move(mean_point));
points.push_back(std::move(max_point));
points.push_back(std::move(min_point));
@@ -85,17 +89,17 @@ void MetricExporter::ExportViewData(
for (size_t i = 0; i < descriptor.columns().size(); ++i) {
keys.push_back(descriptor.columns()[i].name());
}
auto &metric_name = descriptor.name();
const auto &measure_descriptor = descriptor.measure_descriptor();
switch (view_data.type()) {
case opencensus::stats::ViewData::Type::kDouble:
ExportToPoints<double>(view_data.double_data(), metric_name, keys, points);
ExportToPoints<double>(view_data.double_data(), measure_descriptor, keys, points);
break;
case opencensus::stats::ViewData::Type::kInt64:
ExportToPoints<int64_t>(view_data.int_data(), metric_name, keys, points);
ExportToPoints<int64_t>(view_data.int_data(), measure_descriptor, keys, points);
break;
case opencensus::stats::ViewData::Type::kDistribution:
ExportToPoints<opencensus::stats::Distribution>(view_data.distribution_data(),
metric_name, keys, points);
measure_descriptor, keys, points);
break;
default:
RAY_LOG(FATAL) << "Unknown view data type.";
+4 -3
View File
@@ -57,8 +57,9 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler {
/// \param keys, metric tags map
/// \param points, memory metric vector instance
void ExportToPoints(const opencensus::stats::ViewData::DataMap<DTYPE> &view_data,
const std::string &metric_name, std::vector<std::string> &keys,
std::vector<MetricPoint> &points) {
const opencensus::stats::MeasureDescriptor &measure_descriptor,
std::vector<std::string> &keys, std::vector<MetricPoint> &points) {
const auto &metric_name = measure_descriptor.name();
for (const auto &row : view_data) {
std::unordered_map<std::string, std::string> tags;
for (size_t i = 0; i < keys.size(); ++i) {
@@ -66,7 +67,7 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler {
}
// Current timestamp is used for point not view data time.
MetricPoint point{metric_name, current_sys_time_ms(),
static_cast<double>(row.second), tags};
static_cast<double>(row.second), tags, measure_descriptor};
RAY_LOG(DEBUG) << "Metric name " << metric_name << ", value " << point.value;
points.push_back(std::move(point));
if (points.size() >= report_batch_size_) {
+14 -2
View File
@@ -54,7 +54,7 @@ void MetricsAgentExporter::ReportMetrics(const std::vector<MetricPoint> &points)
MetricExporterDecorator::ReportMetrics(points);
rpc::ReportMetricsRequest request;
for (auto point : points) {
auto metric_point = request.add_metrics_point();
auto metric_point = request.add_metrics_points();
metric_point->set_metric_name(point.metric_name);
metric_point->set_timestamp(point.timestamp);
metric_point->set_value(point.value);
@@ -62,10 +62,22 @@ void MetricsAgentExporter::ReportMetrics(const std::vector<MetricPoint> &points)
for (auto &tag : point.tags) {
(*mutable_tags)[tag.first] = tag.second;
}
// If description and units information is requested from
// the metrics agent, append the information.
// TODO(sang): It can be inefficient if there are lots of new registered metrics.
// We should make it more efficient if there's compelling use cases.
if (should_update_description_) {
metric_point->set_description(point.measure_descriptor.description());
metric_point->set_units(point.measure_descriptor.units());
}
}
should_update_description_ = false;
// TODO(sang): Should retry metrics report if it fails.
client_->ReportMetrics(request, nullptr);
client_->ReportMetrics(
request, [this](const Status &status, const rpc::ReportMetricsReply &reply) {
should_update_description_ = reply.metrcs_description_required();
});
}
} // namespace stats
+2
View File
@@ -69,6 +69,8 @@ class MetricsAgentExporter : public MetricExporterDecorator {
std::unique_ptr<rpc::MetricsAgentClient> client_;
/// Call Manager for gRPC client.
rpc::ClientCallManager client_call_manager_;
/// Whether or not description and units information for metrics should be updated.
bool should_update_description_ = true;
};
} // namespace stats