diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index cd6a8e82a..4f5dcd4a0 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -211,6 +211,8 @@ Port-forwarding applications If you want to run applications on the cluster that are accessible from a web browser (e.g., Jupyter notebook), you can use the ``--port-forward`` option for ``ray exec``. The local port opened is the same as the remote port. +Note: For Kubernetes clusters, the ``port-forward`` option cannot be used while executing a command. To port forward and run a command you need to call ``ray exec`` twice separately. + .. code-block:: bash $ ray exec cluster.yaml --port-forward=8899 'source ~/anaconda3/bin/activate tensorflow_p36 && jupyter notebook --port=8899' @@ -240,6 +242,8 @@ The default idle timeout is 5 minutes. This is to prevent excessive node churn w Monitoring cluster status ~~~~~~~~~~~~~~~~~~~~~~~~~ +The ray also comes with an online dashboard. The dashboard is accessible via HTTP on the head node (by default it listens on ``localhost:8265``). To access it locally, you'll need to forward the port to your local machine. You can also use the built-in ``ray dashboard`` to do this automatically. + You can monitor cluster usage and auto-scaling status by tailing the autoscaling logs in ``/tmp/ray/session_*/logs/monitor*``. diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index e368c8b1e..807851b0f 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -342,8 +342,15 @@ def attach_cluster(config_file, start, use_screen, use_tmux, override_cluster_name, None) -def exec_cluster(config_file, cmd, docker, screen, tmux, stop, start, - override_cluster_name, port_forward): +def exec_cluster(config_file, + cmd=None, + docker=False, + screen=False, + tmux=False, + stop=False, + start=False, + override_cluster_name=None, + port_forward=None): """Runs a command on the specified cluster. Arguments: @@ -389,15 +396,16 @@ def exec_cluster(config_file, cmd, docker, screen, tmux, stop, start, return with_docker_exec( [command], container_name=container_name)[0] - cmd = wrap_docker(cmd) if docker else cmd + if cmd: + cmd = wrap_docker(cmd) if docker else cmd - if stop: - shutdown_cmd = ( - "ray stop; ray teardown ~/ray_bootstrap_config.yaml " - "--yes --workers-only") - if docker: - shutdown_cmd = wrap_docker(shutdown_cmd) - cmd += ("; {}; sudo shutdown -h now".format(shutdown_cmd)) + if stop: + shutdown_cmd = ( + "ray stop; ray teardown ~/ray_bootstrap_config.yaml " + "--yes --workers-only") + if docker: + shutdown_cmd = wrap_docker(shutdown_cmd) + cmd += ("; {}; sudo shutdown -h now".format(shutdown_cmd)) _exec(updater, cmd, screen, tmux, port_forward=port_forward) @@ -434,11 +442,8 @@ def _exec(updater, cmd, screen, tmux, port_forward=None): quote(cmd + "; exec bash") ] cmd = " ".join(cmd) - updater.cmd_runner.run( - cmd, - allocate_tty=True, - exit_on_fail=True, - port_forward=port_forward) + updater.cmd_runner.run( + cmd, allocate_tty=True, exit_on_fail=True, port_forward=port_forward) def rsync(config_file, source, target, override_cluster_name, down): diff --git a/python/ray/autoscaler/kubernetes/example-full.yaml b/python/ray/autoscaler/kubernetes/example-full.yaml index aabc21c98..7742fddab 100644 --- a/python/ray/autoscaler/kubernetes/example-full.yaml +++ b/python/ray/autoscaler/kubernetes/example-full.yaml @@ -235,9 +235,10 @@ head_setup_commands: [] worker_setup_commands: [] # Command to start ray on the head node. You don't need to change this. +# Note webui-host is set to 0.0.0.0 so that kubernetes can port forward. head_start_ray_commands: - ray stop - - ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + - ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --redis-port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --webui-host 0.0.0.0 # Command to start ray on worker nodes. You don't need to change this. worker_start_ray_commands: diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index b5bae21ae..043adab52 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -44,13 +44,15 @@ class KubernetesCommandRunner: self.kubectl = ["kubectl", "-n", self.namespace] def run(self, - cmd, + cmd=None, timeout=120, allocate_tty=False, exit_on_fail=False, port_forward=None): - - logger.info(self.log_prefix + "Running {}...".format(cmd)) + if cmd and port_forward: + raise Exception( + "exec with Kubernetes can't forward ports and execute" + "commands together.") if port_forward: if not isinstance(port_forward, list): @@ -58,47 +60,39 @@ class KubernetesCommandRunner: port_forward_cmd = self.kubectl + [ "port-forward", self.node_id, - ] + [str(fwd) for fwd in port_forward] + ] + [ + "{}:{}".format(local, remote) for local, remote in port_forward + ] + logger.info("Port forwarding with: {}".format( + " ".join(port_forward_cmd))) port_forward_process = subprocess.Popen(port_forward_cmd) - # Give port-forward a grace period to run and print output before - # running the actual command. This is a little ugly, but it should - # work in most scenarios and nothing should go very wrong if the - # command starts running before the port forward starts. - time.sleep(1) - - final_cmd = self.kubectl + [ - "exec", - "-it" if allocate_tty else "-i", - self.node_id, - "--", - ] + with_interactive(cmd) - try: - self.process_runner.check_call(" ".join(final_cmd), shell=True) - except subprocess.CalledProcessError: - if exit_on_fail: - quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) - logger.error(self.log_prefix + - "Command failed: \n\n {}\n".format(quoted_cmd)) - sys.exit(1) - else: - raise - finally: - # Clean up the port forward process. First, try to let it exit - # gracefull with SIGTERM. If that doesn't work after 1s, send - # SIGKILL. - if port_forward: - port_forward_process.terminate() - for _ in range(10): - time.sleep(0.1) - port_forward_process.poll() - if port_forward_process.returncode: - break - logger.info(self.log_prefix + - "Waiting for port forward to die...") + port_forward_process.wait() + # We should never get here, this indicates that port forwarding + # failed, likely because we couldn't bind to a port. + pout, perr = port_forward_process.communicate() + exception_str = " ".join( + port_forward_cmd) + " failed with error: " + perr + raise Exception(exception_str) + else: + logger.info(self.log_prefix + "Running {}...".format(cmd)) + final_cmd = self.kubectl + [ + "exec", + "-it" if allocate_tty else "-i", + self.node_id, + "--", + ] + with_interactive(cmd) + try: + self.process_runner.check_call(" ".join(final_cmd), shell=True) + except subprocess.CalledProcessError: + if exit_on_fail: + quoted_cmd = " ".join(final_cmd[:-1] + + [quote(final_cmd[-1])]) + logger.error( + self.log_prefix + + "Command failed: \n\n {}\n".format(quoted_cmd)) + sys.exit(1) else: - logger.warning(self.log_prefix + - "Killing port forward with SIGKILL.") - port_forward_process.kill() + raise def run_rsync_up(self, source, target): if target.startswith("~"): @@ -172,7 +166,15 @@ class SSHCommandRunner: ("ControlMaster", "auto"), ("ControlPath", "{}/%C".format(self.ssh_control_path)), ("ControlPersist", "10s"), + # Try fewer extraneous key pairs. ("IdentitiesOnly", "yes"), + # Abort if port forwarding fails (instead of just printing to + # stderr). + ("ExitOnForwardFailure", "yes"), + # Quickly kill the connection if network connection breaks (as + # opposed to hanging/blocking). + ("ServerAliveInterval", 5), + ("ServerAliveCountMax", 3), ] return ["-i", self.ssh_private_key] + [ @@ -243,20 +245,25 @@ class SSHCommandRunner: if port_forward: if not isinstance(port_forward, list): port_forward = [port_forward] - for fwd in port_forward: - ssh += ["-L", "{}:localhost:{}".format(fwd, fwd)] + for local, remote in port_forward: + ssh += ["-L", "{}:localhost:{}".format(local, remote)] final_cmd = ssh + self.get_default_ssh_options(timeout) + [ "{}@{}".format(self.ssh_user, self.ssh_ip) - ] + with_interactive(cmd) + ] + if cmd: + final_cmd += with_interactive(cmd) + else: + # We do this because `-o ControlMaster` causes the `-N` flag to + # still create an interactive shell in some ssh versions. + final_cmd.append("while true; do sleep 86400; done") try: self.process_runner.check_call(final_cmd) except subprocess.CalledProcessError: if exit_on_fail: quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) - logger.error(self.log_prefix + - "Command failed: \n\n {}\n".format(quoted_cmd)) - sys.exit(1) + raise Exception( + "Command failed: \n\n {}\n".format(quoted_cmd)) else: raise diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index c7af6d288..1d3626c41 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -60,6 +60,55 @@ def cli(logging_level, logging_format): ray.utils.setup_logger(level, logging_format) +@click.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.option( + "--cluster-name", + "-n", + required=False, + type=str, + help="Override the configured cluster name.") +@click.option( + "--port", + "-p", + required=False, + type=int, + default=8265, + help="The local port to forward to the dashboard") +def dashboard(cluster_config_file, cluster_name, port): + # Sleeping in a loop is preferable to `sleep infinity` because the latter + # only works on linux. + remote_port = 8265 + if port: + dashboard_port = port + else: + dashboard_port = remote_port + + port_taken = True + + # Find the first open port sequentially from `remote_port`. + while port_taken: + try: + port_forward = [ + (dashboard_port, remote_port), + ] + click.echo( + "Attempting to establish dashboard at localhost:{}".format( + port_forward[0][0])) + # We want to probe with a no-op that returns quickly to avoid + # exceptions caused by network errors. + exec_cluster( + cluster_config_file, + override_cluster_name=cluster_name, + port_forward=port_forward) + port_taken = False + except Exception: + click.echo("Failed to forward dashboard, trying a new port...") + port_taken = True + dashboard_port += 1 + pass + + @cli.command() @click.option( "--node-ip-address", @@ -287,7 +336,6 @@ def start(node_ip_address, redis_address, address, redis_port, load_code_from_local=load_code_from_local, use_pickle=use_pickle, _internal_config=internal_config) - if head: # Start Ray on the head node. if redis_shard_ports is not None: @@ -713,9 +761,19 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start, command_parts = ["python", target] if args is not None: command_parts += [args] + + port_forward = [(port, port) for port in list(port_forward)] cmd = " ".join(command_parts) - exec_cluster(cluster_config_file, cmd, docker, screen, tmux, stop, False, - cluster_name, list(port_forward)) + exec_cluster( + cluster_config_file, + cmd, + docker, + screen, + tmux, + stop, + start=False, + override_cluster_name=cluster_name, + port_forward=port_forward) @cli.command() @@ -757,8 +815,9 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start, help="Port to forward. Use this multiple times to forward multiple ports.") def exec_cmd(cluster_config_file, cmd, docker, screen, tmux, stop, start, cluster_name, port_forward): + port_forward = [(port, port) for port in list(port_forward)] exec_cluster(cluster_config_file, cmd, docker, screen, tmux, stop, start, - cluster_name, list(port_forward)) + cluster_name, port_forward) @cli.command() @@ -872,6 +931,7 @@ def stat(address): print(reply) +cli.add_command(dashboard) cli.add_command(start) cli.add_command(stop) cli.add_command(create_or_update, name="up")