diff --git a/python/ray/autoscaler/cli_logger.py b/python/ray/autoscaler/cli_logger.py index eb5cad224..8fcd93cf9 100644 --- a/python/ray/autoscaler/cli_logger.py +++ b/python/ray/autoscaler/cli_logger.py @@ -281,6 +281,14 @@ class _CliLogger(): if self.verbosity > 0: self.print(msg, *args, **kwargs) + def verbose_warning(self, msg, *args, **kwargs): + """Prints a formatted warning if verbosity is not 0. + + For arguments, see `_format_msg`. + """ + if self.verbosity > 0: + self.warning(msg, *args, **kwargs) + def verbose_error(self, msg, *args, **kwargs): """Logs an error if verbosity is not 0. diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index 21952491f..bd4b064d9 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -12,6 +12,9 @@ import time from ray.autoscaler.docker import check_docker_running_cmd, with_docker_exec from ray.autoscaler.log_timer import LogTimer +from ray.autoscaler.subprocess_output_util import run_cmd_redirected,\ + ProcessRunnerError + from ray.autoscaler.cli_logger import cli_logger import colorful as cf @@ -23,28 +26,37 @@ HASH_MAX_LENGTH = 10 KUBECTL_RSYNC = os.path.join( os.path.dirname(os.path.abspath(__file__)), "kubernetes/kubectl-rsync.sh") +_config = {"use_login_shells": True} -class ProcessRunnerError(Exception): - def __init__(self, - msg, - msg_type, - code=None, - command=None, - message_discovered=None): - super(ProcessRunnerError, self).__init__( - "{} (discovered={}): type={}, code={}, command={}".format( - msg, message_discovered, msg_type, code, command)) - self.msg_type = msg_type - self.code = code - self.command = command +def is_using_login_shells(): + return _config["use_login_shells"] - self.message_discovered = message_discovered + +def set_using_login_shells(val): + """Choose between login and non-interactive shells. + + Non-interactive shells have the benefit of receiving less output from + subcommands (since progress bars and TTY control codes are not printed). + Sometimes this can be significant since e.g. `pip install` prints + hundreds of progress bar lines when downloading. + + Login shells have the benefit of working very close to how a proper bash + session does, regarding how scripts execute and how the environment is + setup. This is also how all commands were ran in the past. The only reason + to use login shells over non-interactive shells is if you need some weird + and non-robust tool to work. + + Args: + val (bool): If true, login shells will be used to run all commands. + """ + _config["use_login_shells"] = val def _with_interactive(cmd): force_interactive = ("true && source ~/.bashrc && " "export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ") + return ["bash", "--login", "-c", "-i", quote(force_interactive + cmd)] @@ -327,6 +339,56 @@ class SSHCommandRunner(CommandRunnerInterface): cli_logger.warning("{}", str(e)) # todo: msg cli_logger.old_warning(logger, "{}", str(e)) + def _run_helper(self, + final_cmd, + with_output=False, + exit_on_fail=False, + silent=False): + """Run a command that was already setup with SSH and `bash` settings. + + Args: + cmd (List[str]): + Full command to run. Should include SSH options and other + processing that we do. + with_output (bool): + If `with_output` is `True`, command stdout and stderr + will be captured and returned. + exit_on_fail (bool): + If `exit_on_fail` is `True`, the process will exit + if the command fails (exits with a code other than 0). + """ + + try: + # For now, if the output is needed we just skip the new logic. + # In the future we could update the new logic to support + # capturing output, but it is probably not needed. + if not cli_logger.old_style and not with_output: + return run_cmd_redirected( + final_cmd, + silent=silent, + use_login_shells=is_using_login_shells()) + if with_output: + return self.process_runner.check_output(final_cmd) + else: + return self.process_runner.check_call(final_cmd) + except subprocess.CalledProcessError as e: + quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) + if not cli_logger.old_style and not is_using_login_shells(): + raise ProcessRunnerError( + "Command failed", + "ssh_command_failed", + code=e.returncode, + command=quoted_cmd) + + if exit_on_fail: + raise click.ClickException( + "Command failed: \n\n {}\n".format(quoted_cmd)) \ + from None + else: + raise click.ClickException( + "SSH command Failed. See above for the output from the" + " failure.") from None + def run(self, cmd, timeout=120, @@ -344,7 +406,10 @@ class SSHCommandRunner(CommandRunnerInterface): self._set_ssh_ip_if_required() - ssh = ["ssh", "-tt"] + if is_using_login_shells(): + ssh = ["ssh", "-tt"] + else: + ssh = ["ssh"] if port_forward: with cli_logger.group("Forwarding ports"): @@ -363,7 +428,10 @@ class SSHCommandRunner(CommandRunnerInterface): "{}@{}".format(self.ssh_user, self.ssh_ip) ] if cmd: - final_cmd += _with_interactive(cmd) + if is_using_login_shells(): + final_cmd += _with_interactive(cmd) + else: + final_cmd += [cmd] cli_logger.old_info(logger, "{}Running {}", self.log_prefix, " ".join(final_cmd)) else: @@ -371,42 +439,16 @@ class SSHCommandRunner(CommandRunnerInterface): # still create an interactive shell in some ssh versions. final_cmd.append(quote("while true; do sleep 86400; done")) - # todo: add a flag for this, we might - # wanna log commands with print sometimes cli_logger.verbose("Running `{}`", cf.bold(cmd)) with cli_logger.indented(): cli_logger.very_verbose("Full command is `{}`", cf.bold(" ".join(final_cmd))) - def start_process(): - try: - if with_output: - return self.process_runner.check_output(final_cmd) - else: - self.process_runner.check_call(final_cmd) - except subprocess.CalledProcessError as e: - quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) - if not cli_logger.old_style: - raise ProcessRunnerError( - "Command failed", - "ssh_command_failed", - code=e.returncode, - command=quoted_cmd) - - if exit_on_fail: - raise click.ClickException( - "Command failed: \n\n {}\n".format(quoted_cmd)) \ - from None - else: - raise click.ClickException( - "SSH command Failed. See above for the output from the" - " failure.") from None - if cli_logger.verbosity > 0: with cli_logger.indented(): - return start_process() + return self._run_helper(final_cmd, with_output, exit_on_fail) else: - return start_process() + return self._run_helper(final_cmd, with_output, exit_on_fail) def run_rsync_up(self, source, target): self._set_ssh_ip_if_required() @@ -418,7 +460,7 @@ class SSHCommandRunner(CommandRunnerInterface): target) ] cli_logger.verbose("Running `{}`", cf.bold(" ".join(command))) - self.process_runner.check_call(command) + self._run_helper(command, silent=True) def run_rsync_down(self, source, target): self._set_ssh_ip_if_required() @@ -431,7 +473,7 @@ class SSHCommandRunner(CommandRunnerInterface): source), target ] cli_logger.verbose("Running `{}`", cf.bold(" ".join(command))) - self.process_runner.check_call(command) + self._run_helper(command, silent=True) def remote_shell_command_str(self): if self.ssh_private_key: diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 19ad48d08..5c4dd922e 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -29,10 +29,13 @@ from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler.updater import NodeUpdaterThread +from ray.autoscaler.command_runner import set_using_login_shells from ray.autoscaler.command_runner import DockerCommandRunner from ray.autoscaler.log_timer import LogTimer from ray.worker import global_worker +import ray.autoscaler.subprocess_output_util as cmd_output_util + from ray.autoscaler.cli_logger import cli_logger import colorful as cf @@ -95,21 +98,33 @@ def create_or_update_cluster( override_max_workers: Optional[int], no_restart: bool, restart_only: bool, yes: bool, override_cluster_name: Optional[str], no_config_cache: bool, log_old_style: bool, log_color: str, + dump_command_output: bool, use_login_shells: bool, verbose: int) -> None: """Create or updates an autoscaling Ray cluster from a config json.""" cli_logger.old_style = log_old_style cli_logger.color_mode = log_color cli_logger.verbosity = verbose - # todo: disable by default when the command output handling PR makes it in - cli_logger.dump_command_output = True + set_using_login_shells(use_login_shells) + cmd_output_util.set_output_redirected(not dump_command_output) + + if use_login_shells: + cli_logger.warning( + "Commands running under a login shell can produce more " + "output than special processing can handle.") + cli_logger.warning( + "Thus, the output from subcommands will be logged as is.") + cli_logger.warning( + "Consider using {}, {}.", cf.bold("--use-normal-shells"), + cf.underlined("if you tested your workflow and it is compatible")) + cli_logger.newline() cli_logger.detect_colors() def handle_yaml_error(e): - cli_logger.error( - "Cluster config invalid.\n" - "Failed to load YAML file " + cf.bold("{}"), config_file) + cli_logger.error("Cluster config invalid\n") + cli_logger.error("Failed to load YAML file " + cf.bold("{}"), + config_file) cli_logger.newline() with cli_logger.verbatim_error_ctx("PyYAML error:"): cli_logger.error(e) @@ -119,7 +134,7 @@ def create_or_update_cluster( config = yaml.safe_load(open(config_file).read()) except FileNotFoundError: cli_logger.abort( - "Provided cluster configuration file ({}) does not exist.", + "Provided cluster configuration file ({}) does not exist", cf.bold(config_file)) except yaml.parser.ParserError as e: handle_yaml_error(e) @@ -140,7 +155,7 @@ def create_or_update_cluster( raise NotImplementedError("Unsupported provider {}".format( config["provider"])) - cli_logger.success("Cluster configuration valid.\n") + cli_logger.success("Cluster configuration valid\n") printed_overrides = False @@ -201,8 +216,17 @@ def _bootstrap_config(config: Dict[str, Any], # relatively cheap try_reload_log_state(config_cache["config"]["provider"], config_cache.get("provider_log_info")) - cli_logger.verbose("Loaded cached config from " + cf.bold("{}"), - cache_key) + + cli_logger.newline() + cli_logger.verbose_warning( + "Loaded cached provider configuration " + "from " + cf.bold("{}"), cache_key) + if cli_logger.verbosity == 0: + cli_logger.warning("Loaded cached provider configuration") + cli_logger.warning( + "If you experience issues with " + "the cloud provider, try re-running " + "the command with {}.", cf.bold("--no-config-cache")) return config_cache["config"] else: @@ -435,6 +459,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, override_cluster_name): """Create the cluster head node, which in turn creates the workers.""" provider = get_node_provider(config["provider"], config["cluster_name"]) + + raw_config_file = config_file # used for printing to the user config_file = os.path.abspath(config_file) try: head_node_tags = { @@ -631,17 +657,20 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, logger, "get_or_create_head_node: " "Updating {} failed", head_node_ip) sys.exit(1) - logger.info( - "get_or_create_head_node: " - "Head node up-to-date, IP address is: {}".format(head_node_ip)) - monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*" - if override_cluster_name: - modifiers = " --cluster-name={}".format( - quote(override_cluster_name)) - else: - modifiers = "" - print("To monitor auto-scaling activity, you can run:\n\n" + cli_logger.old_info( + logger, "get_or_create_head_node: " + "Head node up-to-date, IP address is: {}", head_node_ip) + + monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*" + if override_cluster_name: + modifiers = " --cluster-name={}".format( + quote(override_cluster_name)) + else: + modifiers = "" + + if cli_logger.old_style: + print("To monitor autoscaling activity, you can run:\n\n" " ray exec {} {}{}\n".format(config_file, quote(monitor_str), modifiers)) print("To open a console on the cluster:\n\n" @@ -650,6 +679,17 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, print("To get a remote shell to the cluster manually, run:\n\n" " {}\n".format( updater.cmd_runner.remote_shell_command_str())) + + cli_logger.newline() + with cli_logger.group("Useful commands"): + cli_logger.print("Monitor auto-scailng with") + cli_logger.print( + cf.bold(" ray exec {}{} {}"), raw_config_file, modifiers, + quote(monitor_str)) + + cli_logger.print("Connect to a terminal on the cluster head") + cli_logger.print( + cf.bold(" ray attach {}{}"), raw_config_file, modifiers) finally: provider.cleanup() diff --git a/python/ray/autoscaler/subprocess_output_util.py b/python/ray/autoscaler/subprocess_output_util.py new file mode 100644 index 000000000..96670bb6d --- /dev/null +++ b/python/ray/autoscaler/subprocess_output_util.py @@ -0,0 +1,371 @@ +import os +import re +import subprocess +import tempfile +import time +import sys + +from ray.autoscaler.cli_logger import cli_logger +import colorful as cf + +CONN_REFUSED_PATIENCE = 30 # how long to wait for sshd to run + +_config = {"redirect_output": True} + + +def is_output_redirected(): + return _config["_redirect_output"] + + +def set_output_redirected(val): + """Choose between logging to a temporary file and to `sys.stdout`. + + The default is to log to a file. + + Args: + val (bool): If true, subprocess output will be redirected to + a temporary file. + """ + _config["_redirect_output"] = val + + +class ProcessRunnerError(Exception): + def __init__(self, + msg, + msg_type, + code=None, + command=None, + special_case=None): + super(ProcessRunnerError, self).__init__( + "{} (discovered={}): type={}, code={}, command={}".format( + msg, special_case, msg_type, code, command)) + + self.msg_type = msg_type + self.code = code + self.command = command + + self.special_case = special_case + + +_ssh_output_regexes = { + "known_host_update": re.compile( + r"\s*Warning: Permanently added '.+' \(.+\) " + r"to the list of known hosts.\s*"), + "connection_closed": re.compile(r"\s*Shared connection to .+ closed.\s*"), + "timeout": re.compile(r"\s*ssh: connect to host .+ port .+: " + r"Operation timed out\s*"), + "conn_refused": re.compile( + r"\s*ssh: connect to host .+ port .+: Connection refused\s*") + # todo: check for other connection failures for better error messages? +} + + +def _read_subprocess_stream(f, output_file, is_stdout=False): + """Read and process a subprocess output stream. + + The goal is to find error messages and respond to them in a clever way. + Currently just used for SSH messages (CONN_REFUSED, TIMEOUT, etc.), so + the user does not get confused by these. + + Ran in a thread each for both `stdout` and `stderr` to + allow for cross-platform asynchronous IO. + + Note: `select`-based IO is another option, but Windows has + no support for `select`ing pipes, and Linux support varies somewhat. + Spefically, Older *nix systems might also have quirks in how they + handle `select` on pipes. + + Args: + f: File object for the stream. + output_file: File object to which filtered output is written. + is_stdout (bool): + When `is_stdout` is `False`, the stream is assumed to + be `stderr`. Different error message detectors are used, + and the output is displayed to the user unless it matches + a special case (e.g. SSH timeout), in which case this is + left up to the caller. + """ + + detected_special_case = None + while True: + # ! Readline here is crucial. + # ! Normal `read()` will block until EOF instead of until + # something is available. + line = f.readline() + + if line is None or line == "": + # EOF + break + + if line[-1] == "\n": + line = line[:-1] + + if not is_stdout: + if _ssh_output_regexes["connection_closed"]\ + .fullmatch(line) is not None: + # Do not log "connection closed" messages which SSH + # puts in stderr for no reason. + # + # They are never errors since the connection will + # close no matter whether the command succeeds or not. + continue + + if _ssh_output_regexes["timeout"].fullmatch(line) is not None: + # Timeout is not really an error but rather a special + # condition. It should be handled by the caller, since + # network conditions/nodes in the early stages of boot + # are expected to sometimes cause connection timeouts. + if detected_special_case is not None: + raise ValueError("Bug: ssh_timeout conflicts with another " + "special codition: " + + detected_special_case) + + detected_special_case = "ssh_timeout" + continue + + if _ssh_output_regexes["conn_refused"]\ + .fullmatch(line) is not None: + # Connection refused is not really an error but + # rather a special condition. It should be handled by + # the caller, since network conditions/nodes in the + # early stages of boot are expected to sometimes cause + # CONN_REFUSED. + if detected_special_case is not None: + raise ValueError( + "Bug: ssh_conn_refused conflicts with another " + "special codition: " + detected_special_case) + + detected_special_case = "ssh_conn_refused" + continue + + if _ssh_output_regexes["known_host_update"]\ + .fullmatch(line) is not None: + # Since we ignore SSH host control anyway + # (-o UserKnownHostsFile=/dev/null), + # we should silence the host control warnings. + continue + + cli_logger.error(line) + + if output_file is not None: + output_file.write(line + "\n") + + return detected_special_case + + +def _run_and_process_output(cmd, + stdout_file, + stderr_file=None, + use_login_shells=False): + """Run a command and process its output for special cases. + + Specifically, run all command output through regex to detect + error conditions and filter out non-error messages that went to stderr + anyway (SSH writes ALL of its "system" messages to stderr even if they + are not actually errors). + + Args: + cmd (List[str]): Command to run. + stdout_file: File to redirect stdout to. + stdout_file: File to redirect stderr to. + + Implementation notes: + 1. `use_login_shells` disables special processing + If we run interactive apps, output processing will likely get + overwhelemed with the interactive output elements. + Thus we disable output processing for login shells. This makes + the logging experience considerably worse, but it only degrades + to old-style logging. + + For example, `pip install` outputs HUNDREDS of progress-bar lines + when downloading a package, and we have to + read + regex + write all of them. + + After all, even just printing output to console can often slow + down a fast-printing app, and we do more than just print, and + all that from Python, which is much slower than C regarding + stream processing. + + 2. `stdin=PIPE` for subprocesses + Do not inherit stdin as it messes with bash signals + (ctrl-C for SIGINT) and these commands aren't supposed to + take input anyway. + + 3. `ThreadPoolExecutor` without the `Pool` + We use `ThreadPoolExecutor` to create futures from threads. + Threads are never reused. + + This approach allows us to have no custom synchronization by + off-loading the return value and exception passing to the + standard library (`ThreadPoolExecutor` internals). + + This instance will be `shutdown()` ASAP so it's fine to + create one in such a weird place. + + The code is thus 100% thread-safe as long as the stream readers + are read-only except for return values and possible exceptions. + """ + + if use_login_shells: + # See implementation note #1 + if stdout_file is None: + stdout_file = subprocess.DEVNULL + if stderr_file is None: + stderr_file = subprocess.DEVNULL + + return subprocess.check_call( + cmd, + # See implementation note #2 + stdin=subprocess.PIPE, + stdout=stdout_file, + stderr=stderr_file) + + with subprocess.Popen( + cmd, + # See implementation note #2 + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, # line buffering + universal_newlines=True # text mode outputs + ) as p: + from concurrent.futures import ThreadPoolExecutor + + # Closing stdin might be necessary to signal EOF to some + # apps (they might get stuck waiting for input forever otherwise). + p.stdin.close() + + # See implementation note #3 + with ThreadPoolExecutor(max_workers=2) as executor: + stdout_future = \ + executor.submit(_read_subprocess_stream, + p.stdout, stdout_file, + is_stdout=True) + stderr_future = \ + executor.submit(_read_subprocess_stream, + p.stderr, stderr_file, + is_stdout=False) + # Wait for completion. + executor.shutdown() + + # Update `p.returncode` + p.poll() + + detected_special_case = stdout_future.result() + if stderr_future.result() is not None: + if detected_special_case is not None: + # This might some day need to be changed. + # We should probably make sure the two special cases + # are compatible then and that we can handle both by + # e.g. reporting both to the caller. + raise ValueError( + "Bug: found a special case in both stdout and " + "stderr. This is not valid behavior at the time " + "of writing this code.") + detected_special_case = stderr_future.result() + + if p.returncode > 0: + # Process failed, but not due to a signal, since signals + # set the exit code to a negative value. + raise ProcessRunnerError( + "Command failed", + "ssh_command_failed", + code=p.returncode, + command=cmd, + special_case=detected_special_case) + elif p.returncode < 0: + # Process failed due to a signal, since signals + # set the exit code to a negative value. + raise ProcessRunnerError( + "Command failed", + "ssh_command_failed", + code=p.returncode, + command=cmd, + special_case="died_to_signal") + + return p.returncode + + +def run_cmd_redirected(cmd, silent=False, use_login_shells=False): + """Run a command and optionally redirect output to a file. + + Args: + cmd (List[str]): Command to run. + silent (bool): If true, the command output will be silenced completely + (redirected to /dev/null), unless verbose logging + is enabled. Use this for runnign utility commands like + rsync. + """ + if silent and cli_logger.verbosity < 1: + return _run_and_process_output( + cmd, stdout_file=None, use_login_shells=use_login_shells) + + if not is_output_redirected(): + return _run_and_process_output( + cmd, stdout_file=sys.stdout, use_login_shells=use_login_shells) + else: + tmpfile_path = os.path.join( + tempfile.gettempdir(), "ray-up-{}-{}.txt".format( + cmd[0], time.time())) + with open( + tmpfile_path, + mode="w", + # line buffering + buffering=1) as tmp: + cli_logger.verbose("Command stdout is redirected to {}", + cf.bold(tmp.name)) + cli_logger.verbose( + cf.gray("Use --dump-command-output to " + "dump to terminal instead.")) + + return _run_and_process_output( + cmd, + stdout_file=tmp, + stderr_file=tmp, + use_login_shells=use_login_shells) + + +def handle_ssh_fails(e, first_conn_refused_time, retry_interval): + """Handle SSH system failures coming from a subprocess. + + Args: + e: The `ProcessRunnerException` to handle. + first_conn_refused_time: + The time (as reported by this function) or None, + indicating the last time a CONN_REFUSED error was caught. + + After exceeding a patience value, the program will be aborted + since SSH will likely never recover. + retry_interval: The interval after which the command will be retried, + used here just to inform the user. + """ + if e.msg_type != "ssh_command_failed": + return + + if e.special_case == "ssh_conn_refused": + if first_conn_refused_time is not None and \ + time.time() - first_conn_refused_time > \ + CONN_REFUSED_PATIENCE: + cli_logger.error( + "SSH connection was being refused " + "for {} seconds. Head node assumed " + "unreachable.", cf.bold(str(CONN_REFUSED_PATIENCE))) + cli_logger.abort("Check the node's firewall settings " + "and the cloud network configuration.") + + cli_logger.warning("SSH connection was refused.") + cli_logger.warning("This might mean that the SSH daemon is " + "still setting up, or that " + "the host is inaccessable (e.g. due to " + "a firewall).") + + return time.time() + + if e.special_case in ["ssh_timeout", "ssh_conn_refused"]: + cli_logger.print("SSH still not available, " + "retrying in {} seconds.", cf.bold( + str(retry_interval))) + else: + raise e + + return first_conn_refused_time diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index b40678810..d1924b343 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -10,9 +10,12 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \ TAG_RAY_FILE_MOUNTS_CONTENTS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \ STATUS_SETTING_UP, STATUS_SYNCING_FILES -from ray.autoscaler.command_runner import NODE_START_WAIT_S, SSHOptions +from ray.autoscaler.command_runner import NODE_START_WAIT_S, SSHOptions, \ + ProcessRunnerError from ray.autoscaler.log_timer import LogTimer +import ray.autoscaler.subprocess_output_util as cmd_output_util + from ray.autoscaler.cli_logger import cli_logger import colorful as cf @@ -172,6 +175,7 @@ class NodeUpdater: self.log_prefix) cli_logger.print("Running `{}` as a test.", cf.bold("uptime")) + first_conn_refused_time = None while time.time() < deadline and \ not self.provider.is_terminated(self.node_id): try: @@ -183,7 +187,20 @@ class NodeUpdater: cli_logger.old_debug(logger, "Uptime succeeded.") cli_logger.success("Success.") return True + except ProcessRunnerError as e: + first_conn_refused_time = \ + cmd_output_util.handle_ssh_fails( + e, first_conn_refused_time, + retry_interval=READY_CHECK_INTERVAL) + time.sleep(READY_CHECK_INTERVAL) except Exception as e: + # TODO(maximsmol): we should not be ignoring + # exceptions if they get filtered properly + # (new style log + non-interactive shells) + # + # however threading this configuration state + # is a pain and I'm leaving it for later + retry_str = str(e) if hasattr(e, "cmd"): retry_str = "(Exit Status {}): {}".format( @@ -249,35 +266,42 @@ class NodeUpdater: if self.initialization_commands: with cli_logger.group( "Running initialization commands", - _numbered=("[]", 4, - 6)): # todo: fix command numbering + _numbered=("[]", 3, 5)): with LogTimer( self.log_prefix + "Initialization commands", show_status=True): - for cmd in self.initialization_commands: - self.cmd_runner.run( - cmd, - ssh_options_override=SSHOptions( - self.auth_config.get( - "ssh_private_key"))) + try: + self.cmd_runner.run( + cmd, + ssh_options_override=SSHOptions( + self.auth_config.get( + "ssh_private_key"))) + except ProcessRunnerError as e: + if e.msg_type == "ssh_command_failed": + cli_logger.error("Failed.") + cli_logger.error( + "See above for stderr.") + + raise click.ClickException( + "Initialization command failed.") else: cli_logger.print( "No initialization commands to run.", - _numbered=("[]", 4, 6)) + _numbered=("[]", 3, 6)) if self.setup_commands: with cli_logger.group( "Running setup commands", - _numbered=("[]", 5, - 6)): # todo: fix command numbering + # todo: fix command numbering + _numbered=("[]", 4, 6)): with LogTimer( self.log_prefix + "Setup commands", show_status=True): total = len(self.setup_commands) for i, cmd in enumerate(self.setup_commands): - if cli_logger.verbosity == 0: + if cli_logger.verbosity == 0 and len(cmd) > 30: cmd_to_print = cf.bold(cmd[:30]) + "..." else: cmd_to_print = cf.bold(cmd) @@ -287,17 +311,33 @@ class NodeUpdater: cmd_to_print, _numbered=("()", i, total)) - self.cmd_runner.run(cmd) + try: + self.cmd_runner.run(cmd) + except ProcessRunnerError as e: + if e.msg_type == "ssh_command_failed": + cli_logger.error("Failed.") + cli_logger.error( + "See above for stderr.") + + raise click.ClickException( + "Setup command failed.") else: cli_logger.print( - "No setup commands to run.", _numbered=("[]", 5, 6)) + "No setup commands to run.", _numbered=("[]", 4, 6)) with cli_logger.group( "Starting the Ray runtime", _numbered=("[]", 6, 6)): with LogTimer( self.log_prefix + "Ray start commands", show_status=True): for cmd in self.ray_start_commands: - self.cmd_runner.run(cmd) + try: + self.cmd_runner.run(cmd) + except ProcessRunnerError as e: + if e.msg_type == "ssh_command_failed": + cli_logger.error("Failed.") + cli_logger.error("See above for stderr.") + + raise click.ClickException("Start command failed.") def rsync_up(self, source, target): cli_logger.old_info(logger, "{}Syncing {} to {}...", self.log_prefix, diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 88da974fc..7def6117c 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -704,9 +704,23 @@ def stop(force, verbose): default="auto", help=("Use color logging. " "Valid values are: auto (if stdout is a tty), true, false.")) +@click.option( + "--dump-command-output", + is_flag=True, + default=False, + help=("Print command output straight to " + "the terminal instead of redirecting to a file.")) +@click.option( + "--use-login-shells/--use-normal-shells", + is_flag=True, + default=True, + help=("Ray uses login shells (bash --login -i) to run cluster commands " + "by default. If your workflow is compatible with normal shells, " + "this can be disabled for a better user experience.")) @click.option("-v", "--verbose", count=True) def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only, - yes, cluster_name, no_config_cache, log_old_style, log_color, verbose): + yes, cluster_name, no_config_cache, log_old_style, log_color, + dump_command_output, use_login_shells, verbose): """Create or update a Ray cluster.""" if restart_only or no_restart: assert restart_only != no_restart, "Cannot set both 'restart_only' " \ @@ -724,7 +738,7 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only, create_or_update_cluster(cluster_config_file, min_workers, max_workers, no_restart, restart_only, yes, cluster_name, no_config_cache, log_old_style, log_color, - verbose) + dump_command_output, use_login_shells, verbose) @cli.command()