diff --git a/python/ray/includes/ray_config.pxd b/python/ray/includes/ray_config.pxd index 791ff9059..17264b854 100644 --- a/python/ray/includes/ray_config.pxd +++ b/python/ray/includes/ray_config.pxd @@ -90,3 +90,5 @@ cdef extern from "ray/common/ray_config.h" nogil: c_bool put_small_object_in_memory_store() const uint32_t max_tasks_in_flight_per_worker() const + + uint64_t metrics_report_interval_ms() const diff --git a/python/ray/includes/ray_config.pxi b/python/ray/includes/ray_config.pxi index 725140389..3dd256010 100644 --- a/python/ray/includes/ray_config.pxi +++ b/python/ray/includes/ray_config.pxi @@ -165,3 +165,7 @@ cdef class Config: @staticmethod def max_tasks_in_flight_per_worker(): return RayConfig.instance().max_tasks_in_flight_per_worker() + + @staticmethod + def metrics_report_interval_ms(): + return RayConfig.instance().metrics_report_interval_ms() diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index f88f36b26..c9254ce5a 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -14,6 +14,7 @@ from ray.core.generated.common_pb2 import MetricPoint from ray.dashboard.util import get_unused_port from ray.metrics_agent import (Gauge, MetricsAgent, PrometheusServiceDiscoveryWriter) +from ray.test_utils import wait_for_condition def generate_metrics_point(name: str, @@ -216,6 +217,80 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster): loaded_json_data["targets"])) +def test_metrics_export_end_to_end(ray_start_cluster): + NUM_NODES = 2 + cluster = ray_start_cluster + # Add a head node. + cluster.add_node( + _internal_config=json.dumps({ + "metrics_report_interval_ms": 1000 + })) + # Add worker nodes. + [cluster.add_node() for _ in range(NUM_NODES - 1)] + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + # Generate some metrics around actor & tasks. + @ray.remote + def f(): + return 3 + + @ray.remote + class A: + def ping(self): + return 3 + + ray.get([f.remote() for _ in range(30)]) + a = A.remote() + ray.get(a.ping.remote()) + + node_info_list = ray.nodes() + prom_addresses = [] + for node_info in node_info_list: + metrics_export_port = node_info["MetricsExportPort"] + addr = node_info["NodeManagerAddress"] + prom_addresses.append(f"{addr}:{metrics_export_port}") + + # Make sure we can ping Prometheus endpoints. + def get_component_information(prom_addresses): + # TODO(sang): Add a core worker & gcs_server after adding metrics. + components_dict = {} + for address in prom_addresses: + if address not in components_dict: + components_dict[address] = set() + try: + response = requests.get( + "http://localhost:{}".format(metrics_export_port)) + except requests.exceptions.ConnectionError: + return components_dict + + for line in response.text.split("\n"): + for family in text_string_to_metric_families(line): + for sample in family.samples: + # print(sample) + if "Component" in sample.labels: + components_dict[address].add( + sample.labels["Component"]) + return components_dict + + def test_prometheus_endpoint(): + # TODO(sang): Add a core worker & gcs_server after adding metrics. + components_dict = get_component_information(prom_addresses) + COMPONENTS_CANDIDATES = {"raylet"} + return all( + COMPONENTS_CANDIDATES.issubset(components) + for components in components_dict.values()) + + try: + wait_for_condition(test_prometheus_endpoint, timeout=3) + except RuntimeError: + # This is for debugging when test failed. + print(get_component_information(prom_addresses)) + raise RuntimeError("All components were not visible to " + "prometheus endpoints on time.") + ray.shutdown() + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f3607caa5..ae7cdd502 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -346,3 +346,6 @@ RAY_CONFIG(bool, enable_multi_tenancy, false) /// Whether start the Plasma Store as a Raylet thread. RAY_CONFIG(bool, ownership_based_object_directory_enabled, false) + +// The interval where metrics are exported in milliseconds. +RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000) diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index a7eaba2b6..e4ee94fad 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -57,7 +57,7 @@ int main(int argc, char *argv[]) { } RayConfig::instance().initialize(config_map); - const ray::stats::TagsType global_tags = {{ray::stats::JobNameKey, "gcs_server"}, + const ray::stats::TagsType global_tags = {{ray::stats::ComponentKey, "gcs_server"}, {ray::stats::VersionKey, "0.9.0.dev0"}}; ray::stats::Init(global_tags, metrics_agent_port); diff --git a/src/ray/stats/metric.h b/src/ray/stats/metric.h index 4a5b7ff70..06e8534c4 100644 --- a/src/ray/stats/metric.h +++ b/src/ray/stats/metric.h @@ -76,12 +76,12 @@ class StatsConfig final { /// If true, don't collect metrics in this process. bool is_stats_disabled_ = true; // Regular reporting interval for all reporters. - absl::Duration report_interval_ = absl::Seconds(10); + absl::Duration report_interval_ = absl::Milliseconds(10000); // Time interval for periodic aggregation. // Exporter may capture empty collection if harvest interval is longer than // report interval. So harvest interval is suggusted to be half of report // interval. - absl::Duration harvest_interval_ = absl::Seconds(5); + absl::Duration harvest_interval_ = absl::Milliseconds(5000); // Whether or not if the stats has been initialized. bool is_initialized_ = false; }; diff --git a/src/ray/stats/stats.h b/src/ray/stats/stats.h index 2f85f2549..c7bb283e0 100644 --- a/src/ray/stats/stats.h +++ b/src/ray/stats/stats.h @@ -89,6 +89,13 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por exporter = exporter_to_use; } + // Set interval. + StatsConfig::instance().SetReportInterval(absl::Milliseconds(std::max( + RayConfig::instance().metrics_report_interval_ms(), static_cast(1000)))); + StatsConfig::instance().SetHarvestInterval( + absl::Milliseconds(std::max(RayConfig::instance().metrics_report_interval_ms() / 2, + static_cast(500)))); + MetricExporter::Register(exporter, metrics_report_batch_size); opencensus::stats::StatsExporter::SetInterval( StatsConfig::instance().GetReportInterval()); @@ -101,7 +108,6 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por /// Shutdown the initialized stats library. /// This cleans up various threads and metadata for stats library. static inline void Shutdown() { - // TODO(sang): Harvest thread is not currently cleaned up. absl::MutexLock lock(&stats_mutex); if (!StatsConfig::instance().IsInitialized()) { // Return if stats had never been initialized.