ray exec and ray attach commands (#2560)

ray exec CLUSTER CMD [--screen] [--start] [--stop]
ray attach CLUSTER [--start]

Example:
ray exec sgd.yaml 'source activate tensorflow_p27 && cd ~/ray/python/ray/rllib && ./train.py --run=PPO --env=CartPole-v0' --screen --start --stop

This will in one command create a cluster and run the command on it in a screen session. The screen can later be attached to via ray attach. After the command finishes, the cluster workers will be terminated and the head node stopped.
This commit is contained in:
Eric Liang
2018-08-15 14:31:50 -07:00
committed by GitHub
parent 53f9755594
commit 079c4e482a
10 changed files with 322 additions and 75 deletions
+12 -2
View File
@@ -188,6 +188,9 @@ class LoadMetrics(object):
max_frac = frac
nodes_used += max_frac
idle_times = [now - t for t in self.last_used_time_by_ip.values()]
heartbeat_times = [
now - t for t in self.last_heartbeat_time_by_ip.values()
]
return {
"ResourceUsage": ", ".join([
"{}/{} {}".format(
@@ -201,6 +204,10 @@ class LoadMetrics(object):
int(np.min(idle_times)) if idle_times else -1,
int(np.mean(idle_times)) if idle_times else -1,
int(np.max(idle_times)) if idle_times else -1),
"TimeSinceLastHeartbeat": "Min={} Mean={} Max={}".format(
int(np.min(heartbeat_times)) if heartbeat_times else -1,
int(np.mean(heartbeat_times)) if heartbeat_times else -1,
int(np.max(heartbeat_times)) if heartbeat_times else -1),
}
@@ -504,14 +511,17 @@ class StandardAutoscaler(object):
return
if self.files_up_to_date(node_id):
return
if self.config.get("no_restart", False) and \
self.num_successful_updates.get(node_id, 0) > 0:
successful_updated = self.num_successful_updates.get(node_id, 0) > 0
if successful_updated and self.config.get("restart_only", False):
init_commands = self.config["worker_start_ray_commands"]
elif successful_updated and self.config.get("no_restart", False):
init_commands = (self.config["setup_commands"] +
self.config["worker_setup_commands"])
else:
init_commands = (self.config["setup_commands"] +
self.config["worker_setup_commands"] +
self.config["worker_start_ray_commands"])
updater = self.node_updater_cls(
node_id,
self.config["provider"],
+116 -23
View File
@@ -3,7 +3,9 @@ from __future__ import division
from __future__ import print_function
import copy
import hashlib
import json
import os
import tempfile
import time
import sys
@@ -24,17 +26,31 @@ from ray.autoscaler.updater import NodeUpdaterProcess
def create_or_update_cluster(config_file, override_min_workers,
override_max_workers, no_restart, yes):
override_max_workers, no_restart, restart_only,
yes, override_cluster_name):
"""Create or updates an autoscaling Ray cluster from a config json."""
config = yaml.load(open(config_file).read())
validate_config(config)
config = fillout_defaults(config)
if override_min_workers is not None:
config["min_workers"] = override_min_workers
if override_max_workers is not None:
config["max_workers"] = override_max_workers
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
config = _bootstrap_config(config)
get_or_create_head_node(config, config_file, no_restart, restart_only, yes)
def _bootstrap_config(config):
hasher = hashlib.sha1()
hasher.update(json.dumps([config], sort_keys=True).encode("utf-8"))
cache_key = os.path.join(tempfile.gettempdir(),
"ray-config-{}".format(hasher.hexdigest()))
if os.path.exists(cache_key):
print("Cached settings:", cache_key)
return json.loads(open(cache_key).read())
validate_config(config)
config = fillout_defaults(config)
importer = NODE_PROVIDERS.get(config["provider"]["type"])
if not importer:
@@ -42,36 +58,41 @@ def create_or_update_cluster(config_file, override_min_workers,
config["provider"]))
bootstrap_config, _ = importer()
config = bootstrap_config(config)
get_or_create_head_node(config, no_restart, yes)
resolved_config = bootstrap_config(config)
with open(cache_key, "w") as f:
f.write(json.dumps(resolved_config))
return resolved_config
def teardown_cluster(config_file, yes):
def teardown_cluster(config_file, yes, workers_only, override_cluster_name):
"""Destroys all nodes of a Ray cluster described by a config json."""
config = yaml.load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
validate_config(config)
config = fillout_defaults(config)
confirm("This will destroy your cluster", yes)
provider = get_node_provider(config["provider"], config["cluster_name"])
head_node_tags = {
TAG_RAY_NODE_TYPE: "head",
}
for node in provider.nodes(head_node_tags):
print("Terminating head node {}".format(node))
provider.terminate_node(node)
nodes = provider.nodes({})
if not workers_only:
for node in provider.nodes({TAG_RAY_NODE_TYPE: "head"}):
print("Terminating head node {}".format(node))
provider.terminate_node(node)
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
while nodes:
for node in nodes:
print("Terminating worker {}".format(node))
provider.terminate_node(node)
time.sleep(5)
nodes = provider.nodes({})
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
def get_or_create_head_node(config, no_restart, yes):
def get_or_create_head_node(config, config_file, no_restart, restart_only,
yes):
"""Create the cluster head node, which in turn creates the workers."""
provider = get_node_provider(config["provider"], config["cluster_name"])
@@ -133,7 +154,9 @@ def get_or_create_head_node(config, no_restart, yes):
"~/ray_bootstrap_config.yaml": remote_config_file.name
})
if no_restart:
if restart_only:
init_commands = config["head_start_ray_commands"]
elif no_restart:
init_commands = (
config["setup_commands"] + config["head_setup_commands"])
else:
@@ -170,20 +193,82 @@ def get_or_create_head_node(config, no_restart, yes):
monitor_str = "docker exec {} /bin/sh -c {}".format(
config["docker"]["container_name"], quote(monitor_str))
print("To monitor auto-scaling activity, you can run:\n\n"
" ssh -i {} {}@{} {}\n".format(config["auth"]["ssh_private_key"],
config["auth"]["ssh_user"],
provider.external_ip(head_node),
quote(monitor_str)))
" ray exec {} {} --cluster-name={}\n".format(
config_file, quote(monitor_str), quote(config["cluster_name"])))
print("To login to the cluster, run:\n\n"
" ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
config["auth"]["ssh_user"],
provider.external_ip(head_node)))
def get_head_node_ip(config_file):
def attach_cluster(config_file, start, override_cluster_name):
"""Attaches to a screen for the specified cluster.
Arguments:
config_file: path to the cluster yaml
start: whether to start the cluster if it isn't up
override_cluster_name: set the name of the cluster
"""
exec_cluster(config_file, "screen -L -xRR", False, False, start,
override_cluster_name)
def exec_cluster(config_file, cmd, screen, stop, start, override_cluster_name):
"""Runs a command on the specified cluster.
Arguments:
config_file: path to the cluster yaml
cmd: command to run
screen: whether to run in a screen
stop: whether to stop the cluster after command run
start: whether to start the cluster if it isn't up
override_cluster_name: set the name of the cluster
"""
config = yaml.load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
config = _bootstrap_config(config)
head_node = _get_head_node(config, config_file, create_if_needed=start)
updater = NodeUpdaterProcess(
head_node,
config["provider"],
config["auth"],
config["cluster_name"],
config["file_mounts"], [],
"",
redirect_output=False)
if stop:
cmd += ("; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes "
"--workers-only; sudo shutdown -h now")
_exec(updater, cmd, screen, expect_error=stop)
def _exec(updater, cmd, screen, expect_error=False):
if cmd:
if screen:
cmd = [
"screen", "-L", "-dm", "bash", "-c",
quote(cmd + "; exec bash")
]
cmd = " ".join(cmd)
updater.ssh_cmd(
cmd, verbose=True, allocate_tty=True, expect_error=expect_error)
def get_head_node_ip(config_file, override_cluster_name):
"""Returns head node IP for given configuration file if exists."""
config = yaml.load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
provider = get_node_provider(config["provider"], config["cluster_name"])
head_node = _get_head_node(config, config_file)
return provider.external_ip(head_node)
def _get_head_node(config, config_file, create_if_needed=False):
provider = get_node_provider(config["provider"], config["cluster_name"])
head_node_tags = {
TAG_RAY_NODE_TYPE: "head",
@@ -191,7 +276,15 @@ def get_head_node_ip(config_file):
nodes = provider.nodes(head_node_tags)
if len(nodes) > 0:
head_node = nodes[0]
return provider.external_ip(head_node)
return head_node
elif create_if_needed:
get_or_create_head_node(
config,
config_file,
restart_only=False,
no_restart=False,
yes=True)
return _get_head_node(config, config_file, create_if_needed=False)
else:
print("Head node of cluster ({}) not found!".format(
config["cluster_name"]))
+23 -8
View File
@@ -181,19 +181,34 @@ class NodeUpdater(object):
for cmd in self.setup_cmds:
self.ssh_cmd(cmd, verbose=True)
def ssh_cmd(self, cmd, connect_timeout=120, redirect=None, verbose=False):
def ssh_cmd(self,
cmd,
connect_timeout=120,
redirect=None,
verbose=False,
allocate_tty=False,
emulate_interactive=True,
expect_error=False):
if verbose:
print(
"NodeUpdater: running {} on {}...".format(
pretty_cmd(cmd), self.ssh_ip),
file=self.stdout)
force_interactive = "set -i || true && source ~/.bashrc && "
self.process_runner.check_call(
[
"ssh", "-o", "ConnectTimeout={}s".format(connect_timeout),
"-o", "StrictHostKeyChecking=no", "-i", self.ssh_private_key,
"{}@{}".format(self.ssh_user, self.ssh_ip),
"bash --login -c {}".format(quote(force_interactive + cmd))
ssh = ["ssh"]
if allocate_tty:
ssh.append("-tt")
if emulate_interactive:
force_interactive = "set -i || true && source ~/.bashrc && "
cmd = "bash --login -c {}".format(quote(force_interactive + cmd))
if expect_error:
call = self.process_runner.call
else:
call = self.process_runner.check_call
call(
ssh + [
"-o", "ConnectTimeout={}s".format(connect_timeout), "-o",
"StrictHostKeyChecking=no", "-i", self.ssh_private_key,
"{}@{}".format(self.ssh_user, self.ssh_ip), cmd
],
stdout=redirect or self.stdout,
stderr=redirect or self.stderr)
+4 -4
View File
@@ -316,8 +316,8 @@ class Monitor(object):
if ip:
self.load_metrics.update(ip, static_resources, dynamic_resources)
else:
print("Warning: could not find ip for client {}."
.format(client_id))
print("Warning: could not find ip for client {} in {}.".format(
client_id, self.local_scheduler_id_to_ip_map))
def xray_heartbeat_handler(self, unused_channel, data):
"""Handle an xray heartbeat message from Redis."""
@@ -342,8 +342,8 @@ class Monitor(object):
if ip:
self.load_metrics.update(ip, static_resources, dynamic_resources)
else:
print("Warning: could not find ip for client {}."
.format(client_id))
print("Warning: could not find ip for client {} in {}.".format(
client_id, self.local_scheduler_id_to_ip_map))
def plasma_manager_heartbeat_handler(self, unused_channel, data):
"""Handle a plasma manager heartbeat from Redis.
+95 -18
View File
@@ -8,7 +8,8 @@ import os
import subprocess
import ray.services as services
from ray.autoscaler.commands import (create_or_update_cluster,
from ray.autoscaler.commands import (attach_cluster, exec_cluster,
create_or_update_cluster,
teardown_cluster, get_head_node_ip)
import ray.utils
@@ -370,6 +371,12 @@ def stop():
default=False,
help=("Whether to skip restarting Ray services during the update. "
"This avoids interrupting running jobs."))
@click.option(
"--restart-only",
is_flag=True,
default=False,
help=("Whether to skip running setup commands and only restart Ray. "
"This cannot be used with 'no-restart'."))
@click.option(
"--min-workers",
required=False,
@@ -381,39 +388,109 @@ def stop():
type=int,
help=("Override the configured max worker node count for the cluster."))
@click.option(
"--yes",
"-y",
is_flag=True,
default=False,
help=("Don't ask for confirmation."))
def create_or_update(cluster_config_file, min_workers, max_workers, no_restart,
yes):
create_or_update_cluster(cluster_config_file, min_workers, max_workers,
no_restart, yes)
@click.command()
@click.argument("cluster_config_file", required=True, type=str)
"--cluster-name",
required=False,
type=str,
help=("Override the configured cluster name."))
@click.option(
"--yes",
"-y",
is_flag=True,
default=False,
help=("Don't ask for confirmation."))
def teardown(cluster_config_file, yes):
teardown_cluster(cluster_config_file, yes)
def create_or_update(cluster_config_file, min_workers, max_workers, no_restart,
restart_only, yes, cluster_name):
if restart_only or no_restart:
assert restart_only != no_restart, "Cannot set both 'restart_only' " \
"and 'no_restart' at the same time!"
create_or_update_cluster(cluster_config_file, min_workers, max_workers,
no_restart, restart_only, yes, cluster_name)
@click.command()
@click.argument("cluster_config_file", required=True, type=str)
def get_head_ip(cluster_config_file):
click.echo(get_head_node_ip(cluster_config_file))
@click.option(
"--workers-only",
is_flag=True,
default=False,
help=("Only destroy the workers."))
@click.option(
"--yes",
"-y",
is_flag=True,
default=False,
help=("Don't ask for confirmation."))
@click.option(
"--cluster-name",
required=False,
type=str,
help=("Override the configured cluster name."))
def teardown(cluster_config_file, yes, workers_only, cluster_name):
teardown_cluster(cluster_config_file, yes, workers_only, cluster_name)
@click.command()
@click.argument("cluster_config_file", required=True, type=str)
@click.option(
"--start",
is_flag=True,
default=False,
help=("Start the cluster if needed."))
@click.option(
"--cluster-name",
required=False,
type=str,
help=("Override the configured cluster name."))
def attach(cluster_config_file, start, cluster_name):
attach_cluster(cluster_config_file, start, cluster_name)
@click.command()
@click.argument("cluster_config_file", required=True, type=str)
@click.argument("cmd", required=True, type=str)
@click.option(
"--stop",
is_flag=True,
default=False,
help=("Stop the cluster after the command finishes running."))
@click.option(
"--start",
is_flag=True,
default=False,
help=("Start the cluster if needed."))
@click.option(
"--screen",
is_flag=True,
default=False,
help=("Run the command in a screen."))
@click.option(
"--cluster-name",
required=False,
type=str,
help=("Override the configured cluster name."))
def exec_cmd(cluster_config_file, cmd, screen, stop, start, cluster_name):
exec_cluster(cluster_config_file, cmd, screen, stop, start, cluster_name)
@click.command()
@click.argument("cluster_config_file", required=True, type=str)
@click.option(
"--cluster-name",
required=False,
type=str,
help=("Override the configured cluster name."))
def get_head_ip(cluster_config_file, cluster_name):
click.echo(get_head_node_ip(cluster_config_file, cluster_name))
cli.add_command(start)
cli.add_command(stop)
cli.add_command(create_or_update)
cli.add_command(create_or_update, name="up")
cli.add_command(attach)
cli.add_command(exec_cmd, name="exec")
cli.add_command(teardown)
cli.add_command(teardown, name="down")
cli.add_command(get_head_ip)