diff --git a/doc/source/ray-metrics.rst b/doc/source/ray-metrics.rst index be4e71518..c6137ac38 100644 --- a/doc/source/ray-metrics.rst +++ b/doc/source/ray-metrics.rst @@ -120,3 +120,34 @@ Getting Started (Cluster Launcher) ---------------------------------- When you use a Ray cluster launcher, it is common node IP addresses are changing because cluster is scaling up and down. In this case, you can use Prometheus' `file based service discovery `_. + +Prometheus Service Discovery Support +------------------------------------ +Ray auto-generates a Prometheus `service discovery file `_ in a head node to help metrics agents' service discovery. +This allows you to easily scrape all metrics at each node in autoscaling clusters. Let's walkthrough how to acheive this. + +The service discovery file is generated in a head node. Note that head node is a node where you started by `ray start --head` or ran `ray.init()`. + +Inside a head node, check out a `temp_dir` of Ray. By default, it is `/tmp/ray` (in both Linux and MacOS). You should be able to find a file `prom_metrics_service_discovery.json`. +Ray periodically updates the addresses of all metrics agents in a cluster to this file. + +Now, modify a Prometheus config to scrape the file for service discovery. + +.. code-block:: yaml + + # Prometheus config file + + # my global config + global: + scrape_interval: 2s + evaluation_interval: 2s + + # A scrape configuration containing exactly one endpoint to scrape: + # Here it's Prometheus itself. + scrape_configs: + - job_name: 'ray' + file_sd_configs: + - files: + - '/tmp/ray/prom_metrics_service_discovery.json' + +Prometheus will automatically detect that the file contents are changing and update addresses it scrapes to based on the service discovery file generated by Ray. diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index c9a22bd6e..e8d996028 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -36,6 +36,7 @@ from ray.dashboard.metrics_exporter.client import Exporter from ray.dashboard.metrics_exporter.client import MetricsExportClient from ray.dashboard.node_stats import NodeStats from ray.dashboard.util import to_unix_time, measures_to_dict, format_resource +from ray.metrics_agent import PrometheusServiceDiscoveryWriter try: from ray.tune import Analysis @@ -493,6 +494,8 @@ class Dashboard: self.dashboard_id = str(uuid.uuid4()) self.dashboard_controller = DashboardController( redis_address, redis_password) + self.service_discovery = PrometheusServiceDiscoveryWriter( + redis_address, redis_password, temp_dir) # Setting the environment variable RAY_DASHBOARD_DEV=1 disables some # security checks in the dashboard server to ease development while @@ -571,6 +574,7 @@ class Dashboard: def run(self): self.log_dashboard_url() self.dashboard_controller.start_collecting_metrics() + self.service_discovery.start() if self.metrics_export_address: self._start_exporting_metrics() aiohttp.web.run_app(self.app, host=self.host, port=self.port) diff --git a/python/ray/metrics_agent.py b/python/ray/metrics_agent.py index 4d84ea6d5..1d64ee2fa 100644 --- a/python/ray/metrics_agent.py +++ b/python/ray/metrics_agent.py @@ -1,5 +1,9 @@ +import json import logging +import os import threading +import time +import traceback from collections import defaultdict from typing import List @@ -13,6 +17,8 @@ from opencensus.tags import tag_map as tag_map_module from opencensus.tags import tag_value as tag_value_module from opencensus.stats import view +import ray + from ray import prometheus_exporter from ray.core.generated.common_pb2 import MetricPoint @@ -181,3 +187,73 @@ class MetricsAgent: metric_name].view.measure._description = metric_description self._registry[metric_name].view.measure._unit = metric_units self._missing_information = False + + +class PrometheusServiceDiscoveryWriter(threading.Thread): + """A class to support Prometheus service discovery. + + It supports file-based service discovery. Checkout + https://prometheus.io/docs/guides/file-sd/ for more details. + + Args: + redis_address(str): Ray's redis address. + redis_password(str): Ray's redis password. + temp_dir(str): Temporary directory used by + Ray to store logs and metadata. + """ + + def __init__(self, redis_address, redis_password, temp_dir): + ray.state.state._initialize_global_state( + redis_address=redis_address, redis_password=redis_password) + self.temp_dir = temp_dir + self.default_service_discovery_flush_period = 5 + super().__init__() + + def get_file_discovery_content(self): + """Return the content for Prometheus serivce discovery.""" + nodes = ray.nodes() + metrics_export_addresses = [ + "{}:{}".format(node["NodeManagerAddress"], + node["MetricsExportPort"]) for node in nodes + ] + return json.dumps([{ + "labels": { + "job": "ray" + }, + "targets": metrics_export_addresses + }]) + + def write(self): + # Write a file based on https://prometheus.io/docs/guides/file-sd/ + # Write should be atomic. Otherwise, Prometheus raises an error that + # json file format is invalid because it reads a file when + # file is re-written. Note that Prometheus still works although we + # have this error. + temp_file_name = self.get_temp_file_name() + with open(temp_file_name, "w") as json_file: + json_file.write(self.get_file_discovery_content()) + # NOTE: os.rename is atomic, so we won't have race condition reading + # this file. + os.rename(temp_file_name, self.get_target_file_name()) + + def get_target_file_name(self): + return os.path.join( + self.temp_dir, ray.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE) + + def get_temp_file_name(self): + return os.path.join( + self.temp_dir, "{}_{}".format( + "tmp", ray.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE)) + + def run(self): + while True: + # This thread won't be broken by exceptions. + try: + self.write() + except Exception as e: + logger.warning("Writing a service discovery file, {}," + "failed." + .format(self.writer.get_target_file_name())) + logger.warning(traceback.format_exc()) + logger.warning("Error message: {}".format(e)) + time.sleep(self.default_service_discovery_flush_period) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index d17eb8f19..c65d4a5fa 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -37,6 +37,7 @@ REDIS_MINIMUM_MEMORY_BYTES = 10**7 DEFAULT_PORT = 6379 DEFAULT_DASHBOARD_PORT = 8265 +PROMETHEUS_SERVICE_DISCOVERY_FILE = "prom_metrics_service_discovery.json" # Default resource requirements for actors when no resource requirements are # specified. DEFAULT_ACTOR_METHOD_CPU_SIMPLE = 1 diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index c0edd4419..88da974fc 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -350,7 +350,7 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): @click.option( "--metrics-export-port", type=int, - default=8080, + default=None, help="the port to use to expose Ray metrics through a " "Prometheus endpoint.") def start(node_ip_address, redis_address, address, redis_port, port, diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 98e0cc896..f88f36b26 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -1,3 +1,4 @@ +import json import pytest from collections import defaultdict @@ -7,9 +8,12 @@ import requests from opencensus.tags import tag_key as tag_key_module from prometheus_client.parser import text_string_to_metric_families +import ray + from ray.core.generated.common_pb2 import MetricPoint from ray.dashboard.util import get_unused_port -from ray.metrics_agent import Gauge, MetricsAgent +from ray.metrics_agent import (Gauge, MetricsAgent, + PrometheusServiceDiscoveryWriter) def generate_metrics_point(name: str, @@ -180,6 +184,38 @@ def test_multiple_record(cleanup_agent): assert sample_values == [point.value for point in points] +def test_prometheus_file_based_service_discovery(ray_start_cluster): + # Make sure Prometheus service discovery file is correctly written + # when number of nodes are dynamically changed. + NUM_NODES = 5 + cluster = ray_start_cluster + nodes = [cluster.add_node() for _ in range(NUM_NODES)] + cluster.wait_for_nodes() + addr = ray.init(address=cluster.address) + redis_address = addr["redis_address"] + writer = PrometheusServiceDiscoveryWriter( + redis_address, ray.ray_constants.REDIS_DEFAULT_PASSWORD, "/tmp/ray") + + def get_metrics_export_address_from_node(nodes): + return [ + "{}:{}".format(node.node_ip_address, node.metrics_export_port) + for node in nodes + ] + + loaded_json_data = json.loads(writer.get_file_discovery_content())[0] + assert (set(get_metrics_export_address_from_node(nodes)) == set( + loaded_json_data["targets"])) + + # Let's update nodes. + for _ in range(3): + nodes.append(cluster.add_node()) + + # Make sure service discovery file content is correctly updated. + loaded_json_data = json.loads(writer.get_file_discovery_content())[0] + assert (set(get_metrics_export_address_from_node(nodes)) == set( + loaded_json_data["targets"])) + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__]))