diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 0d2f39563..be08b86d6 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -321,7 +321,8 @@ class StandardAutoscaler: self.config["worker_start_ray_commands"]), runtime_hash=self.runtime_hash, process_runner=self.process_runner, - use_internal_ip=True) + use_internal_ip=True, + docker_config=self.config["docker"]) updater.start() self.updaters[node_id] = updater @@ -360,7 +361,8 @@ class StandardAutoscaler: ray_start_commands=with_head_node_ip(ray_start_commands), runtime_hash=self.runtime_hash, process_runner=self.process_runner, - use_internal_ip=True) + use_internal_ip=True, + docker_config=self.config["docker"]) updater.start() self.updaters[node_id] = updater diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 121d2970d..e8260545d 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -147,7 +147,8 @@ def kill_node(config_file, yes, hard, override_cluster_name): initialization_commands=[], setup_commands=[], ray_start_commands=[], - runtime_hash="") + runtime_hash="", + docker_config=config["docker"]) _exec(updater, "ray stop", False, False) @@ -286,7 +287,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, setup_commands=init_commands, ray_start_commands=ray_start_commands, runtime_hash=runtime_hash, - ) + docker_config=config["docker"]) updater.start() updater.join() @@ -407,7 +408,7 @@ def exec_cluster(config_file, setup_commands=[], ray_start_commands=[], runtime_hash="", - ) + docker_config=config["docker"]) def wrap_docker(command): container_name = config["docker"]["container_name"] @@ -529,7 +530,7 @@ def rsync(config_file, setup_commands=[], ray_start_commands=[], runtime_hash="", - ) + docker_config=config["docker"]) if down: rsync = updater.rsync_down else: diff --git a/python/ray/autoscaler/docker.py b/python/ray/autoscaler/docker.py index b3d5f7789..7dde54503 100644 --- a/python/ray/autoscaler/docker.py +++ b/python/ray/autoscaler/docker.py @@ -80,6 +80,10 @@ def aptwait_cmd(): "do echo 'Waiting for release of dpkg/apt locks'; sleep 5; done") +def check_docker_running_cmd(cname): + return " ".join(["docker", "inspect", "-f", "'{{.State.Running}}'", cname]) + + def docker_start_cmds(user, image, mount, cname, user_options): cmds = [] @@ -99,15 +103,13 @@ def docker_start_cmds(user, image, mount, cname, user_options): user_options_str = " ".join(user_options) # docker run command - docker_check = [ - "docker", "inspect", "-f", "'{{.State.Running}}'", cname, "||" - ] + docker_check = check_docker_running_cmd(cname) + " || " docker_run = [ "docker", "run", "--rm", "--name {}".format(cname), "-d", "-it", port_flags, mount_flags, env_flags, user_options_str, "--net=host", image, "bash" ] - cmds.append(" ".join(docker_check + docker_run)) + cmds.append(docker_check + " ".join(docker_run)) return cmds diff --git a/python/ray/autoscaler/kubernetes/node_provider.py b/python/ray/autoscaler/kubernetes/node_provider.py index 3471e4e76..88c3a6d83 100644 --- a/python/ray/autoscaler/kubernetes/node_provider.py +++ b/python/ray/autoscaler/kubernetes/node_provider.py @@ -87,7 +87,13 @@ class KubernetesNodeProvider(NodeProvider): for node_id in node_ids: self.terminate_node(node_id) - def get_command_runner(self, log_prefix, node_id, auth_config, - cluster_name, process_runner, use_internal_ip): + def get_command_runner(self, + log_prefix, + node_id, + auth_config, + cluster_name, + process_runner, + use_internal_ip, + docker_config=None): return KubernetesCommandRunner(log_prefix, self.namespace, node_id, auth_config, process_runner) diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index b0cc39f2a..15b6f8573 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -3,7 +3,7 @@ import logging import os import yaml -from ray.autoscaler.updater import SSHCommandRunner +from ray.autoscaler.updater import SSHCommandRunner, DockerCommandRunner logger = logging.getLogger(__name__) @@ -211,8 +211,14 @@ class NodeProvider: """Clean-up when a Provider is no longer required.""" pass - def get_command_runner(self, log_prefix, node_id, auth_config, - cluster_name, process_runner, use_internal_ip): + def get_command_runner(self, + log_prefix, + node_id, + auth_config, + cluster_name, + process_runner, + use_internal_ip, + docker_config=None): """ Returns the CommandRunner class used to perform SSH commands. Args: @@ -226,7 +232,19 @@ class NodeProvider: in the CommandRunner. E.g., subprocess. use_internal_ip(bool): whether the node_id belongs to an internal ip or external ip. + docker_config(dict): If set, the docker information of the docker + container that commands should be run on. """ - - return SSHCommandRunner(log_prefix, node_id, self, auth_config, - cluster_name, process_runner, use_internal_ip) + common_args = { + "log_prefix": log_prefix, + "node_id": node_id, + "provider": self, + "auth_config": auth_config, + "cluster_name": cluster_name, + "process_runner": process_runner, + "use_internal_ip": use_internal_ip + } + if docker_config and docker_config["container_name"] != "": + return DockerCommandRunner(docker_config, **common_args) + else: + return SSHCommandRunner(**common_args) diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index cf253e775..07e36ca37 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -17,6 +17,7 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \ STATUS_SETTING_UP, STATUS_SYNCING_FILES from ray.autoscaler.log_timer import LogTimer +from ray.autoscaler.docker import check_docker_running_cmd logger = logging.getLogger(__name__) @@ -294,6 +295,66 @@ class SSHCommandRunner: self.ssh_private_key, self.ssh_user, self.ssh_ip) +class DockerCommandRunner(SSHCommandRunner): + def __init__(self, docker_config, **common_args): + self.ssh_command_runner = SSHCommandRunner(**common_args) + self.docker_name = docker_config["container_name"] + self.docker_config = docker_config + self.home_dir = None + + def run(self, + cmd, + timeout=120, + exit_on_fail=False, + port_forward=None, + with_output=False): + + return self.ssh_command_runner.run( + cmd, + timeout=timeout, + exit_on_fail=exit_on_fail, + port_forward=None, + with_output=False) + + def check_container_status(self): + no_exist = "not_present" + cmd = check_docker_running_cmd(self.docker_name) + " ".join( + ["||", "echo", quote(no_exist)]) + output = self.ssh_command_runner.run( + cmd, with_output=True).decode("utf-8").strip() + if no_exist in output: + return False + return output + + def run_rsync_up(self, source, target): + self.ssh_command_runner.run_rsync_up(source, target) + if self.check_container_status(): + self.ssh_command_runner.run("docker cp {} {}:{}".format( + target, self.docker_name, self.docker_expand_user(target))) + + def run_rsync_down(self, source, target): + self.ssh_command_runner.run("docker cp {}:{} {}".format( + self.docker_name, self.docker_expand_user(source), source)) + self.ssh_command_runner.run_rsync_down(source, target) + + def remote_shell_command_str(self): + inner_str = self.ssh_command_runner.remote_shell_command_str().replace( + "ssh", "ssh -tt", 1).strip("\n") + return inner_str + " docker exec -it {} /bin/bash\n".format( + self.docker_name) + + def docker_expand_user(self, string): + if string.find("~") == 0: + if self.home_dir is None: + self.home_dir = self.ssh_command_runner.run( + "docker exec {} env | grep HOME | cut -d'=' -f2".format( + self.docker_name), + with_output=True).decode("utf-8").strip() + return string.replace("~", self.home_dir) + else: + return string + + class NodeUpdater: """A process for syncing files and running init commands on a node.""" @@ -309,14 +370,15 @@ class NodeUpdater: ray_start_commands, runtime_hash, process_runner=subprocess, - use_internal_ip=False): + use_internal_ip=False, + docker_config=None): self.log_prefix = "NodeUpdater: {}: ".format(node_id) use_internal_ip = (use_internal_ip or provider_config.get("use_internal_ips", False)) self.cmd_runner = provider.get_command_runner( self.log_prefix, node_id, auth_config, cluster_name, - process_runner, use_internal_ip) + process_runner, use_internal_ip, docker_config) self.daemon = True self.process_runner = process_runner diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 462ac440e..86c5f09ce 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -847,8 +847,9 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start, if start: create_or_update_cluster(cluster_config_file, None, None, False, False, True, cluster_name) - - target = os.path.join("~", os.path.basename(script)) + target = os.path.basename(script) + if not docker: + target = os.path.join("~", target) rsync(cluster_config_file, script, target, cluster_name, down=False) command_parts = ["python", target]