CoreWorker correctly push metrics to agent (#10031)

This commit is contained in:
Simon Mo
2020-08-13 16:44:53 -07:00
committed by GitHub
parent b77d6bf87d
commit 01f38bc5d1
6 changed files with 72 additions and 27 deletions
-1
View File
@@ -74,7 +74,6 @@ cdef class Gauge(Metric):
value = 5
key1= "key1"
key2 = "key2"
s
gauge.record(value, {"tagk1": key1, "tagk2": key2})
"""
def __init__(self, name, description, unit, tag_keys):
+2 -1
View File
@@ -104,7 +104,8 @@ class Node:
head), "LRU Evict can only be passed into the head node."
self._raylet_ip_address = raylet_ip_address
self.metrics_agent_port = self._get_unused_port()[0]
self.metrics_agent_port = (ray_params.metrics_agent_port
or self._get_unused_port()[0])
self._metrics_export_port = ray_params.metrics_export_port
if self._metrics_export_port is None:
self._metrics_export_port = self._get_unused_port()[0]
+2 -2
View File
@@ -1377,8 +1377,7 @@ def start_raylet(redis_address,
# Create the command that the Raylet will use to start workers.
start_worker_command = [
sys.executable,
worker_path,
sys.executable, worker_path,
"--node-ip-address={}".format(node_ip_address),
"--node-manager-port={}".format(node_manager_port),
"--object-store-name={}".format(plasma_store_name),
@@ -1386,6 +1385,7 @@ def start_raylet(redis_address,
"--redis-address={}".format(redis_address),
"--config-list={}".format(config_str),
"--temp-dir={}".format(temp_dir),
f"--metrics-agent-port={metrics_agent_port}"
]
if redis_password:
start_worker_command += ["--redis-password={}".format(redis_password)]
+59 -23
View File
@@ -1,20 +1,20 @@
import asyncio
import json
import pytest
import time
from collections import defaultdict
import requests
import pytest
from opencensus.tags import tag_key as tag_key_module
from prometheus_client.parser import text_string_to_metric_families
import ray
from ray.core.generated.common_pb2 import MetricPoint
from ray.dashboard.util import get_unused_port
from ray.metrics_agent import (Gauge, MetricsAgent,
PrometheusServiceDiscoveryWriter)
from ray.test_utils import wait_for_condition
from ray.experimental.metrics import Count, Histogram
from ray.test_utils import wait_for_condition, SignalActor
def generate_metrics_point(name: str,
@@ -217,6 +217,7 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster):
loaded_json_data["targets"]))
@pytest.mark.skip("This test is flaky right now. Will be fixed in #10080")
def test_metrics_export_end_to_end(ray_start_cluster):
NUM_NODES = 2
cluster = ray_start_cluster
@@ -230,19 +231,36 @@ def test_metrics_export_end_to_end(ray_start_cluster):
cluster.wait_for_nodes()
ray.init(address=cluster.address)
signal = SignalActor.remote()
# Generate some metrics around actor & tasks.
@ray.remote
def f():
return 3
counter = Count("test_counter", "desc", "unit", [])
ray.get(signal.send.remote())
while True:
counter.record(1, {})
time.sleep(0.1)
@ray.remote
class A:
def ping(self):
return 3
async def ready(self):
pass
ray.get([f.remote() for _ in range(30)])
async def ping(self):
histogram = Histogram("test_histogram", "desc", "unit", [0, 1, 2],
[])
while True:
histogram.record(1, {})
await asyncio.sleep(0.1)
obj_refs = [f.remote() for _ in range(30)]
a = A.remote()
ray.get(a.ping.remote())
obj_refs.append(a.ping.remote())
# Make sure both histogram and counter are created
ray.get(a.ready.remote())
ray.get(signal.wait.remote())
node_info_list = ray.nodes()
prom_addresses = []
@@ -252,9 +270,9 @@ def test_metrics_export_end_to_end(ray_start_cluster):
prom_addresses.append(f"{addr}:{metrics_export_port}")
# Make sure we can ping Prometheus endpoints.
def get_component_information(prom_addresses):
# TODO(sang): Add a core worker & gcs_server after adding metrics.
def fetch_prometheus(prom_addresses):
components_dict = {}
metric_names = set()
for address in prom_addresses:
if address not in components_dict:
components_dict[address] = set()
@@ -262,32 +280,50 @@ def test_metrics_export_end_to_end(ray_start_cluster):
response = requests.get(
"http://localhost:{}".format(metrics_export_port))
except requests.exceptions.ConnectionError:
return components_dict
return components_dict, metric_names
for line in response.text.split("\n"):
for family in text_string_to_metric_families(line):
for sample in family.samples:
# print(sample)
metric_names.add(sample.name)
if "Component" in sample.labels:
components_dict[address].add(
sample.labels["Component"])
return components_dict
return components_dict, metric_names
def test_prometheus_endpoint():
# TODO(sang): Add a core worker & gcs_server after adding metrics.
components_dict = get_component_information(prom_addresses)
COMPONENTS_CANDIDATES = {"raylet"}
return all(
COMPONENTS_CANDIDATES.issubset(components)
# TODO(Simon): Add a gcs_server after fixing metrics.
components_dict, metric_names = fetch_prometheus(prom_addresses)
# Raylet should be on every node
expected_components = {"raylet"}
components_found = all(
expected_components.issubset(components)
for components in components_dict.values())
# Core worker should be on at least one node
components_found = components_found and any(
"core_worker" in components
for components in components_dict.values())
expected_metric_names = {"ray_test_counter", "ray_test_histogram_max"}
metric_names_found = expected_metric_names.issubset(metric_names)
return components_found and metric_names_found
try:
wait_for_condition(test_prometheus_endpoint, timeout=3)
wait_for_condition(
test_prometheus_endpoint,
timeout=20,
retry_interval_ms=1000, # Yield resource for other processes
)
except RuntimeError:
# This is for debugging when test failed.
print(get_component_information(prom_addresses))
raise RuntimeError("All components were not visible to "
"prometheus endpoints on time.")
raise RuntimeError(
"All components were not visible to "
"prometheus endpoints on time. "
f"The compoenents are {fetch_prometheus(prom_addresses)}")
ray.shutdown()
+6
View File
@@ -80,6 +80,11 @@ parser.add_argument(
default=False,
action="store_true",
help="True if cloudpickle should be used for serialization.")
parser.add_argument(
"--metrics-agent-port",
required=True,
type=int,
help="the port of the node's metric agent.")
if __name__ == "__main__":
args = parser.parse_args()
@@ -109,6 +114,7 @@ if __name__ == "__main__":
raylet_socket_name=args.raylet_name,
temp_dir=args.temp_dir,
load_code_from_local=args.load_code_from_local,
metrics_agent_port=args.metrics_agent_port,
_internal_config=json.dumps(internal_config),
)