mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 07:23:55 +08:00
[autoscaler] Improve UX for Autoscaler (#1558)
This commit is contained in:
@@ -7,6 +7,7 @@ import json
|
||||
import tempfile
|
||||
import time
|
||||
import sys
|
||||
import click
|
||||
|
||||
import yaml
|
||||
try: # py3
|
||||
@@ -24,7 +25,8 @@ from ray.autoscaler.updater import NodeUpdaterProcess
|
||||
|
||||
|
||||
def create_or_update_cluster(
|
||||
config_file, override_min_workers, override_max_workers, no_restart):
|
||||
config_file, override_min_workers, override_max_workers,
|
||||
no_restart, yes):
|
||||
"""Create or updates an autoscaling Ray cluster from a config json."""
|
||||
|
||||
config = yaml.load(open(config_file).read())
|
||||
@@ -43,17 +45,17 @@ def create_or_update_cluster(
|
||||
|
||||
bootstrap_config, _ = importer()
|
||||
config = bootstrap_config(config)
|
||||
get_or_create_head_node(config, no_restart)
|
||||
get_or_create_head_node(config, no_restart, yes)
|
||||
|
||||
|
||||
def teardown_cluster(config_file):
|
||||
def teardown_cluster(config_file, yes):
|
||||
"""Destroys all nodes of a Ray cluster described by a config json."""
|
||||
|
||||
config = yaml.load(open(config_file).read())
|
||||
validate_config(config)
|
||||
dockerize_if_needed(config)
|
||||
|
||||
confirm("This will destroy your cluster")
|
||||
confirm("This will destroy your cluster", yes)
|
||||
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
head_node_tags = {
|
||||
@@ -71,7 +73,7 @@ def teardown_cluster(config_file):
|
||||
nodes = provider.nodes({})
|
||||
|
||||
|
||||
def get_or_create_head_node(config, no_restart):
|
||||
def get_or_create_head_node(config, no_restart, yes):
|
||||
"""Create the cluster head node, which in turn creates the workers."""
|
||||
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
@@ -85,15 +87,15 @@ def get_or_create_head_node(config, no_restart):
|
||||
head_node = None
|
||||
|
||||
if not head_node:
|
||||
confirm("This will create a new cluster")
|
||||
confirm("This will create a new cluster", yes)
|
||||
elif not no_restart:
|
||||
confirm("This will restart cluster services")
|
||||
confirm("This will restart cluster services", yes)
|
||||
|
||||
launch_hash = hash_launch_conf(config["head_node"], config["auth"])
|
||||
if head_node is None or provider.node_tags(head_node).get(
|
||||
TAG_RAY_LAUNCH_CONFIG) != launch_hash:
|
||||
if head_node is not None:
|
||||
confirm("Head node config out-of-date. It will be terminated")
|
||||
confirm("Head node config out-of-date. It will be terminated", yes)
|
||||
print("Terminating outdated head node {}".format(head_node))
|
||||
provider.terminate_node(head_node)
|
||||
print("Launching new head node...")
|
||||
@@ -185,12 +187,23 @@ def get_or_create_head_node(config, no_restart):
|
||||
provider.external_ip(head_node)))
|
||||
|
||||
|
||||
def confirm(msg):
|
||||
print("{}. Do you want to continue [y/N]? ".format(msg), end="")
|
||||
if sys.version_info >= (3, 0):
|
||||
answer = input()
|
||||
def get_head_node_ip(config_file):
|
||||
"""Returns head node IP for given configuration file if exists."""
|
||||
|
||||
config = yaml.load(open(config_file).read())
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
head_node_tags = {
|
||||
TAG_RAY_NODE_TYPE: "Head",
|
||||
}
|
||||
nodes = provider.nodes(head_node_tags)
|
||||
if len(nodes) > 0:
|
||||
head_node = nodes[0]
|
||||
return provider.external_ip(head_node)
|
||||
else:
|
||||
answer = raw_input() # noqa: F821
|
||||
if answer.strip().lower() != "y":
|
||||
print("Abort.")
|
||||
exit(1)
|
||||
print("Head node of cluster ({}) not found!".format(
|
||||
config["cluster_name"]))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def confirm(msg, yes):
|
||||
return None if yes else click.confirm(msg, abort=True)
|
||||
|
||||
@@ -19,6 +19,10 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG
|
||||
NODE_START_WAIT_S = 300
|
||||
|
||||
|
||||
def pretty_cmd(cmd_str):
|
||||
return "\n\n\t{}\n\n".format(cmd_str)
|
||||
|
||||
|
||||
class NodeUpdater(object):
|
||||
"""A process for syncing files and running init commands on a node."""
|
||||
|
||||
@@ -54,9 +58,13 @@ class NodeUpdater(object):
|
||||
try:
|
||||
self.do_update()
|
||||
except Exception as e:
|
||||
error_str = str(e)
|
||||
if hasattr(e, "cmd"):
|
||||
error_str = "(Exit Status {}) {}".format(
|
||||
e.returncode, pretty_cmd(" ".join(e.cmd)))
|
||||
print(
|
||||
"NodeUpdater: Error updating {}, "
|
||||
"see {} for remote logs".format(e, self.output_name),
|
||||
"NodeUpdater: Error updating {}"
|
||||
"See {} for remote logs.".format(error_str, self.output_name),
|
||||
file=self.stdout)
|
||||
self.provider.set_node_tags(
|
||||
self.node_id, {TAG_RAY_NODE_STATUS: "UpdateFailed"})
|
||||
@@ -103,14 +111,18 @@ class NodeUpdater(object):
|
||||
self.node_id),
|
||||
file=self.stdout)
|
||||
if not self.provider.is_running(self.node_id):
|
||||
raise Exception()
|
||||
raise Exception("Node not running yet...")
|
||||
self.ssh_cmd(
|
||||
"uptime",
|
||||
connect_timeout=5, redirect=open("/dev/null", "w"))
|
||||
ssh_ok = True
|
||||
except Exception as e:
|
||||
retry_str = str(e)
|
||||
if hasattr(e, "cmd"):
|
||||
retry_str = "(Exit Status {}): {}".format(
|
||||
e.returncode, pretty_cmd(" ".join(e.cmd)))
|
||||
print(
|
||||
"NodeUpdater: SSH not up, retrying: {}".format(e),
|
||||
"NodeUpdater: SSH not up, retrying: {}".format(retry_str),
|
||||
file=self.stdout)
|
||||
time.sleep(5)
|
||||
else:
|
||||
@@ -150,7 +162,7 @@ class NodeUpdater(object):
|
||||
if verbose:
|
||||
print(
|
||||
"NodeUpdater: running {} on {}...".format(
|
||||
cmd, self.ssh_ip),
|
||||
pretty_cmd(cmd), self.ssh_ip),
|
||||
file=self.stdout)
|
||||
force_interactive = "set -i && source ~/.bashrc && "
|
||||
self.process_runner.check_call([
|
||||
|
||||
@@ -7,7 +7,8 @@ import json
|
||||
import subprocess
|
||||
|
||||
import ray.services as services
|
||||
from ray.autoscaler.commands import create_or_update_cluster, teardown_cluster
|
||||
from ray.autoscaler.commands import (
|
||||
create_or_update_cluster, teardown_cluster, get_head_node_ip)
|
||||
|
||||
|
||||
def check_no_existing_redis_clients(node_ip_address, redis_client):
|
||||
@@ -260,22 +261,35 @@ def stop():
|
||||
@click.option(
|
||||
"--max-workers", required=False, type=int, help=(
|
||||
"Override the configured max worker node count for the cluster."))
|
||||
@click.option(
|
||||
"--yes", "-y", is_flag=True, default=False, help=(
|
||||
"Don't ask for confirmation."))
|
||||
def create_or_update(
|
||||
cluster_config_file, min_workers, max_workers, no_restart):
|
||||
cluster_config_file, min_workers, max_workers, no_restart, yes):
|
||||
create_or_update_cluster(
|
||||
cluster_config_file, min_workers, max_workers, no_restart)
|
||||
cluster_config_file, min_workers, max_workers, no_restart, yes)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("cluster_config_file", required=True, type=str)
|
||||
def teardown(cluster_config_file):
|
||||
teardown_cluster(cluster_config_file)
|
||||
@click.option(
|
||||
"--yes", "-y", is_flag=True, default=False, help=(
|
||||
"Don't ask for confirmation."))
|
||||
def teardown(cluster_config_file, yes):
|
||||
teardown_cluster(cluster_config_file, yes)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("cluster_config_file", required=True, type=str)
|
||||
def get_head_ip(cluster_config_file):
|
||||
click.echo(get_head_node_ip(cluster_config_file))
|
||||
|
||||
|
||||
cli.add_command(start)
|
||||
cli.add_command(stop)
|
||||
cli.add_command(create_or_update)
|
||||
cli.add_command(teardown)
|
||||
cli.add_command(get_head_ip)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
Reference in New Issue
Block a user