From 0ff24ec8dcc4b744981bfd544e70bc748d34c4e4 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 24 Jun 2020 18:22:03 -0700 Subject: [PATCH] Add "ray status" debug tool for autoscaler. (#9091) --- python/ray/autoscaler/autoscaler.py | 24 ++++++++++++----- python/ray/autoscaler/commands.py | 38 ++++++++++++++++++++++++--- python/ray/autoscaler/load_metrics.py | 5 ++-- python/ray/autoscaler/updater.py | 3 +-- python/ray/autoscaler/util.py | 4 +++ python/ray/monitor.py | 4 +++ python/ray/scripts/scripts.py | 23 +++++++++++++--- python/ray/tests/test_autoscaler.py | 6 ++--- 8 files changed, 87 insertions(+), 20 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index cf43c5972..f401e7885 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -10,6 +10,8 @@ import threading import time import yaml +from ray.experimental.internal_kv import _internal_kv_put, \ + _internal_kv_initialized from ray.autoscaler.node_provider import get_node_provider from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, @@ -17,7 +19,8 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG, from ray.autoscaler.updater import NodeUpdaterThread from ray.autoscaler.node_launcher import NodeLauncher from ray.autoscaler.util import ConcurrentCounter, validate_config, \ - with_head_node_ip, hash_launch_conf, hash_runtime_conf + with_head_node_ip, hash_launch_conf, hash_runtime_conf, \ + DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \ AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \ AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \ @@ -111,6 +114,9 @@ class StandardAutoscaler: except Exception as e: logger.exception("StandardAutoscaler: " "Error during autoscaling.") + if _internal_kv_initialized(): + _internal_kv_put( + DEBUG_AUTOSCALING_ERROR, str(e), overwrite=True) self.num_failures += 1 if self.num_failures > self.max_failures: logger.critical("StandardAutoscaler: " @@ -184,7 +190,6 @@ class StandardAutoscaler: nodes = self.workers() self.log_info_string(nodes, target_workers) elif self.load_metrics.num_workers_connected() >= target_workers: - logger.info("Ending bringup phase") self.bringup = False self.log_info_string(nodes, target_workers) @@ -387,9 +392,14 @@ class StandardAutoscaler: tag_filters={TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER}) def log_info_string(self, nodes, target): - logger.info("StandardAutoscaler: {}".format( - self.info_string(nodes, target))) - logger.info("LoadMetrics: {}".format(self.load_metrics.info_string())) + tmp = "Cluster status: " + tmp += self.info_string(nodes, target) + tmp += "\n" + tmp += self.load_metrics.info_string() + tmp += "\n" + if _internal_kv_initialized(): + _internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True) + logger.info(tmp) def info_string(self, nodes, target): suffix = "" @@ -410,8 +420,8 @@ class StandardAutoscaler: self.resource_requests[resource] = max( self.resource_requests[resource], count) - logger.info("StandardAutoscaler: resource_requests={}".format( - self.resource_requests)) + logger.info( + "StandardAutoscaler: resource_requests={}".format(resources)) def kill_workers(self): logger.error("StandardAutoscaler: kill_workers triggered") diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 77ce7f95a..c491392cd 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -15,17 +15,46 @@ try: # py3 except ImportError: # py2 from pipes import quote +from ray.experimental.internal_kv import _internal_kv_get +import ray.services as services from ray.autoscaler.util import validate_config, hash_runtime_conf, \ - hash_launch_conf, prepare_config + hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \ + DEBUG_AUTOSCALING_STATUS from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD from ray.autoscaler.updater import NodeUpdaterThread from ray.autoscaler.log_timer import LogTimer from ray.autoscaler.docker import with_docker_exec +from ray.worker import global_worker logger = logging.getLogger(__name__) +redis_client = None + + +def _redis(): + global redis_client + if redis_client is None: + redis_client = services.create_redis_client( + global_worker.node.redis_address, + password=global_worker.node.redis_password) + return redis_client + + +def debug_status(): + """Return a debug string for the autoscaler.""" + status = _internal_kv_get(DEBUG_AUTOSCALING_STATUS) + error = _internal_kv_get(DEBUG_AUTOSCALING_ERROR) + if not status: + status = "No cluster status." + else: + status = status.decode("utf-8") + if error: + status += "\n" + status += error.decode("utf-8") + return status + def create_or_update_cluster(config_file, override_min_workers, override_max_workers, no_restart, restart_only, @@ -80,8 +109,11 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name, confirm("This will destroy your cluster", yes) if not workers_only: - exec_cluster(config_file, "ray stop", False, False, False, False, - False, override_cluster_name, None, False) + try: + exec_cluster(config_file, "ray stop", False, False, False, False, + False, override_cluster_name, None, False) + except Exception: + logger.exception("Ignoring error attempting a clean shutdown.") provider = get_node_provider(config["provider"], config["cluster_name"]) try: diff --git a/python/ray/autoscaler/load_metrics.py b/python/ray/autoscaler/load_metrics.py index 23c96f9fd..cabbc88fe 100644 --- a/python/ray/autoscaler/load_metrics.py +++ b/python/ray/autoscaler/load_metrics.py @@ -128,8 +128,8 @@ class LoadMetrics: return nodes_used, resources_used, resources_total def info_string(self): - return ", ".join( - ["{}={}".format(k, v) for k, v in sorted(self._info().items())]) + return " - " + "\n - ".join( + ["{}: {}".format(k, v) for k, v in sorted(self._info().items())]) def _info(self): nodes_used, resources_used, resources_total = self.get_resource_usage() @@ -160,6 +160,7 @@ class LoadMetrics: format_resource(rid, resources_used[rid]), format_resource(rid, resources_total[rid]), rid) for rid in sorted(resources_used) + if not rid.startswith("node:") ]), "NumNodesConnected": len(self.static_resources_by_ip), "NumNodesUsed": round(nodes_used, 2), diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 07e36ca37..dd6b58ea4 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -251,8 +251,7 @@ class SSHCommandRunner: ] if cmd: logger.info(self.log_prefix + - "Running {} on {}...".format(cmd, self.ssh_ip)) - logger.info("Begin remote output from {}".format(self.ssh_ip)) + "Running {}".format(" ".join(final_cmd))) final_cmd += with_interactive(cmd) else: # We do this because `-o ControlMaster` causes the `-N` flag to diff --git a/python/ray/autoscaler/util.py b/python/ray/autoscaler/util.py index 1febddbb1..13ae5262f 100644 --- a/python/ray/autoscaler/util.py +++ b/python/ray/autoscaler/util.py @@ -14,6 +14,10 @@ REQUIRED, OPTIONAL = True, False RAY_SCHEMA_PATH = os.path.join( os.path.dirname(ray.autoscaler.__file__), "ray-schema.json") +# Internal kv keys for storing debug status. +DEBUG_AUTOSCALING_ERROR = "__autoscaling_error" +DEBUG_AUTOSCALING_STATUS = "__autoscaling_status" + class ConcurrentCounter: def __init__(self): diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 8f18250d2..472f9e496 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -37,6 +37,10 @@ class Monitor: redis_address, redis_password=redis_password) self.redis = ray.services.create_redis_client( redis_address, password=redis_password) + # Set the redis client and mode so _internal_kv works for autoscaler. + worker = ray.worker.global_worker + worker.redis_client = self.redis + worker.mode = 0 # Setup subscriptions to the primary Redis server and the Redis shards. self.primary_subscribe_client = self.redis.pubsub( ignore_subscribe_messages=True) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 332510a2d..99ce78807 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -15,7 +15,8 @@ import psutil import ray.services as services from ray.autoscaler.commands import ( attach_cluster, exec_cluster, create_or_update_cluster, monitor_cluster, - rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips) + rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips, + debug_status) import ray.ray_constants as ray_constants import ray.utils from ray.projects.scripts import project_cli, session_cli @@ -1055,7 +1056,7 @@ def timeline(address): required=False, type=str, help="Override the address to connect to.") -def stat(address): +def statistics(address): """Get the current metrics protobuf from a Ray cluster (developer tool).""" if not address: address = services.find_redis_address_or_die() @@ -1094,6 +1095,21 @@ def memory(address): print(ray.internal.internal_api.memory_summary()) +@cli.command() +@click.option( + "--address", + required=False, + type=str, + help="Override the address to connect to.") +def status(address): + """Print cluster status, including autoscaling info.""" + if not address: + address = services.find_redis_address_or_die() + logger.info("Connecting to Ray instance at {}.".format(address)) + ray.init(address=address) + print(debug_status()) + + @cli.command() @click.option( "--address", @@ -1132,7 +1148,8 @@ add_command_alias(get_head_ip, name="get_head_ip", hidden=True) cli.add_command(get_worker_ips) cli.add_command(microbenchmark) cli.add_command(stack) -cli.add_command(stat) +cli.add_command(statistics) +cli.add_command(status) cli.add_command(memory) cli.add_command(globalgc) cli.add_command(timeline) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 8d7046d6f..5cca474af 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -260,11 +260,11 @@ class LoadMetricsTest(unittest.TestCase): "object_store_memory": 20 }, {}) debug = lm.info_string() - assert ("ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU, " + assert ("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU, " "1.05 GiB/1.05 GiB memory, " "1.05 GiB/2.1 GiB object_store_memory") in debug - assert "NumNodesConnected=3" in debug - assert "NumNodesUsed=2.88" in debug + assert "NumNodesConnected: 3" in debug + assert "NumNodesUsed: 2.88" in debug class AutoscalingTest(unittest.TestCase):