mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:22:39 +08:00
[Metrics] Record per node and raylet cpu / mem usage (#12982)
* Record per node and raylet cpu / mem usage * Add comments. * Addressed code review.
This commit is contained in:
@@ -4,9 +4,10 @@ import os
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from collections import namedtuple
|
||||
from typing import List
|
||||
|
||||
from opencensus.stats import aggregation
|
||||
from opencensus.stats import measure as measure_module
|
||||
from opencensus.stats import stats as stats_module
|
||||
from opencensus.stats.view import View
|
||||
@@ -15,6 +16,9 @@ from opencensus.stats.aggregation_data import (CountAggregationData,
|
||||
DistributionAggregationData,
|
||||
LastValueAggregationData)
|
||||
from opencensus.metrics.export.value import ValueDouble
|
||||
from opencensus.tags import tag_key as tag_key_module
|
||||
from opencensus.tags import tag_map as tag_map_module
|
||||
from opencensus.tags import tag_value as tag_value_module
|
||||
|
||||
import ray
|
||||
|
||||
@@ -24,11 +28,41 @@ from ray.core.generated.metrics_pb2 import Metric
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Gauge(View):
|
||||
"""Gauge representation of opencensus view.
|
||||
|
||||
This class is used to collect process metrics from the reporter agent.
|
||||
Cpp metrics should be collected in a different way.
|
||||
"""
|
||||
|
||||
def __init__(self, name, description, unit, tags: List[str]):
|
||||
self._measure = measure_module.MeasureInt(name, description, unit)
|
||||
tags = [tag_key_module.TagKey(tag) for tag in tags]
|
||||
self._view = View(name, description, tags, self.measure,
|
||||
aggregation.LastValueAggregation())
|
||||
|
||||
@property
|
||||
def measure(self):
|
||||
return self._measure
|
||||
|
||||
@property
|
||||
def view(self):
|
||||
return self._view
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.measure.name
|
||||
|
||||
|
||||
Record = namedtuple("Record", ["gauge", "value", "tags"])
|
||||
|
||||
|
||||
class MetricsAgent:
|
||||
def __init__(self, metrics_export_port):
|
||||
assert metrics_export_port is not None
|
||||
# OpenCensus classes.
|
||||
self.view_manager = stats_module.stats.view_manager
|
||||
self.stats_recorder = stats_module.stats.stats_recorder
|
||||
# Port where we will expose metrics.
|
||||
self.metrics_export_port = metrics_export_port
|
||||
# Lock required because gRPC server uses
|
||||
@@ -41,6 +75,31 @@ class MetricsAgent:
|
||||
prometheus_exporter.Options(
|
||||
namespace="ray", port=metrics_export_port)))
|
||||
|
||||
def record_reporter_stats(self, records: List[Record]):
|
||||
with self._lock:
|
||||
for record in records:
|
||||
gauge = record.gauge
|
||||
value = record.value
|
||||
tags = record.tags
|
||||
self._record_gauge(gauge, value, tags)
|
||||
|
||||
def _record_gauge(self, gauge: Gauge, value: float, tags: dict):
|
||||
view_data = self.view_manager.get_view(gauge.name)
|
||||
if not view_data:
|
||||
self.view_manager.register_view(gauge.view)
|
||||
# Reobtain the view.
|
||||
view = self.view_manager.get_view(gauge.name).view
|
||||
|
||||
measurement_map = self.stats_recorder.new_measurement_map()
|
||||
tag_map = tag_map_module.TagMap()
|
||||
for key, tag_val in tags.items():
|
||||
tag_key = tag_key_module.TagKey(key)
|
||||
tag_value = tag_value_module.TagValue(tag_val)
|
||||
tag_map.insert(tag_key, tag_value)
|
||||
measurement_map.measure_float_put(view.measure, value)
|
||||
# NOTE: When we record this metric, timestamp will be renewed.
|
||||
measurement_map.record(tag_map)
|
||||
|
||||
def record_metric_points_from_protobuf(self, metrics: List[Metric]):
|
||||
"""Record metrics from Opencensus Protobuf"""
|
||||
with self._lock:
|
||||
|
||||
@@ -14,6 +14,8 @@ from contextlib import redirect_stdout, redirect_stderr
|
||||
import ray
|
||||
import ray._private.services
|
||||
import ray.utils
|
||||
import requests
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
from ray.scripts.scripts import main as ray_main
|
||||
|
||||
import psutil # We must import psutil after ray because we bundle it with ray.
|
||||
@@ -447,3 +449,26 @@ def new_scheduler_enabled():
|
||||
|
||||
def client_test_enabled() -> bool:
|
||||
return os.environ.get("RAY_CLIENT_MODE") == "1"
|
||||
|
||||
|
||||
def fetch_prometheus(prom_addresses):
|
||||
components_dict = {}
|
||||
metric_names = set()
|
||||
metric_samples = []
|
||||
for address in prom_addresses:
|
||||
if address not in components_dict:
|
||||
components_dict[address] = set()
|
||||
try:
|
||||
response = requests.get(f"http://{address}/metrics")
|
||||
except requests.exceptions.ConnectionError:
|
||||
continue
|
||||
|
||||
for line in response.text.split("\n"):
|
||||
for family in text_string_to_metric_families(line):
|
||||
for sample in family.samples:
|
||||
metric_names.add(sample.name)
|
||||
metric_samples.append(sample)
|
||||
if "Component" in sample.labels:
|
||||
components_dict[address].add(
|
||||
sample.labels["Component"])
|
||||
return components_dict, metric_names, metric_samples
|
||||
|
||||
@@ -5,15 +5,13 @@ from pprint import pformat
|
||||
import time
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import requests
|
||||
import pytest
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
|
||||
import ray
|
||||
from ray.ray_constants import PROMETHEUS_SERVICE_DISCOVERY_FILE
|
||||
from ray.metrics_agent import PrometheusServiceDiscoveryWriter
|
||||
from ray.util.metrics import Count, Histogram, Gauge
|
||||
from ray.test_utils import wait_for_condition, SignalActor
|
||||
from ray.test_utils import wait_for_condition, SignalActor, fetch_prometheus
|
||||
|
||||
|
||||
def test_prometheus_file_based_service_discovery(ray_start_cluster):
|
||||
@@ -115,29 +113,6 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test):
|
||||
|
||||
prom_addresses = _setup_cluster_for_test
|
||||
|
||||
# Make sure we can ping Prometheus endpoints.
|
||||
def fetch_prometheus(prom_addresses):
|
||||
components_dict = {}
|
||||
metric_names = set()
|
||||
metric_samples = []
|
||||
for address in prom_addresses:
|
||||
if address not in components_dict:
|
||||
components_dict[address] = set()
|
||||
try:
|
||||
response = requests.get(f"http://{address}/metrics")
|
||||
except requests.exceptions.ConnectionError:
|
||||
continue
|
||||
|
||||
for line in response.text.split("\n"):
|
||||
for family in text_string_to_metric_families(line):
|
||||
for sample in family.samples:
|
||||
metric_names.add(sample.name)
|
||||
metric_samples.append(sample)
|
||||
if "Component" in sample.labels:
|
||||
components_dict[address].add(
|
||||
sample.labels["Component"])
|
||||
return components_dict, metric_names, metric_samples
|
||||
|
||||
def test_cases():
|
||||
components_dict, metric_names, metric_samples = fetch_prometheus(
|
||||
prom_addresses)
|
||||
@@ -158,6 +133,9 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test):
|
||||
for metric_name in ["test_counter", "test_histogram"]:
|
||||
assert any(metric_name in full_name for full_name in metric_names)
|
||||
|
||||
# Make sure GCS server metrics are recorded.
|
||||
assert "ray_outbound_heartbeat_size_kb_sum" in metric_names
|
||||
|
||||
# Make sure the numeric value is correct
|
||||
test_counter_sample = [
|
||||
m for m in metric_samples if "test_counter" in m.name
|
||||
|
||||
Reference in New Issue
Block a user