mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 07:53:50 +08:00
[Autoscaler] Command Output Improvement (#9699)
* cross-platform prototype * checkpoint * Address PR comments * format * prepare to push * format * PR comments * fixes * fixtest * Revert "fixtest" This reverts commit d6f54353e1b891c784417bb8d0304c18cc5bcdd8. * return-result Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
@@ -281,6 +281,14 @@ class _CliLogger():
|
||||
if self.verbosity > 0:
|
||||
self.print(msg, *args, **kwargs)
|
||||
|
||||
def verbose_warning(self, msg, *args, **kwargs):
|
||||
"""Prints a formatted warning if verbosity is not 0.
|
||||
|
||||
For arguments, see `_format_msg`.
|
||||
"""
|
||||
if self.verbosity > 0:
|
||||
self.warning(msg, *args, **kwargs)
|
||||
|
||||
def verbose_error(self, msg, *args, **kwargs):
|
||||
"""Logs an error if verbosity is not 0.
|
||||
|
||||
|
||||
@@ -12,6 +12,9 @@ import time
|
||||
from ray.autoscaler.docker import check_docker_running_cmd, with_docker_exec
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
|
||||
from ray.autoscaler.subprocess_output_util import run_cmd_redirected,\
|
||||
ProcessRunnerError
|
||||
|
||||
from ray.autoscaler.cli_logger import cli_logger
|
||||
import colorful as cf
|
||||
|
||||
@@ -23,28 +26,37 @@ HASH_MAX_LENGTH = 10
|
||||
KUBECTL_RSYNC = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)), "kubernetes/kubectl-rsync.sh")
|
||||
|
||||
_config = {"use_login_shells": True}
|
||||
|
||||
class ProcessRunnerError(Exception):
|
||||
def __init__(self,
|
||||
msg,
|
||||
msg_type,
|
||||
code=None,
|
||||
command=None,
|
||||
message_discovered=None):
|
||||
super(ProcessRunnerError, self).__init__(
|
||||
"{} (discovered={}): type={}, code={}, command={}".format(
|
||||
msg, message_discovered, msg_type, code, command))
|
||||
|
||||
self.msg_type = msg_type
|
||||
self.code = code
|
||||
self.command = command
|
||||
def is_using_login_shells():
|
||||
return _config["use_login_shells"]
|
||||
|
||||
self.message_discovered = message_discovered
|
||||
|
||||
def set_using_login_shells(val):
|
||||
"""Choose between login and non-interactive shells.
|
||||
|
||||
Non-interactive shells have the benefit of receiving less output from
|
||||
subcommands (since progress bars and TTY control codes are not printed).
|
||||
Sometimes this can be significant since e.g. `pip install` prints
|
||||
hundreds of progress bar lines when downloading.
|
||||
|
||||
Login shells have the benefit of working very close to how a proper bash
|
||||
session does, regarding how scripts execute and how the environment is
|
||||
setup. This is also how all commands were ran in the past. The only reason
|
||||
to use login shells over non-interactive shells is if you need some weird
|
||||
and non-robust tool to work.
|
||||
|
||||
Args:
|
||||
val (bool): If true, login shells will be used to run all commands.
|
||||
"""
|
||||
_config["use_login_shells"] = val
|
||||
|
||||
|
||||
def _with_interactive(cmd):
|
||||
force_interactive = ("true && source ~/.bashrc && "
|
||||
"export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ")
|
||||
|
||||
return ["bash", "--login", "-c", "-i", quote(force_interactive + cmd)]
|
||||
|
||||
|
||||
@@ -327,6 +339,56 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
cli_logger.warning("{}", str(e)) # todo: msg
|
||||
cli_logger.old_warning(logger, "{}", str(e))
|
||||
|
||||
def _run_helper(self,
|
||||
final_cmd,
|
||||
with_output=False,
|
||||
exit_on_fail=False,
|
||||
silent=False):
|
||||
"""Run a command that was already setup with SSH and `bash` settings.
|
||||
|
||||
Args:
|
||||
cmd (List[str]):
|
||||
Full command to run. Should include SSH options and other
|
||||
processing that we do.
|
||||
with_output (bool):
|
||||
If `with_output` is `True`, command stdout and stderr
|
||||
will be captured and returned.
|
||||
exit_on_fail (bool):
|
||||
If `exit_on_fail` is `True`, the process will exit
|
||||
if the command fails (exits with a code other than 0).
|
||||
"""
|
||||
|
||||
try:
|
||||
# For now, if the output is needed we just skip the new logic.
|
||||
# In the future we could update the new logic to support
|
||||
# capturing output, but it is probably not needed.
|
||||
if not cli_logger.old_style and not with_output:
|
||||
return run_cmd_redirected(
|
||||
final_cmd,
|
||||
silent=silent,
|
||||
use_login_shells=is_using_login_shells())
|
||||
if with_output:
|
||||
return self.process_runner.check_output(final_cmd)
|
||||
else:
|
||||
return self.process_runner.check_call(final_cmd)
|
||||
except subprocess.CalledProcessError as e:
|
||||
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
|
||||
if not cli_logger.old_style and not is_using_login_shells():
|
||||
raise ProcessRunnerError(
|
||||
"Command failed",
|
||||
"ssh_command_failed",
|
||||
code=e.returncode,
|
||||
command=quoted_cmd)
|
||||
|
||||
if exit_on_fail:
|
||||
raise click.ClickException(
|
||||
"Command failed: \n\n {}\n".format(quoted_cmd)) \
|
||||
from None
|
||||
else:
|
||||
raise click.ClickException(
|
||||
"SSH command Failed. See above for the output from the"
|
||||
" failure.") from None
|
||||
|
||||
def run(self,
|
||||
cmd,
|
||||
timeout=120,
|
||||
@@ -344,7 +406,10 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
|
||||
self._set_ssh_ip_if_required()
|
||||
|
||||
ssh = ["ssh", "-tt"]
|
||||
if is_using_login_shells():
|
||||
ssh = ["ssh", "-tt"]
|
||||
else:
|
||||
ssh = ["ssh"]
|
||||
|
||||
if port_forward:
|
||||
with cli_logger.group("Forwarding ports"):
|
||||
@@ -363,7 +428,10 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
"{}@{}".format(self.ssh_user, self.ssh_ip)
|
||||
]
|
||||
if cmd:
|
||||
final_cmd += _with_interactive(cmd)
|
||||
if is_using_login_shells():
|
||||
final_cmd += _with_interactive(cmd)
|
||||
else:
|
||||
final_cmd += [cmd]
|
||||
cli_logger.old_info(logger, "{}Running {}", self.log_prefix,
|
||||
" ".join(final_cmd))
|
||||
else:
|
||||
@@ -371,42 +439,16 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
# still create an interactive shell in some ssh versions.
|
||||
final_cmd.append(quote("while true; do sleep 86400; done"))
|
||||
|
||||
# todo: add a flag for this, we might
|
||||
# wanna log commands with print sometimes
|
||||
cli_logger.verbose("Running `{}`", cf.bold(cmd))
|
||||
with cli_logger.indented():
|
||||
cli_logger.very_verbose("Full command is `{}`",
|
||||
cf.bold(" ".join(final_cmd)))
|
||||
|
||||
def start_process():
|
||||
try:
|
||||
if with_output:
|
||||
return self.process_runner.check_output(final_cmd)
|
||||
else:
|
||||
self.process_runner.check_call(final_cmd)
|
||||
except subprocess.CalledProcessError as e:
|
||||
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
|
||||
if not cli_logger.old_style:
|
||||
raise ProcessRunnerError(
|
||||
"Command failed",
|
||||
"ssh_command_failed",
|
||||
code=e.returncode,
|
||||
command=quoted_cmd)
|
||||
|
||||
if exit_on_fail:
|
||||
raise click.ClickException(
|
||||
"Command failed: \n\n {}\n".format(quoted_cmd)) \
|
||||
from None
|
||||
else:
|
||||
raise click.ClickException(
|
||||
"SSH command Failed. See above for the output from the"
|
||||
" failure.") from None
|
||||
|
||||
if cli_logger.verbosity > 0:
|
||||
with cli_logger.indented():
|
||||
return start_process()
|
||||
return self._run_helper(final_cmd, with_output, exit_on_fail)
|
||||
else:
|
||||
return start_process()
|
||||
return self._run_helper(final_cmd, with_output, exit_on_fail)
|
||||
|
||||
def run_rsync_up(self, source, target):
|
||||
self._set_ssh_ip_if_required()
|
||||
@@ -418,7 +460,7 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
target)
|
||||
]
|
||||
cli_logger.verbose("Running `{}`", cf.bold(" ".join(command)))
|
||||
self.process_runner.check_call(command)
|
||||
self._run_helper(command, silent=True)
|
||||
|
||||
def run_rsync_down(self, source, target):
|
||||
self._set_ssh_ip_if_required()
|
||||
@@ -431,7 +473,7 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
source), target
|
||||
]
|
||||
cli_logger.verbose("Running `{}`", cf.bold(" ".join(command)))
|
||||
self.process_runner.check_call(command)
|
||||
self._run_helper(command, silent=True)
|
||||
|
||||
def remote_shell_command_str(self):
|
||||
if self.ssh_private_key:
|
||||
|
||||
@@ -29,10 +29,13 @@ from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
|
||||
|
||||
from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
||||
from ray.autoscaler.updater import NodeUpdaterThread
|
||||
from ray.autoscaler.command_runner import set_using_login_shells
|
||||
from ray.autoscaler.command_runner import DockerCommandRunner
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
from ray.worker import global_worker
|
||||
|
||||
import ray.autoscaler.subprocess_output_util as cmd_output_util
|
||||
|
||||
from ray.autoscaler.cli_logger import cli_logger
|
||||
import colorful as cf
|
||||
|
||||
@@ -95,21 +98,33 @@ def create_or_update_cluster(
|
||||
override_max_workers: Optional[int], no_restart: bool,
|
||||
restart_only: bool, yes: bool, override_cluster_name: Optional[str],
|
||||
no_config_cache: bool, log_old_style: bool, log_color: str,
|
||||
dump_command_output: bool, use_login_shells: bool,
|
||||
verbose: int) -> None:
|
||||
"""Create or updates an autoscaling Ray cluster from a config json."""
|
||||
cli_logger.old_style = log_old_style
|
||||
cli_logger.color_mode = log_color
|
||||
cli_logger.verbosity = verbose
|
||||
|
||||
# todo: disable by default when the command output handling PR makes it in
|
||||
cli_logger.dump_command_output = True
|
||||
set_using_login_shells(use_login_shells)
|
||||
cmd_output_util.set_output_redirected(not dump_command_output)
|
||||
|
||||
if use_login_shells:
|
||||
cli_logger.warning(
|
||||
"Commands running under a login shell can produce more "
|
||||
"output than special processing can handle.")
|
||||
cli_logger.warning(
|
||||
"Thus, the output from subcommands will be logged as is.")
|
||||
cli_logger.warning(
|
||||
"Consider using {}, {}.", cf.bold("--use-normal-shells"),
|
||||
cf.underlined("if you tested your workflow and it is compatible"))
|
||||
cli_logger.newline()
|
||||
|
||||
cli_logger.detect_colors()
|
||||
|
||||
def handle_yaml_error(e):
|
||||
cli_logger.error(
|
||||
"Cluster config invalid.\n"
|
||||
"Failed to load YAML file " + cf.bold("{}"), config_file)
|
||||
cli_logger.error("Cluster config invalid\n")
|
||||
cli_logger.error("Failed to load YAML file " + cf.bold("{}"),
|
||||
config_file)
|
||||
cli_logger.newline()
|
||||
with cli_logger.verbatim_error_ctx("PyYAML error:"):
|
||||
cli_logger.error(e)
|
||||
@@ -119,7 +134,7 @@ def create_or_update_cluster(
|
||||
config = yaml.safe_load(open(config_file).read())
|
||||
except FileNotFoundError:
|
||||
cli_logger.abort(
|
||||
"Provided cluster configuration file ({}) does not exist.",
|
||||
"Provided cluster configuration file ({}) does not exist",
|
||||
cf.bold(config_file))
|
||||
except yaml.parser.ParserError as e:
|
||||
handle_yaml_error(e)
|
||||
@@ -140,7 +155,7 @@ def create_or_update_cluster(
|
||||
raise NotImplementedError("Unsupported provider {}".format(
|
||||
config["provider"]))
|
||||
|
||||
cli_logger.success("Cluster configuration valid.\n")
|
||||
cli_logger.success("Cluster configuration valid\n")
|
||||
|
||||
printed_overrides = False
|
||||
|
||||
@@ -201,8 +216,17 @@ def _bootstrap_config(config: Dict[str, Any],
|
||||
# relatively cheap
|
||||
try_reload_log_state(config_cache["config"]["provider"],
|
||||
config_cache.get("provider_log_info"))
|
||||
cli_logger.verbose("Loaded cached config from " + cf.bold("{}"),
|
||||
cache_key)
|
||||
|
||||
cli_logger.newline()
|
||||
cli_logger.verbose_warning(
|
||||
"Loaded cached provider configuration "
|
||||
"from " + cf.bold("{}"), cache_key)
|
||||
if cli_logger.verbosity == 0:
|
||||
cli_logger.warning("Loaded cached provider configuration")
|
||||
cli_logger.warning(
|
||||
"If you experience issues with "
|
||||
"the cloud provider, try re-running "
|
||||
"the command with {}.", cf.bold("--no-config-cache"))
|
||||
|
||||
return config_cache["config"]
|
||||
else:
|
||||
@@ -435,6 +459,8 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
|
||||
override_cluster_name):
|
||||
"""Create the cluster head node, which in turn creates the workers."""
|
||||
provider = get_node_provider(config["provider"], config["cluster_name"])
|
||||
|
||||
raw_config_file = config_file # used for printing to the user
|
||||
config_file = os.path.abspath(config_file)
|
||||
try:
|
||||
head_node_tags = {
|
||||
@@ -631,17 +657,20 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
|
||||
logger, "get_or_create_head_node: "
|
||||
"Updating {} failed", head_node_ip)
|
||||
sys.exit(1)
|
||||
logger.info(
|
||||
"get_or_create_head_node: "
|
||||
"Head node up-to-date, IP address is: {}".format(head_node_ip))
|
||||
|
||||
monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
|
||||
if override_cluster_name:
|
||||
modifiers = " --cluster-name={}".format(
|
||||
quote(override_cluster_name))
|
||||
else:
|
||||
modifiers = ""
|
||||
print("To monitor auto-scaling activity, you can run:\n\n"
|
||||
cli_logger.old_info(
|
||||
logger, "get_or_create_head_node: "
|
||||
"Head node up-to-date, IP address is: {}", head_node_ip)
|
||||
|
||||
monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
|
||||
if override_cluster_name:
|
||||
modifiers = " --cluster-name={}".format(
|
||||
quote(override_cluster_name))
|
||||
else:
|
||||
modifiers = ""
|
||||
|
||||
if cli_logger.old_style:
|
||||
print("To monitor autoscaling activity, you can run:\n\n"
|
||||
" ray exec {} {}{}\n".format(config_file,
|
||||
quote(monitor_str), modifiers))
|
||||
print("To open a console on the cluster:\n\n"
|
||||
@@ -650,6 +679,17 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
|
||||
print("To get a remote shell to the cluster manually, run:\n\n"
|
||||
" {}\n".format(
|
||||
updater.cmd_runner.remote_shell_command_str()))
|
||||
|
||||
cli_logger.newline()
|
||||
with cli_logger.group("Useful commands"):
|
||||
cli_logger.print("Monitor auto-scailng with")
|
||||
cli_logger.print(
|
||||
cf.bold(" ray exec {}{} {}"), raw_config_file, modifiers,
|
||||
quote(monitor_str))
|
||||
|
||||
cli_logger.print("Connect to a terminal on the cluster head")
|
||||
cli_logger.print(
|
||||
cf.bold(" ray attach {}{}"), raw_config_file, modifiers)
|
||||
finally:
|
||||
provider.cleanup()
|
||||
|
||||
|
||||
@@ -0,0 +1,371 @@
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import sys
|
||||
|
||||
from ray.autoscaler.cli_logger import cli_logger
|
||||
import colorful as cf
|
||||
|
||||
CONN_REFUSED_PATIENCE = 30 # how long to wait for sshd to run
|
||||
|
||||
_config = {"redirect_output": True}
|
||||
|
||||
|
||||
def is_output_redirected():
|
||||
return _config["_redirect_output"]
|
||||
|
||||
|
||||
def set_output_redirected(val):
|
||||
"""Choose between logging to a temporary file and to `sys.stdout`.
|
||||
|
||||
The default is to log to a file.
|
||||
|
||||
Args:
|
||||
val (bool): If true, subprocess output will be redirected to
|
||||
a temporary file.
|
||||
"""
|
||||
_config["_redirect_output"] = val
|
||||
|
||||
|
||||
class ProcessRunnerError(Exception):
|
||||
def __init__(self,
|
||||
msg,
|
||||
msg_type,
|
||||
code=None,
|
||||
command=None,
|
||||
special_case=None):
|
||||
super(ProcessRunnerError, self).__init__(
|
||||
"{} (discovered={}): type={}, code={}, command={}".format(
|
||||
msg, special_case, msg_type, code, command))
|
||||
|
||||
self.msg_type = msg_type
|
||||
self.code = code
|
||||
self.command = command
|
||||
|
||||
self.special_case = special_case
|
||||
|
||||
|
||||
_ssh_output_regexes = {
|
||||
"known_host_update": re.compile(
|
||||
r"\s*Warning: Permanently added '.+' \(.+\) "
|
||||
r"to the list of known hosts.\s*"),
|
||||
"connection_closed": re.compile(r"\s*Shared connection to .+ closed.\s*"),
|
||||
"timeout": re.compile(r"\s*ssh: connect to host .+ port .+: "
|
||||
r"Operation timed out\s*"),
|
||||
"conn_refused": re.compile(
|
||||
r"\s*ssh: connect to host .+ port .+: Connection refused\s*")
|
||||
# todo: check for other connection failures for better error messages?
|
||||
}
|
||||
|
||||
|
||||
def _read_subprocess_stream(f, output_file, is_stdout=False):
|
||||
"""Read and process a subprocess output stream.
|
||||
|
||||
The goal is to find error messages and respond to them in a clever way.
|
||||
Currently just used for SSH messages (CONN_REFUSED, TIMEOUT, etc.), so
|
||||
the user does not get confused by these.
|
||||
|
||||
Ran in a thread each for both `stdout` and `stderr` to
|
||||
allow for cross-platform asynchronous IO.
|
||||
|
||||
Note: `select`-based IO is another option, but Windows has
|
||||
no support for `select`ing pipes, and Linux support varies somewhat.
|
||||
Spefically, Older *nix systems might also have quirks in how they
|
||||
handle `select` on pipes.
|
||||
|
||||
Args:
|
||||
f: File object for the stream.
|
||||
output_file: File object to which filtered output is written.
|
||||
is_stdout (bool):
|
||||
When `is_stdout` is `False`, the stream is assumed to
|
||||
be `stderr`. Different error message detectors are used,
|
||||
and the output is displayed to the user unless it matches
|
||||
a special case (e.g. SSH timeout), in which case this is
|
||||
left up to the caller.
|
||||
"""
|
||||
|
||||
detected_special_case = None
|
||||
while True:
|
||||
# ! Readline here is crucial.
|
||||
# ! Normal `read()` will block until EOF instead of until
|
||||
# something is available.
|
||||
line = f.readline()
|
||||
|
||||
if line is None or line == "":
|
||||
# EOF
|
||||
break
|
||||
|
||||
if line[-1] == "\n":
|
||||
line = line[:-1]
|
||||
|
||||
if not is_stdout:
|
||||
if _ssh_output_regexes["connection_closed"]\
|
||||
.fullmatch(line) is not None:
|
||||
# Do not log "connection closed" messages which SSH
|
||||
# puts in stderr for no reason.
|
||||
#
|
||||
# They are never errors since the connection will
|
||||
# close no matter whether the command succeeds or not.
|
||||
continue
|
||||
|
||||
if _ssh_output_regexes["timeout"].fullmatch(line) is not None:
|
||||
# Timeout is not really an error but rather a special
|
||||
# condition. It should be handled by the caller, since
|
||||
# network conditions/nodes in the early stages of boot
|
||||
# are expected to sometimes cause connection timeouts.
|
||||
if detected_special_case is not None:
|
||||
raise ValueError("Bug: ssh_timeout conflicts with another "
|
||||
"special codition: " +
|
||||
detected_special_case)
|
||||
|
||||
detected_special_case = "ssh_timeout"
|
||||
continue
|
||||
|
||||
if _ssh_output_regexes["conn_refused"]\
|
||||
.fullmatch(line) is not None:
|
||||
# Connection refused is not really an error but
|
||||
# rather a special condition. It should be handled by
|
||||
# the caller, since network conditions/nodes in the
|
||||
# early stages of boot are expected to sometimes cause
|
||||
# CONN_REFUSED.
|
||||
if detected_special_case is not None:
|
||||
raise ValueError(
|
||||
"Bug: ssh_conn_refused conflicts with another "
|
||||
"special codition: " + detected_special_case)
|
||||
|
||||
detected_special_case = "ssh_conn_refused"
|
||||
continue
|
||||
|
||||
if _ssh_output_regexes["known_host_update"]\
|
||||
.fullmatch(line) is not None:
|
||||
# Since we ignore SSH host control anyway
|
||||
# (-o UserKnownHostsFile=/dev/null),
|
||||
# we should silence the host control warnings.
|
||||
continue
|
||||
|
||||
cli_logger.error(line)
|
||||
|
||||
if output_file is not None:
|
||||
output_file.write(line + "\n")
|
||||
|
||||
return detected_special_case
|
||||
|
||||
|
||||
def _run_and_process_output(cmd,
|
||||
stdout_file,
|
||||
stderr_file=None,
|
||||
use_login_shells=False):
|
||||
"""Run a command and process its output for special cases.
|
||||
|
||||
Specifically, run all command output through regex to detect
|
||||
error conditions and filter out non-error messages that went to stderr
|
||||
anyway (SSH writes ALL of its "system" messages to stderr even if they
|
||||
are not actually errors).
|
||||
|
||||
Args:
|
||||
cmd (List[str]): Command to run.
|
||||
stdout_file: File to redirect stdout to.
|
||||
stdout_file: File to redirect stderr to.
|
||||
|
||||
Implementation notes:
|
||||
1. `use_login_shells` disables special processing
|
||||
If we run interactive apps, output processing will likely get
|
||||
overwhelemed with the interactive output elements.
|
||||
Thus we disable output processing for login shells. This makes
|
||||
the logging experience considerably worse, but it only degrades
|
||||
to old-style logging.
|
||||
|
||||
For example, `pip install` outputs HUNDREDS of progress-bar lines
|
||||
when downloading a package, and we have to
|
||||
read + regex + write all of them.
|
||||
|
||||
After all, even just printing output to console can often slow
|
||||
down a fast-printing app, and we do more than just print, and
|
||||
all that from Python, which is much slower than C regarding
|
||||
stream processing.
|
||||
|
||||
2. `stdin=PIPE` for subprocesses
|
||||
Do not inherit stdin as it messes with bash signals
|
||||
(ctrl-C for SIGINT) and these commands aren't supposed to
|
||||
take input anyway.
|
||||
|
||||
3. `ThreadPoolExecutor` without the `Pool`
|
||||
We use `ThreadPoolExecutor` to create futures from threads.
|
||||
Threads are never reused.
|
||||
|
||||
This approach allows us to have no custom synchronization by
|
||||
off-loading the return value and exception passing to the
|
||||
standard library (`ThreadPoolExecutor` internals).
|
||||
|
||||
This instance will be `shutdown()` ASAP so it's fine to
|
||||
create one in such a weird place.
|
||||
|
||||
The code is thus 100% thread-safe as long as the stream readers
|
||||
are read-only except for return values and possible exceptions.
|
||||
"""
|
||||
|
||||
if use_login_shells:
|
||||
# See implementation note #1
|
||||
if stdout_file is None:
|
||||
stdout_file = subprocess.DEVNULL
|
||||
if stderr_file is None:
|
||||
stderr_file = subprocess.DEVNULL
|
||||
|
||||
return subprocess.check_call(
|
||||
cmd,
|
||||
# See implementation note #2
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=stdout_file,
|
||||
stderr=stderr_file)
|
||||
|
||||
with subprocess.Popen(
|
||||
cmd,
|
||||
# See implementation note #2
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
bufsize=1, # line buffering
|
||||
universal_newlines=True # text mode outputs
|
||||
) as p:
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
# Closing stdin might be necessary to signal EOF to some
|
||||
# apps (they might get stuck waiting for input forever otherwise).
|
||||
p.stdin.close()
|
||||
|
||||
# See implementation note #3
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stdout_future = \
|
||||
executor.submit(_read_subprocess_stream,
|
||||
p.stdout, stdout_file,
|
||||
is_stdout=True)
|
||||
stderr_future = \
|
||||
executor.submit(_read_subprocess_stream,
|
||||
p.stderr, stderr_file,
|
||||
is_stdout=False)
|
||||
# Wait for completion.
|
||||
executor.shutdown()
|
||||
|
||||
# Update `p.returncode`
|
||||
p.poll()
|
||||
|
||||
detected_special_case = stdout_future.result()
|
||||
if stderr_future.result() is not None:
|
||||
if detected_special_case is not None:
|
||||
# This might some day need to be changed.
|
||||
# We should probably make sure the two special cases
|
||||
# are compatible then and that we can handle both by
|
||||
# e.g. reporting both to the caller.
|
||||
raise ValueError(
|
||||
"Bug: found a special case in both stdout and "
|
||||
"stderr. This is not valid behavior at the time "
|
||||
"of writing this code.")
|
||||
detected_special_case = stderr_future.result()
|
||||
|
||||
if p.returncode > 0:
|
||||
# Process failed, but not due to a signal, since signals
|
||||
# set the exit code to a negative value.
|
||||
raise ProcessRunnerError(
|
||||
"Command failed",
|
||||
"ssh_command_failed",
|
||||
code=p.returncode,
|
||||
command=cmd,
|
||||
special_case=detected_special_case)
|
||||
elif p.returncode < 0:
|
||||
# Process failed due to a signal, since signals
|
||||
# set the exit code to a negative value.
|
||||
raise ProcessRunnerError(
|
||||
"Command failed",
|
||||
"ssh_command_failed",
|
||||
code=p.returncode,
|
||||
command=cmd,
|
||||
special_case="died_to_signal")
|
||||
|
||||
return p.returncode
|
||||
|
||||
|
||||
def run_cmd_redirected(cmd, silent=False, use_login_shells=False):
|
||||
"""Run a command and optionally redirect output to a file.
|
||||
|
||||
Args:
|
||||
cmd (List[str]): Command to run.
|
||||
silent (bool): If true, the command output will be silenced completely
|
||||
(redirected to /dev/null), unless verbose logging
|
||||
is enabled. Use this for runnign utility commands like
|
||||
rsync.
|
||||
"""
|
||||
if silent and cli_logger.verbosity < 1:
|
||||
return _run_and_process_output(
|
||||
cmd, stdout_file=None, use_login_shells=use_login_shells)
|
||||
|
||||
if not is_output_redirected():
|
||||
return _run_and_process_output(
|
||||
cmd, stdout_file=sys.stdout, use_login_shells=use_login_shells)
|
||||
else:
|
||||
tmpfile_path = os.path.join(
|
||||
tempfile.gettempdir(), "ray-up-{}-{}.txt".format(
|
||||
cmd[0], time.time()))
|
||||
with open(
|
||||
tmpfile_path,
|
||||
mode="w",
|
||||
# line buffering
|
||||
buffering=1) as tmp:
|
||||
cli_logger.verbose("Command stdout is redirected to {}",
|
||||
cf.bold(tmp.name))
|
||||
cli_logger.verbose(
|
||||
cf.gray("Use --dump-command-output to "
|
||||
"dump to terminal instead."))
|
||||
|
||||
return _run_and_process_output(
|
||||
cmd,
|
||||
stdout_file=tmp,
|
||||
stderr_file=tmp,
|
||||
use_login_shells=use_login_shells)
|
||||
|
||||
|
||||
def handle_ssh_fails(e, first_conn_refused_time, retry_interval):
|
||||
"""Handle SSH system failures coming from a subprocess.
|
||||
|
||||
Args:
|
||||
e: The `ProcessRunnerException` to handle.
|
||||
first_conn_refused_time:
|
||||
The time (as reported by this function) or None,
|
||||
indicating the last time a CONN_REFUSED error was caught.
|
||||
|
||||
After exceeding a patience value, the program will be aborted
|
||||
since SSH will likely never recover.
|
||||
retry_interval: The interval after which the command will be retried,
|
||||
used here just to inform the user.
|
||||
"""
|
||||
if e.msg_type != "ssh_command_failed":
|
||||
return
|
||||
|
||||
if e.special_case == "ssh_conn_refused":
|
||||
if first_conn_refused_time is not None and \
|
||||
time.time() - first_conn_refused_time > \
|
||||
CONN_REFUSED_PATIENCE:
|
||||
cli_logger.error(
|
||||
"SSH connection was being refused "
|
||||
"for {} seconds. Head node assumed "
|
||||
"unreachable.", cf.bold(str(CONN_REFUSED_PATIENCE)))
|
||||
cli_logger.abort("Check the node's firewall settings "
|
||||
"and the cloud network configuration.")
|
||||
|
||||
cli_logger.warning("SSH connection was refused.")
|
||||
cli_logger.warning("This might mean that the SSH daemon is "
|
||||
"still setting up, or that "
|
||||
"the host is inaccessable (e.g. due to "
|
||||
"a firewall).")
|
||||
|
||||
return time.time()
|
||||
|
||||
if e.special_case in ["ssh_timeout", "ssh_conn_refused"]:
|
||||
cli_logger.print("SSH still not available, "
|
||||
"retrying in {} seconds.", cf.bold(
|
||||
str(retry_interval)))
|
||||
else:
|
||||
raise e
|
||||
|
||||
return first_conn_refused_time
|
||||
@@ -10,9 +10,12 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \
|
||||
TAG_RAY_FILE_MOUNTS_CONTENTS, \
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \
|
||||
STATUS_SETTING_UP, STATUS_SYNCING_FILES
|
||||
from ray.autoscaler.command_runner import NODE_START_WAIT_S, SSHOptions
|
||||
from ray.autoscaler.command_runner import NODE_START_WAIT_S, SSHOptions, \
|
||||
ProcessRunnerError
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
|
||||
import ray.autoscaler.subprocess_output_util as cmd_output_util
|
||||
|
||||
from ray.autoscaler.cli_logger import cli_logger
|
||||
import colorful as cf
|
||||
|
||||
@@ -172,6 +175,7 @@ class NodeUpdater:
|
||||
self.log_prefix)
|
||||
|
||||
cli_logger.print("Running `{}` as a test.", cf.bold("uptime"))
|
||||
first_conn_refused_time = None
|
||||
while time.time() < deadline and \
|
||||
not self.provider.is_terminated(self.node_id):
|
||||
try:
|
||||
@@ -183,7 +187,20 @@ class NodeUpdater:
|
||||
cli_logger.old_debug(logger, "Uptime succeeded.")
|
||||
cli_logger.success("Success.")
|
||||
return True
|
||||
except ProcessRunnerError as e:
|
||||
first_conn_refused_time = \
|
||||
cmd_output_util.handle_ssh_fails(
|
||||
e, first_conn_refused_time,
|
||||
retry_interval=READY_CHECK_INTERVAL)
|
||||
time.sleep(READY_CHECK_INTERVAL)
|
||||
except Exception as e:
|
||||
# TODO(maximsmol): we should not be ignoring
|
||||
# exceptions if they get filtered properly
|
||||
# (new style log + non-interactive shells)
|
||||
#
|
||||
# however threading this configuration state
|
||||
# is a pain and I'm leaving it for later
|
||||
|
||||
retry_str = str(e)
|
||||
if hasattr(e, "cmd"):
|
||||
retry_str = "(Exit Status {}): {}".format(
|
||||
@@ -249,35 +266,42 @@ class NodeUpdater:
|
||||
if self.initialization_commands:
|
||||
with cli_logger.group(
|
||||
"Running initialization commands",
|
||||
_numbered=("[]", 4,
|
||||
6)): # todo: fix command numbering
|
||||
_numbered=("[]", 3, 5)):
|
||||
with LogTimer(
|
||||
self.log_prefix + "Initialization commands",
|
||||
show_status=True):
|
||||
|
||||
for cmd in self.initialization_commands:
|
||||
self.cmd_runner.run(
|
||||
cmd,
|
||||
ssh_options_override=SSHOptions(
|
||||
self.auth_config.get(
|
||||
"ssh_private_key")))
|
||||
try:
|
||||
self.cmd_runner.run(
|
||||
cmd,
|
||||
ssh_options_override=SSHOptions(
|
||||
self.auth_config.get(
|
||||
"ssh_private_key")))
|
||||
except ProcessRunnerError as e:
|
||||
if e.msg_type == "ssh_command_failed":
|
||||
cli_logger.error("Failed.")
|
||||
cli_logger.error(
|
||||
"See above for stderr.")
|
||||
|
||||
raise click.ClickException(
|
||||
"Initialization command failed.")
|
||||
else:
|
||||
cli_logger.print(
|
||||
"No initialization commands to run.",
|
||||
_numbered=("[]", 4, 6))
|
||||
_numbered=("[]", 3, 6))
|
||||
|
||||
if self.setup_commands:
|
||||
with cli_logger.group(
|
||||
"Running setup commands",
|
||||
_numbered=("[]", 5,
|
||||
6)): # todo: fix command numbering
|
||||
# todo: fix command numbering
|
||||
_numbered=("[]", 4, 6)):
|
||||
with LogTimer(
|
||||
self.log_prefix + "Setup commands",
|
||||
show_status=True):
|
||||
|
||||
total = len(self.setup_commands)
|
||||
for i, cmd in enumerate(self.setup_commands):
|
||||
if cli_logger.verbosity == 0:
|
||||
if cli_logger.verbosity == 0 and len(cmd) > 30:
|
||||
cmd_to_print = cf.bold(cmd[:30]) + "..."
|
||||
else:
|
||||
cmd_to_print = cf.bold(cmd)
|
||||
@@ -287,17 +311,33 @@ class NodeUpdater:
|
||||
cmd_to_print,
|
||||
_numbered=("()", i, total))
|
||||
|
||||
self.cmd_runner.run(cmd)
|
||||
try:
|
||||
self.cmd_runner.run(cmd)
|
||||
except ProcessRunnerError as e:
|
||||
if e.msg_type == "ssh_command_failed":
|
||||
cli_logger.error("Failed.")
|
||||
cli_logger.error(
|
||||
"See above for stderr.")
|
||||
|
||||
raise click.ClickException(
|
||||
"Setup command failed.")
|
||||
else:
|
||||
cli_logger.print(
|
||||
"No setup commands to run.", _numbered=("[]", 5, 6))
|
||||
"No setup commands to run.", _numbered=("[]", 4, 6))
|
||||
|
||||
with cli_logger.group(
|
||||
"Starting the Ray runtime", _numbered=("[]", 6, 6)):
|
||||
with LogTimer(
|
||||
self.log_prefix + "Ray start commands", show_status=True):
|
||||
for cmd in self.ray_start_commands:
|
||||
self.cmd_runner.run(cmd)
|
||||
try:
|
||||
self.cmd_runner.run(cmd)
|
||||
except ProcessRunnerError as e:
|
||||
if e.msg_type == "ssh_command_failed":
|
||||
cli_logger.error("Failed.")
|
||||
cli_logger.error("See above for stderr.")
|
||||
|
||||
raise click.ClickException("Start command failed.")
|
||||
|
||||
def rsync_up(self, source, target):
|
||||
cli_logger.old_info(logger, "{}Syncing {} to {}...", self.log_prefix,
|
||||
|
||||
@@ -704,9 +704,23 @@ def stop(force, verbose):
|
||||
default="auto",
|
||||
help=("Use color logging. "
|
||||
"Valid values are: auto (if stdout is a tty), true, false."))
|
||||
@click.option(
|
||||
"--dump-command-output",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help=("Print command output straight to "
|
||||
"the terminal instead of redirecting to a file."))
|
||||
@click.option(
|
||||
"--use-login-shells/--use-normal-shells",
|
||||
is_flag=True,
|
||||
default=True,
|
||||
help=("Ray uses login shells (bash --login -i) to run cluster commands "
|
||||
"by default. If your workflow is compatible with normal shells, "
|
||||
"this can be disabled for a better user experience."))
|
||||
@click.option("-v", "--verbose", count=True)
|
||||
def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
|
||||
yes, cluster_name, no_config_cache, log_old_style, log_color, verbose):
|
||||
yes, cluster_name, no_config_cache, log_old_style, log_color,
|
||||
dump_command_output, use_login_shells, verbose):
|
||||
"""Create or update a Ray cluster."""
|
||||
if restart_only or no_restart:
|
||||
assert restart_only != no_restart, "Cannot set both 'restart_only' " \
|
||||
@@ -724,7 +738,7 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
|
||||
create_or_update_cluster(cluster_config_file, min_workers, max_workers,
|
||||
no_restart, restart_only, yes, cluster_name,
|
||||
no_config_cache, log_old_style, log_color,
|
||||
verbose)
|
||||
dump_command_output, use_login_shells, verbose)
|
||||
|
||||
|
||||
@cli.command()
|
||||
|
||||
Reference in New Issue
Block a user