diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 9e61759c8..6e76c35d3 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -7,6 +7,7 @@ import json import tempfile import time import sys +import click import yaml try: # py3 @@ -24,7 +25,8 @@ from ray.autoscaler.updater import NodeUpdaterProcess def create_or_update_cluster( - config_file, override_min_workers, override_max_workers, no_restart): + config_file, override_min_workers, override_max_workers, + no_restart, yes): """Create or updates an autoscaling Ray cluster from a config json.""" config = yaml.load(open(config_file).read()) @@ -43,17 +45,17 @@ def create_or_update_cluster( bootstrap_config, _ = importer() config = bootstrap_config(config) - get_or_create_head_node(config, no_restart) + get_or_create_head_node(config, no_restart, yes) -def teardown_cluster(config_file): +def teardown_cluster(config_file, yes): """Destroys all nodes of a Ray cluster described by a config json.""" config = yaml.load(open(config_file).read()) validate_config(config) dockerize_if_needed(config) - confirm("This will destroy your cluster") + confirm("This will destroy your cluster", yes) provider = get_node_provider(config["provider"], config["cluster_name"]) head_node_tags = { @@ -71,7 +73,7 @@ def teardown_cluster(config_file): nodes = provider.nodes({}) -def get_or_create_head_node(config, no_restart): +def get_or_create_head_node(config, no_restart, yes): """Create the cluster head node, which in turn creates the workers.""" provider = get_node_provider(config["provider"], config["cluster_name"]) @@ -85,15 +87,15 @@ def get_or_create_head_node(config, no_restart): head_node = None if not head_node: - confirm("This will create a new cluster") + confirm("This will create a new cluster", yes) elif not no_restart: - confirm("This will restart cluster services") + confirm("This will restart cluster services", yes) launch_hash = hash_launch_conf(config["head_node"], config["auth"]) if head_node is None or provider.node_tags(head_node).get( TAG_RAY_LAUNCH_CONFIG) != launch_hash: if head_node is not None: - confirm("Head node config out-of-date. It will be terminated") + confirm("Head node config out-of-date. It will be terminated", yes) print("Terminating outdated head node {}".format(head_node)) provider.terminate_node(head_node) print("Launching new head node...") @@ -185,12 +187,23 @@ def get_or_create_head_node(config, no_restart): provider.external_ip(head_node))) -def confirm(msg): - print("{}. Do you want to continue [y/N]? ".format(msg), end="") - if sys.version_info >= (3, 0): - answer = input() +def get_head_node_ip(config_file): + """Returns head node IP for given configuration file if exists.""" + + config = yaml.load(open(config_file).read()) + provider = get_node_provider(config["provider"], config["cluster_name"]) + head_node_tags = { + TAG_RAY_NODE_TYPE: "Head", + } + nodes = provider.nodes(head_node_tags) + if len(nodes) > 0: + head_node = nodes[0] + return provider.external_ip(head_node) else: - answer = raw_input() # noqa: F821 - if answer.strip().lower() != "y": - print("Abort.") - exit(1) + print("Head node of cluster ({}) not found!".format( + config["cluster_name"])) + sys.exit(1) + + +def confirm(msg, yes): + return None if yes else click.confirm(msg, abort=True) diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 5b8fbd066..d62bee51f 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -19,6 +19,10 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG NODE_START_WAIT_S = 300 +def pretty_cmd(cmd_str): + return "\n\n\t{}\n\n".format(cmd_str) + + class NodeUpdater(object): """A process for syncing files and running init commands on a node.""" @@ -54,9 +58,13 @@ class NodeUpdater(object): try: self.do_update() except Exception as e: + error_str = str(e) + if hasattr(e, "cmd"): + error_str = "(Exit Status {}) {}".format( + e.returncode, pretty_cmd(" ".join(e.cmd))) print( - "NodeUpdater: Error updating {}, " - "see {} for remote logs".format(e, self.output_name), + "NodeUpdater: Error updating {}" + "See {} for remote logs.".format(error_str, self.output_name), file=self.stdout) self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: "UpdateFailed"}) @@ -103,14 +111,18 @@ class NodeUpdater(object): self.node_id), file=self.stdout) if not self.provider.is_running(self.node_id): - raise Exception() + raise Exception("Node not running yet...") self.ssh_cmd( "uptime", connect_timeout=5, redirect=open("/dev/null", "w")) ssh_ok = True except Exception as e: + retry_str = str(e) + if hasattr(e, "cmd"): + retry_str = "(Exit Status {}): {}".format( + e.returncode, pretty_cmd(" ".join(e.cmd))) print( - "NodeUpdater: SSH not up, retrying: {}".format(e), + "NodeUpdater: SSH not up, retrying: {}".format(retry_str), file=self.stdout) time.sleep(5) else: @@ -150,7 +162,7 @@ class NodeUpdater(object): if verbose: print( "NodeUpdater: running {} on {}...".format( - cmd, self.ssh_ip), + pretty_cmd(cmd), self.ssh_ip), file=self.stdout) force_interactive = "set -i && source ~/.bashrc && " self.process_runner.check_call([ diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 221d04a0e..d6b1d6767 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -7,7 +7,8 @@ import json import subprocess import ray.services as services -from ray.autoscaler.commands import create_or_update_cluster, teardown_cluster +from ray.autoscaler.commands import ( + create_or_update_cluster, teardown_cluster, get_head_node_ip) def check_no_existing_redis_clients(node_ip_address, redis_client): @@ -260,22 +261,35 @@ def stop(): @click.option( "--max-workers", required=False, type=int, help=( "Override the configured max worker node count for the cluster.")) +@click.option( + "--yes", "-y", is_flag=True, default=False, help=( + "Don't ask for confirmation.")) def create_or_update( - cluster_config_file, min_workers, max_workers, no_restart): + cluster_config_file, min_workers, max_workers, no_restart, yes): create_or_update_cluster( - cluster_config_file, min_workers, max_workers, no_restart) + cluster_config_file, min_workers, max_workers, no_restart, yes) @click.command() @click.argument("cluster_config_file", required=True, type=str) -def teardown(cluster_config_file): - teardown_cluster(cluster_config_file) +@click.option( + "--yes", "-y", is_flag=True, default=False, help=( + "Don't ask for confirmation.")) +def teardown(cluster_config_file, yes): + teardown_cluster(cluster_config_file, yes) + + +@click.command() +@click.argument("cluster_config_file", required=True, type=str) +def get_head_ip(cluster_config_file): + click.echo(get_head_node_ip(cluster_config_file)) cli.add_command(start) cli.add_command(stop) cli.add_command(create_or_update) cli.add_command(teardown) +cli.add_command(get_head_ip) def main():