diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index 01d17a801..3d9472a3d 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -18,7 +18,7 @@ import ray._private.services import ray.utils from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc -from ray.metrics_agent import MetricsAgent +from ray.metrics_agent import MetricsAgent, Gauge, Record import psutil logger = logging.getLogger(__name__) @@ -72,6 +72,19 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, self._metrics_agent = MetricsAgent(dashboard_agent.metrics_export_port) self._key = f"{reporter_consts.REPORTER_PREFIX}" \ f"{self._dashboard_agent.node_id}" + # A list of gauges to record and export metrics. + self._gauges = { + "node_cpu": Gauge("node_cpu", "Total CPU usage on a ray node", + "percentage", ["ip"]), + "node_mem": Gauge("node_mem", "Total memory usage on a ray node", + "mb", ["ip"]), + "raylet_cpu": Gauge("raylet_cpu", + "CPU usage of the raylet on a node.", + "percentage", ["ip", "pid"]), + "raylet_mem": Gauge("raylet_mem", + "Memory usage of the raylet on a node", "mb", + ["ip", "pid"]) + } async def GetProfilingStats(self, request, context): pid = request.pid @@ -177,6 +190,25 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, ]) for w in self._workers if w.status() != psutil.STATUS_ZOMBIE ] + def _get_raylet_stats(self): + curr_proc = psutil.Process() + # Here, parent is always raylet because the + # dashboard agent is a child of the raylet process. + parent = curr_proc.parent() + if parent is None or parent.pid == 1: + return [] + if parent.status() == psutil.STATUS_ZOMBIE: + return [] + + return parent.as_dict(attrs=[ + "pid", + "create_time", + "cpu_percent", + "cpu_times", + "cmdline", + "memory_info", + ]) + @staticmethod def _get_raylet_cmdline(): try: @@ -207,7 +239,6 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, then, prev_network_stats = self._network_stats_hist[0] netstats = ((network_stats[0] - prev_network_stats[0]) / (now - then), (network_stats[1] - prev_network_stats[1]) / (now - then)) - return { "now": now, "hostname": self._hostname, @@ -224,11 +255,50 @@ class ReporterAgent(dashboard_utils.DashboardAgentModule, "cmdline": self._get_raylet_cmdline(), } + def _record_stats(self, stats): + ip = stats["ip"] + # -- CPU per node -- + cpu_usage = float(stats["cpu"]) + cpu_record = Record( + gauge=self._gauges["node_cpu"], value=cpu_usage, tags={"ip": ip}) + + # -- Mem per node -- + total, avail, _ = stats["mem"] + mem_usage = float(total - avail) / 1e6 + mem_record = Record( + gauge=self._gauges["node_mem"], value=mem_usage, tags={"ip": ip}) + + raylet_stats = self._get_raylet_stats() + raylet_pid = str(raylet_stats["pid"]) + # -- raylet CPU -- + raylet_cpu_usage = float(raylet_stats["cpu_percent"]) * 100 + raylet_cpu_record = Record( + gauge=self._gauges["raylet_cpu"], + value=raylet_cpu_usage, + tags={ + "ip": ip, + "pid": raylet_pid + }) + + # -- raylet mem -- + raylet_mem_usage = float(raylet_stats["memory_info"].rss) / 1e6 + raylet_mem_record = Record( + gauge=self._gauges["raylet_mem"], + value=raylet_mem_usage, + tags={ + "ip": ip, + "pid": raylet_pid + }) + + self._metrics_agent.record_reporter_stats( + [cpu_record, mem_record, raylet_cpu_record, raylet_mem_record]) + async def _perform_iteration(self, aioredis_client): """Get any changes to the log files and push updates to Redis.""" while True: try: stats = self._get_all_stats() + self._record_stats(stats) await aioredis_client.publish(self._key, jsonify_asdict(stats)) except Exception: logger.exception("Error publishing node physical stats.") diff --git a/dashboard/modules/reporter/tests/test_reporter.py b/dashboard/modules/reporter/tests/test_reporter.py index 62cb43c41..001ea42a5 100644 --- a/dashboard/modules/reporter/tests/test_reporter.py +++ b/dashboard/modules/reporter/tests/test_reporter.py @@ -6,13 +6,11 @@ import time import pytest import ray +from ray import ray_constants from ray.new_dashboard.tests.conftest import * # noqa -from ray.test_utils import ( - format_web_url, - RayTestTimeoutException, - wait_until_server_available, - wait_for_condition, -) +from ray.test_utils import (format_web_url, RayTestTimeoutException, + wait_until_server_available, wait_for_condition, + fetch_prometheus) logger = logging.getLogger(__name__) @@ -96,5 +94,36 @@ def test_node_physical_stats(enable_test_module, shutdown_only): wait_for_condition(_check_workers, timeout=10) +def test_prometheus_physical_stats_record(enable_test_module, shutdown_only): + addresses = ray.init(include_dashboard=True, num_cpus=1) + metrics_export_port = addresses["metrics_export_port"] + addr = addresses["raylet_ip_address"] + prom_addresses = [f"{addr}:{metrics_export_port}"] + + def test_case_stats_exist(): + components_dict, metric_names, metric_samples = fetch_prometheus( + prom_addresses) + return all([ + "ray_node_cpu" in metric_names, "ray_node_mem" in metric_names, + "ray_raylet_cpu" in metric_names, "ray_raylet_mem" in metric_names + ]) + + def test_case_ip_correct(): + components_dict, metric_names, metric_samples = fetch_prometheus( + prom_addresses) + raylet_proc = ray.worker._global_node.all_processes[ + ray_constants.PROCESS_TYPE_RAYLET][0] + raylet_pid = None + # Find the raylet pid recorded in the tag. + for sample in metric_samples: + if sample.name == "ray_raylet_cpu": + raylet_pid = sample.labels["pid"] + break + return str(raylet_proc.process.pid) == str(raylet_pid) + + wait_for_condition(test_case_stats_exist, retry_interval_ms=1000) + wait_for_condition(test_case_ip_correct, retry_interval_ms=1000) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/metrics_agent.py b/python/ray/metrics_agent.py index 300a32529..21a2aa2f7 100644 --- a/python/ray/metrics_agent.py +++ b/python/ray/metrics_agent.py @@ -4,9 +4,10 @@ import os import threading import time import traceback - +from collections import namedtuple from typing import List +from opencensus.stats import aggregation from opencensus.stats import measure as measure_module from opencensus.stats import stats as stats_module from opencensus.stats.view import View @@ -15,6 +16,9 @@ from opencensus.stats.aggregation_data import (CountAggregationData, DistributionAggregationData, LastValueAggregationData) from opencensus.metrics.export.value import ValueDouble +from opencensus.tags import tag_key as tag_key_module +from opencensus.tags import tag_map as tag_map_module +from opencensus.tags import tag_value as tag_value_module import ray @@ -24,11 +28,41 @@ from ray.core.generated.metrics_pb2 import Metric logger = logging.getLogger(__name__) +class Gauge(View): + """Gauge representation of opencensus view. + + This class is used to collect process metrics from the reporter agent. + Cpp metrics should be collected in a different way. + """ + + def __init__(self, name, description, unit, tags: List[str]): + self._measure = measure_module.MeasureInt(name, description, unit) + tags = [tag_key_module.TagKey(tag) for tag in tags] + self._view = View(name, description, tags, self.measure, + aggregation.LastValueAggregation()) + + @property + def measure(self): + return self._measure + + @property + def view(self): + return self._view + + @property + def name(self): + return self.measure.name + + +Record = namedtuple("Record", ["gauge", "value", "tags"]) + + class MetricsAgent: def __init__(self, metrics_export_port): assert metrics_export_port is not None # OpenCensus classes. self.view_manager = stats_module.stats.view_manager + self.stats_recorder = stats_module.stats.stats_recorder # Port where we will expose metrics. self.metrics_export_port = metrics_export_port # Lock required because gRPC server uses @@ -41,6 +75,31 @@ class MetricsAgent: prometheus_exporter.Options( namespace="ray", port=metrics_export_port))) + def record_reporter_stats(self, records: List[Record]): + with self._lock: + for record in records: + gauge = record.gauge + value = record.value + tags = record.tags + self._record_gauge(gauge, value, tags) + + def _record_gauge(self, gauge: Gauge, value: float, tags: dict): + view_data = self.view_manager.get_view(gauge.name) + if not view_data: + self.view_manager.register_view(gauge.view) + # Reobtain the view. + view = self.view_manager.get_view(gauge.name).view + + measurement_map = self.stats_recorder.new_measurement_map() + tag_map = tag_map_module.TagMap() + for key, tag_val in tags.items(): + tag_key = tag_key_module.TagKey(key) + tag_value = tag_value_module.TagValue(tag_val) + tag_map.insert(tag_key, tag_value) + measurement_map.measure_float_put(view.measure, value) + # NOTE: When we record this metric, timestamp will be renewed. + measurement_map.record(tag_map) + def record_metric_points_from_protobuf(self, metrics: List[Metric]): """Record metrics from Opencensus Protobuf""" with self._lock: diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 4185d3f0c..3751a89b6 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -14,6 +14,8 @@ from contextlib import redirect_stdout, redirect_stderr import ray import ray._private.services import ray.utils +import requests +from prometheus_client.parser import text_string_to_metric_families from ray.scripts.scripts import main as ray_main import psutil # We must import psutil after ray because we bundle it with ray. @@ -447,3 +449,26 @@ def new_scheduler_enabled(): def client_test_enabled() -> bool: return os.environ.get("RAY_CLIENT_MODE") == "1" + + +def fetch_prometheus(prom_addresses): + components_dict = {} + metric_names = set() + metric_samples = [] + for address in prom_addresses: + if address not in components_dict: + components_dict[address] = set() + try: + response = requests.get(f"http://{address}/metrics") + except requests.exceptions.ConnectionError: + continue + + for line in response.text.split("\n"): + for family in text_string_to_metric_families(line): + for sample in family.samples: + metric_names.add(sample.name) + metric_samples.append(sample) + if "Component" in sample.labels: + components_dict[address].add( + sample.labels["Component"]) + return components_dict, metric_names, metric_samples diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index 12490dd44..6f6c719c7 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -5,15 +5,13 @@ from pprint import pformat import time from unittest.mock import MagicMock -import requests import pytest -from prometheus_client.parser import text_string_to_metric_families import ray from ray.ray_constants import PROMETHEUS_SERVICE_DISCOVERY_FILE from ray.metrics_agent import PrometheusServiceDiscoveryWriter from ray.util.metrics import Count, Histogram, Gauge -from ray.test_utils import wait_for_condition, SignalActor +from ray.test_utils import wait_for_condition, SignalActor, fetch_prometheus def test_prometheus_file_based_service_discovery(ray_start_cluster): @@ -115,29 +113,6 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test): prom_addresses = _setup_cluster_for_test - # Make sure we can ping Prometheus endpoints. - def fetch_prometheus(prom_addresses): - components_dict = {} - metric_names = set() - metric_samples = [] - for address in prom_addresses: - if address not in components_dict: - components_dict[address] = set() - try: - response = requests.get(f"http://{address}/metrics") - except requests.exceptions.ConnectionError: - continue - - for line in response.text.split("\n"): - for family in text_string_to_metric_families(line): - for sample in family.samples: - metric_names.add(sample.name) - metric_samples.append(sample) - if "Component" in sample.labels: - components_dict[address].add( - sample.labels["Component"]) - return components_dict, metric_names, metric_samples - def test_cases(): components_dict, metric_names, metric_samples = fetch_prometheus( prom_addresses) @@ -158,6 +133,9 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test): for metric_name in ["test_counter", "test_histogram"]: assert any(metric_name in full_name for full_name in metric_names) + # Make sure GCS server metrics are recorded. + assert "ray_outbound_heartbeat_size_kb_sum" in metric_names + # Make sure the numeric value is correct test_counter_sample = [ m for m in metric_samples if "test_counter" in m.name diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index aacbe2926..c6ff254fb 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -114,6 +114,8 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Print debug info periodically. PrintDebugInfo(); + CollectStats(); + is_started_ = true; }