diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst index 0ca1e9a4e..8c2c0df8b 100644 --- a/doc/source/advanced.rst +++ b/doc/source/advanced.rst @@ -205,23 +205,26 @@ To get information about the current nodes in your cluster, you can use ``ray.no print(ray.nodes()) """ - [{'ClientID': 'a9e430719685f3862ed7ba411259d4138f8afb1e', - 'IsInsertion': True, - 'NodeManagerAddress': '192.168.19.108', - 'NodeManagerPort': 37428, - 'ObjectManagerPort': 43415, - 'ObjectStoreSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/plasma_store', - 'RayletSocketName': '/tmp/ray/session_2019-07-28_17-03-53_955034_24883/sockets/raylet', - 'Resources': {'CPU': 4.0}, - 'alive': True}] + [{'NodeID': '2691a0c1aed6f45e262b2372baf58871734332d7', + 'Alive': True, + 'NodeManagerAddress': '192.168.1.82', + 'NodeManagerHostname': 'host-MBP.attlocal.net', + 'NodeManagerPort': 58472, + 'ObjectManagerPort': 52383, + 'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/plasma_store', + 'RayletSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/raylet', + 'MetricsExportPort': 64860, + 'alive': True, + 'Resources': {'CPU': 16.0, 'memory': 100.0, 'object_store_memory': 34.0, 'node:192.168.1.82': 1.0}}] """ The above information includes: - - `ClientID`: A unique identifier for the raylet. + - `NodeID`: A unique identifier for the raylet. - `alive`: Whether the node is still alive. - `NodeManagerAddress`: PrivateIP of the node that the raylet is on. - `Resources`: The total resource capacity on the node. + - `MetricsExportPort`: The port number at which metrics are exposed to through a `Prometheus endpoint `_. Resource Information ~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/index.rst b/doc/source/index.rst index d932cfa7b..cf5a1bb8d 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -131,7 +131,6 @@ Academic Papers walkthrough.rst using-ray.rst configure.rst - ray-dashboard.rst Tutorial and Examples package-ref.rst @@ -208,6 +207,14 @@ Academic Papers pandas_on_ray.rst projects.rst +.. toctree:: + :hidden: + :maxdepth: -1 + :caption: Ray Observability + + ray-dashboard.rst + ray-metrics.rst + .. toctree:: :hidden: :maxdepth: -1 diff --git a/doc/source/ray-metrics.rst b/doc/source/ray-metrics.rst new file mode 100644 index 000000000..be4e71518 --- /dev/null +++ b/doc/source/ray-metrics.rst @@ -0,0 +1,122 @@ +Ray Monitoring with Prometheus +============================== +To help monitoring Ray applications, Ray + +- Collects Ray's pre-selected system level metrics. +- Exposes metrics in a Prometheus format. We'll call the endpoint to access these metrics a Prometheus endpoint. + +This page describes how to acces these metrics using Prometheus. + +.. note:: + + It is currently an experimental feature and under active development. APIs are subject to change. + +Getting Started (Single Node) +----------------------------- +Ray exposes its metrics in Prometheus format. This allows us to easily scrape them using Prometheus. + +Let's expose metrics through `ray start`. + +.. code-block:: bash + + ray start --head --metrics-export-port=8080 # Assign metrics export port on a head node. + +Now, you can scrape Ray's metrics using Prometheus. + +First, download Prometheus. `Download Link `_ + +.. code-block:: bash + + tar xvfz prometheus-*.tar.gz + cd prometheus-* + +Let's modify Prometheus's config file to scrape metrics from Prometheus endpoints. + +.. code-block:: yaml + + # prometheus.yml + global: + scrape_interval: 5s + evaluation_interval: 5s + + scrape_configs: + - job_name: prometheus + static_configs: + - targets: ['localhost:8080'] # This must be same as metrics_export_port + +Next, let's start Prometheus. + +.. code-block:: shell + + ./prometheus --config.file=./prometheus.yml + +Now, you can access Ray metrics from the default Prometheus url, `http://localhost:9090`. + +Getting Started (Multi-nodes) +----------------------------- +Let's now walk through how to import metrics from a Ray cluster. + +Ray runs a metrics agent per node. Each metrics agent collects metrics from a local node and exposes in a Prometheus format. +You can then scrape each endpoint to access Ray's metrics. + +At a head node, + +.. code-block:: bash + + ray start --head --metrics-export-port=8080 # Assign metrics export port on a head node. + +At a worker node, + +.. code-block:: bash + + ray start --address=[head_node_address] --metrics-export-port=8080 + +You can now get the url of metrics agents using `ray.nodes()` + +.. code-block:: python + + # In a head node, + import ray + ray.init(address='auto') + from pprint import pprint + pprint(ray.nodes()) + + """ + [{'Alive': True, + 'MetricsExportPort': 8080, + 'NodeID': '2f480984702a22556b90566bdac818a4a771e69a', + 'NodeManagerAddress': '192.168.1.82', + 'NodeManagerHostname': 'host2.attlocal.net', + 'NodeManagerPort': 61760, + 'ObjectManagerPort': 61454, + 'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/plasma_store', + 'RayletSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/raylet', + 'Resources': {'CPU': 1.0, + 'memory': 123.0, + 'node:192.168.1.82': 1.0, + 'object_store_memory': 2.0}, + 'alive': True}, + {'Alive': True, + 'MetricsExportPort': 8080, + 'NodeID': 'ce6f30a7e2ef58c8a6893b3df171bcd464b33c77', + 'NodeManagerAddress': '192.168.1.82', + 'NodeManagerHostname': 'host1.attlocal.net', + 'NodeManagerPort': 62052, + 'ObjectManagerPort': 61468, + 'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/plasma_store.1', + 'RayletSocketName': '/tmp/ray/session_2020-08-04_18-18-16_481195_34255/sockets/raylet.1', + 'Resources': {'CPU': 1.0, + 'memory': 134.0, + 'node:192.168.1.82': 1.0, + 'object_store_memory': 2.0}, + 'alive': True}] + """ + +Now, setup your prometheus to read metrics from `[NodeManagerAddress]:[MetricsExportPort]` from all nodes in the cluster. +If you'd like to make this process automated, you can also use `file based service discovery `_. +This will allow Prometheus to dynamically find endpoints it should scrape (service discovery). You can easily get all endpoints using `ray.nodes()` + +Getting Started (Cluster Launcher) +---------------------------------- +When you use a Ray cluster launcher, it is common node IP addresses are changing because cluster is scaling up and down. +In this case, you can use Prometheus' `file based service discovery `_. diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b4cf57eb0..9ff67222b 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -659,7 +659,7 @@ cdef class CoreWorker: JobID job_id, GcsClientOptions gcs_options, log_dir, node_ip_address, node_manager_port, raylet_ip_address, local_mode, driver_name, stdout_file, stderr_file, - serialized_job_config): + serialized_job_config, metrics_agent_port): self.is_driver = is_driver self.is_local_mode = local_mode @@ -690,6 +690,7 @@ cdef class CoreWorker: options.kill_main = kill_main_task options.terminate_asyncio_thread = terminate_asyncio_thread options.serialized_job_config = serialized_job_config + options.metrics_agent_port = metrics_agent_port CCoreWorkerProcess.Initialize(options) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index c495bd0e7..a52f3f044 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -226,6 +226,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: (c_bool() nogil) kill_main CCoreWorkerOptions() (void() nogil) terminate_asyncio_thread + int metrics_agent_port c_string serialized_job_config cdef cppclass CCoreWorkerProcess "ray::CoreWorkerProcess": diff --git a/python/ray/includes/metric.pxi b/python/ray/includes/metric.pxi index dec481197..94c78f502 100644 --- a/python/ray/includes/metric.pxi +++ b/python/ray/includes/metric.pxi @@ -62,7 +62,20 @@ cdef class Metric: cdef class Gauge(Metric): """Cython wrapper class of C++ `ray::stats::Gauge`. - Gauge: Keeps the last recorded value, drops everything before. + Gauge: Keeps the last recorded value, drops everything before. + + Example: + + >>> gauge = Gauge( + "ray.worker.metric", + "description", + "unit", + ["tagk1", "tagk2"]). + value = 5 + key1= "key1" + key2 = "key2" +s + gauge.record(value, {"tagk1": key1, "tagk2": key2}) """ def __init__(self, name, description, unit, tag_keys): """Create a gauge metric @@ -88,6 +101,19 @@ cdef class Gauge(Metric): cdef class Count(Metric): """Cython wrapper class of C++ `ray::stats::Count`. + Example: + + >>> count = Count( + "ray.worker.metric", + "description", + "unit", + ["tagk1", "tagk2"]). + value = 5 + key1= "key1" + key2 = "key2" + + count.record(value, {"tagk1": key1, "tagk2": key2}) + Count: The count of the number of metric points. """ def __init__(self, name, description, unit, tag_keys): @@ -114,6 +140,19 @@ cdef class Count(Metric): cdef class Sum(Metric): """Cython wrapper class of C++ `ray::stats::Sum`. + Example: + + >>> metric_sum = Sum( + "ray.worker.metric", + "description", + "unit", + ["tagk1", "tagk2"]). + value = 5 + key1= "key1" + key2 = "key2" + + metric_sum.record(value, {"tagk1": key1, "tagk2": key2}) + Sum: A sum up of the metric points. """ def __init__(self, name, description, unit, tag_keys): @@ -141,6 +180,19 @@ cdef class Sum(Metric): cdef class Histogram(Metric): """Cython wrapper class of C++ `ray::stats::Histogram`. + Example: + + >>> histogram = Histogram( + "ray.worker.histogram1", + "desciprtion", + "unit", + [1.0, 2.0], # boundaries. + ["tagk1"]) + value = 5 + key1= "key1" + + histogram.record(value, {"tagk1": key1}) + Histogram: Histogram distribution of metric points. """ def __init__(self, name, description, unit, boundaries, tag_keys): diff --git a/python/ray/node.py b/python/ray/node.py index f35e2e243..880cdbeca 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -103,12 +103,17 @@ class Node: head), "LRU Evict can only be passed into the head node." self._raylet_ip_address = raylet_ip_address + self.metrics_agent_port = self._get_unused_port()[0] + self._metrics_export_port = ray_params.metrics_export_port + if self._metrics_export_port is None: + self._metrics_export_port = self._get_unused_port()[0] ray_params.update_if_absent( include_log_monitor=True, resources={}, temp_dir=ray.utils.get_ray_temp_dir(), - metrics_agent_port=self._get_unused_port()[0], + metrics_agent_port=self.metrics_agent_port, + metrics_export_port=self._metrics_export_port, worker_path=os.path.join( os.path.dirname(os.path.abspath(__file__)), "workers/default_worker.py")) @@ -318,6 +323,11 @@ class Node: """Get the node manager's port.""" return self._ray_params.node_manager_port + @property + def metrics_export_port(self): + """Get the port that exposes metrics""" + return self._metrics_export_port + @property def socket(self): """Get the socket reserving the node manager's port""" @@ -337,6 +347,7 @@ class Node: "raylet_socket_name": self._raylet_socket_name, "webui_url": self._webui_url, "session_dir": self._session_dir, + "metrics_export_port": self._metrics_export_port } def create_redis_client(self): @@ -561,9 +572,11 @@ class Node: """Start the reporter.""" stdout_file, stderr_file = self.get_log_file_handles( "reporter", unique=True) + process_info = ray.services.start_reporter( self.redis_address, self._ray_params.metrics_agent_port, + self._metrics_export_port, stdout_file=stdout_file, stderr_file=stderr_file, redis_password=self._ray_params.redis_password, @@ -667,6 +680,7 @@ class Node: self._ray_params.object_manager_port, self._ray_params.redis_password, self._ray_params.metrics_agent_port, + self._metrics_export_port, use_valgrind=use_valgrind, use_profiler=use_profiler, stdout_file=stdout_file, diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 7cc55f56b..07e36db26 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -89,6 +89,8 @@ class RayParams: java_worker_options (list): The command options for Java worker. load_code_from_local: Whether load code from local file or from GCS. metrics_agent_port(int): The port to bind metrics agent. + metrics_export_port(int): The port at which metrics are exposed + through a Prometheus endpoint. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. lru_evict (bool): Enable LRU eviction if space is needed. @@ -142,6 +144,7 @@ class RayParams: _internal_config=None, enable_object_reconstruction=False, metrics_agent_port=None, + metrics_export_port=None, lru_evict=False): self.object_ref_seed = object_ref_seed self.redis_address = redis_address @@ -181,6 +184,7 @@ class RayParams: self.java_worker_options = java_worker_options self.load_code_from_local = load_code_from_local self.metrics_agent_port = metrics_agent_port + self.metrics_export_port = metrics_export_port self.start_initial_python_workers_for_first_job = ( start_initial_python_workers_for_first_job) self._internal_config = _internal_config diff --git a/python/ray/reporter.py b/python/ray/reporter.py index 544d2cc8a..0ef81d10d 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -19,7 +19,6 @@ import ray.services import ray.utils from ray.core.generated import reporter_pb2 from ray.core.generated import reporter_pb2_grpc -from ray.dashboard.util import get_unused_port from ray.metrics_agent import MetricsAgent # Logger for this module. It should be configured at the entry point @@ -116,16 +115,17 @@ class Reporter: redis_client: A client used to communicate with the Redis server. """ - def __init__(self, redis_address, port, redis_password=None): + def __init__(self, + redis_address, + port, + metrics_export_port, + redis_password=None): """Initialize the reporter object.""" self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False)) self.ip = ray.services.get_node_ip_address() self.hostname = platform.node() self.port = port - metrics_agent_port = os.getenv("METRICS_AGENT_PORT") - if not metrics_agent_port: - metrics_agent_port = get_unused_port() - self.metrics_agent = MetricsAgent(metrics_agent_port) + self.metrics_agent = MetricsAgent(metrics_export_port) self.reporter_grpc_server = ReporterServer(self.metrics_agent) _ = psutil.cpu_percent() # For initialization @@ -282,6 +282,11 @@ if __name__ == "__main__": required=True, type=int, help="The port to bind the reporter process.") + parser.add_argument( + "--metrics-export-port", + required=True, + type=int, + help="The port to expose metrics through Prometheus.") parser.add_argument( "--redis-password", required=False, @@ -305,7 +310,10 @@ if __name__ == "__main__": ray.utils.setup_logger(args.logging_level, args.logging_format) reporter = Reporter( - args.redis_address, args.port, redis_password=args.redis_password) + args.redis_address, + args.port, + args.metrics_export_port, + redis_password=args.redis_password) try: reporter.run() diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index b8f37688d..c0edd4419 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -347,6 +347,12 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): default=False, help="Specify whether object reconstruction will be used for this cluster." ) +@click.option( + "--metrics-export-port", + type=int, + default=8080, + help="the port to use to expose Ray metrics through a " + "Prometheus endpoint.") def start(node_ip_address, redis_address, address, redis_port, port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, @@ -357,7 +363,7 @@ def start(node_ip_address, redis_address, address, redis_port, port, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, java_worker_options, load_code_from_local, internal_config, - lru_evict, enable_object_reconstruction): + lru_evict, enable_object_reconstruction, metrics_export_port): """Start Ray processes manually on the local machine.""" if gcs_server_port and not head: raise ValueError( @@ -436,7 +442,8 @@ def start(node_ip_address, redis_address, address, redis_port, port, load_code_from_local=load_code_from_local, _internal_config=internal_config, lru_evict=lru_evict, - enable_object_reconstruction=enable_object_reconstruction) + enable_object_reconstruction=enable_object_reconstruction, + metrics_export_port=metrics_export_port) if head: # Start Ray on the head node. if redis_shard_ports is not None: diff --git a/python/ray/services.py b/python/ray/services.py index 4b40bd282..8d9b5afee 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1066,6 +1066,7 @@ def start_log_monitor(redis_address, def start_reporter(redis_address, port, + metrics_export_port, stdout_file=None, stderr_file=None, redis_password=None, @@ -1075,6 +1076,7 @@ def start_reporter(redis_address, Args: redis_address (str): The address of the Redis instance. port(int): The port to bind the reporter process. + metrics_export_port(int): The port at which metrics are exposed to. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If @@ -1088,7 +1090,8 @@ def start_reporter(redis_address, os.path.dirname(os.path.abspath(__file__)), "reporter.py") command = [ sys.executable, "-u", reporter_filepath, - "--redis-address={}".format(redis_address), "--port={}".format(port) + "--redis-address={}".format(redis_address), "--port={}".format(port), + "--metrics-export-port={}".format(metrics_export_port) ] if redis_password: command += ["--redis-password", redis_password] @@ -1259,6 +1262,7 @@ def start_raylet(redis_address, object_manager_port=None, redis_password=None, metrics_agent_port=None, + metrics_export_port=None, use_valgrind=False, use_profiler=False, stdout_file=None, @@ -1296,6 +1300,7 @@ def start_raylet(redis_address, on. If set, min_worker_port must also be set. redis_password: The password to use when connecting to Redis. metrics_agent_port(int): The port where metrics agent is bound to. + metrics_export_port(int): The port at which metrics are exposed to. use_valgrind (bool): True if the raylet should be started inside of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the raylet should be started inside @@ -1403,6 +1408,7 @@ def start_raylet(redis_address, "--temp_dir={}".format(temp_dir), "--session_dir={}".format(session_dir), "--metrics-agent-port={}".format(metrics_agent_port), + "--metrics_export_port={}".format(metrics_export_port), ] if start_initial_python_workers_for_first_job: command.append("--num_initial_python_workers_for_first_job={}".format( diff --git a/python/ray/state.py b/python/ray/state.py index f9a1ddaef..5ddf8af81 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -306,7 +306,8 @@ class GlobalState: "NodeManagerPort": item.node_manager_port, "ObjectManagerPort": item.object_manager_port, "ObjectStoreSocketName": item.object_store_socket_name, - "RayletSocketName": item.raylet_socket_name + "RayletSocketName": item.raylet_socket_name, + "MetricsExportPort": item.metrics_export_port, } node_info["alive"] = node_info["Alive"] node_info["Resources"] = self.node_resource_table( diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 81b4d5363..c7ffe745c 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -387,6 +387,34 @@ def test_profiling_info_endpoint(shutdown_only): assert profiling_stats is not None +def test_multi_node_metrics_export_port_discovery(ray_start_cluster): + NUM_NODES = 3 + cluster = ray_start_cluster + nodes = [cluster.add_node() for _ in range(NUM_NODES)] + nodes = { + node.address_info["metrics_export_port"]: node.address_info + for node in nodes + } + cluster.wait_for_nodes() + ray.init(address=cluster.address) + node_info_list = ray.nodes() + + for node_info in node_info_list: + metrics_export_port = node_info["MetricsExportPort"] + address_info = nodes[metrics_export_port] + assert (address_info["raylet_socket_name"] == node_info[ + "RayletSocketName"]) + + # Make sure we can ping Prometheus endpoints. + def test_prometheus_endpoint(): + response = requests.get( + "http://localhost:{}".format(metrics_export_port)) + return response.status_code == 200 + + wait_until_succeeded_without_exception( + test_prometheus_endpoint, (requests.exceptions.ConnectionError, )) + + # This variable is used inside test_memory_dashboard. # It is defined as a global variable to be used across all nested test # functions. We use it because memory table is updated every one second, diff --git a/python/ray/worker.py b/python/ray/worker.py index 03f6222f8..6861de455 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -499,7 +499,8 @@ def init(address=None, use_pickle=True, _internal_config=None, lru_evict=False, - enable_object_reconstruction=False): + enable_object_reconstruction=False, + _metrics_export_port=None): """ Connect to an existing Ray cluster or start one and connect to it. @@ -625,6 +626,9 @@ def init(address=None, created the object. Arguments to the task will be recursively reconstructed. If False, then ray.UnreconstructableError will be thrown. + _metrics_export_port(int): Port number Ray exposes system metrics + through a Prometheus endpoint. It is currently under active + development, and the API is subject to change. Returns: Address information about the started processes. @@ -719,7 +723,8 @@ def init(address=None, start_initial_python_workers_for_first_job=True, _internal_config=_internal_config, lru_evict=lru_evict, - enable_object_reconstruction=enable_object_reconstruction) + enable_object_reconstruction=enable_object_reconstruction, + metrics_export_port=_metrics_export_port) # Start the Ray processes. We set shutdown_at_exit=False because we # shutdown the node in the ray.shutdown call that happens in the atexit # handler. We still spawn a reaper process in case the atexit handler @@ -793,7 +798,8 @@ def init(address=None, load_code_from_local=load_code_from_local, _internal_config=_internal_config, lru_evict=lru_evict, - enable_object_reconstruction=enable_object_reconstruction) + enable_object_reconstruction=enable_object_reconstruction, + metrics_export_port=_metrics_export_port) _global_node = ray.node.Node( ray_params, head=False, @@ -1277,20 +1283,11 @@ def connect(node, serialized_job_config = job_config.serialize() worker.core_worker = ray._raylet.CoreWorker( (mode == SCRIPT_MODE or mode == LOCAL_MODE), - node.plasma_store_socket_name, - node.raylet_socket_name, - job_id, - gcs_options, - node.get_logs_dir_path(), - node.node_ip_address, - node.node_manager_port, - node.raylet_ip_address, - (mode == LOCAL_MODE), - driver_name, - log_stdout_file_path, - log_stderr_file_path, - serialized_job_config, - ) + node.plasma_store_socket_name, node.raylet_socket_name, job_id, + gcs_options, node.get_logs_dir_path(), node.node_ip_address, + node.node_manager_port, node.raylet_ip_address, (mode == LOCAL_MODE), + driver_name, log_stdout_file_path, log_stderr_file_path, + serialized_job_config, node.metrics_agent_port) # Create an object for interfacing with the global state. # Note, global state should be intialized after `CoreWorker`, because it diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index ebb4c6657..13bdf4cc8 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -325,10 +325,6 @@ RAY_CONFIG(int64_t, enable_metrics_collection, true) /// Whether start the Plasma Store as a Raylet thread. RAY_CONFIG(bool, put_small_object_in_memory_store, false) -/// Metric agent port for reporting, default -1 means no such agent will be -/// listening. -RAY_CONFIG(int, metrics_agent_port, -1) - /// Maximum number of tasks that can be in flight between an owner and a worker for which /// the owner has been granted a lease. A value >1 is used when we want to enable /// pipelining task submission. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 638cfdc98..6066582ae 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -141,7 +141,7 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options) // NOTE(lingxuan.zlx): We assume RayConfig is initialized before it's used. // RayConfig is generated in Java_io_ray_runtime_RayNativeRuntime_nativeInitialize // for java worker or in constructor of CoreWorker for python worker. - ray::stats::Init(global_tags, RayConfig::instance().metrics_agent_port()); + ray::stats::Init(global_tags, options_.metrics_agent_port); } CoreWorkerProcess::~CoreWorkerProcess() { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 929064e70..f4d13ddd8 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -121,6 +121,9 @@ struct CoreWorkerOptions { std::function terminate_asyncio_thread; /// Serialized representation of JobConfig. std::string serialized_job_config; + /// The port number of a metrics agent that imports metrics from core workers. + /// -1 means there's no such agent. + int metrics_agent_port; }; /// Lifecycle management of one or more `CoreWorker` instances in a process. diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index 602913fbd..26a2f7dba 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -198,7 +198,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( RAY_LOG(INFO) << "Calling System.gc() ..."; env->CallStaticObjectMethod(java_system_class, java_system_gc); last_gc_time_ms = current_time_ms(); - RAY_LOG(INFO) << "GC finished in " << (double) (last_gc_time_ms - start) / 1000 + RAY_LOG(INFO) << "GC finished in " << (double)(last_gc_time_ms - start) / 1000 << " seconds."; } }; @@ -232,6 +232,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( static_cast(numWorkersPerProcess), // num_workers nullptr, // terminate_asyncio_thread serialized_job_config, // serialized_job_config + -1, // metrics_agent_port }; ray::CoreWorkerProcess::Initialize(options); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 4f3b0a7d7..0c2e2a942 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -163,6 +163,9 @@ class CoreWorkerTest : public ::testing::Test { true, // ref_counting_enabled false, // is_local_mode 1, // num_workers + nullptr, // terminate_asyncio_thread + "", // serialized_job_config + -1, // metrics_agent_port }; CoreWorkerProcess::Initialize(options); } diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index def1838b0..cf4cc26c2 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -59,6 +59,9 @@ class MockWorker { true, // ref_counting_enabled false, // is_local_mode 1, // num_workers + nullptr, // terminate_asyncio_thread + "", // serialized_job_config + -1, // metrics_agent_port }; CoreWorkerProcess::Initialize(options); } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 35d1fb37a..12eb6cd79 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -249,6 +249,9 @@ message GcsNodeInfo { // The Hostname address of the node manager. string node_manager_hostname = 8; + + // The port at which the node will expose metrics to. + int32 metrics_export_port = 9; } // Represents the demand for a particular resource shape. diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 0054b2a86..ac66bf2c4 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -28,6 +28,7 @@ DEFINE_string(store_socket_name, "", "The socket name of object store."); DEFINE_int32(object_manager_port, -1, "The port of object manager."); DEFINE_int32(node_manager_port, -1, "The port of node manager."); DEFINE_int32(metrics_agent_port, -1, "The port of metrics agent."); +DEFINE_int32(metrics_export_port, 1, "Maximum startup concurrency"); DEFINE_string(node_ip_address, "", "The ip address of this node."); DEFINE_string(redis_address, "", "The ip address of redis server."); DEFINE_int32(redis_port, -1, "The port of redis server."); @@ -88,6 +89,7 @@ int main(int argc, char *argv[]) { const int64_t object_store_memory = FLAGS_object_store_memory; const std::string plasma_directory = FLAGS_plasma_directory; const bool huge_pages = FLAGS_huge_pages; + const int metrics_export_port = FLAGS_metrics_export_port; gflags::ShutDownCommandLineFlags(); // Configuration for the node manager. @@ -231,7 +233,8 @@ int main(int argc, char *argv[]) { // Initialize the node manager. server.reset(new ray::raylet::Raylet( main_service, raylet_socket_name, node_ip_address, redis_address, redis_port, - redis_password, node_manager_config, object_manager_config, gcs_client)); + redis_password, node_manager_config, object_manager_config, gcs_client, + metrics_export_port)); server->Start(); })); diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 34995575a..b0977c4f7 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -59,7 +59,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ int redis_port, const std::string &redis_password, const NodeManagerConfig &node_manager_config, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client) + std::shared_ptr gcs_client, int metrics_export_port) : self_node_id_(ClientID::FromRandom()), gcs_client_(gcs_client), object_directory_(std::make_shared(main_service, gcs_client_)), @@ -78,6 +78,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ self_node_info_.set_object_manager_port(object_manager_.GetServerPort()); self_node_info_.set_node_manager_port(node_manager_.GetServerPort()); self_node_info_.set_node_manager_hostname(boost::asio::ip::host_name()); + self_node_info_.set_metrics_export_port(metrics_export_port); } Raylet::~Raylet() {} diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 3f4140ce2..938b73c8c 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -49,12 +49,13 @@ class Raylet { /// \param object_manager_config Configuration to initialize the object /// manager. /// \param gcs_client A client connection to the GCS. + /// \param metrics_export_port A port at which metrics are exposed to. Raylet(boost::asio::io_service &main_service, const std::string &socket_name, const std::string &node_ip_address, const std::string &redis_address, int redis_port, const std::string &redis_password, const NodeManagerConfig &node_manager_config, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client); + std::shared_ptr gcs_client, int metrics_export_port); /// Start this raylet. void Start(); diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index ce3e2abc5..4055bb20f 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -318,6 +318,9 @@ class StreamingWorker { true, // ref_counting_enabled false, // is_local_mode 1, // num_workers + nullptr, // terminate_asyncio_thread + "", // serialized_job_config + -1, // metrics_agent_port }; CoreWorkerProcess::Initialize(options); diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index beec76064..920e24d55 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -248,6 +248,9 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { true, // ref_counting_enabled false, // is_local_mode 1, // num_workers + nullptr, // terminate_asyncio_thread + "", // serialized_job_config + -1, // metrics_agent_port }; InitShutdownRAII core_worker_raii(CoreWorkerProcess::Initialize, CoreWorkerProcess::Shutdown, options);