Add 'ray dashboard' command (#6959)

This commit is contained in:
Alex Wu
2020-02-10 12:55:21 -08:00
committed by GitHub
parent 72c31e3e19
commit 3f99be8dad
5 changed files with 145 additions and 68 deletions
+20 -15
View File
@@ -342,8 +342,15 @@ def attach_cluster(config_file, start, use_screen, use_tmux,
override_cluster_name, None)
def exec_cluster(config_file, cmd, docker, screen, tmux, stop, start,
override_cluster_name, port_forward):
def exec_cluster(config_file,
cmd=None,
docker=False,
screen=False,
tmux=False,
stop=False,
start=False,
override_cluster_name=None,
port_forward=None):
"""Runs a command on the specified cluster.
Arguments:
@@ -389,15 +396,16 @@ def exec_cluster(config_file, cmd, docker, screen, tmux, stop, start,
return with_docker_exec(
[command], container_name=container_name)[0]
cmd = wrap_docker(cmd) if docker else cmd
if cmd:
cmd = wrap_docker(cmd) if docker else cmd
if stop:
shutdown_cmd = (
"ray stop; ray teardown ~/ray_bootstrap_config.yaml "
"--yes --workers-only")
if docker:
shutdown_cmd = wrap_docker(shutdown_cmd)
cmd += ("; {}; sudo shutdown -h now".format(shutdown_cmd))
if stop:
shutdown_cmd = (
"ray stop; ray teardown ~/ray_bootstrap_config.yaml "
"--yes --workers-only")
if docker:
shutdown_cmd = wrap_docker(shutdown_cmd)
cmd += ("; {}; sudo shutdown -h now".format(shutdown_cmd))
_exec(updater, cmd, screen, tmux, port_forward=port_forward)
@@ -434,11 +442,8 @@ def _exec(updater, cmd, screen, tmux, port_forward=None):
quote(cmd + "; exec bash")
]
cmd = " ".join(cmd)
updater.cmd_runner.run(
cmd,
allocate_tty=True,
exit_on_fail=True,
port_forward=port_forward)
updater.cmd_runner.run(
cmd, allocate_tty=True, exit_on_fail=True, port_forward=port_forward)
def rsync(config_file, source, target, override_cluster_name, down):
@@ -235,9 +235,10 @@ head_setup_commands: []
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
# Note webui-host is set to 0.0.0.0 so that kubernetes can port forward.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml
- ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --webui-host 0.0.0.0
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
+55 -48
View File
@@ -44,13 +44,15 @@ class KubernetesCommandRunner:
self.kubectl = ["kubectl", "-n", self.namespace]
def run(self,
cmd,
cmd=None,
timeout=120,
allocate_tty=False,
exit_on_fail=False,
port_forward=None):
logger.info(self.log_prefix + "Running {}...".format(cmd))
if cmd and port_forward:
raise Exception(
"exec with Kubernetes can't forward ports and execute"
"commands together.")
if port_forward:
if not isinstance(port_forward, list):
@@ -58,47 +60,39 @@ class KubernetesCommandRunner:
port_forward_cmd = self.kubectl + [
"port-forward",
self.node_id,
] + [str(fwd) for fwd in port_forward]
] + [
"{}:{}".format(local, remote) for local, remote in port_forward
]
logger.info("Port forwarding with: {}".format(
" ".join(port_forward_cmd)))
port_forward_process = subprocess.Popen(port_forward_cmd)
# Give port-forward a grace period to run and print output before
# running the actual command. This is a little ugly, but it should
# work in most scenarios and nothing should go very wrong if the
# command starts running before the port forward starts.
time.sleep(1)
final_cmd = self.kubectl + [
"exec",
"-it" if allocate_tty else "-i",
self.node_id,
"--",
] + with_interactive(cmd)
try:
self.process_runner.check_call(" ".join(final_cmd), shell=True)
except subprocess.CalledProcessError:
if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
logger.error(self.log_prefix +
"Command failed: \n\n {}\n".format(quoted_cmd))
sys.exit(1)
else:
raise
finally:
# Clean up the port forward process. First, try to let it exit
# gracefull with SIGTERM. If that doesn't work after 1s, send
# SIGKILL.
if port_forward:
port_forward_process.terminate()
for _ in range(10):
time.sleep(0.1)
port_forward_process.poll()
if port_forward_process.returncode:
break
logger.info(self.log_prefix +
"Waiting for port forward to die...")
port_forward_process.wait()
# We should never get here, this indicates that port forwarding
# failed, likely because we couldn't bind to a port.
pout, perr = port_forward_process.communicate()
exception_str = " ".join(
port_forward_cmd) + " failed with error: " + perr
raise Exception(exception_str)
else:
logger.info(self.log_prefix + "Running {}...".format(cmd))
final_cmd = self.kubectl + [
"exec",
"-it" if allocate_tty else "-i",
self.node_id,
"--",
] + with_interactive(cmd)
try:
self.process_runner.check_call(" ".join(final_cmd), shell=True)
except subprocess.CalledProcessError:
if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] +
[quote(final_cmd[-1])])
logger.error(
self.log_prefix +
"Command failed: \n\n {}\n".format(quoted_cmd))
sys.exit(1)
else:
logger.warning(self.log_prefix +
"Killing port forward with SIGKILL.")
port_forward_process.kill()
raise
def run_rsync_up(self, source, target):
if target.startswith("~"):
@@ -172,7 +166,15 @@ class SSHCommandRunner:
("ControlMaster", "auto"),
("ControlPath", "{}/%C".format(self.ssh_control_path)),
("ControlPersist", "10s"),
# Try fewer extraneous key pairs.
("IdentitiesOnly", "yes"),
# Abort if port forwarding fails (instead of just printing to
# stderr).
("ExitOnForwardFailure", "yes"),
# Quickly kill the connection if network connection breaks (as
# opposed to hanging/blocking).
("ServerAliveInterval", 5),
("ServerAliveCountMax", 3),
]
return ["-i", self.ssh_private_key] + [
@@ -243,20 +245,25 @@ class SSHCommandRunner:
if port_forward:
if not isinstance(port_forward, list):
port_forward = [port_forward]
for fwd in port_forward:
ssh += ["-L", "{}:localhost:{}".format(fwd, fwd)]
for local, remote in port_forward:
ssh += ["-L", "{}:localhost:{}".format(local, remote)]
final_cmd = ssh + self.get_default_ssh_options(timeout) + [
"{}@{}".format(self.ssh_user, self.ssh_ip)
] + with_interactive(cmd)
]
if cmd:
final_cmd += with_interactive(cmd)
else:
# We do this because `-o ControlMaster` causes the `-N` flag to
# still create an interactive shell in some ssh versions.
final_cmd.append("while true; do sleep 86400; done")
try:
self.process_runner.check_call(final_cmd)
except subprocess.CalledProcessError:
if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
logger.error(self.log_prefix +
"Command failed: \n\n {}\n".format(quoted_cmd))
sys.exit(1)
raise Exception(
"Command failed: \n\n {}\n".format(quoted_cmd))
else:
raise
+64 -4
View File
@@ -60,6 +60,55 @@ def cli(logging_level, logging_format):
ray.utils.setup_logger(level, logging_format)
@click.command()
@click.argument("cluster_config_file", required=True, type=str)
@click.option(
"--cluster-name",
"-n",
required=False,
type=str,
help="Override the configured cluster name.")
@click.option(
"--port",
"-p",
required=False,
type=int,
default=8265,
help="The local port to forward to the dashboard")
def dashboard(cluster_config_file, cluster_name, port):
# Sleeping in a loop is preferable to `sleep infinity` because the latter
# only works on linux.
remote_port = 8265
if port:
dashboard_port = port
else:
dashboard_port = remote_port
port_taken = True
# Find the first open port sequentially from `remote_port`.
while port_taken:
try:
port_forward = [
(dashboard_port, remote_port),
]
click.echo(
"Attempting to establish dashboard at localhost:{}".format(
port_forward[0][0]))
# We want to probe with a no-op that returns quickly to avoid
# exceptions caused by network errors.
exec_cluster(
cluster_config_file,
override_cluster_name=cluster_name,
port_forward=port_forward)
port_taken = False
except Exception:
click.echo("Failed to forward dashboard, trying a new port...")
port_taken = True
dashboard_port += 1
pass
@cli.command()
@click.option(
"--node-ip-address",
@@ -287,7 +336,6 @@ def start(node_ip_address, redis_address, address, redis_port,
load_code_from_local=load_code_from_local,
use_pickle=use_pickle,
_internal_config=internal_config)
if head:
# Start Ray on the head node.
if redis_shard_ports is not None:
@@ -713,9 +761,19 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start,
command_parts = ["python", target]
if args is not None:
command_parts += [args]
port_forward = [(port, port) for port in list(port_forward)]
cmd = " ".join(command_parts)
exec_cluster(cluster_config_file, cmd, docker, screen, tmux, stop, False,
cluster_name, list(port_forward))
exec_cluster(
cluster_config_file,
cmd,
docker,
screen,
tmux,
stop,
start=False,
override_cluster_name=cluster_name,
port_forward=port_forward)
@cli.command()
@@ -757,8 +815,9 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start,
help="Port to forward. Use this multiple times to forward multiple ports.")
def exec_cmd(cluster_config_file, cmd, docker, screen, tmux, stop, start,
cluster_name, port_forward):
port_forward = [(port, port) for port in list(port_forward)]
exec_cluster(cluster_config_file, cmd, docker, screen, tmux, stop, start,
cluster_name, list(port_forward))
cluster_name, port_forward)
@cli.command()
@@ -872,6 +931,7 @@ def stat(address):
print(reply)
cli.add_command(dashboard)
cli.add_command(start)
cli.add_command(stop)
cli.add_command(create_or_update, name="up")