[Stats] Make metrics report time configurable (#10036)

* Done.

* Lint.

* Address code review.

* Address code review.

* Remove wrong commit.

* Fix a test error.
This commit is contained in:
SangBin Cho
2020-08-13 00:30:24 -07:00
committed by GitHub
parent 739933e5b8
commit 86b1db3f11
7 changed files with 94 additions and 4 deletions
+2
View File
@@ -90,3 +90,5 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_bool put_small_object_in_memory_store() const
uint32_t max_tasks_in_flight_per_worker() const
uint64_t metrics_report_interval_ms() const
+4
View File
@@ -165,3 +165,7 @@ cdef class Config:
@staticmethod
def max_tasks_in_flight_per_worker():
return RayConfig.instance().max_tasks_in_flight_per_worker()
@staticmethod
def metrics_report_interval_ms():
return RayConfig.instance().metrics_report_interval_ms()
+75
View File
@@ -14,6 +14,7 @@ from ray.core.generated.common_pb2 import MetricPoint
from ray.dashboard.util import get_unused_port
from ray.metrics_agent import (Gauge, MetricsAgent,
PrometheusServiceDiscoveryWriter)
from ray.test_utils import wait_for_condition
def generate_metrics_point(name: str,
@@ -216,6 +217,80 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster):
loaded_json_data["targets"]))
def test_metrics_export_end_to_end(ray_start_cluster):
NUM_NODES = 2
cluster = ray_start_cluster
# Add a head node.
cluster.add_node(
_internal_config=json.dumps({
"metrics_report_interval_ms": 1000
}))
# Add worker nodes.
[cluster.add_node() for _ in range(NUM_NODES - 1)]
cluster.wait_for_nodes()
ray.init(address=cluster.address)
# Generate some metrics around actor & tasks.
@ray.remote
def f():
return 3
@ray.remote
class A:
def ping(self):
return 3
ray.get([f.remote() for _ in range(30)])
a = A.remote()
ray.get(a.ping.remote())
node_info_list = ray.nodes()
prom_addresses = []
for node_info in node_info_list:
metrics_export_port = node_info["MetricsExportPort"]
addr = node_info["NodeManagerAddress"]
prom_addresses.append(f"{addr}:{metrics_export_port}")
# Make sure we can ping Prometheus endpoints.
def get_component_information(prom_addresses):
# TODO(sang): Add a core worker & gcs_server after adding metrics.
components_dict = {}
for address in prom_addresses:
if address not in components_dict:
components_dict[address] = set()
try:
response = requests.get(
"http://localhost:{}".format(metrics_export_port))
except requests.exceptions.ConnectionError:
return components_dict
for line in response.text.split("\n"):
for family in text_string_to_metric_families(line):
for sample in family.samples:
# print(sample)
if "Component" in sample.labels:
components_dict[address].add(
sample.labels["Component"])
return components_dict
def test_prometheus_endpoint():
# TODO(sang): Add a core worker & gcs_server after adding metrics.
components_dict = get_component_information(prom_addresses)
COMPONENTS_CANDIDATES = {"raylet"}
return all(
COMPONENTS_CANDIDATES.issubset(components)
for components in components_dict.values())
try:
wait_for_condition(test_prometheus_endpoint, timeout=3)
except RuntimeError:
# This is for debugging when test failed.
print(get_component_information(prom_addresses))
raise RuntimeError("All components were not visible to "
"prometheus endpoints on time.")
ray.shutdown()
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))
+3
View File
@@ -346,3 +346,6 @@ RAY_CONFIG(bool, enable_multi_tenancy, false)
/// Whether start the Plasma Store as a Raylet thread.
RAY_CONFIG(bool, ownership_based_object_directory_enabled, false)
// The interval where metrics are exported in milliseconds.
RAY_CONFIG(uint64_t, metrics_report_interval_ms, 10000)
+1 -1
View File
@@ -57,7 +57,7 @@ int main(int argc, char *argv[]) {
}
RayConfig::instance().initialize(config_map);
const ray::stats::TagsType global_tags = {{ray::stats::JobNameKey, "gcs_server"},
const ray::stats::TagsType global_tags = {{ray::stats::ComponentKey, "gcs_server"},
{ray::stats::VersionKey, "0.9.0.dev0"}};
ray::stats::Init(global_tags, metrics_agent_port);
+2 -2
View File
@@ -76,12 +76,12 @@ class StatsConfig final {
/// If true, don't collect metrics in this process.
bool is_stats_disabled_ = true;
// Regular reporting interval for all reporters.
absl::Duration report_interval_ = absl::Seconds(10);
absl::Duration report_interval_ = absl::Milliseconds(10000);
// Time interval for periodic aggregation.
// Exporter may capture empty collection if harvest interval is longer than
// report interval. So harvest interval is suggusted to be half of report
// interval.
absl::Duration harvest_interval_ = absl::Seconds(5);
absl::Duration harvest_interval_ = absl::Milliseconds(5000);
// Whether or not if the stats has been initialized.
bool is_initialized_ = false;
};
+7 -1
View File
@@ -89,6 +89,13 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por
exporter = exporter_to_use;
}
// Set interval.
StatsConfig::instance().SetReportInterval(absl::Milliseconds(std::max(
RayConfig::instance().metrics_report_interval_ms(), static_cast<uint64_t>(1000))));
StatsConfig::instance().SetHarvestInterval(
absl::Milliseconds(std::max(RayConfig::instance().metrics_report_interval_ms() / 2,
static_cast<uint64_t>(500))));
MetricExporter::Register(exporter, metrics_report_batch_size);
opencensus::stats::StatsExporter::SetInterval(
StatsConfig::instance().GetReportInterval());
@@ -101,7 +108,6 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por
/// Shutdown the initialized stats library.
/// This cleans up various threads and metadata for stats library.
static inline void Shutdown() {
// TODO(sang): Harvest thread is not currently cleaned up.
absl::MutexLock lock(&stats_mutex);
if (!StatsConfig::instance().IsInitialized()) {
// Return if stats had never been initialized.