From 01f38bc5d134cbda728105f1592ddeebcf6f85cb Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 13 Aug 2020 16:44:53 -0700 Subject: [PATCH] CoreWorker correctly push metrics to agent (#10031) --- python/ray/includes/metric.pxi | 1 - python/ray/node.py | 3 +- python/ray/services.py | 4 +- python/ray/tests/test_metrics_agent.py | 82 ++++++++++++++++++------- python/ray/workers/default_worker.py | 6 ++ src/ray/stats/metric_exporter_client.cc | 3 + 6 files changed, 72 insertions(+), 27 deletions(-) diff --git a/python/ray/includes/metric.pxi b/python/ray/includes/metric.pxi index 94c78f502..443a476ad 100644 --- a/python/ray/includes/metric.pxi +++ b/python/ray/includes/metric.pxi @@ -74,7 +74,6 @@ cdef class Gauge(Metric): value = 5 key1= "key1" key2 = "key2" -s gauge.record(value, {"tagk1": key1, "tagk2": key2}) """ def __init__(self, name, description, unit, tag_keys): diff --git a/python/ray/node.py b/python/ray/node.py index 1be8393a6..6149a7ae8 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -104,7 +104,8 @@ class Node: head), "LRU Evict can only be passed into the head node." self._raylet_ip_address = raylet_ip_address - self.metrics_agent_port = self._get_unused_port()[0] + self.metrics_agent_port = (ray_params.metrics_agent_port + or self._get_unused_port()[0]) self._metrics_export_port = ray_params.metrics_export_port if self._metrics_export_port is None: self._metrics_export_port = self._get_unused_port()[0] diff --git a/python/ray/services.py b/python/ray/services.py index 4d1a2f925..a8c60116f 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1377,8 +1377,7 @@ def start_raylet(redis_address, # Create the command that the Raylet will use to start workers. start_worker_command = [ - sys.executable, - worker_path, + sys.executable, worker_path, "--node-ip-address={}".format(node_ip_address), "--node-manager-port={}".format(node_manager_port), "--object-store-name={}".format(plasma_store_name), @@ -1386,6 +1385,7 @@ def start_raylet(redis_address, "--redis-address={}".format(redis_address), "--config-list={}".format(config_str), "--temp-dir={}".format(temp_dir), + f"--metrics-agent-port={metrics_agent_port}" ] if redis_password: start_worker_command += ["--redis-password={}".format(redis_password)] diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index c9254ce5a..9012da973 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -1,20 +1,20 @@ +import asyncio import json -import pytest - +import time from collections import defaultdict import requests - +import pytest from opencensus.tags import tag_key as tag_key_module from prometheus_client.parser import text_string_to_metric_families import ray - from ray.core.generated.common_pb2 import MetricPoint from ray.dashboard.util import get_unused_port from ray.metrics_agent import (Gauge, MetricsAgent, PrometheusServiceDiscoveryWriter) -from ray.test_utils import wait_for_condition +from ray.experimental.metrics import Count, Histogram +from ray.test_utils import wait_for_condition, SignalActor def generate_metrics_point(name: str, @@ -217,6 +217,7 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster): loaded_json_data["targets"])) +@pytest.mark.skip("This test is flaky right now. Will be fixed in #10080") def test_metrics_export_end_to_end(ray_start_cluster): NUM_NODES = 2 cluster = ray_start_cluster @@ -230,19 +231,36 @@ def test_metrics_export_end_to_end(ray_start_cluster): cluster.wait_for_nodes() ray.init(address=cluster.address) + signal = SignalActor.remote() + # Generate some metrics around actor & tasks. @ray.remote def f(): - return 3 + counter = Count("test_counter", "desc", "unit", []) + ray.get(signal.send.remote()) + while True: + counter.record(1, {}) + time.sleep(0.1) @ray.remote class A: - def ping(self): - return 3 + async def ready(self): + pass - ray.get([f.remote() for _ in range(30)]) + async def ping(self): + histogram = Histogram("test_histogram", "desc", "unit", [0, 1, 2], + []) + while True: + histogram.record(1, {}) + await asyncio.sleep(0.1) + + obj_refs = [f.remote() for _ in range(30)] a = A.remote() - ray.get(a.ping.remote()) + obj_refs.append(a.ping.remote()) + + # Make sure both histogram and counter are created + ray.get(a.ready.remote()) + ray.get(signal.wait.remote()) node_info_list = ray.nodes() prom_addresses = [] @@ -252,9 +270,9 @@ def test_metrics_export_end_to_end(ray_start_cluster): prom_addresses.append(f"{addr}:{metrics_export_port}") # Make sure we can ping Prometheus endpoints. - def get_component_information(prom_addresses): - # TODO(sang): Add a core worker & gcs_server after adding metrics. + def fetch_prometheus(prom_addresses): components_dict = {} + metric_names = set() for address in prom_addresses: if address not in components_dict: components_dict[address] = set() @@ -262,32 +280,50 @@ def test_metrics_export_end_to_end(ray_start_cluster): response = requests.get( "http://localhost:{}".format(metrics_export_port)) except requests.exceptions.ConnectionError: - return components_dict + return components_dict, metric_names for line in response.text.split("\n"): for family in text_string_to_metric_families(line): for sample in family.samples: # print(sample) + metric_names.add(sample.name) if "Component" in sample.labels: components_dict[address].add( sample.labels["Component"]) - return components_dict + return components_dict, metric_names def test_prometheus_endpoint(): - # TODO(sang): Add a core worker & gcs_server after adding metrics. - components_dict = get_component_information(prom_addresses) - COMPONENTS_CANDIDATES = {"raylet"} - return all( - COMPONENTS_CANDIDATES.issubset(components) + # TODO(Simon): Add a gcs_server after fixing metrics. + components_dict, metric_names = fetch_prometheus(prom_addresses) + + # Raylet should be on every node + expected_components = {"raylet"} + components_found = all( + expected_components.issubset(components) for components in components_dict.values()) + # Core worker should be on at least one node + components_found = components_found and any( + "core_worker" in components + for components in components_dict.values()) + + expected_metric_names = {"ray_test_counter", "ray_test_histogram_max"} + metric_names_found = expected_metric_names.issubset(metric_names) + + return components_found and metric_names_found + try: - wait_for_condition(test_prometheus_endpoint, timeout=3) + wait_for_condition( + test_prometheus_endpoint, + timeout=20, + retry_interval_ms=1000, # Yield resource for other processes + ) except RuntimeError: # This is for debugging when test failed. - print(get_component_information(prom_addresses)) - raise RuntimeError("All components were not visible to " - "prometheus endpoints on time.") + raise RuntimeError( + "All components were not visible to " + "prometheus endpoints on time. " + f"The compoenents are {fetch_prometheus(prom_addresses)}") ray.shutdown() diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 4a8cfff3c..c489d48a5 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -80,6 +80,11 @@ parser.add_argument( default=False, action="store_true", help="True if cloudpickle should be used for serialization.") +parser.add_argument( + "--metrics-agent-port", + required=True, + type=int, + help="the port of the node's metric agent.") if __name__ == "__main__": args = parser.parse_args() @@ -109,6 +114,7 @@ if __name__ == "__main__": raylet_socket_name=args.raylet_name, temp_dir=args.temp_dir, load_code_from_local=args.load_code_from_local, + metrics_agent_port=args.metrics_agent_port, _internal_config=json.dumps(internal_config), ) diff --git a/src/ray/stats/metric_exporter_client.cc b/src/ray/stats/metric_exporter_client.cc index 4272b4fdb..365b610c4 100644 --- a/src/ray/stats/metric_exporter_client.cc +++ b/src/ray/stats/metric_exporter_client.cc @@ -76,6 +76,9 @@ void MetricsAgentExporter::ReportMetrics(const std::vector &points) // TODO(sang): Should retry metrics report if it fails. client_->ReportMetrics( request, [this](const Status &status, const rpc::ReportMetricsReply &reply) { + if (!status.ok()) { + RAY_LOG(WARNING) << "ReportMetrics failed with status " << status; + } should_update_description_ = reply.metrcs_description_required(); }); }