From bedc2c24c805b8aa80bcb0c10fc3a62e5dbfeee8 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Tue, 18 Aug 2020 11:32:42 -0700 Subject: [PATCH] Export Metrics in OpenCensus Protobuf Format (#10080) --- BUILD.bazel | 14 +- bazel/ray_deps_setup.bzl | 7 + python/ray/metrics_agent.py | 209 +++++----------- python/ray/reporter.py | 19 +- python/ray/tests/test_metrics_agent.py | 307 +++++++----------------- src/ray/protobuf/BUILD | 5 +- src/ray/protobuf/reporter.proto | 11 + src/ray/rpc/metrics_agent_client.h | 6 + src/ray/stats/metric_exporter.cc | 107 ++++++++- src/ray/stats/metric_exporter.h | 38 ++- src/ray/stats/metric_exporter_client.cc | 38 +-- src/ray/stats/metric_exporter_client.h | 14 +- src/ray/stats/stats.h | 7 +- 13 files changed, 329 insertions(+), 453 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 4994aca8c..7116f1154 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1902,17 +1902,11 @@ genrule( fi # NOTE(hchen): Protobuf doesn't allow specifying Python package name. So we use this `sed` # command to change the import path in the generated file. - files=( - python/ray/core/generated/gcs_pb2.py - python/ray/core/generated/common_pb2.py - python/ray/core/generated/node_manager_pb2.py - python/ray/core/generated/node_manager_pb2_grpc.py - python/ray/core/generated/reporter_pb2.py - python/ray/core/generated/reporter_pb2_grpc.py - python/ray/core/generated/core_worker_pb2.py - python/ray/core/generated/core_worker_pb2_grpc.py - ) + # shellcheck disable=SC2006 + files=(`ls python/ray/core/generated/*_pb2*.py`) sed -i -E 's/from src.ray.protobuf/from ./' "$${files[@]}" + sed -i -E 's/from opencensus.proto.metrics.v1 import/from . import/' "$${files[@]}" + sed -i -E 's/from opencensus.proto.resource.v1 import/from . import/' "$${files[@]}" echo "$${PWD}" > $@ """, local = 1, diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 05bad9cf6..b69fddf57 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -264,3 +264,10 @@ def ray_deps_setup(): "//thirdparty/patches:msgpack-windows-iovec.patch", ], ) + + http_archive( + name = "io_opencensus_proto", + strip_prefix = "opencensus-proto-0.3.0/src", + urls = ["https://github.com/census-instrumentation/opencensus-proto/archive/v0.3.0.tar.gz"], + sha256 = "b7e13f0b4259e80c3070b583c2f39e53153085a6918718b1c710caf7037572b0", + ) diff --git a/python/ray/metrics_agent.py b/python/ray/metrics_agent.py index 1d64ee2fa..70d728dd0 100644 --- a/python/ray/metrics_agent.py +++ b/python/ray/metrics_agent.py @@ -5,91 +5,35 @@ import threading import time import traceback -from collections import defaultdict from typing import List -from opencensus.stats import aggregation from opencensus.stats import measure as measure_module -from opencensus.stats.measurement_map import MeasurementMap from opencensus.stats import stats as stats_module -from opencensus.tags import tag_key as tag_key_module -from opencensus.tags import tag_map as tag_map_module -from opencensus.tags import tag_value as tag_value_module -from opencensus.stats import view +from opencensus.stats.view import View +from opencensus.stats.view_data import ViewData +from opencensus.stats.aggregation_data import (CountAggregationData, + DistributionAggregationData, + LastValueAggregationData) +from opencensus.metrics.export.value import ValueDouble import ray from ray import prometheus_exporter -from ray.core.generated.common_pb2 import MetricPoint +from ray.core.generated.metrics_pb2 import Metric logger = logging.getLogger(__name__) -# We don't need counter, histogram, or sum because reporter just needs to -# collect momental values (gauge) that are already counted or sampled -# (histogram for example), or summed inside cpp processes. -class Gauge(view.View): - def __init__(self, name, description, unit, - tags: List[tag_key_module.TagKey]): - self._measure = measure_module.MeasureInt(name, description, unit) - self._view = view.View(name, description, tags, self.measure, - aggregation.LastValueAggregation()) - - @property - def measure(self): - return self._measure - - @property - def view(self): - return self._view - - @property - def name(self): - return self.measure.name - - @property - def description(self): - return self.measure.description - - @property - def units(self): - return self.measure.unit - - @property - def tags(self): - return self.view.columns - - def __dict__(self): - return { - "name": self.measure.name, - "description": self.measure.description, - "units": self.measure.unit, - "tags": self.view.columns, - } - - def __str__(self): - return self.__repr__() - - def __repr__(self): - return str(self.__dict__()) - - class MetricsAgent: def __init__(self, metrics_export_port): assert metrics_export_port is not None # OpenCensus classes. self.view_manager = stats_module.stats.view_manager - self.stats_recorder = stats_module.stats.stats_recorder # Port where we will expose metrics. self.metrics_export_port = metrics_export_port - # metric name(str) -> view (view.View) - self._registry = defaultdict(lambda: None) # Lock required because gRPC server uses # multiple threads to process requests. self._lock = threading.Lock() - # Whether or not there are metrics that are missing description and - # units information. This is used to dynamically update registry. - self._missing_information = False # Configure exporter. (We currently only support prometheus). self.view_manager.register_exporter( @@ -97,96 +41,73 @@ class MetricsAgent: prometheus_exporter.Options( namespace="ray", port=metrics_export_port))) - @property - def registry(self): - """Return metric definition registry. - - Metrics definition registry is dynamically updated - by metrics reported by Ray processes. - """ - return self._registry - - def record_metrics_points(self, metrics_points: List[MetricPoint]): + def record_metric_points_from_protobuf(self, metrics: List[Metric]): + """Record metrics from Opencensus Protobuf""" with self._lock: - measurement_map = self.stats_recorder.new_measurement_map() - for metric_point in metrics_points: - self._register_if_needed(metric_point) - self._record(metric_point, measurement_map) - return self._missing_information + self._record_metrics(metrics) - def _record(self, metric_point: MetricPoint, - measurement_map: MeasurementMap): - """Record a single metric point to export. + def _record_metrics(self, metrics): + # The list of view data is what we are going to use for the + # final export to exporter. + view_data_changed: List[ViewData] = [] - NOTE: When this method is called, the caller should acquire a lock. + # Walk the protobufs and convert them to ViewData + for metric in metrics: + descriptor = metric.metric_descriptor + timeseries = metric.timeseries - Args: - metric_point(MetricPoint) metric point defined in common.proto - measurement_map(MeasurementMap): Measurement map to record metrics. - """ - metric_name = metric_point.metric_name - tags = metric_point.tags + if len(timeseries) == 0: + continue - metric = self._registry.get(metric_name) - # Metrics should be always registered dynamically. - assert metric + columns = [label_key.key for label_key in descriptor.label_keys] + start_time = timeseries[0].start_timestamp.seconds - tag_map = tag_map_module.TagMap() - for key, value in tags.items(): - tag_key = tag_key_module.TagKey(key) - tag_value = tag_value_module.TagValue(value) - tag_map.insert(tag_key, tag_value) + # Create the view and view_data + measure = measure_module.BaseMeasure( + descriptor.name, descriptor.description, descriptor.unit) + view = self.view_manager.measure_to_view_map.get_view( + descriptor.name, None) + if not view: + view = View( + descriptor.name, + descriptor.description, + columns, + measure, + aggregation=None) + self.view_manager.measure_to_view_map.register_view( + view, start_time) + view_data = (self.view_manager.measure_to_view_map. + _measure_to_view_data_list_map[measure.name][-1]) + view_data_changed.append(view_data) - metric_value = metric_point.value - measurement_map.measure_float_put(metric.measure, metric_value) - # NOTE: When we record this metric, timestamp will be renewed. - measurement_map.record(tag_map) + # Create the aggregation and fill it in the our stats + for series in timeseries: + tag_vals = tuple(val.value for val in series.label_values) + for point in series.points: + if point.HasField("int64_value"): + data = CountAggregationData(point.int64_value) + elif point.HasField("double_value"): + data = LastValueAggregationData( + ValueDouble, point.double_value) + elif point.HasField("distribution_value"): + dist_value = point.distribution_value + counts_per_bucket = [ + bucket.count for bucket in dist_value.buckets + ] + bucket_bounds = ( + dist_value.bucket_options.explicit.bounds) + data = DistributionAggregationData( + dist_value.sum / dist_value.count, + dist_value.count, + dist_value.sum_of_squared_deviation, + counts_per_bucket, bucket_bounds) + else: + raise ValueError("Summary is not supported") - def _register_if_needed(self, metric_point: MetricPoint): - """Register metrics if they are not registered. + view_data.tag_value_aggregation_data_map[tag_vals] = data - NOTE: When this method is called, the caller should acquire a lock. - - Unseen metrics: - Register it with Gauge type metrics. Note that all metrics in - the agent will be gauge because sampling is already done - within cpp processes. - Metrics that are missing description & units: - In this case, we will notify cpp proceses that we need this - information. Cpp processes will then report description and units - of all metrics they have. - - Args: - metric_point metric point defined in common.proto - Return: - True if given metrics are missing description and units. - False otherwise. - """ - metric_name = metric_point.metric_name - metric_description = metric_point.description - metric_units = metric_point.units - if self._registry[metric_name] is None: - tags = metric_point.tags - metric_tags = [] - for tag_key in tags: - metric_tags.append(tag_key_module.TagKey(tag_key)) - - metric = Gauge(metric_name, metric_description, metric_units, - metric_tags) - self._registry[metric_name] = metric - self.view_manager.register_view(metric.view) - - # If there are missing description & unit information, - # we should notify cpp processes that we need them. - if not metric_description or not metric_units: - self._missing_information = True - - if metric_description and metric_units: - self._registry[metric_name].view._description = metric_description - self._registry[ - metric_name].view.measure._description = metric_description - self._registry[metric_name].view.measure._unit = metric_units - self._missing_information = False + # Finally, export all the values + self.view_manager.measure_to_view_map.export(view_data_changed) class PrometheusServiceDiscoveryWriter(threading.Thread): diff --git a/python/ray/reporter.py b/python/ray/reporter.py index 0ef81d10d..382206445 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -58,23 +58,14 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer): return reporter_pb2.GetProfilingStatsReply( profiling_stats=profiling_stats, std_out=stdout, std_err=stderr) - def ReportMetrics(self, request, context): - # NOTE: Exceptions are not propagated properly - # when we don't catch them here. + def ReportOCMetrics(self, request, context): try: - metrcs_description_required = ( - self.metrics_agent.record_metrics_points( - request.metrics_points)) - except Exception as e: - logger.error(e) + self.metrics_agent.record_metric_points_from_protobuf( + request.metrics) + except Exception: logger.error(traceback.format_exc()) - # If metrics description is missing, we should notify cpp processes - # that we need them. Cpp processes will then report them to here. - # We need it when (1) a new metric is reported (application metric) - # (2) a reporter goes down and restarted (currently not implemented). - return reporter_pb2.ReportMetricsReply( - metrcs_description_required=metrcs_description_required) + return reporter_pb2.ReportOCMetricsReply() def recursive_asdict(o): diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 9012da973..9b0f63b2d 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -1,190 +1,16 @@ -import asyncio import json -import time -from collections import defaultdict +from pprint import pformat import requests import pytest -from opencensus.tags import tag_key as tag_key_module from prometheus_client.parser import text_string_to_metric_families import ray -from ray.core.generated.common_pb2 import MetricPoint -from ray.dashboard.util import get_unused_port -from ray.metrics_agent import (Gauge, MetricsAgent, - PrometheusServiceDiscoveryWriter) +from ray.metrics_agent import PrometheusServiceDiscoveryWriter from ray.experimental.metrics import Count, Histogram from ray.test_utils import wait_for_condition, SignalActor -def generate_metrics_point(name: str, - value: float, - timestamp: int, - tags: dict, - description: str = None, - units: str = None): - return MetricPoint( - metric_name=name, - timestamp=timestamp, - value=value, - tags=tags, - description=description, - units=units) - - -# NOTE: Opencensus metrics is a singleton per process. -# That says, we should re-use the same agent for all tests. -# Please be careful when you add new tests here. If each -# test doesn't use different metrics, it can have some confliction. -metrics_agent = None - - -@pytest.fixture -def cleanup_agent(): - global metrics_agent - if not metrics_agent: - metrics_agent = MetricsAgent(get_unused_port()) - yield - metrics_agent._registry = defaultdict(lambda: None) - - -def test_gauge(): - tags = [tag_key_module.TagKey(str(i)) for i in range(10)] - name = "name" - description = "description" - units = "units" - gauge = Gauge(name, description, units, tags) - assert gauge.__dict__()["name"] == name - assert gauge.__dict__()["description"] == description - assert gauge.__dict__()["units"] == units - assert gauge.__dict__()["tags"] == tags - - -def test_basic_e2e(cleanup_agent): - # Test the basic end to end workflow. This includes. - # - Metrics are reported. - # - Metrics are dynamically registered to registry. - # - Metrics are accessbiel from Prometheus. - POINTS_DEF = [0, 1, 2] - tag = {"TAG_KEY": "TAG_VALUE"} - metrics_points = [ - generate_metrics_point( - str(i), float(i), i, tag, description=str(i), units=str(i)) - for i in POINTS_DEF - ] - metrics_points_dict = { - metric_point.metric_name: metric_point - for metric_point in metrics_points - } - assert metrics_agent.record_metrics_points(metrics_points) is False - # Make sure all metrics are registered. - for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()): - metric_name, metric_entry = metric_entry - assert metric_name == metric_entry.name - assert metric_entry.name == str(i) - assert metric_entry.description == str(i) - assert metric_entry.units == str(i) - assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag] - - # Make sure all metrics are available through a port. - response = requests.get("http://localhost:{}".format( - metrics_agent.metrics_export_port)) - response.raise_for_status() - for line in response.text.split("\n"): - for family in text_string_to_metric_families(line): - metric_name = family.name - - if metric_name not in metrics_points_dict: - continue - - if line.startswith("# HELP"): - # description - assert (family.documentation == metrics_points_dict[ - metric_name].description) - else: - for sample in family.samples: - metrics_points_dict[metric_name].value == sample.value - - -def test_missing_def(cleanup_agent): - # Make sure when metrics with description and units are reported, - # agent updates its registry to include them. - POINTS_DEF = [4, 5, 6] - tag = {"TAG_KEY": "TAG_VALUE"} - metrics_points = [ - generate_metrics_point( - str(i), - float(i), - i, - tag, - ) for i in POINTS_DEF - ] - - # At first, metrics shouldn't have description and units. - assert metrics_agent.record_metrics_points(metrics_points) is True - for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()): - metric_name, metric_entry = metric_entry - assert metric_name == metric_entry.name - assert metric_entry.name == str(i) - assert metric_entry.description == "" - assert metric_entry.units == "" - assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag] - - # The points are coming again with description and units. - # Make sure they are updated. - metrics_points = [ - generate_metrics_point( - str(i), float(i), i, tag, description=str(i), units=str(i)) - for i in POINTS_DEF - ] - assert metrics_agent.record_metrics_points(metrics_points) is False - for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()): - metric_name, metric_entry = metric_entry - assert metric_name == metric_entry.name - assert metric_entry.name == str(i) - assert metric_entry.description == str(i) - assert metric_entry.units == str(i) - assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag] - - -def test_multiple_record(cleanup_agent): - # Make sure prometheus export data properly when multiple points with - # the same name is reported. - TOTAL_POINTS = 10 - NAME = "TEST" - values = list(range(TOTAL_POINTS)) - tags = [{"TAG_KEY": str(i)} for i in range(TOTAL_POINTS)] - timestamps = list(range(TOTAL_POINTS)) - points = [] - - for i in range(TOTAL_POINTS): - points.append( - generate_metrics_point( - name=NAME, - value=values[i], - timestamp=timestamps[i], - tags=tags[i])) - for point in points: - metrics_agent.record_metrics_points([point]) - - # Make sure data is available at prometheus. - response = requests.get("http://localhost:{}".format( - metrics_agent.metrics_export_port)) - response.raise_for_status() - - sample_values = [] - for line in response.text.split("\n"): - for family in text_string_to_metric_families(line): - metric_name = family.name - name_without_prefix = metric_name.split("_")[1] - if name_without_prefix != NAME: - continue - # Lines for recorded metrics values. - for sample in family.samples: - sample_values.append(sample.value) - assert sample_values == [point.value for point in points] - - def test_prometheus_file_based_service_discovery(ray_start_cluster): # Make sure Prometheus service discovery file is correctly written # when number of nodes are dynamically changed. @@ -217,8 +43,8 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster): loaded_json_data["targets"])) -@pytest.mark.skip("This test is flaky right now. Will be fixed in #10080") -def test_metrics_export_end_to_end(ray_start_cluster): +@pytest.fixture +def _setup_cluster_for_test(ray_start_cluster): NUM_NODES = 2 cluster = ray_start_cluster # Add a head node. @@ -231,36 +57,25 @@ def test_metrics_export_end_to_end(ray_start_cluster): cluster.wait_for_nodes() ray.init(address=cluster.address) - signal = SignalActor.remote() + worker_should_exit = SignalActor.remote() - # Generate some metrics around actor & tasks. + # Generate some metrics from actor & tasks. @ray.remote def f(): - counter = Count("test_counter", "desc", "unit", []) - ray.get(signal.send.remote()) - while True: - counter.record(1, {}) - time.sleep(0.1) + counter = Count(f"test_counter", "desc", "unit", []) + counter.record(1, {}) + ray.get(worker_should_exit.wait.remote()) @ray.remote class A: - async def ready(self): - pass - async def ping(self): - histogram = Histogram("test_histogram", "desc", "unit", [0, 1, 2], + histogram = Histogram("test_histogram", "desc", "unit", [0.1, 1.6], []) - while True: - histogram.record(1, {}) - await asyncio.sleep(0.1) + histogram.record(1.5, {}) + ray.get(worker_should_exit.wait.remote()) - obj_refs = [f.remote() for _ in range(30)] a = A.remote() - obj_refs.append(a.ping.remote()) - - # Make sure both histogram and counter are created - ray.get(a.ready.remote()) - ray.get(signal.wait.remote()) + obj_refs = [f.remote(), a.ping.remote()] node_info_list = ray.nodes() prom_addresses = [] @@ -269,62 +84,104 @@ def test_metrics_export_end_to_end(ray_start_cluster): addr = node_info["NodeManagerAddress"] prom_addresses.append(f"{addr}:{metrics_export_port}") + yield prom_addresses + + ray.get(worker_should_exit.send.remote()) + ray.get(obj_refs) + ray.shutdown() + cluster.shutdown() + + +def test_metrics_export_end_to_end(_setup_cluster_for_test): + TEST_TIMEOUT_S = 20 + + prom_addresses = _setup_cluster_for_test + # Make sure we can ping Prometheus endpoints. def fetch_prometheus(prom_addresses): components_dict = {} metric_names = set() + metric_samples = [] for address in prom_addresses: if address not in components_dict: components_dict[address] = set() try: - response = requests.get( - "http://localhost:{}".format(metrics_export_port)) + response = requests.get(f"http://{address}/metrics") except requests.exceptions.ConnectionError: - return components_dict, metric_names + continue for line in response.text.split("\n"): for family in text_string_to_metric_families(line): for sample in family.samples: - # print(sample) metric_names.add(sample.name) + metric_samples.append(sample) if "Component" in sample.labels: components_dict[address].add( sample.labels["Component"]) - return components_dict, metric_names + return components_dict, metric_names, metric_samples - def test_prometheus_endpoint(): - # TODO(Simon): Add a gcs_server after fixing metrics. - components_dict, metric_names = fetch_prometheus(prom_addresses) + def test_cases(): + components_dict, metric_names, metric_samples = fetch_prometheus( + prom_addresses) # Raylet should be on every node - expected_components = {"raylet"} - components_found = all( - expected_components.issubset(components) - for components in components_dict.values()) + assert all( + "raylet" in components for components in components_dict.values()) - # Core worker should be on at least one node - components_found = components_found and any( - "core_worker" in components - for components in components_dict.values()) + # GCS server should be on one node + assert any("gcs_server" in components + for components in components_dict.values()) - expected_metric_names = {"ray_test_counter", "ray_test_histogram_max"} - metric_names_found = expected_metric_names.issubset(metric_names) + # Core worker should be on at least on node + assert any("core_worker" in components + for components in components_dict.values()) - return components_found and metric_names_found + # Make sure our user defined metrics exist + for metric_name in ["test_counter", "test_histogram"]: + assert any(metric_name in full_name for full_name in metric_names) + + # Make sure the numeric value is correct + test_counter_sample = [ + m for m in metric_samples if "test_counter" in m.name + ][0] + assert test_counter_sample.value == 1.0 + + # Make sure the numeric value is correct + test_histogram_samples = [ + m for m in metric_samples if "test_histogram" in m.name + ] + buckets = { + m.labels["le"]: m.value + for m in test_histogram_samples if "_bucket" in m.name + } + # We recorded value 1.5 for the histogram. In Prometheus data model + # the histogram is cumulative. So we expect the count to appear in + # <1.1 and <+Inf buckets. + assert buckets == {"0.1": 0.0, "1.6": 1.0, "+Inf": 1.0} + hist_count = [m for m in test_histogram_samples + if "_count" in m.name][0].value + hist_sum = [m for m in test_histogram_samples + if "_sum" in m.name][0].value + assert hist_count == 1 + assert hist_sum == 1.5 + + def wrap_test_case_for_retry(): + try: + test_cases() + return True + except AssertionError: + return False try: wait_for_condition( - test_prometheus_endpoint, - timeout=20, + wrap_test_case_for_retry, + timeout=TEST_TIMEOUT_S, retry_interval_ms=1000, # Yield resource for other processes ) except RuntimeError: - # This is for debugging when test failed. - raise RuntimeError( - "All components were not visible to " - "prometheus endpoints on time. " - f"The compoenents are {fetch_prometheus(prom_addresses)}") - ray.shutdown() + print( + f"The compoenents are {pformat(fetch_prometheus(prom_addresses))}") + test_cases() # Should fail assert if __name__ == "__main__": diff --git a/src/ray/protobuf/BUILD b/src/ray/protobuf/BUILD index a08b76fee..1dc7f0eb0 100644 --- a/src/ray/protobuf/BUILD +++ b/src/ray/protobuf/BUILD @@ -56,7 +56,10 @@ python_grpc_compile( proto_library( name = "reporter_proto", srcs = ["reporter.proto"], - deps = [":common_proto"], + deps = [ + ":common_proto", + "@io_opencensus_proto//opencensus/proto/metrics/v1:metrics_proto" + ], ) cc_proto_library( diff --git a/src/ray/protobuf/reporter.proto b/src/ray/protobuf/reporter.proto index 686a70e0f..225c52048 100644 --- a/src/ray/protobuf/reporter.proto +++ b/src/ray/protobuf/reporter.proto @@ -16,6 +16,8 @@ syntax = "proto3"; package ray.rpc; +import "opencensus/proto/metrics/v1/metrics.proto"; + import "src/ray/protobuf/common.proto"; message GetProfilingStatsRequest { @@ -42,10 +44,19 @@ message ReportMetricsReply { bool metrcs_description_required = 1; } +message ReportOCMetricsRequest { + repeated opencensus.proto.metrics.v1.Metric metrics = 1; +} + +message ReportOCMetricsReply { +} + // Service for communicating with the reporter.py process on a remote node. service ReporterService { // Get the profiling stats. rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply); // Report metrics to the local metrics agents. rpc ReportMetrics(ReportMetricsRequest) returns (ReportMetricsReply); + // Report OpenCensus metrics to the local metrics agent. + rpc ReportOCMetrics(ReportOCMetricsRequest) returns (ReportOCMetricsReply); } diff --git a/src/ray/rpc/metrics_agent_client.h b/src/ray/rpc/metrics_agent_client.h index 09cdd4ee1..9a78b6d9d 100644 --- a/src/ray/rpc/metrics_agent_client.h +++ b/src/ray/rpc/metrics_agent_client.h @@ -47,6 +47,12 @@ class MetricsAgentClient { /// \param[in] callback The callback function that handles reply. VOID_RPC_CLIENT_METHOD(ReporterService, ReportMetrics, grpc_client_, ) + /// Report open census protobuf metrics to metrics agent. + /// + /// \param[in] request The request message. + /// \param[in] callback The callback function that handles reply. + VOID_RPC_CLIENT_METHOD(ReporterService, ReportOCMetrics, grpc_client_, ) + private: /// The RPC client. std::unique_ptr> grpc_client_; diff --git a/src/ray/stats/metric_exporter.cc b/src/ray/stats/metric_exporter.cc index bdf9f48c9..ec7bf5836 100644 --- a/src/ray/stats/metric_exporter.cc +++ b/src/ray/stats/metric_exporter.cc @@ -20,7 +20,7 @@ namespace ray { namespace stats { template <> -void MetricExporter::ExportToPoints( +void MetricPointExporter::ExportToPoints( const opencensus::stats::ViewData::DataMap &view_data, const opencensus::stats::MeasureDescriptor &measure_descriptor, @@ -75,7 +75,7 @@ void MetricExporter::ExportToPoints( } } -void MetricExporter::ExportViewData( +void MetricPointExporter::ExportViewData( const std::vector> &data) { std::vector points; @@ -110,5 +110,108 @@ void MetricExporter::ExportViewData( metric_exporter_client_->ReportMetrics(points); } +OpenCensusProtoExporter::OpenCensusProtoExporter(const int port, + boost::asio::io_service &io_service, + const std::string address) + : client_call_manager_(io_service) { + client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_)); +}; + +void OpenCensusProtoExporter::ExportViewData( + const std::vector> &data) { + // Start converting opencensus data into their protobuf format. + // The format can be found here + // https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto + rpc::ReportOCMetricsRequest request_proto; + + for (const auto &datum : data) { + // Unpack the fields we need for in memory data structure. + auto &view_descriptor = datum.first; + auto &view_data = datum.second; + auto &measure_descriptor = view_descriptor.measure_descriptor(); + + // Create one metric `Point` in protobuf. + auto request_point_proto = request_proto.add_metrics(); + + // Write the `MetricDescriptor`. + auto metric_descriptor_proto = request_point_proto->mutable_metric_descriptor(); + metric_descriptor_proto->set_name(measure_descriptor.name()); + metric_descriptor_proto->set_description(measure_descriptor.description()); + metric_descriptor_proto->set_unit(measure_descriptor.units()); + for (const auto &tag_key : view_descriptor.columns()) { + metric_descriptor_proto->add_label_keys()->set_key(tag_key.name()); + }; + + // Helpers for writing the actual `TimeSeries`. + auto start_time = absl::ToUnixSeconds(view_data.start_time()); + auto end_time = absl::ToUnixSeconds(view_data.end_time()); + auto make_new_data_point_proto = [&request_point_proto, start_time, end_time]( + const std::vector &tag_values) { + auto metric_timeseries_proto = request_point_proto->add_timeseries(); + metric_timeseries_proto->mutable_start_timestamp()->set_seconds(start_time); + + for (const auto &value : tag_values) { + metric_timeseries_proto->add_label_values()->set_value(value); + }; + + auto point_proto = metric_timeseries_proto->add_points(); + point_proto->mutable_timestamp()->set_seconds(end_time); + return point_proto; + }; + + // Write the `TimeSeries` for the given aggregated data type. + switch (view_data.type()) { + case opencensus::stats::ViewData::Type::kDouble: + for (const auto &row : view_data.double_data()) { + auto point_proto = make_new_data_point_proto(row.first /*tag_values*/); + point_proto->set_double_value(row.second); + } + break; + case opencensus::stats::ViewData::Type::kInt64: + for (const auto &row : view_data.int_data()) { + auto point_proto = make_new_data_point_proto(row.first /*tag_values*/); + point_proto->set_int64_value(row.second); + } + break; + case opencensus::stats::ViewData::Type::kDistribution: + for (const auto &row : view_data.distribution_data()) { + opencensus::stats::Distribution dist_value = row.second; + + auto point_proto = make_new_data_point_proto(row.first /*tag_values*/); + + // Copy in memory data into `DistributionValue` protobuf. + auto distribution_proto = point_proto->mutable_distribution_value(); + distribution_proto->set_count(dist_value.count()); + distribution_proto->set_sum(dist_value.count() * dist_value.mean()); + distribution_proto->set_sum_of_squared_deviation( + dist_value.sum_of_squared_deviation()); + + // Write the `BucketOption` and `Bucket` data. + auto bucket_opt_proto = + distribution_proto->mutable_bucket_options()->mutable_explicit_(); + for (const auto &bound : dist_value.bucket_boundaries().lower_boundaries()) { + bucket_opt_proto->add_bounds(bound); + } + for (const auto &count : dist_value.bucket_counts()) { + distribution_proto->add_buckets()->set_count(count); + } + } + break; + default: + RAY_LOG(FATAL) << "Unknown view data type."; + break; + } + } + + client_->ReportOCMetrics( + request_proto, [](const Status &status, const rpc::ReportOCMetricsReply &reply) { + RAY_UNUSED(reply); + if (!status.ok()) { + RAY_LOG(WARNING) << "Export metrics to agent failed: " << status; + } + }); +} + } // namespace stats } // namespace ray diff --git a/src/ray/stats/metric_exporter.h b/src/ray/stats/metric_exporter.h index e3e44bae9..c3695f3cb 100644 --- a/src/ray/stats/metric_exporter.h +++ b/src/ray/stats/metric_exporter.h @@ -13,9 +13,11 @@ // limitations under the License. #pragma once +#include #include "absl/memory/memory.h" #include "opencensus/stats/stats.h" #include "opencensus/tags/tag_key.h" +#include "ray/rpc/client_call.h" #include "ray/stats/metric.h" #include "ray/stats/metric_exporter_client.h" #include "ray/util/logging.h" @@ -28,19 +30,21 @@ namespace stats { /// opencensus data view, and sends it to the remote (for example /// sends metrics to dashboard agents through RPC). How to use it? Register metrics /// exporter after a main thread launched. -class MetricExporter final : public opencensus::stats::StatsExporter::Handler { +class MetricPointExporter final : public opencensus::stats::StatsExporter::Handler { public: - explicit MetricExporter(std::shared_ptr metric_exporter_client, - size_t report_batch_size = kDefaultBatchSize) + explicit MetricPointExporter( + std::shared_ptr metric_exporter_client, + size_t report_batch_size = kDefaultBatchSize) : metric_exporter_client_(metric_exporter_client), report_batch_size_(report_batch_size) {} - ~MetricExporter() = default; + ~MetricPointExporter() = default; static void Register(std::shared_ptr metric_exporter_client, size_t report_batch_size) { opencensus::stats::StatsExporter::RegisterPushHandler( - absl::make_unique(metric_exporter_client, report_batch_size)); + absl::make_unique(metric_exporter_client, + report_batch_size)); } void ExportViewData( @@ -84,5 +88,29 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler { size_t report_batch_size_; }; +class OpenCensusProtoExporter final : public opencensus::stats::StatsExporter::Handler { + public: + OpenCensusProtoExporter(const int port, boost::asio::io_service &io_service, + const std::string address); + + ~OpenCensusProtoExporter() = default; + + static void Register(const int port, boost::asio::io_service &io_service, + const std::string address) { + opencensus::stats::StatsExporter::RegisterPushHandler( + absl::make_unique(port, io_service, address)); + } + + void ExportViewData( + const std::vector> &data) override; + + private: + /// Call Manager for gRPC client. + rpc::ClientCallManager client_call_manager_; + /// Client to call a metrics agent gRPC server. + std::unique_ptr client_; +}; + } // namespace stats } // namespace ray diff --git a/src/ray/stats/metric_exporter_client.cc b/src/ray/stats/metric_exporter_client.cc index 365b610c4..eecb3cfb9 100644 --- a/src/ray/stats/metric_exporter_client.cc +++ b/src/ray/stats/metric_exporter_client.cc @@ -42,45 +42,11 @@ void MetricExporterDecorator::ReportMetrics(const std::vector &poin /// /// Metrics Agent Exporter /// -MetricsAgentExporter::MetricsAgentExporter(std::shared_ptr exporter, - const int port, - boost::asio::io_service &io_service, - const std::string address) - : MetricExporterDecorator(exporter), client_call_manager_(io_service) { - client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_)); -} +MetricsAgentExporter::MetricsAgentExporter(std::shared_ptr exporter) + : MetricExporterDecorator(exporter) {} void MetricsAgentExporter::ReportMetrics(const std::vector &points) { MetricExporterDecorator::ReportMetrics(points); - rpc::ReportMetricsRequest request; - for (auto point : points) { - auto metric_point = request.add_metrics_points(); - metric_point->set_metric_name(point.metric_name); - metric_point->set_timestamp(point.timestamp); - metric_point->set_value(point.value); - auto mutable_tags = metric_point->mutable_tags(); - for (auto &tag : point.tags) { - (*mutable_tags)[tag.first] = tag.second; - } - // If description and units information is requested from - // the metrics agent, append the information. - // TODO(sang): It can be inefficient if there are lots of new registered metrics. - // We should make it more efficient if there's compelling use cases. - if (should_update_description_) { - metric_point->set_description(point.measure_descriptor.description()); - metric_point->set_units(point.measure_descriptor.units()); - } - } - should_update_description_ = false; - - // TODO(sang): Should retry metrics report if it fails. - client_->ReportMetrics( - request, [this](const Status &status, const rpc::ReportMetricsReply &reply) { - if (!status.ok()) { - RAY_LOG(WARNING) << "ReportMetrics failed with status " << status; - } - should_update_description_ = reply.metrcs_description_required(); - }); } } // namespace stats diff --git a/src/ray/stats/metric_exporter_client.h b/src/ray/stats/metric_exporter_client.h index be6e6bfec..743c4dbd5 100644 --- a/src/ray/stats/metric_exporter_client.h +++ b/src/ray/stats/metric_exporter_client.h @@ -14,9 +14,6 @@ #pragma once -#include - -#include "ray/rpc/client_call.h" #include "ray/rpc/metrics_agent_client.h" #include "ray/stats/metric.h" @@ -57,20 +54,11 @@ class MetricExporterDecorator : public MetricExporterClient { class MetricsAgentExporter : public MetricExporterDecorator { public: - MetricsAgentExporter(std::shared_ptr exporter, const int port, - boost::asio::io_service &io_service, const std::string address); + MetricsAgentExporter(std::shared_ptr exporter); ~MetricsAgentExporter() {} void ReportMetrics(const std::vector &points) override; - - private: - /// Client to call a metrics agent gRPC server. - std::unique_ptr client_; - /// Call Manager for gRPC client. - rpc::ClientCallManager client_call_manager_; - /// Whether or not description and units information for metrics should be updated. - bool should_update_description_ = true; }; } // namespace stats diff --git a/src/ray/stats/stats.h b/src/ray/stats/stats.h index c7bb283e0..eb93e6e2a 100644 --- a/src/ray/stats/stats.h +++ b/src/ray/stats/stats.h @@ -83,8 +83,7 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por // Default exporter is a metrics agent exporter. if (exporter_to_use == nullptr) { std::shared_ptr stdout_exporter(new StdoutExporterClient()); - exporter.reset(new MetricsAgentExporter(stdout_exporter, metrics_agent_port, - (*metrics_io_service), "127.0.0.1")); + exporter.reset(new MetricsAgentExporter(stdout_exporter)); } else { exporter = exporter_to_use; } @@ -96,7 +95,9 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por absl::Milliseconds(std::max(RayConfig::instance().metrics_report_interval_ms() / 2, static_cast(500)))); - MetricExporter::Register(exporter, metrics_report_batch_size); + MetricPointExporter::Register(exporter, metrics_report_batch_size); + OpenCensusProtoExporter::Register(metrics_agent_port, (*metrics_io_service), + "127.0.0.1"); opencensus::stats::StatsExporter::SetInterval( StatsConfig::instance().GetReportInterval()); opencensus::stats::DeltaProducer::Get()->SetHarvestInterval(