From 7e3ba289dc2c62ba8aef2eae891be5d48de5f44a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 28 Jul 2020 10:28:01 -0700 Subject: [PATCH] [Stats] Basic Metrics Infrastructure (Metrics Agent + Prometheus Exporter) (#9607) --- LICENSE | 17 ++ python/ray/dashboard/util.py | 30 +- python/ray/includes/metric.pxi | 7 +- python/ray/metrics_agent.py | 183 ++++++++++++ python/ray/prometheus_exporter.py | 363 ++++++++++++++++++++++++ python/ray/reporter.py | 36 ++- python/ray/tests/BUILD | 4 +- python/ray/tests/test_metrics_agent.py | 185 ++++++++++++ python/ray/tests/test_metrics_export.py | 261 ----------------- python/setup.py | 2 + src/ray/protobuf/common.proto | 8 + src/ray/protobuf/reporter.proto | 3 +- src/ray/stats/metric.cc | 7 + src/ray/stats/metric.h | 2 + src/ray/stats/metric_exporter.cc | 24 +- src/ray/stats/metric_exporter.h | 7 +- src/ray/stats/metric_exporter_client.cc | 16 +- src/ray/stats/metric_exporter_client.h | 2 + 18 files changed, 868 insertions(+), 289 deletions(-) create mode 100644 python/ray/metrics_agent.py create mode 100644 python/ray/prometheus_exporter.py create mode 100644 python/ray/tests/test_metrics_agent.py delete mode 100644 python/ray/tests/test_metrics_export.py diff --git a/LICENSE b/LICENSE index 1dcfa84a3..c5a80d91e 100644 --- a/LICENSE +++ b/LICENSE @@ -270,3 +270,20 @@ LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +------------------ +Code in python/ray/prometheus_exporter.py is adapted from https://github.com/census-instrumentation/opencensus-python/blob/master/contrib/opencensus-ext-prometheus/opencensus/ext/prometheus/stats_exporter/__init__.py + +# Copyright 2018, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/ray/dashboard/util.py b/python/ray/dashboard/util.py index d3c7b82d4..fe9e44b59 100644 --- a/python/ray/dashboard/util.py +++ b/python/ray/dashboard/util.py @@ -1,5 +1,9 @@ -from base64 import b64decode import datetime +import random +import socket + +from base64 import b64decode + import ray @@ -45,3 +49,27 @@ def measures_to_dict(measures): elif "doubleValue" in measure: measures_dict[tags] = measure["doubleValue"] return measures_dict + + +def get_unused_port(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("", 0)) + port = s.getsockname()[1] + + # Try to generate a port that is far above the 'next available' one. + # This solves issue #8254 where GRPC fails because the port assigned + # from this method has been used by a different process. + for _ in range(30): + new_port = random.randint(port, 65535) + new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + new_s.bind(("", new_port)) + except OSError: + new_s.close() + continue + s.close() + new_s.close() + return new_port + print("Unable to succeed in selecting a random port.") + s.close() + return port diff --git a/python/ray/includes/metric.pxi b/python/ray/includes/metric.pxi index 0880ce78f..dec481197 100644 --- a/python/ray/includes/metric.pxi +++ b/python/ray/includes/metric.pxi @@ -18,7 +18,7 @@ cdef class TagKey: def __init__(self, name): self.name = name.encode("ascii") CTagKey.Register(self.name) - + def name(self): return self.name @@ -34,8 +34,9 @@ cdef class Metric: def __init__(self, tag_keys): for tag_key in tag_keys: - self.c_tag_keys.push_back(CTagKey.Register(tag_key.encode("ascii"))) - + self.c_tag_keys.push_back( + CTagKey.Register(tag_key.encode("ascii"))) + def record(self, value, tags=None): """Record a measurement of metric. diff --git a/python/ray/metrics_agent.py b/python/ray/metrics_agent.py new file mode 100644 index 000000000..4d84ea6d5 --- /dev/null +++ b/python/ray/metrics_agent.py @@ -0,0 +1,183 @@ +import logging +import threading + +from collections import defaultdict +from typing import List + +from opencensus.stats import aggregation +from opencensus.stats import measure as measure_module +from opencensus.stats.measurement_map import MeasurementMap +from opencensus.stats import stats as stats_module +from opencensus.tags import tag_key as tag_key_module +from opencensus.tags import tag_map as tag_map_module +from opencensus.tags import tag_value as tag_value_module +from opencensus.stats import view + +from ray import prometheus_exporter +from ray.core.generated.common_pb2 import MetricPoint + +logger = logging.getLogger(__name__) + + +# We don't need counter, histogram, or sum because reporter just needs to +# collect momental values (gauge) that are already counted or sampled +# (histogram for example), or summed inside cpp processes. +class Gauge(view.View): + def __init__(self, name, description, unit, + tags: List[tag_key_module.TagKey]): + self._measure = measure_module.MeasureInt(name, description, unit) + self._view = view.View(name, description, tags, self.measure, + aggregation.LastValueAggregation()) + + @property + def measure(self): + return self._measure + + @property + def view(self): + return self._view + + @property + def name(self): + return self.measure.name + + @property + def description(self): + return self.measure.description + + @property + def units(self): + return self.measure.unit + + @property + def tags(self): + return self.view.columns + + def __dict__(self): + return { + "name": self.measure.name, + "description": self.measure.description, + "units": self.measure.unit, + "tags": self.view.columns, + } + + def __str__(self): + return self.__repr__() + + def __repr__(self): + return str(self.__dict__()) + + +class MetricsAgent: + def __init__(self, metrics_export_port): + assert metrics_export_port is not None + # OpenCensus classes. + self.view_manager = stats_module.stats.view_manager + self.stats_recorder = stats_module.stats.stats_recorder + # Port where we will expose metrics. + self.metrics_export_port = metrics_export_port + # metric name(str) -> view (view.View) + self._registry = defaultdict(lambda: None) + # Lock required because gRPC server uses + # multiple threads to process requests. + self._lock = threading.Lock() + # Whether or not there are metrics that are missing description and + # units information. This is used to dynamically update registry. + self._missing_information = False + + # Configure exporter. (We currently only support prometheus). + self.view_manager.register_exporter( + prometheus_exporter.new_stats_exporter( + prometheus_exporter.Options( + namespace="ray", port=metrics_export_port))) + + @property + def registry(self): + """Return metric definition registry. + + Metrics definition registry is dynamically updated + by metrics reported by Ray processes. + """ + return self._registry + + def record_metrics_points(self, metrics_points: List[MetricPoint]): + with self._lock: + measurement_map = self.stats_recorder.new_measurement_map() + for metric_point in metrics_points: + self._register_if_needed(metric_point) + self._record(metric_point, measurement_map) + return self._missing_information + + def _record(self, metric_point: MetricPoint, + measurement_map: MeasurementMap): + """Record a single metric point to export. + + NOTE: When this method is called, the caller should acquire a lock. + + Args: + metric_point(MetricPoint) metric point defined in common.proto + measurement_map(MeasurementMap): Measurement map to record metrics. + """ + metric_name = metric_point.metric_name + tags = metric_point.tags + + metric = self._registry.get(metric_name) + # Metrics should be always registered dynamically. + assert metric + + tag_map = tag_map_module.TagMap() + for key, value in tags.items(): + tag_key = tag_key_module.TagKey(key) + tag_value = tag_value_module.TagValue(value) + tag_map.insert(tag_key, tag_value) + + metric_value = metric_point.value + measurement_map.measure_float_put(metric.measure, metric_value) + # NOTE: When we record this metric, timestamp will be renewed. + measurement_map.record(tag_map) + + def _register_if_needed(self, metric_point: MetricPoint): + """Register metrics if they are not registered. + + NOTE: When this method is called, the caller should acquire a lock. + + Unseen metrics: + Register it with Gauge type metrics. Note that all metrics in + the agent will be gauge because sampling is already done + within cpp processes. + Metrics that are missing description & units: + In this case, we will notify cpp proceses that we need this + information. Cpp processes will then report description and units + of all metrics they have. + + Args: + metric_point metric point defined in common.proto + Return: + True if given metrics are missing description and units. + False otherwise. + """ + metric_name = metric_point.metric_name + metric_description = metric_point.description + metric_units = metric_point.units + if self._registry[metric_name] is None: + tags = metric_point.tags + metric_tags = [] + for tag_key in tags: + metric_tags.append(tag_key_module.TagKey(tag_key)) + + metric = Gauge(metric_name, metric_description, metric_units, + metric_tags) + self._registry[metric_name] = metric + self.view_manager.register_view(metric.view) + + # If there are missing description & unit information, + # we should notify cpp processes that we need them. + if not metric_description or not metric_units: + self._missing_information = True + + if metric_description and metric_units: + self._registry[metric_name].view._description = metric_description + self._registry[ + metric_name].view.measure._description = metric_description + self._registry[metric_name].view.measure._unit = metric_units + self._missing_information = False diff --git a/python/ray/prometheus_exporter.py b/python/ray/prometheus_exporter.py new file mode 100644 index 000000000..3e203bae0 --- /dev/null +++ b/python/ray/prometheus_exporter.py @@ -0,0 +1,363 @@ +# NOTE: This file has been copied from OpenCensus Python exporter. +# It is because OpenCensus Prometheus exporter hasn't released for a while +# and the latest version has a compatibility issue with the latest OpenCensus +# library. + +import re + +from prometheus_client import start_http_server +from prometheus_client.core import ( + REGISTRY, + CollectorRegistry, + CounterMetricFamily, + GaugeMetricFamily, + HistogramMetricFamily, + UnknownMetricFamily, +) + +from opencensus.common.transports import sync +from opencensus.stats import aggregation_data as aggregation_data_module +from opencensus.stats import base_exporter + + +class Options(object): + """ Options contains options for configuring the exporter. + The address can be empty as the prometheus client will + assume it's localhost + :type namespace: str + :param namespace: The prometheus namespace to be used. Defaults to ''. + :type port: int + :param port: The Prometheus port to be used. Defaults to 8000. + :type address: str + :param address: The Prometheus address to be used. Defaults to ''. + :type registry: registry + :param registry: The Prometheus address to be used. Defaults to ''. + :type registry: :class:`~prometheus_client.core.CollectorRegistry` + :param registry: A Prometheus collector registry instance. + """ + + def __init__(self, + namespace="", + port=8000, + address="", + registry=CollectorRegistry()): + self._namespace = namespace + self._registry = registry + self._port = int(port) + self._address = address + + @property + def registry(self): + """ Prometheus Collector Registry instance + """ + return self._registry + + @property + def namespace(self): + """ Prefix to be used with view name + """ + return self._namespace + + @property + def port(self): + """ Port number to listen + """ + return self._port + + @property + def address(self): + """ Endpoint address (default is localhost) + """ + return self._address + + +class Collector(object): + """ Collector represents the Prometheus Collector object + """ + + def __init__(self, options=Options(), view_name_to_data_map=None): + if view_name_to_data_map is None: + view_name_to_data_map = {} + self._options = options + self._registry = options.registry + self._view_name_to_data_map = view_name_to_data_map + self._registered_views = {} + + @property + def options(self): + """ Options to be used to configure the exporter + """ + return self._options + + @property + def registry(self): + """ Prometheus Collector Registry instance + """ + return self._registry + + @property + def view_name_to_data_map(self): + """ Map with all view data objects + that will be sent to Prometheus + """ + return self._view_name_to_data_map + + @property + def registered_views(self): + """ Map with all registered views + """ + return self._registered_views + + def register_view(self, view): + """ register_view will create the needed structure + in order to be able to sent all data to Prometheus + """ + v_name = get_view_name(self.options.namespace, view) + + if v_name not in self.registered_views: + desc = { + "name": v_name, + "documentation": view.description, + "labels": list(map(sanitize, view.columns)), + "units": view.measure.unit + } + self.registered_views[v_name] = desc + self.registry.register(self) + + def add_view_data(self, view_data): + """ Add view data object to be sent to server + """ + self.register_view(view_data.view) + v_name = get_view_name(self.options.namespace, view_data.view) + self.view_name_to_data_map[v_name] = view_data + + # TODO: add start and end timestamp + def to_metric(self, desc, tag_values, agg_data): + """ to_metric translate the data that OpenCensus create + to Prometheus format, using Prometheus Metric object + :type desc: dict + :param desc: The map that describes view definition + :type tag_values: tuple of :class: + `~opencensus.tags.tag_value.TagValue` + :param object of opencensus.tags.tag_value.TagValue: + TagValue object used as label values + :type agg_data: object of :class: + `~opencensus.stats.aggregation_data.AggregationData` + :param object of opencensus.stats.aggregation_data.AggregationData: + Aggregated data that needs to be converted as Prometheus samples + :rtype: :class:`~prometheus_client.core.CounterMetricFamily` or + :class:`~prometheus_client.core.HistogramMetricFamily` or + :class:`~prometheus_client.core.UnknownMetricFamily` or + :class:`~prometheus_client.core.GaugeMetricFamily` + :returns: A Prometheus metric object + """ + metric_name = desc["name"] + metric_description = desc["documentation"] + label_keys = desc["labels"] + metric_units = desc["units"] + + assert (len(tag_values) == len(label_keys)) + # Prometheus requires that all tag values be strings hence + # the need to cast none to the empty string before exporting. See + # https://github.com/census-instrumentation/opencensus-python/issues/480 + tag_values = [tv if tv else "" for tv in tag_values] + + if isinstance(agg_data, aggregation_data_module.CountAggregationData): + metric = CounterMetricFamily( + name=metric_name, + documentation=metric_description, + unit=metric_units, + labels=label_keys) + metric.add_metric(labels=tag_values, value=agg_data.count_data) + return metric + + elif isinstance(agg_data, + aggregation_data_module.DistributionAggregationData): + + assert (agg_data.bounds == sorted(agg_data.bounds)) + # buckets are a list of buckets. Each bucket is another list with + # a pair of bucket name and value, or a triple of bucket name, + # value, and exemplar. buckets need to be in order. + buckets = [] + cum_count = 0 # Prometheus buckets expect cumulative count. + for ii, bound in enumerate(agg_data.bounds): + cum_count += agg_data.counts_per_bucket[ii] + bucket = [str(bound), cum_count] + buckets.append(bucket) + # Prometheus requires buckets to be sorted, and +Inf present. + # In OpenCensus we don't have +Inf in the bucket bonds so need to + # append it here. + buckets.append(["+Inf", agg_data.count_data]) + metric = HistogramMetricFamily( + name=metric_name, + documentation=metric_description, + labels=label_keys) + metric.add_metric( + labels=tag_values, + buckets=buckets, + sum_value=agg_data.sum, + ) + return metric + + elif isinstance(agg_data, aggregation_data_module.SumAggregationData): + metric = UnknownMetricFamily( + name=metric_name, + documentation=metric_description, + labels=label_keys) + metric.add_metric(labels=tag_values, value=agg_data.sum_data) + return metric + + elif isinstance(agg_data, + aggregation_data_module.LastValueAggregationData): + metric = GaugeMetricFamily( + name=metric_name, + documentation=metric_description, + labels=label_keys) + metric.add_metric(labels=tag_values, value=agg_data.value) + return metric + + else: + raise ValueError( + "unsupported aggregation type %s" % type(agg_data)) + + def collect(self): # pragma: NO COVER + """Collect fetches the statistics from OpenCensus + and delivers them as Prometheus Metrics. + Collect is invoked every time a prometheus.Gatherer is run + for example when the HTTP endpoint is invoked by Prometheus. + """ + for v_name, view_data in self.view_name_to_data_map.items(): + if v_name not in self.registered_views: + continue + desc = self.registered_views[v_name] + for tag_values in view_data.tag_value_aggregation_data_map: + agg_data = view_data.tag_value_aggregation_data_map[tag_values] + metric = self.to_metric(desc, tag_values, agg_data) + yield metric + + +class PrometheusStatsExporter(base_exporter.StatsExporter): + """ Exporter exports stats to Prometheus, users need + to register the exporter as an HTTP Handler to be + able to export. + :type options: + :class:`~opencensus.ext.prometheus.stats_exporter.Options` + :param options: An options object with the parameters to instantiate the + prometheus exporter. + :type gatherer: :class:`~prometheus_client.core.CollectorRegistry` + :param gatherer: A Prometheus collector registry instance. + :type transport: + :class:`opencensus.common.transports.sync.SyncTransport` or + :class:`opencensus.common.transports.async_.AsyncTransport` + :param transport: An instance of a Transpor to send data with. + :type collector: + :class:`~opencensus.ext.prometheus.stats_exporter.Collector` + :param collector: An instance of the Prometheus Collector object. + """ + + def __init__(self, + options, + gatherer, + transport=sync.SyncTransport, + collector=Collector()): + self._options = options + self._gatherer = gatherer + self._collector = collector + self._transport = transport(self) + self.serve_http() + REGISTRY.register(self._collector) + + @property + def transport(self): + """ The transport way to be sent data to server + (default is sync). + """ + return self._transport + + @property + def collector(self): + """ Collector class instance to be used + to communicate with Prometheus + """ + return self._collector + + @property + def gatherer(self): + """ Prometheus Collector Registry instance + """ + return self._gatherer + + @property + def options(self): + """ Options to be used to configure the exporter + """ + return self._options + + def export(self, view_data): + """ export send the data to the transport class + in order to be sent to Prometheus in a sync or async way. + """ + if view_data is not None: # pragma: NO COVER + self.transport.export(view_data) + + def on_register_view(self, view): + return NotImplementedError("Not supported by Prometheus") + + def emit(self, view_data): # pragma: NO COVER + """ Emit exports to the Prometheus if view data has one or more rows. + Each OpenCensus AggregationData will be converted to + corresponding Prometheus Metric: SumData will be converted + to Untyped Metric, CountData will be a Counter Metric + DistributionData will be a Histogram Metric. + """ + + for v_data in view_data: + if v_data.tag_value_aggregation_data_map: + self.collector.add_view_data(v_data) + + def serve_http(self): + """ serve_http serves the Prometheus endpoint. + """ + start_http_server( + port=self.options.port, addr=str(self.options.address)) + + +def new_stats_exporter(option): + """ new_stats_exporter returns an exporter + that exports stats to Prometheus. + """ + if option.namespace == "": + raise ValueError("Namespace can not be empty string.") + + collector = new_collector(option) + + exporter = PrometheusStatsExporter( + options=option, gatherer=option.registry, collector=collector) + return exporter + + +def new_collector(options): + """ new_collector should be used + to create instance of Collector class in order to + prevent the usage of constructor directly + """ + return Collector(options=options) + + +def get_view_name(namespace, view): + """ create the name for the view + """ + name = "" + if namespace != "": + name = namespace + "_" + return sanitize(name + view.name) + + +_NON_LETTERS_NOR_DIGITS_RE = re.compile(r"[^\w]", re.UNICODE | re.IGNORECASE) + + +def sanitize(key): + """ sanitize the given metric name or label according to Prometheus rule. + Replace all characters other than [A-Za-z0-9_] with '_'. + """ + return _NON_LETTERS_NOR_DIGITS_RE.sub("_", key) diff --git a/python/ray/reporter.py b/python/ray/reporter.py index 9b307c874..544d2cc8a 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -10,13 +10,17 @@ import platform import subprocess import sys from concurrent import futures + import ray import psutil + import ray.ray_constants as ray_constants import ray.services import ray.utils from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc +from ray.dashboard.util import get_unused_port +from ray.metrics_agent import MetricsAgent # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray provides a default configuration at @@ -32,8 +36,8 @@ except ImportError: class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer): - def __init__(self): - pass + def __init__(self, metrics_agent): + self.metrics_agent = metrics_agent def GetProfilingStats(self, request, context): pid = request.pid @@ -56,8 +60,22 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer): profiling_stats=profiling_stats, std_out=stdout, std_err=stderr) def ReportMetrics(self, request, context): - # TODO(sang): Process metrics here. - return reporter_pb2.ReportMetricsReply() + # NOTE: Exceptions are not propagated properly + # when we don't catch them here. + try: + metrcs_description_required = ( + self.metrics_agent.record_metrics_points( + request.metrics_points)) + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + + # If metrics description is missing, we should notify cpp processes + # that we need them. Cpp processes will then report them to here. + # We need it when (1) a new metric is reported (application metric) + # (2) a reporter goes down and restarted (currently not implemented). + return reporter_pb2.ReportMetricsReply( + metrcs_description_required=metrcs_description_required) def recursive_asdict(o): @@ -104,6 +122,11 @@ class Reporter: self.ip = ray.services.get_node_ip_address() self.hostname = platform.node() self.port = port + metrics_agent_port = os.getenv("METRICS_AGENT_PORT") + if not metrics_agent_port: + metrics_agent_port = get_unused_port() + self.metrics_agent = MetricsAgent(metrics_agent_port) + self.reporter_grpc_server = ReporterServer(self.metrics_agent) _ = psutil.cpu_percent() # For initialization @@ -225,13 +248,14 @@ class Reporter: ) def run(self): - """Publish the port.""" thread_pool = futures.ThreadPoolExecutor(max_workers=10) server = grpc.server(thread_pool, options=(("grpc.so_reuseport", 0), )) reporter_pb2_grpc.add_ReporterServiceServicer_to_server( - ReporterServer(), server) + self.reporter_grpc_server, server) port = server.add_insecure_port("[::]:{}".format(self.port)) + server.start() + # Publish the port. self.redis_client.set("REPORTER_PORT:{}".format(self.ip), port) """Run the reporter.""" while True: diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index eadb90dae..1437b2e16 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -301,9 +301,9 @@ py_test( ) py_test( - name = "test_metrics_export", + name = "test_metrics_agent", size = "small", - srcs = SRCS + ["test_metrics_export.py"], + srcs = SRCS + ["test_metrics_agent.py"], tags = ["exclusive"], deps = ["//:ray_lib"], ) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py new file mode 100644 index 000000000..98e0cc896 --- /dev/null +++ b/python/ray/tests/test_metrics_agent.py @@ -0,0 +1,185 @@ +import pytest + +from collections import defaultdict + +import requests + +from opencensus.tags import tag_key as tag_key_module +from prometheus_client.parser import text_string_to_metric_families + +from ray.core.generated.common_pb2 import MetricPoint +from ray.dashboard.util import get_unused_port +from ray.metrics_agent import Gauge, MetricsAgent + + +def generate_metrics_point(name: str, + value: float, + timestamp: int, + tags: dict, + description: str = None, + units: str = None): + return MetricPoint( + metric_name=name, + timestamp=timestamp, + value=value, + tags=tags, + description=description, + units=units) + + +# NOTE: Opencensus metrics is a singleton per process. +# That says, we should re-use the same agent for all tests. +# Please be careful when you add new tests here. If each +# test doesn't use different metrics, it can have some confliction. +metrics_agent = None + + +@pytest.fixture +def cleanup_agent(): + global metrics_agent + if not metrics_agent: + metrics_agent = MetricsAgent(get_unused_port()) + yield + metrics_agent._registry = defaultdict(lambda: None) + + +def test_gauge(): + tags = [tag_key_module.TagKey(str(i)) for i in range(10)] + name = "name" + description = "description" + units = "units" + gauge = Gauge(name, description, units, tags) + assert gauge.__dict__()["name"] == name + assert gauge.__dict__()["description"] == description + assert gauge.__dict__()["units"] == units + assert gauge.__dict__()["tags"] == tags + + +def test_basic_e2e(cleanup_agent): + # Test the basic end to end workflow. This includes. + # - Metrics are reported. + # - Metrics are dynamically registered to registry. + # - Metrics are accessbiel from Prometheus. + POINTS_DEF = [0, 1, 2] + tag = {"TAG_KEY": "TAG_VALUE"} + metrics_points = [ + generate_metrics_point( + str(i), float(i), i, tag, description=str(i), units=str(i)) + for i in POINTS_DEF + ] + metrics_points_dict = { + metric_point.metric_name: metric_point + for metric_point in metrics_points + } + assert metrics_agent.record_metrics_points(metrics_points) is False + # Make sure all metrics are registered. + for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()): + metric_name, metric_entry = metric_entry + assert metric_name == metric_entry.name + assert metric_entry.name == str(i) + assert metric_entry.description == str(i) + assert metric_entry.units == str(i) + assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag] + + # Make sure all metrics are available through a port. + response = requests.get("http://localhost:{}".format( + metrics_agent.metrics_export_port)) + response.raise_for_status() + for line in response.text.split("\n"): + for family in text_string_to_metric_families(line): + metric_name = family.name + + if metric_name not in metrics_points_dict: + continue + + if line.startswith("# HELP"): + # description + assert (family.documentation == metrics_points_dict[ + metric_name].description) + else: + for sample in family.samples: + metrics_points_dict[metric_name].value == sample.value + + +def test_missing_def(cleanup_agent): + # Make sure when metrics with description and units are reported, + # agent updates its registry to include them. + POINTS_DEF = [4, 5, 6] + tag = {"TAG_KEY": "TAG_VALUE"} + metrics_points = [ + generate_metrics_point( + str(i), + float(i), + i, + tag, + ) for i in POINTS_DEF + ] + + # At first, metrics shouldn't have description and units. + assert metrics_agent.record_metrics_points(metrics_points) is True + for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()): + metric_name, metric_entry = metric_entry + assert metric_name == metric_entry.name + assert metric_entry.name == str(i) + assert metric_entry.description == "" + assert metric_entry.units == "" + assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag] + + # The points are coming again with description and units. + # Make sure they are updated. + metrics_points = [ + generate_metrics_point( + str(i), float(i), i, tag, description=str(i), units=str(i)) + for i in POINTS_DEF + ] + assert metrics_agent.record_metrics_points(metrics_points) is False + for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()): + metric_name, metric_entry = metric_entry + assert metric_name == metric_entry.name + assert metric_entry.name == str(i) + assert metric_entry.description == str(i) + assert metric_entry.units == str(i) + assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag] + + +def test_multiple_record(cleanup_agent): + # Make sure prometheus export data properly when multiple points with + # the same name is reported. + TOTAL_POINTS = 10 + NAME = "TEST" + values = list(range(TOTAL_POINTS)) + tags = [{"TAG_KEY": str(i)} for i in range(TOTAL_POINTS)] + timestamps = list(range(TOTAL_POINTS)) + points = [] + + for i in range(TOTAL_POINTS): + points.append( + generate_metrics_point( + name=NAME, + value=values[i], + timestamp=timestamps[i], + tags=tags[i])) + for point in points: + metrics_agent.record_metrics_points([point]) + + # Make sure data is available at prometheus. + response = requests.get("http://localhost:{}".format( + metrics_agent.metrics_export_port)) + response.raise_for_status() + + sample_values = [] + for line in response.text.split("\n"): + for family in text_string_to_metric_families(line): + metric_name = family.name + name_without_prefix = metric_name.split("_")[1] + if name_without_prefix != NAME: + continue + # Lines for recorded metrics values. + for sample in family.samples: + sample_values.append(sample.value) + assert sample_values == [point.value for point in points] + + +if __name__ == "__main__": + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_metrics_export.py b/python/ray/tests/test_metrics_export.py deleted file mode 100644 index 2a888f6a3..000000000 --- a/python/ray/tests/test_metrics_export.py +++ /dev/null @@ -1,261 +0,0 @@ -import pytest -import requests - -from unittest.mock import patch - -from ray.dashboard.metrics_exporter.actions import ActionHandler -from ray.dashboard.metrics_exporter.client import MetricsExportClient -from ray.dashboard.metrics_exporter.client import Exporter -from ray.dashboard.metrics_exporter.schema import (AuthResponse, BaseModel, - ValidationError, Field) - -MOCK_DASHBOARD_ID = "1234" -MOCK_DASHBOARD_ADDRESS = "http://127.0.0.1:9081" -MOCK_ACCESS_TOKEN = "1234" - - -def _setup_client_and_exporter(controller): - exporter = Exporter(MOCK_DASHBOARD_ID, MOCK_DASHBOARD_ADDRESS, controller) - client = MetricsExportClient(MOCK_DASHBOARD_ADDRESS, controller, - MOCK_DASHBOARD_ID, exporter) - return exporter, client - - -""" -Test Exporter -""" - - -@patch("ray.dashboard.dashboard.DashboardController") -def test_verify_exporter_cannot_run_without_access_token(mock_controller): - exporter, client = _setup_client_and_exporter(mock_controller) - # Should raise an assertion error because there's no access token set. - with pytest.raises(AssertionError): - exporter.run() - - -""" -Test Client -""" - - -@patch("ray.dashboard.dashboard.DashboardController") -@patch( - "ray.dashboard.metrics_exporter.api.authentication_request", - side_effect=requests.exceptions.HTTPError) -def test_client_invalid_request_status_returned(auth_request, mock_controller): - """ - If authentication request fails with an invalid status code, - `start_exporting_metrics` should fail. - """ - exporter, client = _setup_client_and_exporter(mock_controller) - - # authenticate should throw an exception because API request fails. - with pytest.raises(requests.exceptions.HTTPError): - client._authenticate() - - # This should fail because authentication throws an exception. - result, error = client.start_exporting_metrics() - assert result is False - - -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_authentication(auth_request, mock_controller): - auth_request.return_value = AuthResponse( - access_token_dashboard=MOCK_ACCESS_TOKEN, - access_token_ingest=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - - assert client.enabled is False - client._authenticate() - assert client.dashboard_url == "{address}/dashboard/{access_token}".format( - address=MOCK_DASHBOARD_ADDRESS, access_token=MOCK_ACCESS_TOKEN) - assert client.enabled is True - - -@patch.object(Exporter, "start") -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_start_exporting_metrics_without_authentication( - auth_request, mock_controller, start): - """ - `start_exporting_metrics` should trigger authentication if users - are not authenticated. - """ - auth_request.return_value = AuthResponse( - access_token_dashboard=MOCK_ACCESS_TOKEN, - access_token_ingest=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - - # start_exporting_metrics should succeed. - result, error = client.start_exporting_metrics() - assert result is True - assert error is None - assert client.enabled is True - - -@patch.object(Exporter, "start") -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_start_exporting_metrics_with_authentication(auth_request, - mock_controller, start): - """ - If users are already authenticated, `start_exporting_metrics` - should not authenticate users. - """ - auth_request.return_value = AuthResponse( - access_token_dashboard=MOCK_ACCESS_TOKEN, - access_token_ingest=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - # Already authenticated. - client._authenticate() - assert client.enabled is True - - result, error = client.start_exporting_metrics() - # Auth request should be called only once because - # it was already authenticated. - auth_request.call_count == 1 - assert result is True - assert error is None - - -@patch.object(Exporter, "start") -@patch("ray.dashboard.dashboard.DashboardController") -@patch("ray.dashboard.metrics_exporter.api.authentication_request") -def test_start_exporting_metrics_succeed(auth_request, mock_controller, start): - auth_request.return_value = AuthResponse( - access_token_dashboard=MOCK_ACCESS_TOKEN, - access_token_ingest=MOCK_ACCESS_TOKEN) - exporter, client = _setup_client_and_exporter(mock_controller) - - result, error = client.start_exporting_metrics() - assert result is True - assert error is None - assert client.is_exporting_started is True - start.call_count == 1 - - with pytest.raises(AssertionError): - client.start_exporting_metrics() - - -""" -BaseModel Test -""" - - -def test_base_model(): - DEFAULT_VALUE = "default" - - class A(BaseModel): - __schema__ = { - "a": Field(required=True, default=None, type=str), - "b": Field(required=False, default=DEFAULT_VALUE, type=str) - } - - # Test the correct case. - obj = {"a": "1", "b": "1"} - a = A.parse_obj(obj) - assert a.a == "1" - assert a.b == "1" - assert a._dict == obj - string = "{name}\n{dict}".format(name=A.__name__, dict=str(obj)) - assert str(a) == string - - # Test wrong types. It is not checked in the current implementation. - obj = {"a": 1, "b": 2} - a = A.parse_obj(obj) - assert a.a == 1 - assert a.b == 2 - - # Test wrong types. parse_obj can only parse dictionary. - obj = None - with pytest.raises(AssertionError): - a = A.parse_obj(obj) - - # Test when required fields are not provided. - obj = {"b": "1"} - with pytest.raises(ValidationError): - a = A.parse_obj(obj) - - # Test optional fields are set to default when fields are not given. - obj = {"a": "1"} - a = A.parse_obj(obj) - assert a.b == DEFAULT_VALUE - - # Test when fields that are not defined in the schema is given. - # It should be ignoered - obj = {"a": "a", "b": "b", "c": "c"} - a = A.parse_obj(obj) - assert a.a == "a" - assert a.b == "b" - assert a.c == "c" - - -""" -Test Action Handler -""" - - -def _get_mock_kill_action(): - return { - "type": "KILL_ACTOR", - "actor_id": "1234", - "ip_address": "1234", - "port": 30 - } - - -@patch("ray.dashboard.dashboard.DashboardController") -def test_handle_kill_action(mock_controller): - action_handler = ActionHandler(mock_controller) - kill_action = _get_mock_kill_action() - action_handler.handle_kill_action(kill_action) - assert mock_controller.kill_actor.call_count == 1 - - -@patch("ray.dashboard.dashboard.DashboardController") -def test_handle_kill_action_invalid_dict(mock_controller): - action_handler = ActionHandler(mock_controller) - kill_action = {"type": "KILL_ACTOR", "ip_address": "1234", "port": 30} - - with pytest.raises(ValidationError): - action_handler.handle_kill_action(kill_action) - - -@patch("ray.dashboard.dashboard.DashboardController") -def test_handle_actions_many_kill_actor(mock_controller): - action_handler = ActionHandler(mock_controller) - # 10 actions required. - actions = [_get_mock_kill_action() for _ in range(10)] - - action_handler.handle_actions(actions) - assert mock_controller.kill_actor.call_count == 10 - - -@patch("ray.dashboard.dashboard.DashboardController") -def test_handle_actions_kill_actor_and_mixed_type(mock_controller): - action_handler = ActionHandler(mock_controller) - wrong_type_action = {"type": "NON_EXIST"} - actions = [ - _get_mock_kill_action(), wrong_type_action, - _get_mock_kill_action() - ] - - action_handler.handle_actions(actions) - assert mock_controller.kill_actor.call_count == 2 - - -@patch("ray.dashboard.dashboard.DashboardController") -def test_handle_actions_only_wrong_type(mock_controller): - action_handler = ActionHandler(mock_controller) - wrong_type_action = {"type": "NON_EXIST"} - actions = [wrong_type_action for _ in range(10)] - - action_handler.handle_actions(actions) - assert mock_controller.kill_actor.call_count == 0 - - -if __name__ == "__main__": - import sys - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/setup.py b/python/setup.py index 60f66d02f..e417d96ba 100644 --- a/python/setup.py +++ b/python/setup.py @@ -317,6 +317,8 @@ install_requires = [ "pyyaml", "requests", "redis >= 3.3.2, < 3.5.0", + "opencensus", + "prometheus_client >= 0.7.1", ] diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 5c9d66ab3..8ac33a109 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -334,8 +334,16 @@ message CoreWorkerStats { } message MetricPoint { + // Name of the metric. string metric_name = 1; + // Timestamp when metric is exported. int64 timestamp = 2; + // Value of the metric point. double value = 3; + // Tags of the metric. map tags = 4; + // [Optional] Description of the metric. + string description = 5; + // [Optional] Unit of the metric. + string units = 6; } diff --git a/src/ray/protobuf/reporter.proto b/src/ray/protobuf/reporter.proto index b0bcc9ca4..686a70e0f 100644 --- a/src/ray/protobuf/reporter.proto +++ b/src/ray/protobuf/reporter.proto @@ -35,10 +35,11 @@ message GetProfilingStatsReply { } message ReportMetricsRequest { - repeated MetricPoint metrics_point = 1; + repeated MetricPoint metrics_points = 1; } message ReportMetricsReply { + bool metrcs_description_required = 1; } // Service for communicating with the reporter.py process on a remote node. diff --git a/src/ray/stats/metric.cc b/src/ray/stats/metric.cc index 472b8209d..2cfe236b9 100644 --- a/src/ray/stats/metric.cc +++ b/src/ray/stats/metric.cc @@ -36,6 +36,10 @@ static void RegisterAsView(opencensus::stats::ViewDescriptor view_descriptor, view_descriptor.RegisterForExport(); } +/// +/// Stats Config +/// + StatsConfig &StatsConfig::instance() { static StatsConfig instance; return instance; @@ -71,6 +75,9 @@ void StatsConfig::SetIsInitialized(bool initialized) { is_initialized_ = initial bool StatsConfig::IsInitialized() const { return is_initialized_; } +/// +/// Metric +/// void Metric::Record(double value, const TagsType &tags) { if (StatsConfig::instance().IsStatsDisabled()) { return; diff --git a/src/ray/stats/metric.h b/src/ray/stats/metric.h index b20482c03..4a5b7ff70 100644 --- a/src/ray/stats/metric.h +++ b/src/ray/stats/metric.h @@ -185,7 +185,9 @@ struct MetricPoint { int64_t timestamp; double value; std::unordered_map tags; + const opencensus::stats::MeasureDescriptor &measure_descriptor; }; + } // namespace stats } // namespace ray diff --git a/src/ray/stats/metric_exporter.cc b/src/ray/stats/metric_exporter.cc index 84fd27ac2..da69ec94f 100644 --- a/src/ray/stats/metric_exporter.cc +++ b/src/ray/stats/metric_exporter.cc @@ -23,13 +23,15 @@ template <> void MetricExporter::ExportToPoints( const opencensus::stats::ViewData::DataMap &view_data, - const std::string &metric_name, std::vector &keys, - std::vector &points) { + const opencensus::stats::MeasureDescriptor &measure_descriptor, + std::vector &keys, std::vector &points) { // Return if no raw data found in view map. if (view_data.size() == 0) { return; } + const auto &metric_name = measure_descriptor.name(); + // NOTE(lingxuan.zlx): No sampling in histogram data, so all points all be filled in. std::unordered_map tags; for (size_t i = 0; i < view_data.begin()->first.size(); ++i) { @@ -53,10 +55,12 @@ void MetricExporter::ExportToPoints( } } hist_mean /= view_data.size(); - MetricPoint mean_point = {metric_name + ".mean", current_sys_time_ms(), hist_mean, - tags}; - MetricPoint max_point = {metric_name + ".max", current_sys_time_ms(), hist_max, tags}; - MetricPoint min_point = {metric_name + ".min", current_sys_time_ms(), hist_min, tags}; + MetricPoint mean_point = {metric_name + ".mean", current_sys_time_ms(), hist_mean, tags, + measure_descriptor}; + MetricPoint max_point = {metric_name + ".max", current_sys_time_ms(), hist_max, tags, + measure_descriptor}; + MetricPoint min_point = {metric_name + ".min", current_sys_time_ms(), hist_min, tags, + measure_descriptor}; points.push_back(std::move(mean_point)); points.push_back(std::move(max_point)); points.push_back(std::move(min_point)); @@ -85,17 +89,17 @@ void MetricExporter::ExportViewData( for (size_t i = 0; i < descriptor.columns().size(); ++i) { keys.push_back(descriptor.columns()[i].name()); } - auto &metric_name = descriptor.name(); + const auto &measure_descriptor = descriptor.measure_descriptor(); switch (view_data.type()) { case opencensus::stats::ViewData::Type::kDouble: - ExportToPoints(view_data.double_data(), metric_name, keys, points); + ExportToPoints(view_data.double_data(), measure_descriptor, keys, points); break; case opencensus::stats::ViewData::Type::kInt64: - ExportToPoints(view_data.int_data(), metric_name, keys, points); + ExportToPoints(view_data.int_data(), measure_descriptor, keys, points); break; case opencensus::stats::ViewData::Type::kDistribution: ExportToPoints(view_data.distribution_data(), - metric_name, keys, points); + measure_descriptor, keys, points); break; default: RAY_LOG(FATAL) << "Unknown view data type."; diff --git a/src/ray/stats/metric_exporter.h b/src/ray/stats/metric_exporter.h index f08937439..e07dcfff8 100644 --- a/src/ray/stats/metric_exporter.h +++ b/src/ray/stats/metric_exporter.h @@ -57,8 +57,9 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler { /// \param keys, metric tags map /// \param points, memory metric vector instance void ExportToPoints(const opencensus::stats::ViewData::DataMap &view_data, - const std::string &metric_name, std::vector &keys, - std::vector &points) { + const opencensus::stats::MeasureDescriptor &measure_descriptor, + std::vector &keys, std::vector &points) { + const auto &metric_name = measure_descriptor.name(); for (const auto &row : view_data) { std::unordered_map tags; for (size_t i = 0; i < keys.size(); ++i) { @@ -66,7 +67,7 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler { } // Current timestamp is used for point not view data time. MetricPoint point{metric_name, current_sys_time_ms(), - static_cast(row.second), tags}; + static_cast(row.second), tags, measure_descriptor}; RAY_LOG(DEBUG) << "Metric name " << metric_name << ", value " << point.value; points.push_back(std::move(point)); if (points.size() >= report_batch_size_) { diff --git a/src/ray/stats/metric_exporter_client.cc b/src/ray/stats/metric_exporter_client.cc index 9a774c517..7a1779536 100644 --- a/src/ray/stats/metric_exporter_client.cc +++ b/src/ray/stats/metric_exporter_client.cc @@ -54,7 +54,7 @@ void MetricsAgentExporter::ReportMetrics(const std::vector &points) MetricExporterDecorator::ReportMetrics(points); rpc::ReportMetricsRequest request; for (auto point : points) { - auto metric_point = request.add_metrics_point(); + auto metric_point = request.add_metrics_points(); metric_point->set_metric_name(point.metric_name); metric_point->set_timestamp(point.timestamp); metric_point->set_value(point.value); @@ -62,10 +62,22 @@ void MetricsAgentExporter::ReportMetrics(const std::vector &points) for (auto &tag : point.tags) { (*mutable_tags)[tag.first] = tag.second; } + // If description and units information is requested from + // the metrics agent, append the information. + // TODO(sang): It can be inefficient if there are lots of new registered metrics. + // We should make it more efficient if there's compelling use cases. + if (should_update_description_) { + metric_point->set_description(point.measure_descriptor.description()); + metric_point->set_units(point.measure_descriptor.units()); + } } + should_update_description_ = false; // TODO(sang): Should retry metrics report if it fails. - client_->ReportMetrics(request, nullptr); + client_->ReportMetrics( + request, [this](const Status &status, const rpc::ReportMetricsReply &reply) { + should_update_description_ = reply.metrcs_description_required(); + }); } } // namespace stats diff --git a/src/ray/stats/metric_exporter_client.h b/src/ray/stats/metric_exporter_client.h index bce4d9b62..be6e6bfec 100644 --- a/src/ray/stats/metric_exporter_client.h +++ b/src/ray/stats/metric_exporter_client.h @@ -69,6 +69,8 @@ class MetricsAgentExporter : public MetricExporterDecorator { std::unique_ptr client_; /// Call Manager for gRPC client. rpc::ClientCallManager client_call_manager_; + /// Whether or not description and units information for metrics should be updated. + bool should_update_description_ = true; }; } // namespace stats