[Stats] Metrics Export User Interface Part 1 (#9913)

* Metrics export port expose done.

* Support exposing metrics port + metrics agent service discovery through ray.nodes()

* Formatting.

* Added a doc.

* Linting.

* Change the location of metrics agent port.

* Addressed code review.

* Addressed code review.
This commit is contained in:
SangBin Cho
2020-08-06 16:16:29 -07:00
committed by GitHub
parent eace94d2dc
commit ec2f1a225e
26 changed files with 322 additions and 51 deletions
+2 -1
View File
@@ -659,7 +659,7 @@ cdef class CoreWorker:
JobID job_id, GcsClientOptions gcs_options, log_dir,
node_ip_address, node_manager_port, raylet_ip_address,
local_mode, driver_name, stdout_file, stderr_file,
serialized_job_config):
serialized_job_config, metrics_agent_port):
self.is_driver = is_driver
self.is_local_mode = local_mode
@@ -690,6 +690,7 @@ cdef class CoreWorker:
options.kill_main = kill_main_task
options.terminate_asyncio_thread = terminate_asyncio_thread
options.serialized_job_config = serialized_job_config
options.metrics_agent_port = metrics_agent_port
CCoreWorkerProcess.Initialize(options)
+1
View File
@@ -226,6 +226,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
(c_bool() nogil) kill_main
CCoreWorkerOptions()
(void() nogil) terminate_asyncio_thread
int metrics_agent_port
c_string serialized_job_config
cdef cppclass CCoreWorkerProcess "ray::CoreWorkerProcess":
+53 -1
View File
@@ -62,7 +62,20 @@ cdef class Metric:
cdef class Gauge(Metric):
"""Cython wrapper class of C++ `ray::stats::Gauge`.
Gauge: Keeps the last recorded value, drops everything before.
Gauge: Keeps the last recorded value, drops everything before.
Example:
>>> gauge = Gauge(
"ray.worker.metric",
"description",
"unit",
["tagk1", "tagk2"]).
value = 5
key1= "key1"
key2 = "key2"
s
gauge.record(value, {"tagk1": key1, "tagk2": key2})
"""
def __init__(self, name, description, unit, tag_keys):
"""Create a gauge metric
@@ -88,6 +101,19 @@ cdef class Gauge(Metric):
cdef class Count(Metric):
"""Cython wrapper class of C++ `ray::stats::Count`.
Example:
>>> count = Count(
"ray.worker.metric",
"description",
"unit",
["tagk1", "tagk2"]).
value = 5
key1= "key1"
key2 = "key2"
count.record(value, {"tagk1": key1, "tagk2": key2})
Count: The count of the number of metric points.
"""
def __init__(self, name, description, unit, tag_keys):
@@ -114,6 +140,19 @@ cdef class Count(Metric):
cdef class Sum(Metric):
"""Cython wrapper class of C++ `ray::stats::Sum`.
Example:
>>> metric_sum = Sum(
"ray.worker.metric",
"description",
"unit",
["tagk1", "tagk2"]).
value = 5
key1= "key1"
key2 = "key2"
metric_sum.record(value, {"tagk1": key1, "tagk2": key2})
Sum: A sum up of the metric points.
"""
def __init__(self, name, description, unit, tag_keys):
@@ -141,6 +180,19 @@ cdef class Sum(Metric):
cdef class Histogram(Metric):
"""Cython wrapper class of C++ `ray::stats::Histogram`.
Example:
>>> histogram = Histogram(
"ray.worker.histogram1",
"desciprtion",
"unit",
[1.0, 2.0], # boundaries.
["tagk1"])
value = 5
key1= "key1"
histogram.record(value, {"tagk1": key1})
Histogram: Histogram distribution of metric points.
"""
def __init__(self, name, description, unit, boundaries, tag_keys):
+15 -1
View File
@@ -103,12 +103,17 @@ class Node:
head), "LRU Evict can only be passed into the head node."
self._raylet_ip_address = raylet_ip_address
self.metrics_agent_port = self._get_unused_port()[0]
self._metrics_export_port = ray_params.metrics_export_port
if self._metrics_export_port is None:
self._metrics_export_port = self._get_unused_port()[0]
ray_params.update_if_absent(
include_log_monitor=True,
resources={},
temp_dir=ray.utils.get_ray_temp_dir(),
metrics_agent_port=self._get_unused_port()[0],
metrics_agent_port=self.metrics_agent_port,
metrics_export_port=self._metrics_export_port,
worker_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"workers/default_worker.py"))
@@ -318,6 +323,11 @@ class Node:
"""Get the node manager's port."""
return self._ray_params.node_manager_port
@property
def metrics_export_port(self):
"""Get the port that exposes metrics"""
return self._metrics_export_port
@property
def socket(self):
"""Get the socket reserving the node manager's port"""
@@ -337,6 +347,7 @@ class Node:
"raylet_socket_name": self._raylet_socket_name,
"webui_url": self._webui_url,
"session_dir": self._session_dir,
"metrics_export_port": self._metrics_export_port
}
def create_redis_client(self):
@@ -561,9 +572,11 @@ class Node:
"""Start the reporter."""
stdout_file, stderr_file = self.get_log_file_handles(
"reporter", unique=True)
process_info = ray.services.start_reporter(
self.redis_address,
self._ray_params.metrics_agent_port,
self._metrics_export_port,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
@@ -667,6 +680,7 @@ class Node:
self._ray_params.object_manager_port,
self._ray_params.redis_password,
self._ray_params.metrics_agent_port,
self._metrics_export_port,
use_valgrind=use_valgrind,
use_profiler=use_profiler,
stdout_file=stdout_file,
+4
View File
@@ -89,6 +89,8 @@ class RayParams:
java_worker_options (list): The command options for Java worker.
load_code_from_local: Whether load code from local file or from GCS.
metrics_agent_port(int): The port to bind metrics agent.
metrics_export_port(int): The port at which metrics are exposed
through a Prometheus endpoint.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
lru_evict (bool): Enable LRU eviction if space is needed.
@@ -142,6 +144,7 @@ class RayParams:
_internal_config=None,
enable_object_reconstruction=False,
metrics_agent_port=None,
metrics_export_port=None,
lru_evict=False):
self.object_ref_seed = object_ref_seed
self.redis_address = redis_address
@@ -181,6 +184,7 @@ class RayParams:
self.java_worker_options = java_worker_options
self.load_code_from_local = load_code_from_local
self.metrics_agent_port = metrics_agent_port
self.metrics_export_port = metrics_export_port
self.start_initial_python_workers_for_first_job = (
start_initial_python_workers_for_first_job)
self._internal_config = _internal_config
+15 -7
View File
@@ -19,7 +19,6 @@ import ray.services
import ray.utils
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.dashboard.util import get_unused_port
from ray.metrics_agent import MetricsAgent
# Logger for this module. It should be configured at the entry point
@@ -116,16 +115,17 @@ class Reporter:
redis_client: A client used to communicate with the Redis server.
"""
def __init__(self, redis_address, port, redis_password=None):
def __init__(self,
redis_address,
port,
metrics_export_port,
redis_password=None):
"""Initialize the reporter object."""
self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False))
self.ip = ray.services.get_node_ip_address()
self.hostname = platform.node()
self.port = port
metrics_agent_port = os.getenv("METRICS_AGENT_PORT")
if not metrics_agent_port:
metrics_agent_port = get_unused_port()
self.metrics_agent = MetricsAgent(metrics_agent_port)
self.metrics_agent = MetricsAgent(metrics_export_port)
self.reporter_grpc_server = ReporterServer(self.metrics_agent)
_ = psutil.cpu_percent() # For initialization
@@ -282,6 +282,11 @@ if __name__ == "__main__":
required=True,
type=int,
help="The port to bind the reporter process.")
parser.add_argument(
"--metrics-export-port",
required=True,
type=int,
help="The port to expose metrics through Prometheus.")
parser.add_argument(
"--redis-password",
required=False,
@@ -305,7 +310,10 @@ if __name__ == "__main__":
ray.utils.setup_logger(args.logging_level, args.logging_format)
reporter = Reporter(
args.redis_address, args.port, redis_password=args.redis_password)
args.redis_address,
args.port,
args.metrics_export_port,
redis_password=args.redis_password)
try:
reporter.run()
+9 -2
View File
@@ -347,6 +347,12 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
default=False,
help="Specify whether object reconstruction will be used for this cluster."
)
@click.option(
"--metrics-export-port",
type=int,
default=8080,
help="the port to use to expose Ray metrics through a "
"Prometheus endpoint.")
def start(node_ip_address, redis_address, address, redis_port, port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port,
@@ -357,7 +363,7 @@ def start(node_ip_address, redis_address, address, redis_port, port,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, internal_config,
lru_evict, enable_object_reconstruction):
lru_evict, enable_object_reconstruction, metrics_export_port):
"""Start Ray processes manually on the local machine."""
if gcs_server_port and not head:
raise ValueError(
@@ -436,7 +442,8 @@ def start(node_ip_address, redis_address, address, redis_port, port,
load_code_from_local=load_code_from_local,
_internal_config=internal_config,
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction)
enable_object_reconstruction=enable_object_reconstruction,
metrics_export_port=metrics_export_port)
if head:
# Start Ray on the head node.
if redis_shard_ports is not None:
+7 -1
View File
@@ -1066,6 +1066,7 @@ def start_log_monitor(redis_address,
def start_reporter(redis_address,
port,
metrics_export_port,
stdout_file=None,
stderr_file=None,
redis_password=None,
@@ -1075,6 +1076,7 @@ def start_reporter(redis_address,
Args:
redis_address (str): The address of the Redis instance.
port(int): The port to bind the reporter process.
metrics_export_port(int): The port at which metrics are exposed to.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
@@ -1088,7 +1090,8 @@ def start_reporter(redis_address,
os.path.dirname(os.path.abspath(__file__)), "reporter.py")
command = [
sys.executable, "-u", reporter_filepath,
"--redis-address={}".format(redis_address), "--port={}".format(port)
"--redis-address={}".format(redis_address), "--port={}".format(port),
"--metrics-export-port={}".format(metrics_export_port)
]
if redis_password:
command += ["--redis-password", redis_password]
@@ -1259,6 +1262,7 @@ def start_raylet(redis_address,
object_manager_port=None,
redis_password=None,
metrics_agent_port=None,
metrics_export_port=None,
use_valgrind=False,
use_profiler=False,
stdout_file=None,
@@ -1296,6 +1300,7 @@ def start_raylet(redis_address,
on. If set, min_worker_port must also be set.
redis_password: The password to use when connecting to Redis.
metrics_agent_port(int): The port where metrics agent is bound to.
metrics_export_port(int): The port at which metrics are exposed to.
use_valgrind (bool): True if the raylet should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the raylet should be started inside
@@ -1403,6 +1408,7 @@ def start_raylet(redis_address,
"--temp_dir={}".format(temp_dir),
"--session_dir={}".format(session_dir),
"--metrics-agent-port={}".format(metrics_agent_port),
"--metrics_export_port={}".format(metrics_export_port),
]
if start_initial_python_workers_for_first_job:
command.append("--num_initial_python_workers_for_first_job={}".format(
+2 -1
View File
@@ -306,7 +306,8 @@ class GlobalState:
"NodeManagerPort": item.node_manager_port,
"ObjectManagerPort": item.object_manager_port,
"ObjectStoreSocketName": item.object_store_socket_name,
"RayletSocketName": item.raylet_socket_name
"RayletSocketName": item.raylet_socket_name,
"MetricsExportPort": item.metrics_export_port,
}
node_info["alive"] = node_info["Alive"]
node_info["Resources"] = self.node_resource_table(
+28
View File
@@ -387,6 +387,34 @@ def test_profiling_info_endpoint(shutdown_only):
assert profiling_stats is not None
def test_multi_node_metrics_export_port_discovery(ray_start_cluster):
NUM_NODES = 3
cluster = ray_start_cluster
nodes = [cluster.add_node() for _ in range(NUM_NODES)]
nodes = {
node.address_info["metrics_export_port"]: node.address_info
for node in nodes
}
cluster.wait_for_nodes()
ray.init(address=cluster.address)
node_info_list = ray.nodes()
for node_info in node_info_list:
metrics_export_port = node_info["MetricsExportPort"]
address_info = nodes[metrics_export_port]
assert (address_info["raylet_socket_name"] == node_info[
"RayletSocketName"])
# Make sure we can ping Prometheus endpoints.
def test_prometheus_endpoint():
response = requests.get(
"http://localhost:{}".format(metrics_export_port))
return response.status_code == 200
wait_until_succeeded_without_exception(
test_prometheus_endpoint, (requests.exceptions.ConnectionError, ))
# This variable is used inside test_memory_dashboard.
# It is defined as a global variable to be used across all nested test
# functions. We use it because memory table is updated every one second,
+14 -17
View File
@@ -499,7 +499,8 @@ def init(address=None,
use_pickle=True,
_internal_config=None,
lru_evict=False,
enable_object_reconstruction=False):
enable_object_reconstruction=False,
_metrics_export_port=None):
"""
Connect to an existing Ray cluster or start one and connect to it.
@@ -625,6 +626,9 @@ def init(address=None,
created the object. Arguments to the task will be recursively
reconstructed. If False, then ray.UnreconstructableError will be
thrown.
_metrics_export_port(int): Port number Ray exposes system metrics
through a Prometheus endpoint. It is currently under active
development, and the API is subject to change.
Returns:
Address information about the started processes.
@@ -719,7 +723,8 @@ def init(address=None,
start_initial_python_workers_for_first_job=True,
_internal_config=_internal_config,
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction)
enable_object_reconstruction=enable_object_reconstruction,
metrics_export_port=_metrics_export_port)
# Start the Ray processes. We set shutdown_at_exit=False because we
# shutdown the node in the ray.shutdown call that happens in the atexit
# handler. We still spawn a reaper process in case the atexit handler
@@ -793,7 +798,8 @@ def init(address=None,
load_code_from_local=load_code_from_local,
_internal_config=_internal_config,
lru_evict=lru_evict,
enable_object_reconstruction=enable_object_reconstruction)
enable_object_reconstruction=enable_object_reconstruction,
metrics_export_port=_metrics_export_port)
_global_node = ray.node.Node(
ray_params,
head=False,
@@ -1277,20 +1283,11 @@ def connect(node,
serialized_job_config = job_config.serialize()
worker.core_worker = ray._raylet.CoreWorker(
(mode == SCRIPT_MODE or mode == LOCAL_MODE),
node.plasma_store_socket_name,
node.raylet_socket_name,
job_id,
gcs_options,
node.get_logs_dir_path(),
node.node_ip_address,
node.node_manager_port,
node.raylet_ip_address,
(mode == LOCAL_MODE),
driver_name,
log_stdout_file_path,
log_stderr_file_path,
serialized_job_config,
)
node.plasma_store_socket_name, node.raylet_socket_name, job_id,
gcs_options, node.get_logs_dir_path(), node.node_ip_address,
node.node_manager_port, node.raylet_ip_address, (mode == LOCAL_MODE),
driver_name, log_stdout_file_path, log_stderr_file_path,
serialized_job_config, node.metrics_agent_port)
# Create an object for interfacing with the global state.
# Note, global state should be intialized after `CoreWorker`, because it