From 40b8e35d61982b149e88fc09effd001879649bff Mon Sep 17 00:00:00 2001 From: Maksim Smolin Date: Tue, 11 Aug 2020 09:58:23 -0700 Subject: [PATCH] [cli] New logging for the rest of the `ray` commands (#9984) Co-authored-by: Richard Liaw --- python/ray/autoscaler/cli_logger.py | 76 +++- python/ray/autoscaler/command_runner.py | 19 +- python/ray/autoscaler/commands.py | 43 +- .../ray/autoscaler/subprocess_output_util.py | 4 +- python/ray/autoscaler/updater.py | 21 +- python/ray/resource_spec.py | 31 +- python/ray/scripts/scripts.py | 411 +++++++++++++++--- python/ray/services.py | 19 +- 8 files changed, 508 insertions(+), 116 deletions(-) diff --git a/python/ray/autoscaler/cli_logger.py b/python/ray/autoscaler/cli_logger.py index 8fcd93cf9..5943eedcc 100644 --- a/python/ray/autoscaler/cli_logger.py +++ b/python/ray/autoscaler/cli_logger.py @@ -9,6 +9,9 @@ as well as indentation and other structured output. """ import sys +import logging +import inspect +import os import click @@ -18,6 +21,60 @@ import colorful as cf colorama.init() +def _patched_makeRecord(self, + name, + level, + fn, + lno, + msg, + args, + exc_info, + func=None, + extra=None, + sinfo=None): + """Monkey-patched version of logging.Logger.makeRecord + We have to patch default loggers so they use the proper frame for + line numbers and function names (otherwise everything shows up as + e.g. cli_logger:info() instead of as where it was called from). + + In Python 3.8 we could just use stacklevel=2, but we have to support + Python 3.6 and 3.7 as well. + + The solution is this Python magic superhack. + + The default makeRecord will deliberately check that we don't override + any existing property on the LogRecord using `extra`, + so we remove that check. + + This patched version is otherwise identical to the one in the standard + library. + """ + rv = logging.LogRecord(name, level, fn, lno, msg, args, exc_info, func, + sinfo) + if extra is not None: + rv.__dict__.update(extra) + return rv + + +logging.Logger.makeRecord = _patched_makeRecord + + +def _parent_frame_info(): + """Get the info from the caller frame. + + Used to override the logging function and line number with the correct + ones. See the comment on _patched_makeRecord for more info. + """ + + frame = inspect.currentframe() + # we are also in a function, so must go 2 levels up + caller = frame.f_back.f_back + return { + "lineno": caller.f_lineno, + "filename": os.path.basename(caller.f_code.co_filename), + } + + def _format_msg(msg, *args, _tags=None, @@ -93,7 +150,7 @@ def _format_msg(msg, if _no_format: # todo: throw if given args/kwargs? return numbering_str + msg + tags_str - return numbering_str + msg.format(*args, **kwargs) + tags_str + return numbering_str + cf.format(msg, *args, **kwargs) + tags_str if kwargs: raise ValueError("We do not support printing kwargs yet.") @@ -180,7 +237,7 @@ class _CliLogger(): def newline(self): """Print a line feed. """ - self._print("") + self.print("") def _print(self, msg, linefeed=True): """Proxy for printing messages. @@ -377,7 +434,8 @@ class _CliLogger(): For other arguments, see `_format_msg`. """ if self.old_style: - logger.debug(_format_msg(msg, *args, **kwargs)) + logger.debug( + _format_msg(msg, *args, **kwargs), extra=_parent_frame_info()) return def old_info(self, logger, msg, *args, **kwargs): @@ -393,7 +451,8 @@ class _CliLogger(): For other arguments, see `_format_msg`. """ if self.old_style: - logger.info(_format_msg(msg, *args, **kwargs)) + logger.info( + _format_msg(msg, *args, **kwargs), extra=_parent_frame_info()) return def old_warning(self, logger, msg, *args, **kwargs): @@ -409,7 +468,8 @@ class _CliLogger(): For other arguments, see `_format_msg`. """ if self.old_style: - logger.warning(_format_msg(msg, *args, **kwargs)) + logger.warning( + _format_msg(msg, *args, **kwargs), extra=_parent_frame_info()) return def old_error(self, logger, msg, *args, **kwargs): @@ -425,7 +485,8 @@ class _CliLogger(): For other arguments, see `_format_msg`. """ if self.old_style: - logger.error(_format_msg(msg, *args, **kwargs)) + logger.error( + _format_msg(msg, *args, **kwargs), extra=_parent_frame_info()) return def old_exception(self, logger, msg, *args, **kwargs): @@ -441,7 +502,8 @@ class _CliLogger(): For other arguments, see `_format_msg`. """ if self.old_style: - logger.exception(_format_msg(msg, *args, **kwargs)) + logger.exception( + _format_msg(msg, *args, **kwargs), extra=_parent_frame_info()) return def render_list(self, xs, separator=cf.reset(", ")): diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index bd4b064d9..d2b9ed76b 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -26,7 +26,20 @@ HASH_MAX_LENGTH = 10 KUBECTL_RSYNC = os.path.join( os.path.dirname(os.path.abspath(__file__)), "kubernetes/kubectl-rsync.sh") -_config = {"use_login_shells": True} +_config = {"use_login_shells": True, "silent_rsync": True} + + +def is_rsync_silent(): + return _config["silent_rsync"] + + +def set_rsync_silent(val): + """Choose whether to silence rsync output. + + Most commands will want to list rsync'd files themselves rather than + print the default rsync spew. + """ + _config["silent_rsync"] = val def is_using_login_shells(): @@ -460,7 +473,7 @@ class SSHCommandRunner(CommandRunnerInterface): target) ] cli_logger.verbose("Running `{}`", cf.bold(" ".join(command))) - self._run_helper(command, silent=True) + self._run_helper(command, silent=is_rsync_silent()) def run_rsync_down(self, source, target): self._set_ssh_ip_if_required() @@ -473,7 +486,7 @@ class SSHCommandRunner(CommandRunnerInterface): source), target ] cli_logger.verbose("Running `{}`", cf.bold(" ".join(command))) - self._run_helper(command, silent=True) + self._run_helper(command, silent=is_rsync_silent()) def remote_shell_command_str(self): if self.ssh_private_key: diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 5c4dd922e..4c09e8bf3 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -29,7 +29,8 @@ from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \ from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler.updater import NodeUpdaterThread -from ray.autoscaler.command_runner import set_using_login_shells +from ray.autoscaler.command_runner import set_using_login_shells, \ + set_rsync_silent from ray.autoscaler.command_runner import DockerCommandRunner from ray.autoscaler.log_timer import LogTimer from ray.worker import global_worker @@ -97,14 +98,9 @@ def create_or_update_cluster( config_file: str, override_min_workers: Optional[int], override_max_workers: Optional[int], no_restart: bool, restart_only: bool, yes: bool, override_cluster_name: Optional[str], - no_config_cache: bool, log_old_style: bool, log_color: str, - dump_command_output: bool, use_login_shells: bool, - verbose: int) -> None: + no_config_cache: bool, dump_command_output: bool, + use_login_shells: bool) -> None: """Create or updates an autoscaling Ray cluster from a config json.""" - cli_logger.old_style = log_old_style - cli_logger.color_mode = log_color - cli_logger.verbosity = verbose - set_using_login_shells(use_login_shells) cmd_output_util.set_output_redirected(not dump_command_output) @@ -184,6 +180,7 @@ def create_or_update_cluster( # because it only supports aws if config["provider"]["type"] != "aws": cli_logger.old_style = True + cli_logger.newline() config = _bootstrap_config(config, no_config_cache) if config["provider"]["type"] != "aws": cli_logger.old_style = False @@ -217,7 +214,6 @@ def _bootstrap_config(config: Dict[str, Any], try_reload_log_state(config_cache["config"]["provider"], config_cache.get("provider_log_info")) - cli_logger.newline() cli_logger.verbose_warning( "Loaded cached provider configuration " "from " + cf.bold("{}"), cache_key) @@ -264,14 +260,8 @@ def _bootstrap_config(config: Dict[str, Any], def teardown_cluster(config_file: str, yes: bool, workers_only: bool, override_cluster_name: Optional[str], - keep_min_workers: bool, log_old_style: bool, - log_color: str, verbose: int): + keep_min_workers: bool): """Destroys all nodes of a Ray cluster described by a config json.""" - cli_logger.old_style = log_old_style - cli_logger.color_mode = log_color - cli_logger.verbosity = verbose - cli_logger.dump_command_output = verbose == 3 # todo: add a separate flag? - config = yaml.safe_load(open(config_file).read()) if override_cluster_name is not None: config["cluster_name"] = override_cluster_name @@ -375,7 +365,8 @@ def kill_node(config_file, yes, hard, override_cluster_name): config["cluster_name"] = override_cluster_name config = _bootstrap_config(config) - confirm("This will kill a node in your cluster", yes) + cli_logger.confirm(yes, "A random node will be killed.") + cli_logger.old_confirm("This will kill a node in your cluster", yes) provider = get_node_provider(config["provider"], config["cluster_name"]) try: @@ -383,7 +374,8 @@ def kill_node(config_file, yes, hard, override_cluster_name): TAG_RAY_NODE_TYPE: NODE_TYPE_WORKER }) node = random.choice(nodes) - logger.info("kill_node: Shutdown worker {}".format(node)) + cli_logger.print("Shutdown " + cf.bold("{}"), node) + cli_logger.old_info(logger, "kill_node: Shutdown worker {}", node) if hard: provider.terminate_node(node) else: @@ -682,7 +674,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, cli_logger.newline() with cli_logger.group("Useful commands"): - cli_logger.print("Monitor auto-scailng with") + cli_logger.print("Monitor autoscaling with") cli_logger.print( cf.bold(" ray exec {}{} {}"), raw_config_file, modifiers, quote(monitor_str)) @@ -820,9 +812,12 @@ def exec_cluster(config_file: str, attach_command_parts.append("--screen") attach_command = " ".join(attach_command_parts) + cli_logger.print("Run `{}` to check command status.", + cf.bold(attach_command)) + attach_info = "Use `{}` to check on command status.".format( attach_command) - logger.info(attach_info) + cli_logger.old_info(logger, attach_info) return result finally: provider.cleanup() @@ -873,6 +868,10 @@ def rsync(config_file: str, down: whether we're syncing remote -> local all_nodes: whether to sync worker nodes in addition to the head node """ + if bool(source) != bool(target): + cli_logger.abort( + "Expected either both a source and a target, or neither.") + assert bool(source) == bool(target), ( "Must either provide both or neither source and target.") @@ -918,6 +917,10 @@ def rsync(config_file: str, rsync = updater.rsync_up if source and target: + # print rsync progress for single file rsync + cmd_output_util.set_output_redirected(False) + set_rsync_silent(False) + rsync(source, target) else: updater.sync_file_mounts(rsync) diff --git a/python/ray/autoscaler/subprocess_output_util.py b/python/ray/autoscaler/subprocess_output_util.py index 96670bb6d..fbcf3f2fe 100644 --- a/python/ray/autoscaler/subprocess_output_util.py +++ b/python/ray/autoscaler/subprocess_output_util.py @@ -14,7 +14,7 @@ _config = {"redirect_output": True} def is_output_redirected(): - return _config["_redirect_output"] + return _config["redirect_output"] def set_output_redirected(val): @@ -26,7 +26,7 @@ def set_output_redirected(val): val (bool): If true, subprocess output will be redirected to a temporary file. """ - _config["_redirect_output"] = val + _config["redirect_output"] = val class ProcessRunnerError(Exception): diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index d1924b343..1b39e72e4 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -119,7 +119,10 @@ class NodeUpdater: self.exitcode = 0 - def sync_file_mounts(self, sync_cmd): + def sync_file_mounts(self, sync_cmd, step_numbers=(0, 2)): + # step_numbers is (# of previous steps, total steps) + previous_steps, total_steps = step_numbers + nolog_paths = [] if cli_logger.verbosity == 0: nolog_paths = [ @@ -154,18 +157,21 @@ class NodeUpdater: # Rsync file mounts with cli_logger.group( - "Processing file mounts", _numbered=("[]", 2, 6)): + "Processing file mounts", + _numbered=("[]", previous_steps + 1, total_steps)): for remote_path, local_path in self.file_mounts.items(): do_sync(remote_path, local_path) if self.cluster_synced_files: with cli_logger.group( - "Processing worker file mounts", _numbered=("[]", 3, 6)): + "Processing worker file mounts", + _numbered=("[]", previous_steps + 2, total_steps)): for path in self.cluster_synced_files: do_sync(path, path, allow_non_existing_paths=True) else: cli_logger.print( - "No worker file mounts to sync", _numbered=("[]", 3, 6)) + "No worker file mounts to sync", + _numbered=("[]", previous_steps + 2, total_steps)) def wait_ready(self, deadline): with cli_logger.group( @@ -239,7 +245,8 @@ class NodeUpdater: # full setup might be cancelled here cli_logger.print( "Configuration already up to date, " - "skipping file mounts, initalization and setup commands.") + "skipping file mounts, initalization and setup commands.", + _numbered=("[]", "2-5", 6)) cli_logger.old_info(logger, "{}{} already up-to-date, skip to ray start", self.log_prefix, self.node_id) @@ -252,7 +259,7 @@ class NodeUpdater: self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SYNCING_FILES}) cli_logger.labeled_value("New status", STATUS_SYNCING_FILES) - self.sync_file_mounts(self.rsync_up) + self.sync_file_mounts(self.rsync_up, step_numbers=(2, 6)) # Only run setup commands if runtime_hash has changed because # we don't want to run setup_commands every time the head node @@ -331,7 +338,9 @@ class NodeUpdater: self.log_prefix + "Ray start commands", show_status=True): for cmd in self.ray_start_commands: try: + cmd_output_util.set_output_redirected(False) self.cmd_runner.run(cmd) + cmd_output_util.set_output_redirected(True) except ProcessRunnerError as e: if e.msg_type == "ssh_command_failed": cli_logger.error("Failed.") diff --git a/python/ray/resource_spec.py b/python/ray/resource_spec.py index 9779baa34..e196e3541 100644 --- a/python/ray/resource_spec.py +++ b/python/ray/resource_spec.py @@ -10,6 +10,9 @@ import sys import ray import ray.ray_constants as ray_constants +from ray.autoscaler.cli_logger import cli_logger +import colorful as cf + logger = logging.getLogger(__name__) # Prefix for the node id resource that is automatically added to each node. @@ -220,15 +223,31 @@ class ResourceSpec( round(memory / 1e9, 2), int(100 * (memory / system_memory)))) - logger.info( + rounded_memory = ray_constants.round_to_memory_units( + memory, round_up=False) + worker_ram = round(rounded_memory / (1024**3), 2) + object_ram = round(object_store_memory / (1024**3), 2) + + # TODO(maximsmol): this behavior is strange since we do not have a + # good grasp on when this will get called + # (you have to study node.py to make a guess) + with cli_logger.group("Available RAM"): + cli_logger.labeled_value("Workers", "{} GiB", str(worker_ram)) + cli_logger.labeled_value("Objects", "{} GiB", str(object_ram)) + cli_logger.newline() + cli_logger.print("To adjust these values, use") + with cf.with_style("monokai") as c: + cli_logger.print( + " ray{0}init(memory{1}{2}, " + "object_store_memory{1}{2})", c.magenta("."), + c.magenta("="), c.purple("")) + + cli_logger.old_info( + logger, "Starting Ray with {} GiB memory available for workers and up to " "{} GiB for objects. You can adjust these settings " "with ray.init(memory=, " - "object_store_memory=).".format( - round( - ray_constants.round_to_memory_units( - memory, round_up=False) / (1024**3), 2), - round(object_store_memory / (1024**3), 2))) + "object_store_memory=).", worker_ram, object_ram) spec = ResourceSpec(num_cpus, num_gpus, memory, object_store_memory, resources, redis_max_memory) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 7def6117c..ad86a4bf4 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -21,6 +21,10 @@ import ray.ray_constants as ray_constants import ray.utils from ray.projects.scripts import project_cli, session_cli +from ray.autoscaler.subprocess_output_util import set_output_redirected +from ray.autoscaler.cli_logger import cli_logger +import colorful as cf + logger = logging.getLogger(__name__) @@ -48,6 +52,38 @@ def check_no_existing_redis_clients(node_ip_address, redis_client): "clients with this IP address.") +logging_options = [ + click.option( + "--log-new-style/--log-old-style", + is_flag=True, + default=False, + envvar="RAY_LOG_NEWSTYLE", + help=("Whether to use the old or the new CLI UX. " + "The new UX supports colored, formatted output and was " + "designed to display only the most important information for " + "human users. The old UX uses the standard `logging` module. " + "It is most suitable for writing to a file and will include " + "timestamps and message level (ERROR/WARNING/INFO).")), + click.option( + "--log-color", + required=False, + type=click.Choice(["auto", "false", "true"], case_sensitive=False), + default="auto", + help=("Use color logging. " + "Valid values are: auto (if stdout is a tty), true, false.")), + click.option("-v", "--verbose", count=True) +] + + +def add_click_options(options): + def wrapper(f): + for option in reversed(logging_options): + f = option(f) + return f + + return wrapper + + @click.group() @click.option( "--logging-level", @@ -353,6 +389,7 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): default=None, help="the port to use to expose Ray metrics through a " "Prometheus endpoint.") +@add_click_options(logging_options) def start(node_ip_address, redis_address, address, redis_port, port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, @@ -363,32 +400,60 @@ def start(node_ip_address, redis_address, address, redis_port, port, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, java_worker_options, load_code_from_local, internal_config, - lru_evict, enable_object_reconstruction, metrics_export_port): + lru_evict, enable_object_reconstruction, metrics_export_port, + log_new_style, log_color, verbose): """Start Ray processes manually on the local machine.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + if gcs_server_port and not head: raise ValueError( "gcs_server_port can be only assigned when you specify --head.") if redis_address is not None: + cli_logger.abort("{} is deprecated. Use {} instead.", + cf.bold("--redis-address"), cf.bold("--address")) + raise DeprecationWarning("The --redis-address argument is " "deprecated. Please use --address instead.") if redis_port is not None: - logger.warn("The --redis-port argument will be deprecated soon. " - "Please use --port instead.") + cli_logger.warning("{} is being deprecated. Use {} instead.", + cf.bold("--redis-port"), cf.bold("--port")) + cli_logger.old_warning( + logger, "The --redis-port argument will be deprecated soon. " + "Please use --port instead.") if port is not None and port != redis_port: + cli_logger.abort( + "Incompatible values for {} and {}. Use only {} instead.", + cf.bold("--port"), cf.bold("--redis-port"), cf.bold("--port")) + raise ValueError("Cannot specify both --port and --redis-port " "as port is a rename of deprecated redis-port") if include_webui is not None: - logger.warn("The --include-webui argument will be deprecated soon" - "Please use --include-dashboard instead.") + cli_logger.warning("{} is being deprecated. Use {} instead.", + cf.bold("--include-webui"), + cf.bold("--include-dashboard")) + cli_logger.old_warning( + logger, "The --include-webui argument will be deprecated soon" + "Please use --include-dashboard instead.") if include_dashboard is not None: include_dashboard = include_webui dashboard_host_default = "localhost" if webui_host != dashboard_host_default: - logger.warn("The --webui-host argument will be deprecated" - " soon. Please use --dashboard-host instead.") + cli_logger.warning("{} is being deprecated. Use {} instead.", + cf.bold("--webui-host"), + cf.bold("--dashboard-host")) + cli_logger.old_warning( + logger, "The --webui-host argument will be deprecated" + " soon. Please use --dashboard-host instead.") if webui_host != dashboard_host and dashboard_host != "localhost": + cli_logger.abort( + "Incompatible values for {} and {}. Use only {} instead.", + cf.bold("--dashboard-host"), cf.bold("--webui-host"), + cf.bold("--dashboard-host")) + raise ValueError( "Cannot specify both --webui-host and --dashboard-host," " please specify only the latter") @@ -407,6 +472,13 @@ def start(node_ip_address, redis_address, address, redis_port, port, try: resources = json.loads(resources) except Exception: + cli_logger.error("`{}` is not a valid JSON string.", + cf.bold("--resources")) + cli_logger.abort( + "Valid values look like this: `{}`", + cf.bold("--resources='\"CustomResource3\": 1, " + "\"CustomResource2\": 2}'")) + raise Exception("Unable to parse the --resources argument using " "json.loads. Try using a format like\n\n" " --resources='{\"CustomResource1\": 3, " @@ -454,6 +526,16 @@ def start(node_ip_address, redis_address, address, redis_port, port, num_redis_shards = len(redis_shard_ports) # Check that the arguments match. if len(redis_shard_ports) != num_redis_shards: + cli_logger.error( + "`{}` must be a comma-separated list of ports, " + "with length equal to `{}` (which defaults to {})", + cf.bold("--redis-shard-ports"), + cf.bold("--num-redis-shards"), cf.bold("1")) + cli_logger.abort( + "Example: `{}`", + cf.bold("--num-redis-shards 3 " + "--redis_shard_ports 6380,6381,6382")) + raise Exception("If --redis-shard-ports is provided, it must " "have the form '6380,6381,6382', and the " "number of ports provided must equal " @@ -461,6 +543,10 @@ def start(node_ip_address, redis_address, address, redis_port, port, "provided)") if redis_address is not None: + cli_logger.abort( + "`{}` starts a new Redis server, `{}` should not be set.", + cf.bold("--head"), cf.bold("--address")) + raise Exception("If --head is passed in, a Redis server will be " "started, so a Redis address should not be " "provided.") @@ -468,8 +554,9 @@ def start(node_ip_address, redis_address, address, redis_port, port, # Get the node IP address if one is not provided. ray_params.update_if_absent( node_ip_address=services.get_node_ip_address()) - logger.info("Using IP address {} for this node.".format( - ray_params.node_ip_address)) + cli_logger.labeled_value("Local node IP", ray_params.node_ip_address) + cli_logger.old_info(logger, "Using IP address {} for this node.", + ray_params.node_ip_address) ray_params.update_if_absent( redis_port=port or redis_port, redis_shard_ports=redis_shard_ports, @@ -484,7 +571,45 @@ def start(node_ip_address, redis_address, address, redis_port, port, ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block) redis_address = node.redis_address - logger.info( + # new-style CLI UX (--log-new-style) + # this is a noop if that flag is not set, so the old logger calls + # are still in place + cli_logger.newline() + startup_msg = "Ray runtime started." + cli_logger.success("-" * len(startup_msg)) + cli_logger.success(startup_msg) + cli_logger.success("-" * len(startup_msg)) + cli_logger.newline() + with cli_logger.group("Next steps"): + cli_logger.print( + "To connect to this Ray runtime from another node, run") + cli_logger.print( + cf.bold(" ray start --address='{}'{}"), redis_address, + " --redis-password='{}'".format(redis_password) + if redis_password else "") + cli_logger.newline() + cli_logger.print("Alternatively, use the following Python code:") + with cli_logger.indented(): + with cf.with_style("monokai") as c: + cli_logger.print("{} ray", c.magenta("import")) + cli_logger.print( + "ray{}init(address{}{}{})", c.magenta("."), + c.magenta("="), c.yellow("'auto'"), + ", redis_password{}{}".format( + c.magenta("="), + c.yellow("'" + redis_password + "'")) + if redis_password else "") + cli_logger.newline() + cli_logger.print( + cf.underlined("If connection fails, check your " + "firewall settings other " + "network configuration.")) + cli_logger.newline() + cli_logger.print("To terminate the Ray runtime, run") + cli_logger.print(cf.bold(" ray stop")) + + cli_logger.old_info( + logger, "\nStarted Ray on this node. You can add additional nodes to " "the cluster by calling\n\n" " ray start --address='{}'{}\n\n" @@ -503,29 +628,54 @@ def start(node_ip_address, redis_address, address, redis_port, port, else: # Start Ray on a non-head node. if not (redis_port is None and port is None): + cli_logger.abort("`{}/{}` should not be specified without `{}`.", + cf.bold("--port"), cf.bold("--redis-port"), + cf.bold("--head")) + raise Exception( "If --head is not passed in, --port and --redis-port are not " "allowed.") if redis_shard_ports is not None: + cli_logger.abort("`{}` should not be specified without `{}`.", + cf.bold("--redis-shard-ports"), cf.bold("--head")) + raise Exception("If --head is not passed in, --redis-shard-ports " "is not allowed.") if redis_address is None: + cli_logger.abort("`{}` is required unless starting with `{}`.", + cf.bold("--address"), cf.bold("--head")) + raise Exception("If --head is not passed in, --address must " "be provided.") if num_redis_shards is not None: + cli_logger.abort("`{}` should not be specified without `{}`.", + cf.bold("--num-redis-shards"), cf.bold("--head")) + raise Exception("If --head is not passed in, --num-redis-shards " "must not be provided.") if redis_max_clients is not None: + cli_logger.abort("`{}` should not be specified without `{}`.", + cf.bold("--redis-max-clients"), cf.bold("--head")) + raise Exception("If --head is not passed in, --redis-max-clients " "must not be provided.") if include_webui: + cli_logger.abort("`{}` should not be specified without `{}`.", + cf.bold("--include-web-ui"), cf.bold("--head")) + raise Exception("If --head is not passed in, the --include-webui" "flag is not relevant.") if include_dashboard: + cli_logger.abort("`{}` should not be specified without `{}`.", + cf.bold("--include-dashboard"), cf.bold("--head")) + raise ValueError( "If --head is not passed in, the --include-dashboard" "flag is not relevant.") if include_java is not None: + cli_logger.abort("`{}` should not be specified without `{}`.", + cf.bold("--include-java"), cf.bold("--head")) + raise ValueError("--include-java should only be set for the head " "node.") @@ -545,8 +695,11 @@ def start(node_ip_address, redis_address, address, redis_port, port, # Get the node IP address if one is not provided. ray_params.update_if_absent( node_ip_address=services.get_node_ip_address(redis_address)) - logger.info("Using IP address {} for this node.".format( - ray_params.node_ip_address)) + + cli_logger.labeled_value("Local node IP", ray_params.node_ip_address) + cli_logger.old_info(logger, "Using IP address {} for this node.", + ray_params.node_ip_address) + # Check that there aren't already Redis clients with the same IP # address connected with this Redis instance. This raises an exception # if the Redis server already has clients on this node. @@ -555,21 +708,54 @@ def start(node_ip_address, redis_address, address, redis_port, port, ray_params.update(redis_address=redis_address) node = ray.node.Node( ray_params, head=False, shutdown_at_exit=block, spawn_reaper=block) - logger.info("\nStarted Ray on this node. If you wish to terminate the " - "processes that have been started, run\n\n" - " ray stop") + + cli_logger.newline() + startup_msg = "Ray runtime started." + cli_logger.success("-" * len(startup_msg)) + cli_logger.success(startup_msg) + cli_logger.success("-" * len(startup_msg)) + cli_logger.newline() + cli_logger.print("To terminate the Ray runtime, run") + cli_logger.print(cf.bold(" ray stop")) + + cli_logger.old_info( + logger, "\nStarted Ray on this node. If you wish to terminate the " + "processes that have been started, run\n\n" + " ray stop") if block: + cli_logger.newline() + with cli_logger.group(cf.bold("--block")): + cli_logger.print( + "This command will now block until terminated by a signal.") + cli_logger.print( + "Runing subprocesses are monitored and a message will be " + "printed if any of them terminate unexpectedly.") + while True: time.sleep(1) deceased = node.dead_processes() if len(deceased) > 0: - logger.error("Ray processes died unexpectedly:") - for process_type, process in deceased: - logger.error("\t{} died with exit code {}".format( - process_type, process.returncode)) + cli_logger.newline() + cli_logger.error("Some Ray subprcesses exited unexpectedly:") + cli_logger.old_error(logger, + "Ray processes died unexpectedly:") + + with cli_logger.indented(): + for process_type, process in deceased: + cli_logger.error( + "{}", + cf.bold(str(process_type)), + _tags={"exit code": str(process.returncode)}) + cli_logger.old_error( + logger, "\t{} died with exit code {}".format( + process_type, process.returncode)) + # shutdown_at_exit will handle cleanup. - logger.error("Killing remaining processes and exiting...") + cli_logger.newline() + cli_logger.error("Remaining processes will be killed.") + cli_logger.old_error( + logger, "Killing remaining processes and exiting...") sys.exit(1) @@ -579,13 +765,14 @@ def start(node_ip_address, redis_address, address, redis_port, port, "--force", is_flag=True, help="If set, ray will send SIGKILL instead of SIGTERM.") -@click.option( - "-v", - "--verbose", - is_flag=True, - help="If set, ray prints out more information about processes to kill.") -def stop(force, verbose): +@add_click_options(logging_options) +def stop(force, verbose, log_new_style, log_color): """Stop Ray processes manually on the local machine.""" + + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + # Note that raylet needs to exit before object store, otherwise # it cannot exit gracefully. is_linux = sys.platform.startswith("linux") @@ -619,12 +806,17 @@ def stop(force, verbose): process_infos.append((proc, proc.name(), proc.cmdline())) except psutil.Error: pass + + total_found = 0 + total_stopped = 0 for keyword, filter_by_cmd in processes_to_kill: if filter_by_cmd and is_linux and len(keyword) > 15: + # getting here is an internal bug, so we do not use cli_logger msg = ("The filter string should not be more than {} " "characters. Actual length: {}. Filter: {}").format( 15, len(keyword), keyword) raise ValueError(msg) + found = [] for candidate in process_infos: proc, proc_cmd, proc_args = candidate @@ -632,11 +824,15 @@ def stop(force, verbose): if filter_by_cmd else subprocess.list2cmdline(proc_args)) if keyword in corpus: found.append(candidate) + for proc, proc_cmd, proc_args in found: + total_found += 1 + + proc_string = str(subprocess.list2cmdline(proc_args)) if verbose: operation = "Terminating" if force else "Killing" - logger.info("%s process %s: %s", operation, proc.pid, - subprocess.list2cmdline(proc_args)) + cli_logger.old_info(logger, "%s process %s: %s", operation, + proc.pid, proc_string) try: if force: proc.kill() @@ -645,10 +841,41 @@ def stop(force, verbose): # We don't want CTRL_BREAK_EVENT, because that would # terminate the entire process group. What to do? proc.terminate() + + if force: + cli_logger.verbose("Killed `{}` {} ", cf.bold(proc_string), + cf.gray("(via SIGKILL)")) + else: + cli_logger.verbose("Send termination request to `{}` {}", + cf.bold(proc_string), + cf.gray("(via SIGTERM)")) + + total_stopped += 1 except psutil.NoSuchProcess: + cli_logger.verbose( + "Attempted to stop `{}`, but process was already dead.", + cf.bold(proc_string)) pass except (psutil.Error, OSError) as ex: - logger.error("Error: %s", ex) + cli_logger.error("Could not terminate `{}` due to {}", + cf.bold(proc_string), str(ex)) + cli_logger.old_error(logger, "Error: %s", ex) + + if total_found == 0: + cli_logger.print("Did not find any active Ray processes.") + else: + if total_stopped == total_found: + cli_logger.success("Stopped all {} Ray processes.", total_stopped) + else: + cli_logger.warning( + "Stopped only {} out of {} Ray processes. " + "Set `{}` to see more details.", total_stopped, total_found, + cf.bold("-v")) + cli_logger.warning("Try running the command again, or use `{}`.", + cf.bold("--force")) + + # TODO(maximsmol): we should probably block until the processes actually + # all died somehow @cli.command() @@ -692,18 +919,6 @@ def stop(force, verbose): is_flag=True, default=False, help="Disable the local cluster config cache.") -@click.option( - "--log-old-style/--log-new-style", - is_flag=True, - default=True, - help=("Use old logging.")) -@click.option( - "--log-color", - required=False, - type=str, - default="auto", - help=("Use color logging. " - "Valid values are: auto (if stdout is a tty), true, false.")) @click.option( "--dump-command-output", is_flag=True, @@ -717,14 +932,22 @@ def stop(force, verbose): help=("Ray uses login shells (bash --login -i) to run cluster commands " "by default. If your workflow is compatible with normal shells, " "this can be disabled for a better user experience.")) -@click.option("-v", "--verbose", count=True) +@add_click_options(logging_options) def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only, - yes, cluster_name, no_config_cache, log_old_style, log_color, - dump_command_output, use_login_shells, verbose): + yes, cluster_name, no_config_cache, dump_command_output, + use_login_shells, log_new_style, log_color, verbose): """Create or update a Ray cluster.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + if restart_only or no_restart: + cli_logger.doassert(restart_only != no_restart, + "`{}` is incompatible with `{}`.", + cf.bold("--restart-only"), cf.bold("--no-restart")) assert restart_only != no_restart, "Cannot set both 'restart_only' " \ "and 'no_restart' at the same time!" + if urllib.parse.urlparse(cluster_config_file).scheme in ("http", "https"): try: response = urllib.request.urlopen(cluster_config_file, timeout=5) @@ -734,11 +957,14 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only, f.write(content) cluster_config_file = file_name except urllib.error.HTTPError as e: - logger.info("Error downloading file: ", e) + cli_logger.warning("{}", str(e)) + cli_logger.warning( + "Could not download remote cluster configuration file.") + cli_logger.old_info(logger, "Error downloading file: ", e) create_or_update_cluster(cluster_config_file, min_workers, max_workers, no_restart, restart_only, yes, cluster_name, - no_config_cache, log_old_style, log_color, - dump_command_output, use_login_shells, verbose) + no_config_cache, dump_command_output, + use_login_shells) @cli.command() @@ -765,24 +991,16 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only, is_flag=True, default=False, help="Retain the minimal amount of workers specified in the config.") -@click.option( - "--log-old-style/--log-new-style", - is_flag=True, - default=True, - help=("Use old logging.")) -@click.option( - "--log-color", - required=False, - type=str, - default="auto", - help=("Use color logging. " - "Valid values are: auto (if stdout is a tty), true, false.")) -@click.option("-v", "--verbose", count=True) +@add_click_options(logging_options) def down(cluster_config_file, yes, workers_only, cluster_name, - keep_min_workers, log_old_style, log_color, verbose): + keep_min_workers, log_new_style, log_color, verbose): """Tear down a Ray cluster.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + teardown_cluster(cluster_config_file, yes, workers_only, cluster_name, - keep_min_workers, log_old_style, log_color, verbose) + keep_min_workers) @cli.command() @@ -825,8 +1043,14 @@ def kill_random_node(cluster_config_file, yes, hard, cluster_name): required=False, type=str, help="Override the configured cluster name.") -def monitor(cluster_config_file, lines, cluster_name): +@add_click_options(logging_options) +def monitor(cluster_config_file, lines, cluster_name, log_new_style, log_color, + verbose): """Tails the autoscaler logs of a Ray cluster.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + monitor_cluster(cluster_config_file, lines, cluster_name) @@ -856,9 +1080,14 @@ def monitor(cluster_config_file, lines, cluster_name): multiple=True, type=int, help="Port to forward. Use this multiple times to forward multiple ports.") +@add_click_options(logging_options) def attach(cluster_config_file, start, screen, tmux, cluster_name, new, - port_forward): + port_forward, log_new_style, log_color, verbose): """Create or attach to a SSH session to a Ray cluster.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + port_forward = [(port, port) for port in list(port_forward)] attach_cluster(cluster_config_file, start, screen, tmux, cluster_name, new, port_forward) @@ -874,8 +1103,14 @@ def attach(cluster_config_file, start, screen, tmux, cluster_name, new, required=False, type=str, help="Override the configured cluster name.") -def rsync_down(cluster_config_file, source, target, cluster_name): +@add_click_options(logging_options) +def rsync_down(cluster_config_file, source, target, cluster_name, + log_new_style, log_color, verbose): """Download specific files from a Ray cluster.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + rsync(cluster_config_file, source, target, cluster_name, down=True) @@ -895,8 +1130,14 @@ def rsync_down(cluster_config_file, source, target, cluster_name): is_flag=True, required=False, help="Upload to all nodes (workers and head).") -def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes): +@add_click_options(logging_options) +def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes, + log_new_style, log_color, verbose): """Upload specific files to a Ray cluster.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + rsync( cluster_config_file, source, @@ -945,8 +1186,10 @@ def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes): type=str, help="(deprecated) Use '-- --arg1 --arg2' for script args.") @click.argument("script_args", nargs=-1) +@add_click_options(logging_options) def submit(cluster_config_file, screen, tmux, stop, start, cluster_name, - port_forward, script, args, script_args): + port_forward, script, args, script_args, log_new_style, log_color, + verbose): """Uploads and runs a script on the specified cluster. The script is automatically synced to the following location: @@ -956,11 +1199,34 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name, Example: >>> ray submit [CLUSTER.YAML] experiment.py -- --smoke-test """ + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + + set_output_redirected(False) + + cli_logger.doassert(not (screen and tmux), + "`{}` and `{}` are incompatible.", cf.bold("--screen"), + cf.bold("--tmux")) + cli_logger.doassert( + not (script_args and args), + "`{0}` and `{1}` are incompatible. Use only `{1}`.\n" + "Example: `{2}`", cf.bold("--args"), cf.bold("-- "), + cf.bold("ray submit script.py -- --arg=123 --flag")) + assert not (screen and tmux), "Can specify only one of `screen` or `tmux`." assert not (script_args and args), "Use -- --arg1 --arg2 for script args." if args: - logger.warning( + cli_logger.warning( + "`{}` is deprecated and will be removed in the future.", + cf.bold("--args")) + cli_logger.warning("Use `{}` instead. Example: `{}`.", + cf.bold("-- "), + cf.bold("ray submit script.py -- --arg=123 --flag")) + cli_logger.newline() + cli_logger.old_warning( + logger, "ray submit [yaml] [script.py] --args=... is deprecated and " "will be removed in a future version of Ray. Use " "`ray submit [yaml] script.py -- --arg1 --arg2` instead.") @@ -1032,9 +1298,16 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name, multiple=True, type=int, help="Port to forward. Use this multiple times to forward multiple ports.") +@add_click_options(logging_options) def exec(cluster_config_file, cmd, run_env, screen, tmux, stop, start, - cluster_name, port_forward): + cluster_name, port_forward, log_new_style, log_color, verbose): """Execute a command via SSH on a Ray cluster.""" + cli_logger.old_style = not log_new_style + cli_logger.color_mode = log_color + cli_logger.verbosity = verbose + + set_output_redirected(False) + port_forward = [(port, port) for port in list(port_forward)] exec_cluster( diff --git a/python/ray/services.py b/python/ray/services.py index 3a819f273..2089ba447 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -19,6 +19,9 @@ import ray import ray.ray_constants as ray_constants import psutil +from ray.autoscaler.cli_logger import cli_logger +import colorful as cf + resource = None if sys.platform != "win32": import resource @@ -579,6 +582,12 @@ def wait_for_redis_to_start(redis_ip_address, redis_port, password=None): else: break else: + cli_logger.error( + "Unable to connect to Redis at " + "`{c.underlined}{}:{}{c.no_underlined}` after {} retries.", + redis_ip_address, redis_port, num_retries) + cli_logger.abort("Check your firewall and network settings.") + raise RuntimeError("Unable to connect to Redis. If the Redis instance " "is on a different machine, check that your " "firewall is configured properly.") @@ -1191,9 +1200,13 @@ def start_dashboard(require_dashboard, dashboard_url = "{}:{}".format( host if host != "0.0.0.0" else get_node_ip_address(), port) - logger.info("View the Ray dashboard at {}{}{}{}{}".format( - colorama.Style.BRIGHT, colorama.Fore.GREEN, dashboard_url, - colorama.Fore.RESET, colorama.Style.NORMAL)) + + cli_logger.labeled_value("Dashboard URL", cf.underlined("http://{}"), + dashboard_url) + cli_logger.old_info(logger, "View the Ray dashboard at {}{}{}{}{}", + colorama.Style.BRIGHT, colorama.Fore.GREEN, + dashboard_url, colorama.Fore.RESET, + colorama.Style.NORMAL) return dashboard_url, process_info else: