Add "ray status" debug tool for autoscaler. (#9091)

This commit is contained in:
Eric Liang
2020-06-24 18:22:03 -07:00
committed by GitHub
parent 80bcbe20c7
commit 0ff24ec8dc
8 changed files with 87 additions and 20 deletions
+17 -7
View File
@@ -10,6 +10,8 @@ import threading
import time
import yaml
from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
from ray.autoscaler.node_provider import get_node_provider
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE,
@@ -17,7 +19,8 @@ from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
from ray.autoscaler.updater import NodeUpdaterThread
from ray.autoscaler.node_launcher import NodeLauncher
from ray.autoscaler.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_LAUNCH_BATCH, AUTOSCALER_MAX_CONCURRENT_LAUNCHES, \
AUTOSCALER_UPDATE_INTERVAL_S, AUTOSCALER_HEARTBEAT_TIMEOUT_S, \
@@ -111,6 +114,9 @@ class StandardAutoscaler:
except Exception as e:
logger.exception("StandardAutoscaler: "
"Error during autoscaling.")
if _internal_kv_initialized():
_internal_kv_put(
DEBUG_AUTOSCALING_ERROR, str(e), overwrite=True)
self.num_failures += 1
if self.num_failures > self.max_failures:
logger.critical("StandardAutoscaler: "
@@ -184,7 +190,6 @@ class StandardAutoscaler:
nodes = self.workers()
self.log_info_string(nodes, target_workers)
elif self.load_metrics.num_workers_connected() >= target_workers:
logger.info("Ending bringup phase")
self.bringup = False
self.log_info_string(nodes, target_workers)
@@ -387,9 +392,14 @@ class StandardAutoscaler:
tag_filters={TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER})
def log_info_string(self, nodes, target):
logger.info("StandardAutoscaler: {}".format(
self.info_string(nodes, target)))
logger.info("LoadMetrics: {}".format(self.load_metrics.info_string()))
tmp = "Cluster status: "
tmp += self.info_string(nodes, target)
tmp += "\n"
tmp += self.load_metrics.info_string()
tmp += "\n"
if _internal_kv_initialized():
_internal_kv_put(DEBUG_AUTOSCALING_STATUS, tmp, overwrite=True)
logger.info(tmp)
def info_string(self, nodes, target):
suffix = ""
@@ -410,8 +420,8 @@ class StandardAutoscaler:
self.resource_requests[resource] = max(
self.resource_requests[resource], count)
logger.info("StandardAutoscaler: resource_requests={}".format(
self.resource_requests))
logger.info(
"StandardAutoscaler: resource_requests={}".format(resources))
def kill_workers(self):
logger.error("StandardAutoscaler: kill_workers triggered")
+35 -3
View File
@@ -15,17 +15,46 @@ try: # py3
except ImportError: # py2
from pipes import quote
from ray.experimental.internal_kv import _internal_kv_get
import ray.services as services
from ray.autoscaler.util import validate_config, hash_runtime_conf, \
hash_launch_conf, prepare_config
hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \
DEBUG_AUTOSCALING_STATUS
from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
TAG_RAY_NODE_NAME, NODE_TYPE_WORKER, NODE_TYPE_HEAD
from ray.autoscaler.updater import NodeUpdaterThread
from ray.autoscaler.log_timer import LogTimer
from ray.autoscaler.docker import with_docker_exec
from ray.worker import global_worker
logger = logging.getLogger(__name__)
redis_client = None
def _redis():
global redis_client
if redis_client is None:
redis_client = services.create_redis_client(
global_worker.node.redis_address,
password=global_worker.node.redis_password)
return redis_client
def debug_status():
"""Return a debug string for the autoscaler."""
status = _internal_kv_get(DEBUG_AUTOSCALING_STATUS)
error = _internal_kv_get(DEBUG_AUTOSCALING_ERROR)
if not status:
status = "No cluster status."
else:
status = status.decode("utf-8")
if error:
status += "\n"
status += error.decode("utf-8")
return status
def create_or_update_cluster(config_file, override_min_workers,
override_max_workers, no_restart, restart_only,
@@ -80,8 +109,11 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name,
confirm("This will destroy your cluster", yes)
if not workers_only:
exec_cluster(config_file, "ray stop", False, False, False, False,
False, override_cluster_name, None, False)
try:
exec_cluster(config_file, "ray stop", False, False, False, False,
False, override_cluster_name, None, False)
except Exception:
logger.exception("Ignoring error attempting a clean shutdown.")
provider = get_node_provider(config["provider"], config["cluster_name"])
try:
+3 -2
View File
@@ -128,8 +128,8 @@ class LoadMetrics:
return nodes_used, resources_used, resources_total
def info_string(self):
return ", ".join(
["{}={}".format(k, v) for k, v in sorted(self._info().items())])
return " - " + "\n - ".join(
["{}: {}".format(k, v) for k, v in sorted(self._info().items())])
def _info(self):
nodes_used, resources_used, resources_total = self.get_resource_usage()
@@ -160,6 +160,7 @@ class LoadMetrics:
format_resource(rid, resources_used[rid]),
format_resource(rid, resources_total[rid]), rid)
for rid in sorted(resources_used)
if not rid.startswith("node:")
]),
"NumNodesConnected": len(self.static_resources_by_ip),
"NumNodesUsed": round(nodes_used, 2),
+1 -2
View File
@@ -251,8 +251,7 @@ class SSHCommandRunner:
]
if cmd:
logger.info(self.log_prefix +
"Running {} on {}...".format(cmd, self.ssh_ip))
logger.info("Begin remote output from {}".format(self.ssh_ip))
"Running {}".format(" ".join(final_cmd)))
final_cmd += with_interactive(cmd)
else:
# We do this because `-o ControlMaster` causes the `-N` flag to
+4
View File
@@ -14,6 +14,10 @@ REQUIRED, OPTIONAL = True, False
RAY_SCHEMA_PATH = os.path.join(
os.path.dirname(ray.autoscaler.__file__), "ray-schema.json")
# Internal kv keys for storing debug status.
DEBUG_AUTOSCALING_ERROR = "__autoscaling_error"
DEBUG_AUTOSCALING_STATUS = "__autoscaling_status"
class ConcurrentCounter:
def __init__(self):
+4
View File
@@ -37,6 +37,10 @@ class Monitor:
redis_address, redis_password=redis_password)
self.redis = ray.services.create_redis_client(
redis_address, password=redis_password)
# Set the redis client and mode so _internal_kv works for autoscaler.
worker = ray.worker.global_worker
worker.redis_client = self.redis
worker.mode = 0
# Setup subscriptions to the primary Redis server and the Redis shards.
self.primary_subscribe_client = self.redis.pubsub(
ignore_subscribe_messages=True)
+20 -3
View File
@@ -15,7 +15,8 @@ import psutil
import ray.services as services
from ray.autoscaler.commands import (
attach_cluster, exec_cluster, create_or_update_cluster, monitor_cluster,
rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips)
rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips,
debug_status)
import ray.ray_constants as ray_constants
import ray.utils
from ray.projects.scripts import project_cli, session_cli
@@ -1055,7 +1056,7 @@ def timeline(address):
required=False,
type=str,
help="Override the address to connect to.")
def stat(address):
def statistics(address):
"""Get the current metrics protobuf from a Ray cluster (developer tool)."""
if not address:
address = services.find_redis_address_or_die()
@@ -1094,6 +1095,21 @@ def memory(address):
print(ray.internal.internal_api.memory_summary())
@cli.command()
@click.option(
"--address",
required=False,
type=str,
help="Override the address to connect to.")
def status(address):
"""Print cluster status, including autoscaling info."""
if not address:
address = services.find_redis_address_or_die()
logger.info("Connecting to Ray instance at {}.".format(address))
ray.init(address=address)
print(debug_status())
@cli.command()
@click.option(
"--address",
@@ -1132,7 +1148,8 @@ add_command_alias(get_head_ip, name="get_head_ip", hidden=True)
cli.add_command(get_worker_ips)
cli.add_command(microbenchmark)
cli.add_command(stack)
cli.add_command(stat)
cli.add_command(statistics)
cli.add_command(status)
cli.add_command(memory)
cli.add_command(globalgc)
cli.add_command(timeline)
+3 -3
View File
@@ -260,11 +260,11 @@ class LoadMetricsTest(unittest.TestCase):
"object_store_memory": 20
}, {})
debug = lm.info_string()
assert ("ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU, "
assert ("ResourceUsage: 2.0/4.0 CPU, 14.0/16.0 GPU, "
"1.05 GiB/1.05 GiB memory, "
"1.05 GiB/2.1 GiB object_store_memory") in debug
assert "NumNodesConnected=3" in debug
assert "NumNodesUsed=2.88" in debug
assert "NumNodesConnected: 3" in debug
assert "NumNodesUsed: 2.88" in debug
class AutoscalingTest(unittest.TestCase):