diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index ab2627b51..370618944 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -89,11 +89,21 @@ You can use ``ray attach`` to attach to an interactive console on the cluster. Port-forwarding applications ---------------------------- -To run connect to applications running on the cluster (e.g. Jupyter notebook) using a web browser, you can forward the port to your local machine using SSH: +To run connect to applications running on the cluster (e.g. Jupyter notebook) using a web browser, you can use the port-forward option for ``ray exec``. The local port opened is the same as the remote port: .. code-block:: bash - $ ssh -L 8899:localhost:8899 -i @ 'source ~/anaconda3/bin/activate tensorflow_p36 && jupyter notebook --port=8899' + $ ray exec cluster.yaml --port-forward=8899 'source ~/anaconda3/bin/activate tensorflow_p36 && jupyter notebook --port=8899' + +Manually synchronizing files +---------------------------- + +To download or upload files to the cluster head node, use ``ray rsync_down`` or ``ray rsync_up``: + +.. code-block:: bash + + $ ray rsync_down cluster.yaml '/path/on/cluster' '/local/path' + $ ray rsync_up cluster.yaml '/local/path' '/path/on/cluster' Updating your cluster --------------------- diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 91c8eb1bd..e42af69a0 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -65,6 +65,8 @@ CLUSTER_CONFIG_SCHEMA = { "module": (str, OPTIONAL), # module, if using external node provider "project_id": (None, OPTIONAL), # gcp project id, if using gcp + "head_ip": (str, OPTIONAL), # local cluster head node + "worker_ips": (list, OPTIONAL), # local cluster worker nodes }, REQUIRED), diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 236541ec1..2d4a93660 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -38,19 +38,20 @@ def create_or_update_cluster(config_file, override_min_workers, if override_cluster_name is not None: config["cluster_name"] = override_cluster_name config = _bootstrap_config(config) - get_or_create_head_node(config, config_file, no_restart, restart_only, yes) + get_or_create_head_node(config, config_file, no_restart, restart_only, yes, + override_cluster_name) def _bootstrap_config(config): + config = fillout_defaults(config) + hasher = hashlib.sha1() hasher.update(json.dumps([config], sort_keys=True).encode("utf-8")) cache_key = os.path.join(tempfile.gettempdir(), "ray-config-{}".format(hasher.hexdigest())) if os.path.exists(cache_key): - print("Cached settings:", cache_key) return json.loads(open(cache_key).read()) validate_config(config) - config = fillout_defaults(config) importer = NODE_PROVIDERS.get(config["provider"]["type"]) if not importer: @@ -91,8 +92,8 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) -def get_or_create_head_node(config, config_file, no_restart, restart_only, - yes): +def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, + override_cluster_name): """Create the cluster head node, which in turn creates the workers.""" provider = get_node_provider(config["provider"], config["cluster_name"]) @@ -192,10 +193,16 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, and "--autoscaling-config" in s): monitor_str = "docker exec {} /bin/sh -c {}".format( config["docker"]["container_name"], quote(monitor_str)) + if override_cluster_name: + modifiers = " --cluster-name={}".format(quote(override_cluster_name)) + else: + modifiers = "" print("To monitor auto-scaling activity, you can run:\n\n" - " ray exec {} {} --cluster-name={}\n".format( - config_file, quote(monitor_str), quote(config["cluster_name"]))) - print("To login to the cluster, run:\n\n" + " ray exec {} {}{}\n".format(config_file, quote(monitor_str), + modifiers)) + print("To open a console on the cluster:\n\n" + " ray attach {}{}\n".format(config_file, modifiers)) + print("To ssh manually to the cluster, run:\n\n" " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"], config["auth"]["ssh_user"], provider.external_ip(head_node))) @@ -211,10 +218,11 @@ def attach_cluster(config_file, start, override_cluster_name): """ exec_cluster(config_file, "screen -L -xRR", False, False, start, - override_cluster_name) + override_cluster_name, None) -def exec_cluster(config_file, cmd, screen, stop, start, override_cluster_name): +def exec_cluster(config_file, cmd, screen, stop, start, override_cluster_name, + port_forward): """Runs a command on the specified cluster. Arguments: @@ -224,13 +232,15 @@ def exec_cluster(config_file, cmd, screen, stop, start, override_cluster_name): stop: whether to stop the cluster after command run start: whether to start the cluster if it isn't up override_cluster_name: set the name of the cluster + port_forward: port to forward """ config = yaml.load(open(config_file).read()) if override_cluster_name is not None: config["cluster_name"] = override_cluster_name config = _bootstrap_config(config) - head_node = _get_head_node(config, config_file, create_if_needed=start) + head_node = _get_head_node( + config, config_file, override_cluster_name, create_if_needed=start) updater = NodeUpdaterProcess( head_node, config["provider"], @@ -242,10 +252,10 @@ def exec_cluster(config_file, cmd, screen, stop, start, override_cluster_name): if stop: cmd += ("; ray stop; ray teardown ~/ray_bootstrap_config.yaml --yes " "--workers-only; sudo shutdown -h now") - _exec(updater, cmd, screen, expect_error=stop) + _exec(updater, cmd, screen, expect_error=stop, port_forward=port_forward) -def _exec(updater, cmd, screen, expect_error=False): +def _exec(updater, cmd, screen, expect_error=False, port_forward=None): if cmd: if screen: cmd = [ @@ -254,7 +264,43 @@ def _exec(updater, cmd, screen, expect_error=False): ] cmd = " ".join(cmd) updater.ssh_cmd( - cmd, verbose=True, allocate_tty=True, expect_error=expect_error) + cmd, + verbose=False, + allocate_tty=True, + expect_error=expect_error, + port_forward=port_forward) + + +def rsync(config_file, source, target, override_cluster_name, down): + """Rsyncs files. + + Arguments: + config_file: path to the cluster yaml + source: source dir + target: target dir + override_cluster_name: set the name of the cluster + down: whether we're syncing remote -> local + """ + + config = yaml.load(open(config_file).read()) + if override_cluster_name is not None: + config["cluster_name"] = override_cluster_name + config = _bootstrap_config(config) + head_node = _get_head_node( + config, config_file, override_cluster_name, create_if_needed=False) + updater = NodeUpdaterProcess( + head_node, + config["provider"], + config["auth"], + config["cluster_name"], + config["file_mounts"], [], + "", + redirect_output=False) + if down: + rsync = updater.rsync_down + else: + rsync = updater.rsync_up + rsync(source, target, check_error=False) def get_head_node_ip(config_file, override_cluster_name): @@ -264,11 +310,14 @@ def get_head_node_ip(config_file, override_cluster_name): if override_cluster_name is not None: config["cluster_name"] = override_cluster_name provider = get_node_provider(config["provider"], config["cluster_name"]) - head_node = _get_head_node(config, config_file) + head_node = _get_head_node(config, config_file, override_cluster_name) return provider.external_ip(head_node) -def _get_head_node(config, config_file, create_if_needed=False): +def _get_head_node(config, + config_file, + override_cluster_name, + create_if_needed=False): provider = get_node_provider(config["provider"], config["cluster_name"]) head_node_tags = { TAG_RAY_NODE_TYPE: "head", @@ -283,8 +332,10 @@ def _get_head_node(config, config_file, create_if_needed=False): config_file, restart_only=False, no_restart=False, - yes=True) - return _get_head_node(config, config_file, create_if_needed=False) + yes=True, + override_cluster_name=override_cluster_name) + return _get_head_node( + config, config_file, override_cluster_name, create_if_needed=False) else: print("Head node of cluster ({}) not found!".format( config["cluster_name"])) diff --git a/python/ray/autoscaler/local/__init__.py b/python/ray/autoscaler/local/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/autoscaler/local/config.py b/python/ray/autoscaler/local/config.py new file mode 100644 index 000000000..ab73bbfa2 --- /dev/null +++ b/python/ray/autoscaler/local/config.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + + +def bootstrap_local(config): + return config diff --git a/python/ray/autoscaler/local/example-full.yaml b/python/ray/autoscaler/local/example-full.yaml new file mode 100644 index 000000000..11f7c960f --- /dev/null +++ b/python/ray/autoscaler/local/example-full.yaml @@ -0,0 +1,32 @@ +cluster_name: default +min_workers: 0 +max_workers: 0 +docker: + image: "" + container_name: "" +target_utilization_fraction: 0.8 +idle_timeout_minutes: 5 +provider: + type: local + head_ip: YOUR_HEAD_NODE_HOSTNAME + worker_ips: [] +auth: + ssh_user: YOUR_USERNAME + ssh_private_key: ~/.ssh/id_rsa +head_node: {} +worker_nodes: {} +file_mounts: + "/tmp/ray_sha": "/YOUR/LOCAL/RAY/REPO/.git/refs/heads/YOUR_BRANCH" +setup_commands: [] +head_setup_commands: [] +worker_setup_commands: [] +setup_commands: + - source activate ray && test -e ray || git clone https://github.com/YOUR_GITHUB/ray.git + - source activate ray && cd ray && git fetch && git reset --hard `cat /tmp/ray_sha` +# - source activate ray && cd ray/python && pip install -e . +head_start_ray_commands: + - source activate ray && ray stop + - source activate ray && ulimit -c unlimited && ray start --head --redis-port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml +worker_start_ray_commands: + - source activate ray && ray stop + - source activate ray && ray start --redis-address=$RAY_HEAD_IP:6379 diff --git a/python/ray/autoscaler/local/node_provider.py b/python/ray/autoscaler/local/node_provider.py new file mode 100644 index 000000000..886d5c2a8 --- /dev/null +++ b/python/ray/autoscaler/local/node_provider.py @@ -0,0 +1,126 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from filelock import FileLock +import json +import os +import socket + +from ray.autoscaler.node_provider import NodeProvider +from ray.autoscaler.tags import TAG_RAY_NODE_TYPE + + +class ClusterState(object): + def __init__(self, lock_path, save_path, provider_config): + self.file_lock = FileLock(lock_path) + self.save_path = save_path + + with self.file_lock: + if os.path.exists(self.save_path): + workers = json.loads(open(self.save_path).read()) + else: + workers = {} + print("Loaded cluster state", workers) + for worker_ip in provider_config["worker_ips"]: + if worker_ip not in workers: + workers[worker_ip] = { + "tags": { + TAG_RAY_NODE_TYPE: "worker" + }, + "state": "terminated", + } + else: + assert workers[worker_ip]["tags"][ + TAG_RAY_NODE_TYPE] == "worker" + if provider_config["head_ip"] not in workers: + workers[provider_config["head_ip"]] = { + "tags": { + TAG_RAY_NODE_TYPE: "head" + }, + "state": "terminated", + } + else: + assert workers[provider_config["head_ip"]]["tags"][ + TAG_RAY_NODE_TYPE] == "head" + assert len(workers) == len(provider_config["worker_ips"]) + 1 + with open(self.save_path, "w") as f: + print("Writing cluster state", workers) + f.write(json.dumps(workers)) + + def get(self): + with self.file_lock: + workers = json.loads(open(self.save_path).read()) + return workers + + def put(self, worker_id, info): + assert "tags" in info + assert "state" in info + with self.file_lock: + workers = self.get() + workers[worker_id] = info + with open(self.save_path, "w") as f: + print("Writing cluster state", workers) + f.write(json.dumps(workers)) + + +class LocalNodeProvider(NodeProvider): + def __init__(self, provider_config, cluster_name): + NodeProvider.__init__(self, provider_config, cluster_name) + self.state = ClusterState("/tmp/cluster-{}.lock".format(cluster_name), + "/tmp/cluster-{}.state".format(cluster_name), + provider_config) + + def nodes(self, tag_filters): + workers = self.state.get() + matching_ips = [] + for worker_ip, info in workers.items(): + if info["state"] == "terminated": + continue + ok = True + for k, v in tag_filters.items(): + if info["tags"].get(k) != v: + ok = False + break + if ok: + matching_ips.append(worker_ip) + return matching_ips + + def is_running(self, node_id): + return self.state.get()[node_id]["state"] == "running" + + def is_terminated(self, node_id): + return not self.is_running(node_id) + + def node_tags(self, node_id): + return self.state.get()[node_id]["tags"] + + def external_ip(self, node_id): + return socket.gethostbyname(node_id) + + def internal_ip(self, node_id): + return socket.gethostbyname(node_id) + + def set_node_tags(self, node_id, tags): + with self.state.file_lock: + info = self.state.get()[node_id] + info["tags"].update(tags) + self.state.put(node_id, info) + + def create_node(self, node_config, tags, count): + node_type = tags[TAG_RAY_NODE_TYPE] + with self.state.file_lock: + workers = self.state.get() + for node_id, info in workers.items(): + if (info["state"] == "terminated" + and info["tags"][TAG_RAY_NODE_TYPE] == node_type): + info["tags"] = tags + info["state"] = "running" + self.state.put(node_id, info) + return + + def terminate_node(self, node_id): + workers = self.state.get() + info = workers[node_id] + info["state"] = "terminated" + self.state.put(node_id, info) diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 463346551..acbe96772 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -19,6 +19,18 @@ def import_gcp(): return bootstrap_gcp, GCPNodeProvider +def import_local(): + from ray.autoscaler.local.config import bootstrap_local + from ray.autoscaler.local.node_provider import LocalNodeProvider + return bootstrap_local, LocalNodeProvider + + +def load_local_example_config(): + import ray.autoscaler.local as ray_local + return os.path.join( + os.path.dirname(ray_local.__file__), "example-full.yaml") + + def load_aws_example_config(): import ray.autoscaler.aws as ray_aws return os.path.join(os.path.dirname(ray_aws.__file__), "example-full.yaml") @@ -39,22 +51,22 @@ def import_external(): NODE_PROVIDERS = { + "local": import_local, "aws": import_aws, "gcp": import_gcp, "azure": None, # TODO: support more node providers "kubernetes": None, "docker": None, - "local_cluster": None, "external": import_external # Import an external module } DEFAULT_CONFIGS = { + "local": load_local_example_config, "aws": load_aws_example_config, "gcp": load_gcp_example_config, "azure": None, # TODO: support more node providers "kubernetes": None, "docker": None, - "local_cluster": None, } diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index d5838f7c1..27f9aceb6 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -165,15 +165,7 @@ class NodeUpdater(object): if not remote_path.endswith("/"): remote_path += "/" self.ssh_cmd("mkdir -p {}".format(os.path.dirname(remote_path))) - self.process_runner.check_call( - [ - "rsync", "-e", "ssh -i {} ".format(self.ssh_private_key) + - "-o ConnectTimeout=120s -o StrictHostKeyChecking=no", - "--delete", "-avz", "{}".format(local_path), - "{}@{}:{}".format(self.ssh_user, self.ssh_ip, remote_path) - ], - stdout=self.stdout, - stderr=self.stderr) + self.rsync_up(local_path, remote_path) # Run init commands self.provider.set_node_tags(self.node_id, @@ -181,6 +173,35 @@ class NodeUpdater(object): for cmd in self.setup_cmds: self.ssh_cmd(cmd, verbose=True) + def rsync_up(self, source, target, check_error=True): + if check_error: + call = self.process_runner.call + else: + call = self.process_runner.check_call + call( + [ + "rsync", "-e", "ssh -i {} ".format(self.ssh_private_key) + + "-o ConnectTimeout=120s -o StrictHostKeyChecking=no", + "--delete", "-avz", source, "{}@{}:{}".format( + self.ssh_user, self.ssh_ip, target) + ], + stdout=self.stdout, + stderr=self.stderr) + + def rsync_down(self, source, target, check_error=True): + if check_error: + call = self.process_runner.call + else: + call = self.process_runner.check_call + call( + [ + "rsync", "-e", "ssh -i {} ".format(self.ssh_private_key) + + "-o ConnectTimeout=120s -o StrictHostKeyChecking=no", "-avz", + "{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target + ], + stdout=self.stdout, + stderr=self.stderr) + def ssh_cmd(self, cmd, connect_timeout=120, @@ -188,7 +209,8 @@ class NodeUpdater(object): verbose=False, allocate_tty=False, emulate_interactive=True, - expect_error=False): + expect_error=False, + port_forward=None): if verbose: print( "NodeUpdater: running {} on {}...".format( @@ -198,14 +220,22 @@ class NodeUpdater(object): if allocate_tty: ssh.append("-tt") if emulate_interactive: - force_interactive = "set -i || true && source ~/.bashrc && " + force_interactive = ( + "set -i || true && source ~/.bashrc && " + "export OMP_NUM_THREADS=1 PYTHONWARNINGS=ignore && ") cmd = "bash --login -c {}".format(quote(force_interactive + cmd)) if expect_error: call = self.process_runner.call else: call = self.process_runner.check_call + if port_forward is None: + ssh_opt = [] + else: + ssh_opt = [ + "-L", "{}:localhost:{}".format(port_forward, port_forward) + ] call( - ssh + [ + ssh + ssh_opt + [ "-o", "ConnectTimeout={}s".format(connect_timeout), "-o", "StrictHostKeyChecking=no", "-i", self.ssh_private_key, "{}@{}".format(self.ssh_user, self.ssh_ip), cmd diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index a15295d84..13aea1e57 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -9,7 +9,7 @@ import subprocess import ray.services as services from ray.autoscaler.commands import (attach_cluster, exec_cluster, - create_or_update_cluster, + create_or_update_cluster, rsync, teardown_cluster, get_head_node_ip) import ray.utils @@ -445,6 +445,32 @@ def attach(cluster_config_file, start, cluster_name): attach_cluster(cluster_config_file, start, cluster_name) +@click.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.argument("source", required=True, type=str) +@click.argument("target", required=True, type=str) +@click.option( + "--cluster-name", + required=False, + type=str, + help=("Override the configured cluster name.")) +def rsync_down(cluster_config_file, source, target, cluster_name): + rsync(cluster_config_file, source, target, cluster_name, down=True) + + +@click.command() +@click.argument("cluster_config_file", required=True, type=str) +@click.argument("source", required=True, type=str) +@click.argument("target", required=True, type=str) +@click.option( + "--cluster-name", + required=False, + type=str, + help=("Override the configured cluster name.")) +def rsync_up(cluster_config_file, source, target, cluster_name): + rsync(cluster_config_file, source, target, cluster_name, down=False) + + @click.command() @click.argument("cluster_config_file", required=True, type=str) @click.argument("cmd", required=True, type=str) @@ -468,8 +494,12 @@ def attach(cluster_config_file, start, cluster_name): required=False, type=str, help=("Override the configured cluster name.")) -def exec_cmd(cluster_config_file, cmd, screen, stop, start, cluster_name): - exec_cluster(cluster_config_file, cmd, screen, stop, start, cluster_name) +@click.option( + "--port-forward", required=False, type=int, help=("Port to forward.")) +def exec_cmd(cluster_config_file, cmd, screen, stop, start, cluster_name, + port_forward): + exec_cluster(cluster_config_file, cmd, screen, stop, start, cluster_name, + port_forward) @click.command() @@ -489,6 +519,8 @@ cli.add_command(create_or_update) cli.add_command(create_or_update, name="up") cli.add_command(attach) cli.add_command(exec_cmd, name="exec") +cli.add_command(rsync_down) +cli.add_command(rsync_up) cli.add_command(teardown) cli.add_command(teardown, name="down") cli.add_command(get_head_ip)