mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 15:23:01 +08:00
[Stats] Metrics Export User Interface Part 2 (Prometheus Service Discovery) (#9970)
* In progress. * In Progress. * Finish the working version. * Write a documentation. * Addressed code review. * Fix lint error. * Lint. * Addressed code review. Make test less flaky. * Use a random port for ray start. * Modify doc. * Make write atomic.
This commit is contained in:
@@ -36,6 +36,7 @@ from ray.dashboard.metrics_exporter.client import Exporter
|
||||
from ray.dashboard.metrics_exporter.client import MetricsExportClient
|
||||
from ray.dashboard.node_stats import NodeStats
|
||||
from ray.dashboard.util import to_unix_time, measures_to_dict, format_resource
|
||||
from ray.metrics_agent import PrometheusServiceDiscoveryWriter
|
||||
|
||||
try:
|
||||
from ray.tune import Analysis
|
||||
@@ -493,6 +494,8 @@ class Dashboard:
|
||||
self.dashboard_id = str(uuid.uuid4())
|
||||
self.dashboard_controller = DashboardController(
|
||||
redis_address, redis_password)
|
||||
self.service_discovery = PrometheusServiceDiscoveryWriter(
|
||||
redis_address, redis_password, temp_dir)
|
||||
|
||||
# Setting the environment variable RAY_DASHBOARD_DEV=1 disables some
|
||||
# security checks in the dashboard server to ease development while
|
||||
@@ -571,6 +574,7 @@ class Dashboard:
|
||||
def run(self):
|
||||
self.log_dashboard_url()
|
||||
self.dashboard_controller.start_collecting_metrics()
|
||||
self.service_discovery.start()
|
||||
if self.metrics_export_address:
|
||||
self._start_exporting_metrics()
|
||||
aiohttp.web.run_app(self.app, host=self.host, port=self.port)
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import List
|
||||
@@ -13,6 +17,8 @@ from opencensus.tags import tag_map as tag_map_module
|
||||
from opencensus.tags import tag_value as tag_value_module
|
||||
from opencensus.stats import view
|
||||
|
||||
import ray
|
||||
|
||||
from ray import prometheus_exporter
|
||||
from ray.core.generated.common_pb2 import MetricPoint
|
||||
|
||||
@@ -181,3 +187,73 @@ class MetricsAgent:
|
||||
metric_name].view.measure._description = metric_description
|
||||
self._registry[metric_name].view.measure._unit = metric_units
|
||||
self._missing_information = False
|
||||
|
||||
|
||||
class PrometheusServiceDiscoveryWriter(threading.Thread):
|
||||
"""A class to support Prometheus service discovery.
|
||||
|
||||
It supports file-based service discovery. Checkout
|
||||
https://prometheus.io/docs/guides/file-sd/ for more details.
|
||||
|
||||
Args:
|
||||
redis_address(str): Ray's redis address.
|
||||
redis_password(str): Ray's redis password.
|
||||
temp_dir(str): Temporary directory used by
|
||||
Ray to store logs and metadata.
|
||||
"""
|
||||
|
||||
def __init__(self, redis_address, redis_password, temp_dir):
|
||||
ray.state.state._initialize_global_state(
|
||||
redis_address=redis_address, redis_password=redis_password)
|
||||
self.temp_dir = temp_dir
|
||||
self.default_service_discovery_flush_period = 5
|
||||
super().__init__()
|
||||
|
||||
def get_file_discovery_content(self):
|
||||
"""Return the content for Prometheus serivce discovery."""
|
||||
nodes = ray.nodes()
|
||||
metrics_export_addresses = [
|
||||
"{}:{}".format(node["NodeManagerAddress"],
|
||||
node["MetricsExportPort"]) for node in nodes
|
||||
]
|
||||
return json.dumps([{
|
||||
"labels": {
|
||||
"job": "ray"
|
||||
},
|
||||
"targets": metrics_export_addresses
|
||||
}])
|
||||
|
||||
def write(self):
|
||||
# Write a file based on https://prometheus.io/docs/guides/file-sd/
|
||||
# Write should be atomic. Otherwise, Prometheus raises an error that
|
||||
# json file format is invalid because it reads a file when
|
||||
# file is re-written. Note that Prometheus still works although we
|
||||
# have this error.
|
||||
temp_file_name = self.get_temp_file_name()
|
||||
with open(temp_file_name, "w") as json_file:
|
||||
json_file.write(self.get_file_discovery_content())
|
||||
# NOTE: os.rename is atomic, so we won't have race condition reading
|
||||
# this file.
|
||||
os.rename(temp_file_name, self.get_target_file_name())
|
||||
|
||||
def get_target_file_name(self):
|
||||
return os.path.join(
|
||||
self.temp_dir, ray.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE)
|
||||
|
||||
def get_temp_file_name(self):
|
||||
return os.path.join(
|
||||
self.temp_dir, "{}_{}".format(
|
||||
"tmp", ray.ray_constants.PROMETHEUS_SERVICE_DISCOVERY_FILE))
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
# This thread won't be broken by exceptions.
|
||||
try:
|
||||
self.write()
|
||||
except Exception as e:
|
||||
logger.warning("Writing a service discovery file, {},"
|
||||
"failed."
|
||||
.format(self.writer.get_target_file_name()))
|
||||
logger.warning(traceback.format_exc())
|
||||
logger.warning("Error message: {}".format(e))
|
||||
time.sleep(self.default_service_discovery_flush_period)
|
||||
|
||||
@@ -37,6 +37,7 @@ REDIS_MINIMUM_MEMORY_BYTES = 10**7
|
||||
DEFAULT_PORT = 6379
|
||||
|
||||
DEFAULT_DASHBOARD_PORT = 8265
|
||||
PROMETHEUS_SERVICE_DISCOVERY_FILE = "prom_metrics_service_discovery.json"
|
||||
# Default resource requirements for actors when no resource requirements are
|
||||
# specified.
|
||||
DEFAULT_ACTOR_METHOD_CPU_SIMPLE = 1
|
||||
|
||||
@@ -350,7 +350,7 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
|
||||
@click.option(
|
||||
"--metrics-export-port",
|
||||
type=int,
|
||||
default=8080,
|
||||
default=None,
|
||||
help="the port to use to expose Ray metrics through a "
|
||||
"Prometheus endpoint.")
|
||||
def start(node_ip_address, redis_address, address, redis_port, port,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from collections import defaultdict
|
||||
@@ -7,9 +8,12 @@ import requests
|
||||
from opencensus.tags import tag_key as tag_key_module
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
|
||||
import ray
|
||||
|
||||
from ray.core.generated.common_pb2 import MetricPoint
|
||||
from ray.dashboard.util import get_unused_port
|
||||
from ray.metrics_agent import Gauge, MetricsAgent
|
||||
from ray.metrics_agent import (Gauge, MetricsAgent,
|
||||
PrometheusServiceDiscoveryWriter)
|
||||
|
||||
|
||||
def generate_metrics_point(name: str,
|
||||
@@ -180,6 +184,38 @@ def test_multiple_record(cleanup_agent):
|
||||
assert sample_values == [point.value for point in points]
|
||||
|
||||
|
||||
def test_prometheus_file_based_service_discovery(ray_start_cluster):
|
||||
# Make sure Prometheus service discovery file is correctly written
|
||||
# when number of nodes are dynamically changed.
|
||||
NUM_NODES = 5
|
||||
cluster = ray_start_cluster
|
||||
nodes = [cluster.add_node() for _ in range(NUM_NODES)]
|
||||
cluster.wait_for_nodes()
|
||||
addr = ray.init(address=cluster.address)
|
||||
redis_address = addr["redis_address"]
|
||||
writer = PrometheusServiceDiscoveryWriter(
|
||||
redis_address, ray.ray_constants.REDIS_DEFAULT_PASSWORD, "/tmp/ray")
|
||||
|
||||
def get_metrics_export_address_from_node(nodes):
|
||||
return [
|
||||
"{}:{}".format(node.node_ip_address, node.metrics_export_port)
|
||||
for node in nodes
|
||||
]
|
||||
|
||||
loaded_json_data = json.loads(writer.get_file_discovery_content())[0]
|
||||
assert (set(get_metrics_export_address_from_node(nodes)) == set(
|
||||
loaded_json_data["targets"]))
|
||||
|
||||
# Let's update nodes.
|
||||
for _ in range(3):
|
||||
nodes.append(cluster.add_node())
|
||||
|
||||
# Make sure service discovery file content is correctly updated.
|
||||
loaded_json_data = json.loads(writer.get_file_discovery_content())[0]
|
||||
assert (set(get_metrics_export_address_from_node(nodes)) == set(
|
||||
loaded_json_data["targets"]))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
|
||||
Reference in New Issue
Block a user