diff --git a/python/ray/autoscaler/_private/aws/config.py b/python/ray/autoscaler/_private/aws/config.py index 89ebca7f1..960e2f9b4 100644 --- a/python/ray/autoscaler/_private/aws/config.py +++ b/python/ray/autoscaler/_private/aws/config.py @@ -217,9 +217,6 @@ def _configure_iam_role(config): cli_logger.verbose( "Creating new IAM instance profile {} for use as the default.", cf.bold(DEFAULT_RAY_INSTANCE_PROFILE)) - cli_logger.old_info( - logger, "_configure_iam_role: " - "Creating new instance profile {}", DEFAULT_RAY_INSTANCE_PROFILE) client = _client("iam", config) client.create_instance_profile( InstanceProfileName=DEFAULT_RAY_INSTANCE_PROFILE) @@ -237,8 +234,6 @@ def _configure_iam_role(config): "Creating new IAM role {} for " "use as the default instance role.", cf.bold(DEFAULT_RAY_IAM_ROLE)) - cli_logger.old_info(logger, "_configure_iam_role: " - "Creating new role {}", DEFAULT_RAY_IAM_ROLE) iam = _resource("iam", config) iam.create_role( RoleName=DEFAULT_RAY_IAM_ROLE, @@ -265,9 +260,6 @@ def _configure_iam_role(config): profile.add_role(RoleName=role.name) time.sleep(15) # wait for propagation - cli_logger.old_info( - logger, "_configure_iam_role: " - "Role not specified for head node, using {}", profile.arn) config["head_node"]["IamInstanceProfile"] = {"Arn": profile.arn} return config @@ -323,9 +315,6 @@ def _configure_key_pair(config): cli_logger.verbose( "Creating new key pair {} for use as the default.", cf.bold(key_name)) - cli_logger.old_info( - logger, "_configure_key_pair: " - "Creating new key pair {}", key_name) key = ec2.create_key_pair(KeyName=key_name) # We need to make sure to _create_ the file with the right @@ -352,10 +341,6 @@ def _configure_key_pair(config): assert os.path.exists(key_path), \ "Private key file {} not found for {}".format(key_path, key_name) - cli_logger.old_info( - logger, "_configure_key_pair: " - "KeyName not specified for nodes, using {}", key_name) - config["auth"]["ssh_private_key"] = key_path config["head_node"]["KeyName"] = key_name config["worker_nodes"]["KeyName"] = key_name @@ -409,23 +394,15 @@ def _configure_subnet(config): format(config["provider"]["availability_zone"])) subnet_ids = [s.subnet_id for s in subnets] - subnet_descr = [(s.subnet_id, s.availability_zone) for s in subnets] if "SubnetIds" not in config["head_node"]: _set_config_info(head_subnet_src="default") config["head_node"]["SubnetIds"] = subnet_ids - cli_logger.old_info( - logger, "_configure_subnet: " - "SubnetIds not specified for head node, using {}", subnet_descr) else: _set_config_info(head_subnet_src="config") if "SubnetIds" not in config["worker_nodes"]: _set_config_info(workers_subnet_src="default") config["worker_nodes"]["SubnetIds"] = subnet_ids - cli_logger.old_info( - logger, "_configure_subnet: " - "SubnetId not specified for workers," - " using {}", subnet_descr) else: _set_config_info(workers_subnet_src="config") @@ -449,20 +426,12 @@ def _configure_security_group(config): head_sg = security_groups[NODE_KIND_HEAD] _set_config_info(head_security_group_src="default") - cli_logger.old_info( - logger, "_configure_security_group: " - "SecurityGroupIds not specified for head node, using {} ({})", - head_sg.group_name, head_sg.id) config["head_node"]["SecurityGroupIds"] = [head_sg.id] if NODE_KIND_WORKER in node_types_to_configure: workers_sg = security_groups[NODE_KIND_WORKER] _set_config_info(workers_security_group_src="default") - cli_logger.old_info( - logger, "_configure_security_group: " - "SecurityGroupIds not specified for workers, using {} ({})", - workers_sg.group_name, workers_sg.id) config["worker_nodes"]["SecurityGroupIds"] = [workers_sg.id] return config @@ -482,26 +451,10 @@ def _check_ami(config): if config["head_node"].get("ImageId", "").lower() == "latest_dlami": config["head_node"]["ImageId"] = default_ami _set_config_info(head_ami_src="dlami") - cli_logger.old_info( - logger, - "_check_ami: head node ImageId is 'latest_dlami'. " - "Using '{ami_id}', which is the default {ami_name} " - "for your region ({region}).", - ami_id=default_ami, - ami_name=DEFAULT_AMI_NAME, - region=region) if config["worker_nodes"].get("ImageId", "").lower() == "latest_dlami": config["worker_nodes"]["ImageId"] = default_ami _set_config_info(workers_ami_src="dlami") - cli_logger.old_info( - logger, - "_check_ami: worker nodes ImageId is 'latest_dlami'. " - "Using '{ami_id}', which is the default {ami_name} " - "for your region ({region}).", - ami_id=default_ami, - ami_name=DEFAULT_AMI_NAME, - region=region) def _upsert_security_groups(config, node_types): @@ -600,9 +553,8 @@ def _create_security_group(config, vpc_id, group_name): "Created new security group {}", cf.bold(security_group.group_name), _tags=dict(id=security_group.id)) - cli_logger.old_info( - logger, "_create_security_group: Created new security group {} ({})", - security_group.group_name, security_group.id) + cli_logger.doassert(security_group, + "Failed to create security group") # err msg assert security_group, "Failed to create security group" return security_group diff --git a/python/ray/autoscaler/_private/aws/node_provider.py b/python/ray/autoscaler/_private/aws/node_provider.py index e7e318b40..7288dc2d4 100644 --- a/python/ray/autoscaler/_private/aws/node_provider.py +++ b/python/ray/autoscaler/_private/aws/node_provider.py @@ -248,11 +248,6 @@ class AWSNodeProvider(NodeProvider): "To disable reuse, set `cache_stopped_nodes: False` " "under `provider` in the cluster configuration.", cli_logger.render_list(reuse_node_ids)) - cli_logger.old_info( - logger, "AWSNodeProvider: reusing instances {}. " - "To disable reuse, set " - "'cache_stopped_nodes: False' in the provider " - "config.", reuse_node_ids) # todo: timed? with cli_logger.group("Stopping instances to reuse"): @@ -263,10 +258,6 @@ class AWSNodeProvider(NodeProvider): if node.state["Name"] == "stopping": cli_logger.print("Waiting for instance {} to stop", node.id) - cli_logger.old_info( - logger, - "AWSNodeProvider: waiting for instance " - "{} to fully stop...", node.id) node.wait_until_stopped() self.ec2.meta.client.start_instances( @@ -328,10 +319,6 @@ class AWSNodeProvider(NodeProvider): try: subnet_id = subnet_ids[self.subnet_idx % len(subnet_ids)] - cli_logger.old_info( - logger, "NodeProvider: calling create_instances " - "with {} (count={}).", subnet_id, count) - self.subnet_idx += 1 conf.update({ "MinCount": 1, @@ -366,26 +353,17 @@ class AWSNodeProvider(NodeProvider): _tags=dict( state=instance.state["Name"], info=state_reason["Message"])) - cli_logger.old_info( - logger, "NodeProvider: Created instance " - "[id={}, name={}, info={}]", instance.instance_id, - instance.state["Name"], state_reason["Message"]) break except botocore.exceptions.ClientError as exc: if attempt == BOTO_CREATE_MAX_RETRIES: # todo: err msg cli_logger.abort( "Failed to launch instances. Max attempts exceeded.") - cli_logger.old_error( - logger, - "create_instances: Max attempts ({}) exceeded.", - BOTO_CREATE_MAX_RETRIES) raise exc else: cli_logger.print( "create_instances: Attempt failed with {}, retrying.", exc) - cli_logger.old_error(logger, exc) def terminate_node(self, node_id): node = self._get_cached_node(node_id) @@ -395,11 +373,6 @@ class AWSNodeProvider(NodeProvider): "Terminating instance {} " + cf.dimmed("(cannot stop spot instances, only terminate)"), node_id) # todo: show node name? - - cli_logger.old_info( - logger, - "AWSNodeProvider: terminating node {} (spot nodes cannot " - "be stopped, only terminated)", node_id) node.terminate() else: cli_logger.print("Stopping instance {} " + cf.dimmed( @@ -407,12 +380,6 @@ class AWSNodeProvider(NodeProvider): "set `cache_stopped_nodes: False` " "under `provider` in the cluster configuration)"), node_id) # todo: show node name? - - cli_logger.old_info( - logger, - "AWSNodeProvider: stopping node {}. To terminate nodes " - "on stop, set 'cache_stopped_nodes: False' in the " - "provider config.".format(node_id)) node.stop() else: node.terminate() @@ -441,11 +408,6 @@ class AWSNodeProvider(NodeProvider): "set `cache_stopped_nodes: False` " "under `provider` in the cluster configuration)"), cli_logger.render_list(on_demand_ids)) - cli_logger.old_info( - logger, - "AWSNodeProvider: stopping nodes {}. To terminate nodes " - "on stop, set 'cache_stopped_nodes: False' in the " - "provider config.", on_demand_ids) self.ec2.meta.client.stop_instances(InstanceIds=on_demand_ids) if spot_ids: @@ -453,10 +415,6 @@ class AWSNodeProvider(NodeProvider): "Terminating instances {} " + cf.dimmed("(cannot stop spot instances, only terminate)"), cli_logger.render_list(spot_ids)) - cli_logger.old_info( - logger, - "AWSNodeProvider: terminating nodes {} (spot nodes cannot " - "be stopped, only terminated)", spot_ids) self.ec2.meta.client.terminate_instances(InstanceIds=spot_ids) else: diff --git a/python/ray/autoscaler/_private/aws/utils.py b/python/ray/autoscaler/_private/aws/utils.py index d97d0afa5..979444347 100644 --- a/python/ray/autoscaler/_private/aws/utils.py +++ b/python/ray/autoscaler/_private/aws/utils.py @@ -26,11 +26,6 @@ class LazyDefaultDict(defaultdict): def handle_boto_error(exc, msg, *args, **kwargs): - if cli_logger.old_style: - # old-style logging doesn't do anything here - # so we exit early - return - error_code = None error_info = None # todo: not sure if these exceptions always have response diff --git a/python/ray/autoscaler/_private/cli_logger.py b/python/ray/autoscaler/_private/cli_logger.py index 3bdf73d28..fa7577419 100644 --- a/python/ray/autoscaler/_private/cli_logger.py +++ b/python/ray/autoscaler/_private/cli_logger.py @@ -256,11 +256,6 @@ class _CliLogger(): to 'record' style logging. Attributes: - old_style (bool): - If `old_style` is `True`, the old logging calls are used instead - of the new CLI UX. This is disabled by default and remains for - backwards compatibility. Currently can only be set via env var - RAY_LOG_NEWSTYLE="0". color_mode (str): Can be "true", "false", or "auto". @@ -276,7 +271,6 @@ class _CliLogger(): Low verbosity will disable `verbose` and `very_verbose` messages. """ - old_style: bool color_mode: str # color_mode: Union[Literal["auto"], Literal["false"], Literal["true"]] indent_level: int @@ -286,7 +280,6 @@ class _CliLogger(): _autodetected_cf_colormode: int def __init__(self): - self.old_style = os.environ.get("RAY_LOG_NEWSTYLE", "1") == "0" self.indent_level = 0 self._verbosity = 0 @@ -476,9 +469,6 @@ class _CliLogger(): For other arguments, see `_format_msg`. """ - if self.old_style: - return - self._print( cf.skyBlue(key) + ": " + _format_msg(cf.bold(msg), *args, **kwargs)) @@ -567,9 +557,6 @@ class _CliLogger(): For arguments, see `_format_msg`. """ - if self.old_style: - return - self._print(_format_msg(msg, *args, **kwargs), _level_str=_level_str) def abort(self, @@ -582,9 +569,6 @@ class _CliLogger(): Print an error and throw an exception to terminate the program (the exception will not print a message). """ - if self.old_style: - return - if msg is not None: self._error(msg, *args, _level_str="PANIC", **kwargs) @@ -604,9 +588,6 @@ class _CliLogger(): For other arguments, see `_format_msg`. """ - if self.old_style: - return - if not val: exc = None if not self.pretty: @@ -618,101 +599,6 @@ class _CliLogger(): # for AssertionError and raise them normally self.abort(msg, *args, exc=exc, **kwargs) - def old_debug(self, logger: logging.Logger, msg: str, *args: Any, - **kwargs: Any): - """Old debug logging proxy. - - Pass along an old debug log iff new logging is disabled. - Supports the new formatting features. - - Args: - logger (logging.Logger): - Logger to use if old logging behavior is selected. - - For other arguments, see `_format_msg`. - """ - if self.old_style: - logger.debug( - _format_msg(msg, *args, **kwargs), - extra=_external_caller_info()) - return - - def old_info(self, logger: logging.Logger, msg: str, *args: Any, - **kwargs: Any): - """Old info logging proxy. - - Pass along an old info log iff new logging is disabled. - Supports the new formatting features. - - Args: - logger (logging.Logger): - Logger to use if old logging behavior is selected. - - For other arguments, see `_format_msg`. - """ - if self.old_style: - logger.info( - _format_msg(msg, *args, **kwargs), - extra=_external_caller_info()) - return - - def old_warning(self, logger: logging.Logger, msg: str, *args: Any, - **kwargs: Any): - """Old warning logging proxy. - - Pass along an old warning log iff new logging is disabled. - Supports the new formatting features. - - Args: - logger (logging.Logger): - Logger to use if old logging behavior is selected. - - For other arguments, see `_format_msg`. - """ - if self.old_style: - logger.warning( - _format_msg(msg, *args, **kwargs), - extra=_external_caller_info()) - return - - def old_error(self, logger: logging.Logger, msg: str, *args: Any, - **kwargs: Any): - """Old error logging proxy. - - Pass along an old error log iff new logging is disabled. - Supports the new formatting features. - - Args: - logger (logging.Logger): - Logger to use if old logging behavior is selected. - - For other arguments, see `_format_msg`. - """ - if self.old_style: - logger.error( - _format_msg(msg, *args, **kwargs), - extra=_external_caller_info()) - return - - def old_exception(self, logger: logging.Logger, msg: str, *args: Any, - **kwargs: Any): - """Old exception logging proxy. - - Pass along an old exception log iff new logging is disabled. - Supports the new formatting features. - - Args: - logger (logging.Logger): - Logger to use if old logging behavior is selected. - - For other arguments, see `_format_msg`. - """ - if self.old_style: - logger.exception( - _format_msg(msg, *args, **kwargs), - extra=_external_caller_info()) - return - def render_list(self, xs: List[str], separator: str = cf.reset(", ")): """Render a list of bolded values using a non-bolded separator. """ @@ -739,9 +625,6 @@ class _CliLogger(): The default action to take if the user just presses enter with no input. """ - if self.old_style: - return - should_abort = _abort default = _default @@ -821,9 +704,6 @@ class _CliLogger(): Returns: The string entered by the user. """ - if self.old_style: - return - complete_str = cf.underlined(msg) rendered_message = _format_msg(complete_str, *args, **kwargs) # the rendered message ends with ascii coding @@ -841,16 +721,6 @@ class _CliLogger(): return res - def old_confirm(self, msg: str, yes: bool): - """Old confirm dialog proxy. - - Let `click` display a confirm dialog iff new logging is disabled. - """ - if not self.old_style: - return - - return None if yes else click.confirm(msg, abort=True) - class SilentClickException(click.ClickException): """`ClickException` that does not print a message. diff --git a/python/ray/autoscaler/_private/command_runner.py b/python/ray/autoscaler/_private/command_runner.py index 4256ebd70..d1f1850aa 100644 --- a/python/ray/autoscaler/_private/command_runner.py +++ b/python/ray/autoscaler/_private/command_runner.py @@ -326,9 +326,6 @@ class SSHCommandRunner(CommandRunnerInterface): with cli_logger.group("Waiting for IP"): while time.time() < deadline and \ not self.provider.is_terminated(self.node_id): - cli_logger.old_info(logger, "{}Waiting for IP...", - self.log_prefix) - ip = self._get_node_ip() if ip is not None: cli_logger.labeled_value("Received", ip) @@ -362,7 +359,6 @@ class SSHCommandRunner(CommandRunnerInterface): os.makedirs(self.ssh_control_path, mode=0o700, exist_ok=True) except OSError as e: cli_logger.warning("{}", str(e)) # todo: msg - cli_logger.old_warning(logger, "{}", str(e)) def _run_helper(self, final_cmd, @@ -391,7 +387,7 @@ class SSHCommandRunner(CommandRunnerInterface): # For now, if the output is needed we just skip the new logic. # In the future we could update the new logic to support # capturing output, but it is probably not needed. - if not cli_logger.old_style and not with_output: + if not with_output: return run_cmd_redirected( final_cmd, process_runner=self.process_runner, @@ -403,7 +399,7 @@ class SSHCommandRunner(CommandRunnerInterface): return self.process_runner.check_call(final_cmd) except subprocess.CalledProcessError as e: joined_cmd = " ".join(final_cmd) - if not cli_logger.old_style and not is_using_login_shells(): + if not is_using_login_shells(): raise ProcessRunnerError( "Command failed", "ssh_command_failed", @@ -458,9 +454,6 @@ class SSHCommandRunner(CommandRunnerInterface): cli_logger.verbose( "Forwarding port {} to port {} on localhost.", cf.bold(local), cf.bold(remote)) # todo: msg - cli_logger.old_info(logger, - "{}Forwarding {} -> localhost:{}", - self.log_prefix, local, remote) ssh += ["-L", "{}:localhost:{}".format(remote, local)] final_cmd = ssh + ssh_options.to_ssh_options_list(timeout=timeout) + [ @@ -473,8 +466,6 @@ class SSHCommandRunner(CommandRunnerInterface): final_cmd += _with_interactive(cmd) else: final_cmd += [cmd] - cli_logger.old_info(logger, "{}Running {}", self.log_prefix, - " ".join(final_cmd)) else: # We do this because `-o ControlMaster` causes the `-N` flag to # still create an interactive shell in some ssh versions. diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index f58e5063d..dd42009f8 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -203,10 +203,6 @@ def create_or_update_cluster(config_file: str, cli_logger.labeled_value("Cluster", config["cluster_name"]) - # disable the cli_logger here if needed - # because it only supports aws - if config["provider"]["type"] != "aws": - cli_logger.old_style = True cli_logger.newline() config = _bootstrap_config(config, no_config_cache=no_config_cache) @@ -228,8 +224,6 @@ def _bootstrap_config(config: Dict[str, Any], "ray-config-{}".format(hasher.hexdigest())) if os.path.exists(cache_key) and not no_config_cache: - cli_logger.old_info(logger, "Using cached config at {}", cache_key) - config_cache = json.loads(open(cache_key).read()) if config_cache.get("_version", -1) == CONFIG_CACHE_VERSION: # todo: is it fine to re-resolve? afaik it should be. @@ -294,7 +288,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, validate_config(config) cli_logger.confirm(yes, "Destroying cluster.", _abort=True) - cli_logger.old_confirm("This will destroy your cluster", yes) if not workers_only: try: @@ -319,9 +312,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, "Ignoring the exception and " "attempting to shut down the cluster nodes anyway.") - cli_logger.old_exception( - logger, "Ignoring error attempting a clean shutdown.") - provider = _get_node_provider(config["provider"], config["cluster_name"]) def remaining_nodes(): @@ -331,13 +321,10 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, if keep_min_workers: min_workers = config.get("min_workers", 0) - cli_logger.print( "{} random worker nodes will not be shut down. " + cf.dimmed("(due to {})"), cf.bold(min_workers), cf.bold("--keep-min-workers")) - cli_logger.old_info( - logger, "teardown_cluster: Keeping {} nodes...", min_workers) workers = random.sample(workers, len(workers) - min_workers) @@ -370,7 +357,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, with_output=False) except Exception: cli_logger.warning(f"Docker stop failed on {node}") - cli_logger.old_warning(logger, f"Docker stop failed on {node}") # Loop here to check that both the head and worker nodes are actually # really gone @@ -383,10 +369,6 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, with LogTimer("teardown_cluster: done."): while A: - cli_logger.old_info( - logger, "teardown_cluster: " - "Shutting down {} nodes...", len(A)) - provider.terminate_nodes(A) cli_logger.print( @@ -411,7 +393,6 @@ def kill_node(config_file: str, yes: bool, hard: bool, config = _bootstrap_config(config) cli_logger.confirm(yes, "A random node will be killed.") - cli_logger.old_confirm("This will kill a node in your cluster", yes) provider = _get_node_provider(config["provider"], config["cluster_name"]) nodes = provider.non_terminated_nodes({ @@ -419,7 +400,6 @@ def kill_node(config_file: str, yes: bool, hard: bool, }) node = random.choice(nodes) cli_logger.print("Shutdown " + cf.bold("{}"), node) - cli_logger.old_info(logger, "kill_node: Shutdown worker {}", node) if hard: provider.terminate_node(node) else: @@ -472,10 +452,6 @@ def warn_about_bad_start_command(start_commands: List[str]) -> None: cli_logger.warning( "Ray runtime will not be started because `{}` is not in `{}`.", cf.bold("ray start"), cf.bold("head_start_ray_commands")) - cli_logger.old_warning( - logger, - "Ray start is not included in the head_start_ray_commands section." - ) if not any("autoscaling-config" in x for x in ray_start_cmd): cli_logger.warning( "The head node will not launch any workers because " @@ -484,11 +460,6 @@ def warn_about_bad_start_command(start_commands: List[str]) -> None: cf.bold("ray start"), cf.bold("--autoscaling-config"), cf.bold("--autoscaling-config=~/ray_bootstrap_config.yaml"), cf.bold("ray start"), cf.bold("head_start_ray_commands")) - cli_logger.old_warning( - logger, "Ray start on the head node does not have the flag" - "--autoscaling-config set. The head node will not launch" - "workers. Add --autoscaling-config=~/ray_bootstrap_config.yaml" - "to ray start in the head_start_ray_commands section.") def get_or_create_head_node(config: Dict[str, Any], @@ -520,9 +491,6 @@ def get_or_create_head_node(config: Dict[str, Any], "No head node found. " "Launching a new cluster.", _abort=True) - cli_logger.old_confirm("This will create a new cluster", yes) - elif not no_restart: - cli_logger.old_confirm("This will restart cluster services", yes) if head_node: if restart_only: @@ -549,6 +517,7 @@ def get_or_create_head_node(config: Dict[str, Any], yes, cf.bold("Cluster Ray runtime will be restarted."), _abort=True) + cli_logger.newline() # TODO(ekl) this logic is duplicated in node_launcher.py (keep in sync) @@ -572,19 +541,10 @@ def get_or_create_head_node(config: Dict[str, Any], provider.node_tags(head_node) .get(TAG_RAY_LAUNCH_CONFIG)), cf.bold(launch_hash)) cli_logger.confirm(yes, "Relaunching it.", _abort=True) - cli_logger.old_confirm( - "Head node config out-of-date. It will be terminated", yes) - - cli_logger.old_info( - logger, "get_or_create_head_node: " - "Shutting down outdated head node {}", head_node) provider.terminate_node(head_node) cli_logger.print("Terminated head node {}", head_node) - cli_logger.old_info( - logger, "get_or_create_head_node: Launching new head node...") - head_node_tags[TAG_RAY_LAUNCH_CONFIG] = launch_hash head_node_tags[TAG_RAY_NODE_NAME] = "ray-{}-head".format( config["cluster_name"]) @@ -620,9 +580,6 @@ def get_or_create_head_node(config: Dict[str, Any], (runtime_hash, file_mounts_contents_hash) = hash_runtime_conf( config["file_mounts"], None, config) - cli_logger.old_info( - logger, "get_or_create_head_node: Updating files on head node...") - # Rewrite the auth config so that the head # node can update the workers remote_config = copy.deepcopy(config) @@ -697,40 +654,17 @@ def get_or_create_head_node(config: Dict[str, Any], # Refresh the node cache so we see the external ip if available provider.non_terminated_nodes(head_node_tags) - if config.get("provider", {}).get("use_internal_ips", False) is True: - head_node_ip = provider.internal_ip(head_node) - else: - head_node_ip = provider.external_ip(head_node) - if updater.exitcode != 0: # todo: this does not follow the mockup and is not good enough cli_logger.abort("Failed to setup head node.") - - cli_logger.old_error( - logger, "get_or_create_head_node: " - "Updating {} failed", head_node_ip) sys.exit(1) - cli_logger.old_info( - logger, "get_or_create_head_node: " - "Head node up-to-date, IP address is: {}", head_node_ip) - monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*" if override_cluster_name: modifiers = " --cluster-name={}".format(quote(override_cluster_name)) else: modifiers = "" - if cli_logger.old_style: - print("To monitor autoscaling activity, you can run:\n\n" - " ray exec {} {}{}\n".format(config_file, quote(monitor_str), - modifiers)) - print("To open a console on the cluster:\n\n" - " ray attach {}{}\n".format(config_file, modifiers)) - - print("To get a remote shell to the cluster manually, run:\n\n" - " {}\n".format(updater.cmd_runner.remote_shell_command_str())) - cli_logger.newline() with cli_logger.group("Useful commands"): cli_logger.print("Monitor autoscaling with") @@ -887,10 +821,6 @@ def exec_cluster(config_file: str, attach_command = " ".join(attach_command_parts) cli_logger.print("Run `{}` to check command status.", cf.bold(attach_command)) - - attach_info = "Use `{}` to check on command status.".format( - attach_command) - cli_logger.old_info(logger, attach_info) return result diff --git a/python/ray/autoscaler/_private/log_timer.py b/python/ray/autoscaler/_private/log_timer.py index e8fa6edac..e12a7c4d3 100644 --- a/python/ray/autoscaler/_private/log_timer.py +++ b/python/ray/autoscaler/_private/log_timer.py @@ -15,14 +15,14 @@ class LogTimer: self._start_time = datetime.datetime.utcnow() def __exit__(self, *error_vals): - if not cli_logger.old_style: + if cli_logger.log_style != "record": return td = datetime.datetime.utcnow() - self._start_time status = "" if self._show_status: status = "failed" if any(error_vals) else "succeeded" - logger.info(" ".join([ + cli_logger.print(" ".join([ self._message, status, "[LogTimer={:.0f}ms]".format(td.total_seconds() * 1000) ])) diff --git a/python/ray/autoscaler/_private/updater.py b/python/ray/autoscaler/_private/updater.py index 04989bb85..942635426 100644 --- a/python/ray/autoscaler/_private/updater.py +++ b/python/ray/autoscaler/_private/updater.py @@ -105,9 +105,6 @@ class NodeUpdater: self.docker_config = docker_config def run(self): - cli_logger.old_info(logger, "{}Updating to {}", self.log_prefix, - self.runtime_hash) - if cmd_output_util.does_allow_interactive( ) and cmd_output_util.is_output_redirected(): # this is most probably a bug since the user has no control @@ -123,18 +120,10 @@ class NodeUpdater: "Applied config {}".format(self.runtime_hash)): self.do_update() except Exception as e: - error_str = str(e) - if hasattr(e, "cmd"): - error_str = "(Exit Status {}) {}".format( - e.returncode, " ".join(e.cmd)) - self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: STATUS_UPDATE_FAILED}) cli_logger.error("New status: {}", cf.bold(STATUS_UPDATE_FAILED)) - cli_logger.old_error(logger, "{}Error executing: {}\n", - self.log_prefix, error_str) - cli_logger.error("!!!") if hasattr(e, "cmd"): cli_logger.error( @@ -236,22 +225,15 @@ class NodeUpdater: with cli_logger.group( "Waiting for SSH to become available", _numbered=("[]", 1, 6)): with LogTimer(self.log_prefix + "Got remote shell"): - cli_logger.old_info(logger, "{}Waiting for remote shell...", - self.log_prefix) cli_logger.print("Running `{}` as a test.", cf.bold("uptime")) first_conn_refused_time = None while time.time() < deadline and \ not self.provider.is_terminated(self.node_id): try: - cli_logger.old_debug(logger, - "{}Waiting for remote shell...", - self.log_prefix) - # Run outside of the container self.cmd_runner.run( "uptime", timeout=5, run_env="host") - cli_logger.old_debug(logger, "Uptime succeeded.") cli_logger.success("Success.") return True except ProcessRunnerError as e: @@ -277,9 +259,6 @@ class NodeUpdater: "SSH still not available {}, " "retrying in {} seconds.", cf.dimmed(retry_str), cf.bold(str(READY_CHECK_INTERVAL))) - cli_logger.old_debug(logger, - "{}Node not up, retrying: {}", - self.log_prefix, retry_str) time.sleep(READY_CHECK_INTERVAL) @@ -314,9 +293,6 @@ class NodeUpdater: "Configuration already up to date, " "skipping file mounts, initalization and setup commands.", _numbered=("[]", "2-5", 6)) - cli_logger.old_info(logger, - "{}{} already up-to-date, skip to ray start", - self.log_prefix, self.node_id) else: cli_logger.print( @@ -435,9 +411,6 @@ class NodeUpdater: raise click.ClickException("Start command failed.") def rsync_up(self, source, target, docker_mount_if_possible=False): - cli_logger.old_info(logger, "{}Syncing {} to {}...", self.log_prefix, - source, target) - options = {} options["docker_mount_if_possible"] = docker_mount_if_possible options["rsync_exclude"] = self.rsync_options.get("rsync_exclude") @@ -447,9 +420,6 @@ class NodeUpdater: cf.bold(source), cf.bold(target)) def rsync_down(self, source, target, docker_mount_if_possible=False): - cli_logger.old_info(logger, "{}Syncing {} from {}...", self.log_prefix, - source, target) - options = {} options["docker_mount_if_possible"] = docker_mount_if_possible options["rsync_exclude"] = self.rsync_options.get("rsync_exclude") diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 5b6e69bcf..d8558ba37 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -465,8 +465,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, # Get the node IP address if one is not provided. ray_params.update_if_absent(node_ip_address=node_ip_address) cli_logger.labeled_value("Local node IP", ray_params.node_ip_address) - cli_logger.old_info(logger, "Using IP address {} for this node.", - ray_params.node_ip_address) ray_params.update_if_absent( redis_port=port, redis_shard_ports=redis_shard_ports, @@ -525,24 +523,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, cli_logger.newline() cli_logger.print("To terminate the Ray runtime, run") cli_logger.print(cf.bold(" ray stop")) - - cli_logger.old_info( - logger, - "\nStarted Ray on this node. You can add additional nodes to " - "the cluster by calling\n\n" - " ray start --address='{}'{}\n\n" - "from the node you wish to add. You can connect a driver to the " - "cluster from Python by running\n\n" - " import ray\n" - " ray.init(address='auto'{})\n\n" - "If you have trouble connecting from a different machine, check " - "that your firewall is configured properly. If you wish to " - "terminate the processes that have been started, run\n\n" - " ray stop".format( - redis_address, " --redis-password='" + redis_password + "'" - if redis_password else "", - ", _redis_password='" + redis_password + "'" - if redis_password else "")) else: # Start Ray on a non-head node. if not (port is None): @@ -589,8 +569,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, node_ip_address=services.get_node_ip_address(redis_address)) cli_logger.labeled_value("Local node IP", ray_params.node_ip_address) - cli_logger.old_info(logger, "Using IP address {} for this node.", - ray_params.node_ip_address) # Check that there aren't already Redis clients with the same IP # address connected with this Redis instance. This raises an exception @@ -610,11 +588,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, cli_logger.print("To terminate the Ray runtime, run") cli_logger.print(cf.bold(" ray stop")) - cli_logger.old_info( - logger, "\nStarted Ray on this node. If you wish to terminate the " - "processes that have been started, run\n\n" - " ray stop") - if block: cli_logger.newline() with cli_logger.group(cf.bold("--block")): @@ -630,8 +603,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, if len(deceased) > 0: cli_logger.newline() cli_logger.error("Some Ray subprcesses exited unexpectedly:") - cli_logger.old_error(logger, - "Ray processes died unexpectedly:") with cli_logger.indented(): for process_type, process in deceased: @@ -639,15 +610,10 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, "{}", cf.bold(str(process_type)), _tags={"exit code": str(process.returncode)}) - cli_logger.old_error( - logger, "\t{} died with exit code {}".format( - process_type, process.returncode)) # shutdown_at_exit will handle cleanup. cli_logger.newline() cli_logger.error("Remaining processes will be killed.") - cli_logger.old_error( - logger, "Killing remaining processes and exiting...") sys.exit(1) @@ -719,10 +685,6 @@ def stop(force, verbose, log_style, log_color): total_found += 1 proc_string = str(subprocess.list2cmdline(proc_args)) - if verbose: - operation = "Terminating" if force else "Killing" - cli_logger.old_info(logger, "%s process %s: %s", operation, - proc.pid, proc_string) try: if force: proc.kill() @@ -749,7 +711,6 @@ def stop(force, verbose, log_style, log_color): except (psutil.Error, OSError) as ex: cli_logger.error("Could not terminate `{}` due to {}", cf.bold(proc_string), str(ex)) - cli_logger.old_error(logger, "Error: %s", ex) if total_found == 0: cli_logger.print("Did not find any active Ray processes.") @@ -847,7 +808,6 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only, cli_logger.warning("{}", str(e)) cli_logger.warning( "Could not download remote cluster configuration file.") - cli_logger.old_info(logger, "Error downloading file: ", e) create_or_update_cluster( config_file=cluster_config_file, override_min_workers=min_workers, @@ -1130,11 +1090,6 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name, cf.bold("-- "), cf.bold("ray submit script.py -- --arg=123 --flag")) cli_logger.newline() - cli_logger.old_warning( - logger, - "ray submit [yaml] [script.py] --args=... is deprecated and " - "will be removed in a future version of Ray. Use " - "`ray submit [yaml] script.py -- --arg1 --arg2` instead.") if start: create_or_update_cluster( diff --git a/python/ray/tests/aws/test_autoscaler_aws.py b/python/ray/tests/aws/test_autoscaler_aws.py index 992b90f09..697c9efb1 100644 --- a/python/ray/tests/aws/test_autoscaler_aws.py +++ b/python/ray/tests/aws/test_autoscaler_aws.py @@ -5,7 +5,7 @@ import ray.tests.aws.utils.stubs as stubs import ray.tests.aws.utils.helpers as helpers from ray.tests.aws.utils.constants import AUX_SUBNET, DEFAULT_SUBNET, \ DEFAULT_SG_AUX_SUBNET, DEFAULT_SG, DEFAULT_SG_DUAL_GROUP_RULES, \ - DEFAULT_SG_WITH_RULES_AUX_SUBNET, DEFAULT_SG_WITH_RULES, AUX_SG, \ + DEFAULT_SG_WITH_RULES_AUX_SUBNET, AUX_SG, \ DEFAULT_SG_WITH_NAME, DEFAULT_SG_WITH_NAME_AND_RULES, CUSTOM_IN_BOUND_RULES @@ -50,13 +50,6 @@ def test_create_sg_different_vpc_same_rules(iam_client_stub, ec2_client_stub): DEFAULT_SG_WITH_RULES_AUX_SUBNET, ) - # given the prior modification to the head security group... - # expect the next read of a head security group property to reload it - stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES) - # given the prior modification to the worker security group... - # expect the next read of a worker security group property to reload it - stubs.describe_sg_echo(ec2_client_stub, DEFAULT_SG_WITH_RULES_AUX_SUBNET) - # given our mocks and an example config file as input... # expect the config to be loaded, validated, and bootstrapped successfully config = helpers.bootstrap_aws_example_config_file("example-subnets.yaml") diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 4f128f91e..e4427a00a 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -103,8 +103,10 @@ def _debug_check_line_by_line(result, expected_lines): exp = expected_lines[i] matched = re.fullmatch(exp + r" *", out) is not None if not matched: - print(f"!!! ERROR: Expected (regex): {repr(exp)}") - print(f"Got: {out}") + print(f"{i:>3}: {out}") + print(f"!!! ^ ERROR: Expected (regex): {repr(exp)}") + else: + print(f"{i:>3}: {out}") i += 1 if i < len(expected_lines): print("!!! ERROR: Expected extra lines (regex):") @@ -134,9 +136,6 @@ def _load_output_pattern(name): def _check_output_via_pattern(name, result): expected_lines = _load_output_pattern(name) - print("---") - print(result.output) - print("---") if result.exception is not None: print(result.output) diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt index 3c940fefd..2e70f7aa6 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt @@ -18,14 +18,18 @@ .+\.py.*Fetching the new head node .+\.py.*<1/1> Setting up head node .+\.py.*Prepared bootstrap config +.+\.py.*AWSNodeProvider: Set tag ray-node-status=waiting-for-ssh on \['.+'\] \[LogTimer=.+\] .+\.py.*New status: waiting-for-ssh .+\.py.*\[1/6\] Waiting for SSH to become available .+\.py.*Running `uptime` as a test\. .+\.py.*Fetched IP: .+ +.+\.py.*NodeUpdater: .+: Got IP \[LogTimer=.+\] .+\.py.*Running `uptime` .+\.py.*Full command is `ssh.+` .+\.py.*Success\. +.+\.py.*NodeUpdater: .+: Got remote shell \[LogTimer=.+\] .+\.py.*Updating cluster configuration\. \[hash=.+\] +.+\.py.*AWSNodeProvider: Set tag ray-node-status=syncing-files on \['.+'\] \[LogTimer=.+\] .+\.py.*New status: syncing-files .+\.py.*\[3/6\] Processing file mounts .+\.py.*Running `mkdir -p ~/tests` @@ -33,21 +37,26 @@ .+\.py.*Running `rsync.+` .+\.py.*`rsync`ed \./ \(local\) to ~/tests/ \(remote\) .+\.py.*~/tests/ from \./ +.+\.py.+NodeUpdater: i-.+: Synced \./ to ~/tests/ \[LogTimer=.+\] .+\.py.*Running `mkdir -p ~` .+\.py.*Full command is `ssh.+` .+\.py.*Running `rsync.+` .+\.py.*`rsync`ed .+/ray-bootstrap-.+ \(local\) to ~/ray_bootstrap_config\.yaml \(remote\) .+\.py.*~/ray_bootstrap_config\.yaml from .+ray-bootstrap-.+ +.+\.py.*NodeUpdater: i-.+: Synced .+ray-bootstrap-.+ to ~/ray_bootstrap_config\.yaml \[LogTimer=.+\] .+\.py.*Running `mkdir -p ~` .+\.py.*Full command is `ssh.+` .+\.py.*Running `rsync.+` .+\.py.*`rsync`ed .+__test-cli_key-1\.pem \(local\) to ~/ray_bootstrap_key\.pem \(remote\) .+\.py.*~/ray_bootstrap_key\.pem from .+__test-cli_key-1\.pem +.+\.py.*NodeUpdater: i-.+: Synced .+.pem to ~/ray_bootstrap_key\.pem \[LogTimer=.+\] .+\.py.*\[4/6\] No worker file mounts to sync +.+\.py.*AWSNodeProvider: Set tag ray-node-status=setting-up on \['.+'\] \[LogTimer=.+\] .+\.py.*New status: setting-up .+\.py.*\[3/5\] Running initialization commands .+\.py.*Running `echo init` .+\.py.*Full command is `ssh.+` +.+\.py.*NodeUpdater: i-.+: Initialization commands succeeded \[LogTimer=.+\] .+\.py.*\[4/6\] Running setup commands .+\.py.*\(0/4\) echo a .+\.py.*Running `echo a` @@ -61,11 +70,17 @@ .+\.py.*\(3/4\) echo head .+\.py.*Running `echo head` .+\.py.*Full command is `ssh.+` +.+\.py.*NodeUpdater: i-.+: Setup commands succeeded \[LogTimer=.+\] .+\.py.*\[6/6\] Starting the Ray runtime .+\.py.*Running `ray stop` .+\.py.*Full command is `ssh.+` .+\.py.*Running `ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml` .+\.py.*Full command is `ssh.+` +.+\.py.*NodeUpdater: i-.+: Ray start commands succeeded \[LogTimer=.+\] +.+\.py.*NodeUpdater: i-.+: Applied config .+ \[LogTimer=.+\] +.+\.py.*AWSNodeProvider: Set tag ray-node-status=up-to-date on \['.+'\] \[LogTimer=.+\] +.+\.py.*AWSNodeProvider: Set tag ray-runtime-config=.+ on \['.+'\] \[LogTimer=.+\] +.+\.py.*AWSNodeProvider: Set tag ray-file-mounts-contents=.+ on \['.+'\] \[LogTimer=.+\] .+\.py.*New status: up-to-date .+\.py.*Useful commands .+\.py.*Monitor autoscaling with