[cli] Remove the deprecated old_style logging calls (#10776)

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Maksim Smolin
2020-11-02 23:40:18 -08:00
committed by GitHub
parent e7f7cb29c4
commit 0a6d24a727
12 changed files with 27 additions and 399 deletions
+2 -50
View File
@@ -217,9 +217,6 @@ def _configure_iam_role(config):
cli_logger.verbose(
"Creating new IAM instance profile {} for use as the default.",
cf.bold(DEFAULT_RAY_INSTANCE_PROFILE))
cli_logger.old_info(
logger, "_configure_iam_role: "
"Creating new instance profile {}", DEFAULT_RAY_INSTANCE_PROFILE)
client = _client("iam", config)
client.create_instance_profile(
InstanceProfileName=DEFAULT_RAY_INSTANCE_PROFILE)
@@ -237,8 +234,6 @@ def _configure_iam_role(config):
"Creating new IAM role {} for "
"use as the default instance role.",
cf.bold(DEFAULT_RAY_IAM_ROLE))
cli_logger.old_info(logger, "_configure_iam_role: "
"Creating new role {}", DEFAULT_RAY_IAM_ROLE)
iam = _resource("iam", config)
iam.create_role(
RoleName=DEFAULT_RAY_IAM_ROLE,
@@ -265,9 +260,6 @@ def _configure_iam_role(config):
profile.add_role(RoleName=role.name)
time.sleep(15) # wait for propagation
cli_logger.old_info(
logger, "_configure_iam_role: "
"Role not specified for head node, using {}", profile.arn)
config["head_node"]["IamInstanceProfile"] = {"Arn": profile.arn}
return config
@@ -323,9 +315,6 @@ def _configure_key_pair(config):
cli_logger.verbose(
"Creating new key pair {} for use as the default.",
cf.bold(key_name))
cli_logger.old_info(
logger, "_configure_key_pair: "
"Creating new key pair {}", key_name)
key = ec2.create_key_pair(KeyName=key_name)
# We need to make sure to _create_ the file with the right
@@ -352,10 +341,6 @@ def _configure_key_pair(config):
assert os.path.exists(key_path), \
"Private key file {} not found for {}".format(key_path, key_name)
cli_logger.old_info(
logger, "_configure_key_pair: "
"KeyName not specified for nodes, using {}", key_name)
config["auth"]["ssh_private_key"] = key_path
config["head_node"]["KeyName"] = key_name
config["worker_nodes"]["KeyName"] = key_name
@@ -409,23 +394,15 @@ def _configure_subnet(config):
format(config["provider"]["availability_zone"]))
subnet_ids = [s.subnet_id for s in subnets]
subnet_descr = [(s.subnet_id, s.availability_zone) for s in subnets]
if "SubnetIds" not in config["head_node"]:
_set_config_info(head_subnet_src="default")
config["head_node"]["SubnetIds"] = subnet_ids
cli_logger.old_info(
logger, "_configure_subnet: "
"SubnetIds not specified for head node, using {}", subnet_descr)
else:
_set_config_info(head_subnet_src="config")
if "SubnetIds" not in config["worker_nodes"]:
_set_config_info(workers_subnet_src="default")
config["worker_nodes"]["SubnetIds"] = subnet_ids
cli_logger.old_info(
logger, "_configure_subnet: "
"SubnetId not specified for workers,"
" using {}", subnet_descr)
else:
_set_config_info(workers_subnet_src="config")
@@ -449,20 +426,12 @@ def _configure_security_group(config):
head_sg = security_groups[NODE_KIND_HEAD]
_set_config_info(head_security_group_src="default")
cli_logger.old_info(
logger, "_configure_security_group: "
"SecurityGroupIds not specified for head node, using {} ({})",
head_sg.group_name, head_sg.id)
config["head_node"]["SecurityGroupIds"] = [head_sg.id]
if NODE_KIND_WORKER in node_types_to_configure:
workers_sg = security_groups[NODE_KIND_WORKER]
_set_config_info(workers_security_group_src="default")
cli_logger.old_info(
logger, "_configure_security_group: "
"SecurityGroupIds not specified for workers, using {} ({})",
workers_sg.group_name, workers_sg.id)
config["worker_nodes"]["SecurityGroupIds"] = [workers_sg.id]
return config
@@ -482,26 +451,10 @@ def _check_ami(config):
if config["head_node"].get("ImageId", "").lower() == "latest_dlami":
config["head_node"]["ImageId"] = default_ami
_set_config_info(head_ami_src="dlami")
cli_logger.old_info(
logger,
"_check_ami: head node ImageId is 'latest_dlami'. "
"Using '{ami_id}', which is the default {ami_name} "
"for your region ({region}).",
ami_id=default_ami,
ami_name=DEFAULT_AMI_NAME,
region=region)
if config["worker_nodes"].get("ImageId", "").lower() == "latest_dlami":
config["worker_nodes"]["ImageId"] = default_ami
_set_config_info(workers_ami_src="dlami")
cli_logger.old_info(
logger,
"_check_ami: worker nodes ImageId is 'latest_dlami'. "
"Using '{ami_id}', which is the default {ami_name} "
"for your region ({region}).",
ami_id=default_ami,
ami_name=DEFAULT_AMI_NAME,
region=region)
def _upsert_security_groups(config, node_types):
@@ -600,9 +553,8 @@ def _create_security_group(config, vpc_id, group_name):
"Created new security group {}",
cf.bold(security_group.group_name),
_tags=dict(id=security_group.id))
cli_logger.old_info(
logger, "_create_security_group: Created new security group {} ({})",
security_group.group_name, security_group.id)
cli_logger.doassert(security_group,
"Failed to create security group") # err msg
assert security_group, "Failed to create security group"
return security_group
@@ -248,11 +248,6 @@ class AWSNodeProvider(NodeProvider):
"To disable reuse, set `cache_stopped_nodes: False` "
"under `provider` in the cluster configuration.",
cli_logger.render_list(reuse_node_ids))
cli_logger.old_info(
logger, "AWSNodeProvider: reusing instances {}. "
"To disable reuse, set "
"'cache_stopped_nodes: False' in the provider "
"config.", reuse_node_ids)
# todo: timed?
with cli_logger.group("Stopping instances to reuse"):
@@ -263,10 +258,6 @@ class AWSNodeProvider(NodeProvider):
if node.state["Name"] == "stopping":
cli_logger.print("Waiting for instance {} to stop",
node.id)
cli_logger.old_info(
logger,
"AWSNodeProvider: waiting for instance "
"{} to fully stop...", node.id)
node.wait_until_stopped()
self.ec2.meta.client.start_instances(
@@ -328,10 +319,6 @@ class AWSNodeProvider(NodeProvider):
try:
subnet_id = subnet_ids[self.subnet_idx % len(subnet_ids)]
cli_logger.old_info(
logger, "NodeProvider: calling create_instances "
"with {} (count={}).", subnet_id, count)
self.subnet_idx += 1
conf.update({
"MinCount": 1,
@@ -366,26 +353,17 @@ class AWSNodeProvider(NodeProvider):
_tags=dict(
state=instance.state["Name"],
info=state_reason["Message"]))
cli_logger.old_info(
logger, "NodeProvider: Created instance "
"[id={}, name={}, info={}]", instance.instance_id,
instance.state["Name"], state_reason["Message"])
break
except botocore.exceptions.ClientError as exc:
if attempt == BOTO_CREATE_MAX_RETRIES:
# todo: err msg
cli_logger.abort(
"Failed to launch instances. Max attempts exceeded.")
cli_logger.old_error(
logger,
"create_instances: Max attempts ({}) exceeded.",
BOTO_CREATE_MAX_RETRIES)
raise exc
else:
cli_logger.print(
"create_instances: Attempt failed with {}, retrying.",
exc)
cli_logger.old_error(logger, exc)
def terminate_node(self, node_id):
node = self._get_cached_node(node_id)
@@ -395,11 +373,6 @@ class AWSNodeProvider(NodeProvider):
"Terminating instance {} " +
cf.dimmed("(cannot stop spot instances, only terminate)"),
node_id) # todo: show node name?
cli_logger.old_info(
logger,
"AWSNodeProvider: terminating node {} (spot nodes cannot "
"be stopped, only terminated)", node_id)
node.terminate()
else:
cli_logger.print("Stopping instance {} " + cf.dimmed(
@@ -407,12 +380,6 @@ class AWSNodeProvider(NodeProvider):
"set `cache_stopped_nodes: False` "
"under `provider` in the cluster configuration)"),
node_id) # todo: show node name?
cli_logger.old_info(
logger,
"AWSNodeProvider: stopping node {}. To terminate nodes "
"on stop, set 'cache_stopped_nodes: False' in the "
"provider config.".format(node_id))
node.stop()
else:
node.terminate()
@@ -441,11 +408,6 @@ class AWSNodeProvider(NodeProvider):
"set `cache_stopped_nodes: False` "
"under `provider` in the cluster configuration)"),
cli_logger.render_list(on_demand_ids))
cli_logger.old_info(
logger,
"AWSNodeProvider: stopping nodes {}. To terminate nodes "
"on stop, set 'cache_stopped_nodes: False' in the "
"provider config.", on_demand_ids)
self.ec2.meta.client.stop_instances(InstanceIds=on_demand_ids)
if spot_ids:
@@ -453,10 +415,6 @@ class AWSNodeProvider(NodeProvider):
"Terminating instances {} " +
cf.dimmed("(cannot stop spot instances, only terminate)"),
cli_logger.render_list(spot_ids))
cli_logger.old_info(
logger,
"AWSNodeProvider: terminating nodes {} (spot nodes cannot "
"be stopped, only terminated)", spot_ids)
self.ec2.meta.client.terminate_instances(InstanceIds=spot_ids)
else:
@@ -26,11 +26,6 @@ class LazyDefaultDict(defaultdict):
def handle_boto_error(exc, msg, *args, **kwargs):
if cli_logger.old_style:
# old-style logging doesn't do anything here
# so we exit early
return
error_code = None
error_info = None
# todo: not sure if these exceptions always have response
@@ -256,11 +256,6 @@ class _CliLogger():
to 'record' style logging.
Attributes:
old_style (bool):
If `old_style` is `True`, the old logging calls are used instead
of the new CLI UX. This is disabled by default and remains for
backwards compatibility. Currently can only be set via env var
RAY_LOG_NEWSTYLE="0".
color_mode (str):
Can be "true", "false", or "auto".
@@ -276,7 +271,6 @@ class _CliLogger():
Low verbosity will disable `verbose` and `very_verbose` messages.
"""
old_style: bool
color_mode: str
# color_mode: Union[Literal["auto"], Literal["false"], Literal["true"]]
indent_level: int
@@ -286,7 +280,6 @@ class _CliLogger():
_autodetected_cf_colormode: int
def __init__(self):
self.old_style = os.environ.get("RAY_LOG_NEWSTYLE", "1") == "0"
self.indent_level = 0
self._verbosity = 0
@@ -476,9 +469,6 @@ class _CliLogger():
For other arguments, see `_format_msg`.
"""
if self.old_style:
return
self._print(
cf.skyBlue(key) + ": " +
_format_msg(cf.bold(msg), *args, **kwargs))
@@ -567,9 +557,6 @@ class _CliLogger():
For arguments, see `_format_msg`.
"""
if self.old_style:
return
self._print(_format_msg(msg, *args, **kwargs), _level_str=_level_str)
def abort(self,
@@ -582,9 +569,6 @@ class _CliLogger():
Print an error and throw an exception to terminate the program
(the exception will not print a message).
"""
if self.old_style:
return
if msg is not None:
self._error(msg, *args, _level_str="PANIC", **kwargs)
@@ -604,9 +588,6 @@ class _CliLogger():
For other arguments, see `_format_msg`.
"""
if self.old_style:
return
if not val:
exc = None
if not self.pretty:
@@ -618,101 +599,6 @@ class _CliLogger():
# for AssertionError and raise them normally
self.abort(msg, *args, exc=exc, **kwargs)
def old_debug(self, logger: logging.Logger, msg: str, *args: Any,
**kwargs: Any):
"""Old debug logging proxy.
Pass along an old debug log iff new logging is disabled.
Supports the new formatting features.
Args:
logger (logging.Logger):
Logger to use if old logging behavior is selected.
For other arguments, see `_format_msg`.
"""
if self.old_style:
logger.debug(
_format_msg(msg, *args, **kwargs),
extra=_external_caller_info())
return
def old_info(self, logger: logging.Logger, msg: str, *args: Any,
**kwargs: Any):
"""Old info logging proxy.
Pass along an old info log iff new logging is disabled.
Supports the new formatting features.
Args:
logger (logging.Logger):
Logger to use if old logging behavior is selected.
For other arguments, see `_format_msg`.
"""
if self.old_style:
logger.info(
_format_msg(msg, *args, **kwargs),
extra=_external_caller_info())
return
def old_warning(self, logger: logging.Logger, msg: str, *args: Any,
**kwargs: Any):
"""Old warning logging proxy.
Pass along an old warning log iff new logging is disabled.
Supports the new formatting features.
Args:
logger (logging.Logger):
Logger to use if old logging behavior is selected.
For other arguments, see `_format_msg`.
"""
if self.old_style:
logger.warning(
_format_msg(msg, *args, **kwargs),
extra=_external_caller_info())
return
def old_error(self, logger: logging.Logger, msg: str, *args: Any,
**kwargs: Any):
"""Old error logging proxy.
Pass along an old error log iff new logging is disabled.
Supports the new formatting features.
Args:
logger (logging.Logger):
Logger to use if old logging behavior is selected.
For other arguments, see `_format_msg`.
"""
if self.old_style:
logger.error(
_format_msg(msg, *args, **kwargs),
extra=_external_caller_info())
return
def old_exception(self, logger: logging.Logger, msg: str, *args: Any,
**kwargs: Any):
"""Old exception logging proxy.
Pass along an old exception log iff new logging is disabled.
Supports the new formatting features.
Args:
logger (logging.Logger):
Logger to use if old logging behavior is selected.
For other arguments, see `_format_msg`.
"""
if self.old_style:
logger.exception(
_format_msg(msg, *args, **kwargs),
extra=_external_caller_info())
return
def render_list(self, xs: List[str], separator: str = cf.reset(", ")):
"""Render a list of bolded values using a non-bolded separator.
"""
@@ -739,9 +625,6 @@ class _CliLogger():
The default action to take if the user just presses enter
with no input.
"""
if self.old_style:
return
should_abort = _abort
default = _default
@@ -821,9 +704,6 @@ class _CliLogger():
Returns:
The string entered by the user.
"""
if self.old_style:
return
complete_str = cf.underlined(msg)
rendered_message = _format_msg(complete_str, *args, **kwargs)
# the rendered message ends with ascii coding
@@ -841,16 +721,6 @@ class _CliLogger():
return res
def old_confirm(self, msg: str, yes: bool):
"""Old confirm dialog proxy.
Let `click` display a confirm dialog iff new logging is disabled.
"""
if not self.old_style:
return
return None if yes else click.confirm(msg, abort=True)
class SilentClickException(click.ClickException):
"""`ClickException` that does not print a message.
@@ -326,9 +326,6 @@ class SSHCommandRunner(CommandRunnerInterface):
with cli_logger.group("Waiting for IP"):
while time.time() < deadline and \
not self.provider.is_terminated(self.node_id):
cli_logger.old_info(logger, "{}Waiting for IP...",
self.log_prefix)
ip = self._get_node_ip()
if ip is not None:
cli_logger.labeled_value("Received", ip)
@@ -362,7 +359,6 @@ class SSHCommandRunner(CommandRunnerInterface):
os.makedirs(self.ssh_control_path, mode=0o700, exist_ok=True)
except OSError as e:
cli_logger.warning("{}", str(e)) # todo: msg
cli_logger.old_warning(logger, "{}", str(e))
def _run_helper(self,
final_cmd,
@@ -391,7 +387,7 @@ class SSHCommandRunner(CommandRunnerInterface):
# For now, if the output is needed we just skip the new logic.
# In the future we could update the new logic to support
# capturing output, but it is probably not needed.
if not cli_logger.old_style and not with_output:
if not with_output:
return run_cmd_redirected(
final_cmd,
process_runner=self.process_runner,
@@ -403,7 +399,7 @@ class SSHCommandRunner(CommandRunnerInterface):
return self.process_runner.check_call(final_cmd)
except subprocess.CalledProcessError as e:
joined_cmd = " ".join(final_cmd)
if not cli_logger.old_style and not is_using_login_shells():
if not is_using_login_shells():
raise ProcessRunnerError(
"Command failed",
"ssh_command_failed",
@@ -458,9 +454,6 @@ class SSHCommandRunner(CommandRunnerInterface):
cli_logger.verbose(
"Forwarding port {} to port {} on localhost.",
cf.bold(local), cf.bold(remote)) # todo: msg
cli_logger.old_info(logger,
"{}Forwarding {} -> localhost:{}",
self.log_prefix, local, remote)
ssh += ["-L", "{}:localhost:{}".format(remote, local)]
final_cmd = ssh + ssh_options.to_ssh_options_list(timeout=timeout) + [
@@ -473,8 +466,6 @@ class SSHCommandRunner(CommandRunnerInterface):
final_cmd += _with_interactive(cmd)
else:
final_cmd += [cmd]
cli_logger.old_info(logger, "{}Running {}", self.log_prefix,
" ".join(final_cmd))
else:
# We do this because `-o ControlMaster` causes the `-N` flag to
# still create an interactive shell in some ssh versions.
+1 -71
View File
@@ -203,10 +203,6 @@ def create_or_update_cluster(config_file: str,
cli_logger.labeled_value("Cluster", config["cluster_name"])
# disable the cli_logger here if needed
# because it only supports aws
if config["provider"]["type"] != "aws":
cli_logger.old_style = True
cli_logger.newline()
config = _bootstrap_config(config, no_config_cache=no_config_cache)
@@ -228,8 +224,6 @@ def _bootstrap_config(config: Dict[str, Any],
"ray-config-{}".format(hasher.hexdigest()))
if os.path.exists(cache_key) and not no_config_cache:
cli_logger.old_info(logger, "Using cached config at {}", cache_key)
config_cache = json.loads(open(cache_key).read())
if config_cache.get("_version", -1) == CONFIG_CACHE_VERSION:
# todo: is it fine to re-resolve? afaik it should be.
@@ -294,7 +288,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
validate_config(config)
cli_logger.confirm(yes, "Destroying cluster.", _abort=True)
cli_logger.old_confirm("This will destroy your cluster", yes)
if not workers_only:
try:
@@ -319,9 +312,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
"Ignoring the exception and "
"attempting to shut down the cluster nodes anyway.")
cli_logger.old_exception(
logger, "Ignoring error attempting a clean shutdown.")
provider = _get_node_provider(config["provider"], config["cluster_name"])
def remaining_nodes():
@@ -331,13 +321,10 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
if keep_min_workers:
min_workers = config.get("min_workers", 0)
cli_logger.print(
"{} random worker nodes will not be shut down. " +
cf.dimmed("(due to {})"), cf.bold(min_workers),
cf.bold("--keep-min-workers"))
cli_logger.old_info(
logger, "teardown_cluster: Keeping {} nodes...", min_workers)
workers = random.sample(workers, len(workers) - min_workers)
@@ -370,7 +357,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
with_output=False)
except Exception:
cli_logger.warning(f"Docker stop failed on {node}")
cli_logger.old_warning(logger, f"Docker stop failed on {node}")
# Loop here to check that both the head and worker nodes are actually
# really gone
@@ -383,10 +369,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
with LogTimer("teardown_cluster: done."):
while A:
cli_logger.old_info(
logger, "teardown_cluster: "
"Shutting down {} nodes...", len(A))
provider.terminate_nodes(A)
cli_logger.print(
@@ -411,7 +393,6 @@ def kill_node(config_file: str, yes: bool, hard: bool,
config = _bootstrap_config(config)
cli_logger.confirm(yes, "A random node will be killed.")
cli_logger.old_confirm("This will kill a node in your cluster", yes)
provider = _get_node_provider(config["provider"], config["cluster_name"])
nodes = provider.non_terminated_nodes({
@@ -419,7 +400,6 @@ def kill_node(config_file: str, yes: bool, hard: bool,
})
node = random.choice(nodes)
cli_logger.print("Shutdown " + cf.bold("{}"), node)
cli_logger.old_info(logger, "kill_node: Shutdown worker {}", node)
if hard:
provider.terminate_node(node)
else:
@@ -472,10 +452,6 @@ def warn_about_bad_start_command(start_commands: List[str]) -> None:
cli_logger.warning(
"Ray runtime will not be started because `{}` is not in `{}`.",
cf.bold("ray start"), cf.bold("head_start_ray_commands"))
cli_logger.old_warning(
logger,
"Ray start is not included in the head_start_ray_commands section."
)
if not any("autoscaling-config" in x for x in ray_start_cmd):
cli_logger.warning(
"The head node will not launch any workers because "
@@ -484,11 +460,6 @@ def warn_about_bad_start_command(start_commands: List[str]) -> None:
cf.bold("ray start"), cf.bold("--autoscaling-config"),
cf.bold("--autoscaling-config=~/ray_bootstrap_config.yaml"),
cf.bold("ray start"), cf.bold("head_start_ray_commands"))
cli_logger.old_warning(
logger, "Ray start on the head node does not have the flag"
"--autoscaling-config set. The head node will not launch"
"workers. Add --autoscaling-config=~/ray_bootstrap_config.yaml"
"to ray start in the head_start_ray_commands section.")
def get_or_create_head_node(config: Dict[str, Any],
@@ -520,9 +491,6 @@ def get_or_create_head_node(config: Dict[str, Any],
"No head node found. "
"Launching a new cluster.",
_abort=True)
cli_logger.old_confirm("This will create a new cluster", yes)
elif not no_restart:
cli_logger.old_confirm("This will restart cluster services", yes)
if head_node:
if restart_only:
@@ -549,6 +517,7 @@ def get_or_create_head_node(config: Dict[str, Any],
yes,
cf.bold("Cluster Ray runtime will be restarted."),
_abort=True)
cli_logger.newline()
# TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync)
@@ -572,19 +541,10 @@ def get_or_create_head_node(config: Dict[str, Any],
provider.node_tags(head_node)
.get(TAG_RAY_LAUNCH_CONFIG)), cf.bold(launch_hash))
cli_logger.confirm(yes, "Relaunching it.", _abort=True)
cli_logger.old_confirm(
"Head node config out-of-date. It will be terminated", yes)
cli_logger.old_info(
logger, "get_or_create_head_node: "
"Shutting down outdated head node {}", head_node)
provider.terminate_node(head_node)
cli_logger.print("Terminated head node {}", head_node)
cli_logger.old_info(
logger, "get_or_create_head_node: Launching new head node...")
head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash
head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format(
config["cluster_name"])
@@ -620,9 +580,6 @@ def get_or_create_head_node(config: Dict[str, Any],
(runtime_hash, file_mounts_contents_hash) = hash_runtime_conf(
config["file_mounts"], None, config)
cli_logger.old_info(
logger, "get_or_create_head_node: Updating files on head node...")
# Rewrite the auth config so that the head
# node can update the workers
remote_config = copy.deepcopy(config)
@@ -697,40 +654,17 @@ def get_or_create_head_node(config: Dict[str, Any],
# Refresh the node cache so we see the external ip if available
provider.non_terminated_nodes(head_node_tags)
if config.get("provider", {}).get("use_internal_ips", False) is True:
head_node_ip = provider.internal_ip(head_node)
else:
head_node_ip = provider.external_ip(head_node)
if updater.exitcode != 0:
# todo: this does not follow the mockup and is not good enough
cli_logger.abort("Failed to setup head node.")
cli_logger.old_error(
logger, "get_or_create_head_node: "
"Updating {} failed", head_node_ip)
sys.exit(1)
cli_logger.old_info(
logger, "get_or_create_head_node: "
"Head node up-to-date, IP address is: {}", head_node_ip)
monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*"
if override_cluster_name:
modifiers = " --cluster-name={}".format(quote(override_cluster_name))
else:
modifiers = ""
if cli_logger.old_style:
print("To monitor autoscaling activity, you can run:\n\n"
" ray exec {} {}{}\n".format(config_file, quote(monitor_str),
modifiers))
print("To open a console on the cluster:\n\n"
" ray attach {}{}\n".format(config_file, modifiers))
print("To get a remote shell to the cluster manually, run:\n\n"
" {}\n".format(updater.cmd_runner.remote_shell_command_str()))
cli_logger.newline()
with cli_logger.group("Useful commands"):
cli_logger.print("Monitor autoscaling with")
@@ -887,10 +821,6 @@ def exec_cluster(config_file: str,
attach_command = " ".join(attach_command_parts)
cli_logger.print("Run `{}` to check command status.",
cf.bold(attach_command))
attach_info = "Use `{}` to check on command status.".format(
attach_command)
cli_logger.old_info(logger, attach_info)
return result
+2 -2
View File
@@ -15,14 +15,14 @@ class LogTimer:
self._start_time = datetime.datetime.utcnow()
def __exit__(self, *error_vals):
if not cli_logger.old_style:
if cli_logger.log_style != "record":
return
td = datetime.datetime.utcnow() - self._start_time
status = ""
if self._show_status:
status = "failed" if any(error_vals) else "succeeded"
logger.info(" ".join([
cli_logger.print(" ".join([
self._message, status,
"[LogTimer={:.0f}ms]".format(td.total_seconds() * 1000)
]))
-30
View File
@@ -105,9 +105,6 @@ class NodeUpdater:
self.docker_config = docker_config
def run(self):
cli_logger.old_info(logger, "{}Updating to {}", self.log_prefix,
self.runtime_hash)
if cmd_output_util.does_allow_interactive(
) and cmd_output_util.is_output_redirected():
# this is most probably a bug since the user has no control
@@ -123,18 +120,10 @@ class NodeUpdater:
"Applied config {}".format(self.runtime_hash)):
self.do_update()
except Exception as e:
error_str = str(e)
if hasattr(e, "cmd"):
error_str = "(Exit Status {}) {}".format(
e.returncode, " ".join(e.cmd))
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED})
cli_logger.error("New status: {}", cf.bold(STATUS_UPDATE_FAILED))
cli_logger.old_error(logger, "{}Error executing: {}\n",
self.log_prefix, error_str)
cli_logger.error("!!!")
if hasattr(e, "cmd"):
cli_logger.error(
@@ -236,22 +225,15 @@ class NodeUpdater:
with cli_logger.group(
"Waiting for SSH to become available", _numbered=("[]", 1, 6)):
with LogTimer(self.log_prefix + "Got remote shell"):
cli_logger.old_info(logger, "{}Waiting for remote shell...",
self.log_prefix)
cli_logger.print("Running `{}` as a test.", cf.bold("uptime"))
first_conn_refused_time = None
while time.time() < deadline and \
not self.provider.is_terminated(self.node_id):
try:
cli_logger.old_debug(logger,
"{}Waiting for remote shell...",
self.log_prefix)
# Run outside of the container
self.cmd_runner.run(
"uptime", timeout=5, run_env="host")
cli_logger.old_debug(logger, "Uptime succeeded.")
cli_logger.success("Success.")
return True
except ProcessRunnerError as e:
@@ -277,9 +259,6 @@ class NodeUpdater:
"SSH still not available {}, "
"retrying in {} seconds.", cf.dimmed(retry_str),
cf.bold(str(READY_CHECK_INTERVAL)))
cli_logger.old_debug(logger,
"{}Node not up, retrying: {}",
self.log_prefix, retry_str)
time.sleep(READY_CHECK_INTERVAL)
@@ -314,9 +293,6 @@ class NodeUpdater:
"Configuration already up to date, "
"skipping file mounts, initalization and setup commands.",
_numbered=("[]", "2-5", 6))
cli_logger.old_info(logger,
"{}{} already up-to-date, skip to ray start",
self.log_prefix, self.node_id)
else:
cli_logger.print(
@@ -435,9 +411,6 @@ class NodeUpdater:
raise click.ClickException("Start command failed.")
def rsync_up(self, source, target, docker_mount_if_possible=False):
cli_logger.old_info(logger, "{}Syncing {} to {}...", self.log_prefix,
source, target)
options = {}
options["docker_mount_if_possible"] = docker_mount_if_possible
options["rsync_exclude"] = self.rsync_options.get("rsync_exclude")
@@ -447,9 +420,6 @@ class NodeUpdater:
cf.bold(source), cf.bold(target))
def rsync_down(self, source, target, docker_mount_if_possible=False):
cli_logger.old_info(logger, "{}Syncing {} from {}...", self.log_prefix,
source, target)
options = {}
options["docker_mount_if_possible"] = docker_mount_if_possible
options["rsync_exclude"] = self.rsync_options.get("rsync_exclude")
-45
View File
@@ -465,8 +465,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
# Get the node IP address if one is not provided.
ray_params.update_if_absent(node_ip_address=node_ip_address)
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
cli_logger.old_info(logger, "Using IP address {} for this node.",
ray_params.node_ip_address)
ray_params.update_if_absent(
redis_port=port,
redis_shard_ports=redis_shard_ports,
@@ -525,24 +523,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
cli_logger.newline()
cli_logger.print("To terminate the Ray runtime, run")
cli_logger.print(cf.bold(" ray stop"))
cli_logger.old_info(
logger,
"\nStarted Ray on this node. You can add additional nodes to "
"the cluster by calling\n\n"
" ray start --address='{}'{}\n\n"
"from the node you wish to add. You can connect a driver to the "
"cluster from Python by running\n\n"
" import ray\n"
" ray.init(address='auto'{})\n\n"
"If you have trouble connecting from a different machine, check "
"that your firewall is configured properly. If you wish to "
"terminate the processes that have been started, run\n\n"
" ray stop".format(
redis_address, " --redis-password='" + redis_password + "'"
if redis_password else "",
", _redis_password='" + redis_password + "'"
if redis_password else ""))
else:
# Start Ray on a non-head node.
if not (port is None):
@@ -589,8 +569,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
node_ip_address=services.get_node_ip_address(redis_address))
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
cli_logger.old_info(logger, "Using IP address {} for this node.",
ray_params.node_ip_address)
# Check that there aren't already Redis clients with the same IP
# address connected with this Redis instance. This raises an exception
@@ -610,11 +588,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
cli_logger.print("To terminate the Ray runtime, run")
cli_logger.print(cf.bold(" ray stop"))
cli_logger.old_info(
logger, "\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
" ray stop")
if block:
cli_logger.newline()
with cli_logger.group(cf.bold("--block")):
@@ -630,8 +603,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
if len(deceased) > 0:
cli_logger.newline()
cli_logger.error("Some Ray subprcesses exited unexpectedly:")
cli_logger.old_error(logger,
"Ray processes died unexpectedly:")
with cli_logger.indented():
for process_type, process in deceased:
@@ -639,15 +610,10 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
"{}",
cf.bold(str(process_type)),
_tags={"exit code": str(process.returncode)})
cli_logger.old_error(
logger, "\t{} died with exit code {}".format(
process_type, process.returncode))
# shutdown_at_exit will handle cleanup.
cli_logger.newline()
cli_logger.error("Remaining processes will be killed.")
cli_logger.old_error(
logger, "Killing remaining processes and exiting...")
sys.exit(1)
@@ -719,10 +685,6 @@ def stop(force, verbose, log_style, log_color):
total_found += 1
proc_string = str(subprocess.list2cmdline(proc_args))
if verbose:
operation = "Terminating" if force else "Killing"
cli_logger.old_info(logger, "%s process %s: %s", operation,
proc.pid, proc_string)
try:
if force:
proc.kill()
@@ -749,7 +711,6 @@ def stop(force, verbose, log_style, log_color):
except (psutil.Error, OSError) as ex:
cli_logger.error("Could not terminate `{}` due to {}",
cf.bold(proc_string), str(ex))
cli_logger.old_error(logger, "Error: %s", ex)
if total_found == 0:
cli_logger.print("Did not find any active Ray processes.")
@@ -847,7 +808,6 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
cli_logger.warning("{}", str(e))
cli_logger.warning(
"Could not download remote cluster configuration file.")
cli_logger.old_info(logger, "Error downloading file: ", e)
create_or_update_cluster(
config_file=cluster_config_file,
override_min_workers=min_workers,
@@ -1130,11 +1090,6 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
cf.bold("-- <args ...>"),
cf.bold("ray submit script.py -- --arg=123 --flag"))
cli_logger.newline()
cli_logger.old_warning(
logger,
"ray submit [yaml] [script.py] --args=... is deprecated and "
"will be removed in a future version of Ray. Use "
"`ray submit [yaml] script.py -- --arg1 --arg2` instead.")
if start:
create_or_update_cluster(
+1 -8
View File
@@ -5,7 +5,7 @@ import ray.tests.aws.utils.stubs as stubs
import ray.tests.aws.utils.helpers as helpers
from ray.tests.aws.utils.constants import AUX_SUBNET, DEFAULT_SUBNET, \
DEFAULT_SG_AUX_SUBNET, DEFAULT_SG, DEFAULT_SG_DUAL_GROUP_RULES, \
DEFAULT_SG_WITH_RULES_AUX_SUBNET, DEFAULT_SG_WITH_RULES, AUX_SG, \
DEFAULT_SG_WITH_RULES_AUX_SUBNET, AUX_SG, \
DEFAULT_SG_WITH_NAME, DEFAULT_SG_WITH_NAME_AND_RULES, CUSTOM_IN_BOUND_RULES
@@ -50,13 +50,6 @@ def test_create_sg_different_vpc_same_rules(iam_client_stub, ec2_client_stub):
DEFAULT_SG_WITH_RULES_AUX_SUBNET,
)
# given the prior modification to the head security group...
# expect the next read of a head security group property to reload it
stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES)
# given the prior modification to the worker security group...
# expect the next read of a worker security group property to reload it
stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES_AUX_SUBNET)
# given our mocks and an example config file as input...
# expect the config to be loaded, validated, and bootstrapped successfully
config = helpers.bootstrap_aws_example_config_file("example-subnets.yaml")
+4 -5
View File
@@ -103,8 +103,10 @@ def _debug_check_line_by_line(result, expected_lines):
exp = expected_lines[i]
matched = re.fullmatch(exp + r" *", out) is not None
if not matched:
print(f"!!! ERROR: Expected (regex): {repr(exp)}")
print(f"Got: {out}")
print(f"{i:>3}: {out}")
print(f"!!! ^ ERROR: Expected (regex): {repr(exp)}")
else:
print(f"{i:>3}: {out}")
i += 1
if i < len(expected_lines):
print("!!! ERROR: Expected extra lines (regex):")
@@ -134,9 +136,6 @@ def _load_output_pattern(name):
def _check_output_via_pattern(name, result):
expected_lines = _load_output_pattern(name)
print("---")
print(result.output)
print("---")
if result.exception is not None:
print(result.output)
@@ -18,14 +18,18 @@
.+\.py.*Fetching the new head node
.+\.py.*<1/1> Setting up head node
.+\.py.*Prepared bootstrap config
.+\.py.*AWSNodeProvider: Set tag ray-node-status=waiting-for-ssh on \['.+'\] \[LogTimer=.+\]
.+\.py.*New status: waiting-for-ssh
.+\.py.*\[1/6\] Waiting for SSH to become available
.+\.py.*Running `uptime` as a test\.
.+\.py.*Fetched IP: .+
.+\.py.*NodeUpdater: .+: Got IP \[LogTimer=.+\]
.+\.py.*Running `uptime`
.+\.py.*Full command is `ssh.+`
.+\.py.*Success\.
.+\.py.*NodeUpdater: .+: Got remote shell \[LogTimer=.+\]
.+\.py.*Updating cluster configuration\. \[hash=.+\]
.+\.py.*AWSNodeProvider: Set tag ray-node-status=syncing-files on \['.+'\] \[LogTimer=.+\]
.+\.py.*New status: syncing-files
.+\.py.*\[3/6\] Processing file mounts
.+\.py.*Running `mkdir -p ~/tests`
@@ -33,21 +37,26 @@
.+\.py.*Running `rsync.+`
.+\.py.*`rsync`ed \./ \(local\) to ~/tests/ \(remote\)
.+\.py.*~/tests/ from \./
.+\.py.+NodeUpdater: i-.+: Synced \./ to ~/tests/ \[LogTimer=.+\]
.+\.py.*Running `mkdir -p ~`
.+\.py.*Full command is `ssh.+`
.+\.py.*Running `rsync.+`
.+\.py.*`rsync`ed .+/ray-bootstrap-.+ \(local\) to ~/ray_bootstrap_config\.yaml \(remote\)
.+\.py.*~/ray_bootstrap_config\.yaml from .+ray-bootstrap-.+
.+\.py.*NodeUpdater: i-.+: Synced .+ray-bootstrap-.+ to ~/ray_bootstrap_config\.yaml \[LogTimer=.+\]
.+\.py.*Running `mkdir -p ~`
.+\.py.*Full command is `ssh.+`
.+\.py.*Running `rsync.+`
.+\.py.*`rsync`ed .+__test-cli_key-1\.pem \(local\) to ~/ray_bootstrap_key\.pem \(remote\)
.+\.py.*~/ray_bootstrap_key\.pem from .+__test-cli_key-1\.pem
.+\.py.*NodeUpdater: i-.+: Synced .+.pem to ~/ray_bootstrap_key\.pem \[LogTimer=.+\]
.+\.py.*\[4/6\] No worker file mounts to sync
.+\.py.*AWSNodeProvider: Set tag ray-node-status=setting-up on \['.+'\] \[LogTimer=.+\]
.+\.py.*New status: setting-up
.+\.py.*\[3/5\] Running initialization commands
.+\.py.*Running `echo init`
.+\.py.*Full command is `ssh.+`
.+\.py.*NodeUpdater: i-.+: Initialization commands succeeded \[LogTimer=.+\]
.+\.py.*\[4/6\] Running setup commands
.+\.py.*\(0/4\) echo a
.+\.py.*Running `echo a`
@@ -61,11 +70,17 @@
.+\.py.*\(3/4\) echo head
.+\.py.*Running `echo head`
.+\.py.*Full command is `ssh.+`
.+\.py.*NodeUpdater: i-.+: Setup commands succeeded \[LogTimer=.+\]
.+\.py.*\[6/6\] Starting the Ray runtime
.+\.py.*Running `ray stop`
.+\.py.*Full command is `ssh.+`
.+\.py.*Running `ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml`
.+\.py.*Full command is `ssh.+`
.+\.py.*NodeUpdater: i-.+: Ray start commands succeeded \[LogTimer=.+\]
.+\.py.*NodeUpdater: i-.+: Applied config .+ \[LogTimer=.+\]
.+\.py.*AWSNodeProvider: Set tag ray-node-status=up-to-date on \['.+'\] \[LogTimer=.+\]
.+\.py.*AWSNodeProvider: Set tag ray-runtime-config=.+ on \['.+'\] \[LogTimer=.+\]
.+\.py.*AWSNodeProvider: Set tag ray-file-mounts-contents=.+ on \['.+'\] \[LogTimer=.+\]
.+\.py.*New status: up-to-date
.+\.py.*Useful commands
.+\.py.*Monitor autoscaling with