[cli] New logging for the rest of the ray commands (#9984)

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Maksim Smolin
2020-08-11 09:58:23 -07:00
committed by GitHub
parent 4f8fef134e
commit 40b8e35d61
8 changed files with 508 additions and 116 deletions
+342 -69
View File
@@ -21,6 +21,10 @@ import ray.ray_constants as ray_constants
import ray.utils
from ray.projects.scripts import project_cli, session_cli
from ray.autoscaler.subprocess_output_util import set_output_redirected
from ray.autoscaler.cli_logger import cli_logger
import colorful as cf
logger = logging.getLogger(__name__)
@@ -48,6 +52,38 @@ def check_no_existing_redis_clients(node_ip_address, redis_client):
"clients with this IP address.")
logging_options = [
click.option(
"--log-new-style/--log-old-style",
is_flag=True,
default=False,
envvar="RAY_LOG_NEWSTYLE",
help=("Whether to use the old or the new CLI UX. "
"The new UX supports colored, formatted output and was "
"designed to display only the most important information for "
"human users. The old UX uses the standard `logging` module. "
"It is most suitable for writing to a file and will include "
"timestamps and message level (ERROR/WARNING/INFO).")),
click.option(
"--log-color",
required=False,
type=click.Choice(["auto", "false", "true"], case_sensitive=False),
default="auto",
help=("Use color logging. "
"Valid values are: auto (if stdout is a tty), true, false.")),
click.option("-v", "--verbose", count=True)
]
def add_click_options(options):
def wrapper(f):
for option in reversed(logging_options):
f = option(f)
return f
return wrapper
@click.group()
@click.option(
"--logging-level",
@@ -353,6 +389,7 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
default=None,
help="the port to use to expose Ray metrics through a "
"Prometheus endpoint.")
@add_click_options(logging_options)
def start(node_ip_address, redis_address, address, redis_port, port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port,
@@ -363,32 +400,60 @@ def start(node_ip_address, redis_address, address, redis_port, port,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, internal_config,
lru_evict, enable_object_reconstruction, metrics_export_port):
lru_evict, enable_object_reconstruction, metrics_export_port,
log_new_style, log_color, verbose):
"""Start Ray processes manually on the local machine."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
if gcs_server_port and not head:
raise ValueError(
"gcs_server_port can be only assigned when you specify --head.")
if redis_address is not None:
cli_logger.abort("{} is deprecated. Use {} instead.",
cf.bold("--redis-address"), cf.bold("--address"))
raise DeprecationWarning("The --redis-address argument is "
"deprecated. Please use --address instead.")
if redis_port is not None:
logger.warn("The --redis-port argument will be deprecated soon. "
"Please use --port instead.")
cli_logger.warning("{} is being deprecated. Use {} instead.",
cf.bold("--redis-port"), cf.bold("--port"))
cli_logger.old_warning(
logger, "The --redis-port argument will be deprecated soon. "
"Please use --port instead.")
if port is not None and port != redis_port:
cli_logger.abort(
"Incompatible values for {} and {}. Use only {} instead.",
cf.bold("--port"), cf.bold("--redis-port"), cf.bold("--port"))
raise ValueError("Cannot specify both --port and --redis-port "
"as port is a rename of deprecated redis-port")
if include_webui is not None:
logger.warn("The --include-webui argument will be deprecated soon"
"Please use --include-dashboard instead.")
cli_logger.warning("{} is being deprecated. Use {} instead.",
cf.bold("--include-webui"),
cf.bold("--include-dashboard"))
cli_logger.old_warning(
logger, "The --include-webui argument will be deprecated soon"
"Please use --include-dashboard instead.")
if include_dashboard is not None:
include_dashboard = include_webui
dashboard_host_default = "localhost"
if webui_host != dashboard_host_default:
logger.warn("The --webui-host argument will be deprecated"
" soon. Please use --dashboard-host instead.")
cli_logger.warning("{} is being deprecated. Use {} instead.",
cf.bold("--webui-host"),
cf.bold("--dashboard-host"))
cli_logger.old_warning(
logger, "The --webui-host argument will be deprecated"
" soon. Please use --dashboard-host instead.")
if webui_host != dashboard_host and dashboard_host != "localhost":
cli_logger.abort(
"Incompatible values for {} and {}. Use only {} instead.",
cf.bold("--dashboard-host"), cf.bold("--webui-host"),
cf.bold("--dashboard-host"))
raise ValueError(
"Cannot specify both --webui-host and --dashboard-host,"
" please specify only the latter")
@@ -407,6 +472,13 @@ def start(node_ip_address, redis_address, address, redis_port, port,
try:
resources = json.loads(resources)
except Exception:
cli_logger.error("`{}` is not a valid JSON string.",
cf.bold("--resources"))
cli_logger.abort(
"Valid values look like this: `{}`",
cf.bold("--resources='\"CustomResource3\": 1, "
"\"CustomResource2\": 2}'"))
raise Exception("Unable to parse the --resources argument using "
"json.loads. Try using a format like\n\n"
" --resources='{\"CustomResource1\": 3, "
@@ -454,6 +526,16 @@ def start(node_ip_address, redis_address, address, redis_port, port,
num_redis_shards = len(redis_shard_ports)
# Check that the arguments match.
if len(redis_shard_ports) != num_redis_shards:
cli_logger.error(
"`{}` must be a comma-separated list of ports, "
"with length equal to `{}` (which defaults to {})",
cf.bold("--redis-shard-ports"),
cf.bold("--num-redis-shards"), cf.bold("1"))
cli_logger.abort(
"Example: `{}`",
cf.bold("--num-redis-shards 3 "
"--redis_shard_ports 6380,6381,6382"))
raise Exception("If --redis-shard-ports is provided, it must "
"have the form '6380,6381,6382', and the "
"number of ports provided must equal "
@@ -461,6 +543,10 @@ def start(node_ip_address, redis_address, address, redis_port, port,
"provided)")
if redis_address is not None:
cli_logger.abort(
"`{}` starts a new Redis server, `{}` should not be set.",
cf.bold("--head"), cf.bold("--address"))
raise Exception("If --head is passed in, a Redis server will be "
"started, so a Redis address should not be "
"provided.")
@@ -468,8 +554,9 @@ def start(node_ip_address, redis_address, address, redis_port, port,
# Get the node IP address if one is not provided.
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address())
logger.info("Using IP address {} for this node.".format(
ray_params.node_ip_address))
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
cli_logger.old_info(logger, "Using IP address {} for this node.",
ray_params.node_ip_address)
ray_params.update_if_absent(
redis_port=port or redis_port,
redis_shard_ports=redis_shard_ports,
@@ -484,7 +571,45 @@ def start(node_ip_address, redis_address, address, redis_port, port,
ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block)
redis_address = node.redis_address
logger.info(
# new-style CLI UX (--log-new-style)
# this is a noop if that flag is not set, so the old logger calls
# are still in place
cli_logger.newline()
startup_msg = "Ray runtime started."
cli_logger.success("-" * len(startup_msg))
cli_logger.success(startup_msg)
cli_logger.success("-" * len(startup_msg))
cli_logger.newline()
with cli_logger.group("Next steps"):
cli_logger.print(
"To connect to this Ray runtime from another node, run")
cli_logger.print(
cf.bold(" ray start --address='{}'{}"), redis_address,
" --redis-password='{}'".format(redis_password)
if redis_password else "")
cli_logger.newline()
cli_logger.print("Alternatively, use the following Python code:")
with cli_logger.indented():
with cf.with_style("monokai") as c:
cli_logger.print("{} ray", c.magenta("import"))
cli_logger.print(
"ray{}init(address{}{}{})", c.magenta("."),
c.magenta("="), c.yellow("'auto'"),
", redis_password{}{}".format(
c.magenta("="),
c.yellow("'" + redis_password + "'"))
if redis_password else "")
cli_logger.newline()
cli_logger.print(
cf.underlined("If connection fails, check your "
"firewall settings other "
"network configuration."))
cli_logger.newline()
cli_logger.print("To terminate the Ray runtime, run")
cli_logger.print(cf.bold(" ray stop"))
cli_logger.old_info(
logger,
"\nStarted Ray on this node. You can add additional nodes to "
"the cluster by calling\n\n"
" ray start --address='{}'{}\n\n"
@@ -503,29 +628,54 @@ def start(node_ip_address, redis_address, address, redis_port, port,
else:
# Start Ray on a non-head node.
if not (redis_port is None and port is None):
cli_logger.abort("`{}/{}` should not be specified without `{}`.",
cf.bold("--port"), cf.bold("--redis-port"),
cf.bold("--head"))
raise Exception(
"If --head is not passed in, --port and --redis-port are not "
"allowed.")
if redis_shard_ports is not None:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--redis-shard-ports"), cf.bold("--head"))
raise Exception("If --head is not passed in, --redis-shard-ports "
"is not allowed.")
if redis_address is None:
cli_logger.abort("`{}` is required unless starting with `{}`.",
cf.bold("--address"), cf.bold("--head"))
raise Exception("If --head is not passed in, --address must "
"be provided.")
if num_redis_shards is not None:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--num-redis-shards"), cf.bold("--head"))
raise Exception("If --head is not passed in, --num-redis-shards "
"must not be provided.")
if redis_max_clients is not None:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--redis-max-clients"), cf.bold("--head"))
raise Exception("If --head is not passed in, --redis-max-clients "
"must not be provided.")
if include_webui:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--include-web-ui"), cf.bold("--head"))
raise Exception("If --head is not passed in, the --include-webui"
"flag is not relevant.")
if include_dashboard:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--include-dashboard"), cf.bold("--head"))
raise ValueError(
"If --head is not passed in, the --include-dashboard"
"flag is not relevant.")
if include_java is not None:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--include-java"), cf.bold("--head"))
raise ValueError("--include-java should only be set for the head "
"node.")
@@ -545,8 +695,11 @@ def start(node_ip_address, redis_address, address, redis_port, port,
# Get the node IP address if one is not provided.
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address(redis_address))
logger.info("Using IP address {} for this node.".format(
ray_params.node_ip_address))
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
cli_logger.old_info(logger, "Using IP address {} for this node.",
ray_params.node_ip_address)
# Check that there aren't already Redis clients with the same IP
# address connected with this Redis instance. This raises an exception
# if the Redis server already has clients on this node.
@@ -555,21 +708,54 @@ def start(node_ip_address, redis_address, address, redis_port, port,
ray_params.update(redis_address=redis_address)
node = ray.node.Node(
ray_params, head=False, shutdown_at_exit=block, spawn_reaper=block)
logger.info("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
" ray stop")
cli_logger.newline()
startup_msg = "Ray runtime started."
cli_logger.success("-" * len(startup_msg))
cli_logger.success(startup_msg)
cli_logger.success("-" * len(startup_msg))
cli_logger.newline()
cli_logger.print("To terminate the Ray runtime, run")
cli_logger.print(cf.bold(" ray stop"))
cli_logger.old_info(
logger, "\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"
" ray stop")
if block:
cli_logger.newline()
with cli_logger.group(cf.bold("--block")):
cli_logger.print(
"This command will now block until terminated by a signal.")
cli_logger.print(
"Runing subprocesses are monitored and a message will be "
"printed if any of them terminate unexpectedly.")
while True:
time.sleep(1)
deceased = node.dead_processes()
if len(deceased) > 0:
logger.error("Ray processes died unexpectedly:")
for process_type, process in deceased:
logger.error("\t{} died with exit code {}".format(
process_type, process.returncode))
cli_logger.newline()
cli_logger.error("Some Ray subprcesses exited unexpectedly:")
cli_logger.old_error(logger,
"Ray processes died unexpectedly:")
with cli_logger.indented():
for process_type, process in deceased:
cli_logger.error(
"{}",
cf.bold(str(process_type)),
_tags={"exit code": str(process.returncode)})
cli_logger.old_error(
logger, "\t{} died with exit code {}".format(
process_type, process.returncode))
# shutdown_at_exit will handle cleanup.
logger.error("Killing remaining processes and exiting...")
cli_logger.newline()
cli_logger.error("Remaining processes will be killed.")
cli_logger.old_error(
logger, "Killing remaining processes and exiting...")
sys.exit(1)
@@ -579,13 +765,14 @@ def start(node_ip_address, redis_address, address, redis_port, port,
"--force",
is_flag=True,
help="If set, ray will send SIGKILL instead of SIGTERM.")
@click.option(
"-v",
"--verbose",
is_flag=True,
help="If set, ray prints out more information about processes to kill.")
def stop(force, verbose):
@add_click_options(logging_options)
def stop(force, verbose, log_new_style, log_color):
"""Stop Ray processes manually on the local machine."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
# Note that raylet needs to exit before object store, otherwise
# it cannot exit gracefully.
is_linux = sys.platform.startswith("linux")
@@ -619,12 +806,17 @@ def stop(force, verbose):
process_infos.append((proc, proc.name(), proc.cmdline()))
except psutil.Error:
pass
total_found = 0
total_stopped = 0
for keyword, filter_by_cmd in processes_to_kill:
if filter_by_cmd and is_linux and len(keyword) > 15:
# getting here is an internal bug, so we do not use cli_logger
msg = ("The filter string should not be more than {} "
"characters. Actual length: {}. Filter: {}").format(
15, len(keyword), keyword)
raise ValueError(msg)
found = []
for candidate in process_infos:
proc, proc_cmd, proc_args = candidate
@@ -632,11 +824,15 @@ def stop(force, verbose):
if filter_by_cmd else subprocess.list2cmdline(proc_args))
if keyword in corpus:
found.append(candidate)
for proc, proc_cmd, proc_args in found:
total_found += 1
proc_string = str(subprocess.list2cmdline(proc_args))
if verbose:
operation = "Terminating" if force else "Killing"
logger.info("%s process %s: %s", operation, proc.pid,
subprocess.list2cmdline(proc_args))
cli_logger.old_info(logger, "%s process %s: %s", operation,
proc.pid, proc_string)
try:
if force:
proc.kill()
@@ -645,10 +841,41 @@ def stop(force, verbose):
# We don't want CTRL_BREAK_EVENT, because that would
# terminate the entire process group. What to do?
proc.terminate()
if force:
cli_logger.verbose("Killed `{}` {} ", cf.bold(proc_string),
cf.gray("(via SIGKILL)"))
else:
cli_logger.verbose("Send termination request to `{}` {}",
cf.bold(proc_string),
cf.gray("(via SIGTERM)"))
total_stopped += 1
except psutil.NoSuchProcess:
cli_logger.verbose(
"Attempted to stop `{}`, but process was already dead.",
cf.bold(proc_string))
pass
except (psutil.Error, OSError) as ex:
logger.error("Error: %s", ex)
cli_logger.error("Could not terminate `{}` due to {}",
cf.bold(proc_string), str(ex))
cli_logger.old_error(logger, "Error: %s", ex)
if total_found == 0:
cli_logger.print("Did not find any active Ray processes.")
else:
if total_stopped == total_found:
cli_logger.success("Stopped all {} Ray processes.", total_stopped)
else:
cli_logger.warning(
"Stopped only {} out of {} Ray processes. "
"Set `{}` to see more details.", total_stopped, total_found,
cf.bold("-v"))
cli_logger.warning("Try running the command again, or use `{}`.",
cf.bold("--force"))
# TODO(maximsmol): we should probably block until the processes actually
# all died somehow
@cli.command()
@@ -692,18 +919,6 @@ def stop(force, verbose):
is_flag=True,
default=False,
help="Disable the local cluster config cache.")
@click.option(
"--log-old-style/--log-new-style",
is_flag=True,
default=True,
help=("Use old logging."))
@click.option(
"--log-color",
required=False,
type=str,
default="auto",
help=("Use color logging. "
"Valid values are: auto (if stdout is a tty), true, false."))
@click.option(
"--dump-command-output",
is_flag=True,
@@ -717,14 +932,22 @@ def stop(force, verbose):
help=("Ray uses login shells (bash --login -i) to run cluster commands "
"by default. If your workflow is compatible with normal shells, "
"this can be disabled for a better user experience."))
@click.option("-v", "--verbose", count=True)
@add_click_options(logging_options)
def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
yes, cluster_name, no_config_cache, log_old_style, log_color,
dump_command_output, use_login_shells, verbose):
yes, cluster_name, no_config_cache, dump_command_output,
use_login_shells, log_new_style, log_color, verbose):
"""Create or update a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
if restart_only or no_restart:
cli_logger.doassert(restart_only != no_restart,
"`{}` is incompatible with `{}`.",
cf.bold("--restart-only"), cf.bold("--no-restart"))
assert restart_only != no_restart, "Cannot set both 'restart_only' " \
"and 'no_restart' at the same time!"
if urllib.parse.urlparse(cluster_config_file).scheme in ("http", "https"):
try:
response = urllib.request.urlopen(cluster_config_file, timeout=5)
@@ -734,11 +957,14 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
f.write(content)
cluster_config_file = file_name
except urllib.error.HTTPError as e:
logger.info("Error downloading file: ", e)
cli_logger.warning("{}", str(e))
cli_logger.warning(
"Could not download remote cluster configuration file.")
cli_logger.old_info(logger, "Error downloading file: ", e)
create_or_update_cluster(cluster_config_file, min_workers, max_workers,
no_restart, restart_only, yes, cluster_name,
no_config_cache, log_old_style, log_color,
dump_command_output, use_login_shells, verbose)
no_config_cache, dump_command_output,
use_login_shells)
@cli.command()
@@ -765,24 +991,16 @@ def up(cluster_config_file, min_workers, max_workers, no_restart, restart_only,
is_flag=True,
default=False,
help="Retain the minimal amount of workers specified in the config.")
@click.option(
"--log-old-style/--log-new-style",
is_flag=True,
default=True,
help=("Use old logging."))
@click.option(
"--log-color",
required=False,
type=str,
default="auto",
help=("Use color logging. "
"Valid values are: auto (if stdout is a tty), true, false."))
@click.option("-v", "--verbose", count=True)
@add_click_options(logging_options)
def down(cluster_config_file, yes, workers_only, cluster_name,
keep_min_workers, log_old_style, log_color, verbose):
keep_min_workers, log_new_style, log_color, verbose):
"""Tear down a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
teardown_cluster(cluster_config_file, yes, workers_only, cluster_name,
keep_min_workers, log_old_style, log_color, verbose)
keep_min_workers)
@cli.command()
@@ -825,8 +1043,14 @@ def kill_random_node(cluster_config_file, yes, hard, cluster_name):
required=False,
type=str,
help="Override the configured cluster name.")
def monitor(cluster_config_file, lines, cluster_name):
@add_click_options(logging_options)
def monitor(cluster_config_file, lines, cluster_name, log_new_style, log_color,
verbose):
"""Tails the autoscaler logs of a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
monitor_cluster(cluster_config_file, lines, cluster_name)
@@ -856,9 +1080,14 @@ def monitor(cluster_config_file, lines, cluster_name):
multiple=True,
type=int,
help="Port to forward. Use this multiple times to forward multiple ports.")
@add_click_options(logging_options)
def attach(cluster_config_file, start, screen, tmux, cluster_name, new,
port_forward):
port_forward, log_new_style, log_color, verbose):
"""Create or attach to a SSH session to a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
port_forward = [(port, port) for port in list(port_forward)]
attach_cluster(cluster_config_file, start, screen, tmux, cluster_name, new,
port_forward)
@@ -874,8 +1103,14 @@ def attach(cluster_config_file, start, screen, tmux, cluster_name, new,
required=False,
type=str,
help="Override the configured cluster name.")
def rsync_down(cluster_config_file, source, target, cluster_name):
@add_click_options(logging_options)
def rsync_down(cluster_config_file, source, target, cluster_name,
log_new_style, log_color, verbose):
"""Download specific files from a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
rsync(cluster_config_file, source, target, cluster_name, down=True)
@@ -895,8 +1130,14 @@ def rsync_down(cluster_config_file, source, target, cluster_name):
is_flag=True,
required=False,
help="Upload to all nodes (workers and head).")
def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes):
@add_click_options(logging_options)
def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes,
log_new_style, log_color, verbose):
"""Upload specific files to a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
rsync(
cluster_config_file,
source,
@@ -945,8 +1186,10 @@ def rsync_up(cluster_config_file, source, target, cluster_name, all_nodes):
type=str,
help="(deprecated) Use '-- --arg1 --arg2' for script args.")
@click.argument("script_args", nargs=-1)
@add_click_options(logging_options)
def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
port_forward, script, args, script_args):
port_forward, script, args, script_args, log_new_style, log_color,
verbose):
"""Uploads and runs a script on the specified cluster.
The script is automatically synced to the following location:
@@ -956,11 +1199,34 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
Example:
>>> ray submit [CLUSTER.YAML] experiment.py -- --smoke-test
"""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
set_output_redirected(False)
cli_logger.doassert(not (screen and tmux),
"`{}` and `{}` are incompatible.", cf.bold("--screen"),
cf.bold("--tmux"))
cli_logger.doassert(
not (script_args and args),
"`{0}` and `{1}` are incompatible. Use only `{1}`.\n"
"Example: `{2}`", cf.bold("--args"), cf.bold("-- <args ...>"),
cf.bold("ray submit script.py -- --arg=123 --flag"))
assert not (screen and tmux), "Can specify only one of `screen` or `tmux`."
assert not (script_args and args), "Use -- --arg1 --arg2 for script args."
if args:
logger.warning(
cli_logger.warning(
"`{}` is deprecated and will be removed in the future.",
cf.bold("--args"))
cli_logger.warning("Use `{}` instead. Example: `{}`.",
cf.bold("-- <args ...>"),
cf.bold("ray submit script.py -- --arg=123 --flag"))
cli_logger.newline()
cli_logger.old_warning(
logger,
"ray submit [yaml] [script.py] --args=... is deprecated and "
"will be removed in a future version of Ray. Use "
"`ray submit [yaml] script.py -- --arg1 --arg2` instead.")
@@ -1032,9 +1298,16 @@ def submit(cluster_config_file, screen, tmux, stop, start, cluster_name,
multiple=True,
type=int,
help="Port to forward. Use this multiple times to forward multiple ports.")
@add_click_options(logging_options)
def exec(cluster_config_file, cmd, run_env, screen, tmux, stop, start,
cluster_name, port_forward):
cluster_name, port_forward, log_new_style, log_color, verbose):
"""Execute a command via SSH on a Ray cluster."""
cli_logger.old_style = not log_new_style
cli_logger.color_mode = log_color
cli_logger.verbosity = verbose
set_output_redirected(False)
port_forward = [(port, port) for port in list(port_forward)]
exec_cluster(