From dc378a80b77685cc45c2b53e2f484ed9c6d8c44c Mon Sep 17 00:00:00 2001 From: Ian Rodney Date: Wed, 26 Aug 2020 10:29:06 -0700 Subject: [PATCH] [autoscaler/docker] Docker Inititialization Revamp (#9515) * Basic idea * Small fixes * dockerize start commands in Command Runner * Remove run_init from CommandRunnerInterface * Add Parens Co-authored-by: Simon Mo * Cleaning up * Response to richards comments * Further small fixes * Fix Json * schema format fix * cleanup * run more often * fix indent * Fix richards responses * fix ups * remove docker_commands from schema * default to list * fix docker cmd runner test * lint fix Co-authored-by: Simon Mo --- python/ray/autoscaler/autoscaler.py | 2 + python/ray/autoscaler/command_runner.py | 74 ++++++++++++++++++------ python/ray/autoscaler/commands.py | 15 ++--- python/ray/autoscaler/docker.py | 75 ++++++------------------- python/ray/autoscaler/updater.py | 48 ++++++++++++++-- python/ray/autoscaler/util.py | 6 +- python/ray/tests/test_command_runner.py | 3 +- 7 files changed, 132 insertions(+), 91 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 46c0cf4ea..d37d18408 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -385,6 +385,7 @@ class StandardAutoscaler: file_mounts_contents_hash=self.file_mounts_contents_hash, process_runner=self.process_runner, use_internal_ip=True, + is_head_node=False, docker_config=self.config.get("docker")) updater.start() self.updaters[node_id] = updater @@ -441,6 +442,7 @@ class StandardAutoscaler: ray_start_commands=with_head_node_ip(ray_start_commands), runtime_hash=self.runtime_hash, file_mounts_contents_hash=self.file_mounts_contents_hash, + is_head_node=False, cluster_synced_files=self.config["cluster_synced_files"], process_runner=self.process_runner, use_internal_ip=True, diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index 1da2f8836..cd646be1f 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -9,7 +9,11 @@ import subprocess import sys import time -from ray.autoscaler.docker import check_docker_running_cmd, with_docker_exec +from ray.autoscaler.docker import check_docker_running_cmd, \ + check_docker_image, \ + docker_autoscaler_setup, \ + docker_start_cmds, \ + with_docker_exec from ray.autoscaler.log_timer import LogTimer from ray.autoscaler.subprocess_output_util import (run_cmd_redirected, @@ -565,14 +569,14 @@ class SSHCommandRunner(CommandRunnerInterface): self.ssh_user, self.ssh_ip) -class DockerCommandRunner(SSHCommandRunner): +class DockerCommandRunner(CommandRunnerInterface): def __init__(self, docker_config, **common_args): self.ssh_command_runner = SSHCommandRunner(**common_args) - self.docker_name = docker_config["container_name"] + self.container_name = docker_config["container_name"] self.docker_config = docker_config self.home_dir = None - self._check_docker_installed() self.shutdown = False + self.initialized = False def run( self, @@ -595,7 +599,8 @@ class DockerCommandRunner(SSHCommandRunner): cmd = self._docker_expand_user(cmd, any_char=True) cmd = " ".join(_with_interactive(cmd)) cmd = with_docker_exec( - [cmd], container_name=self.docker_name, + [cmd], + container_name=self.container_name, with_interactive=True)[0] if self.shutdown: @@ -622,7 +627,7 @@ class DockerCommandRunner(SSHCommandRunner): # Without it, docker copies the source *into* the target target += "/." self.ssh_command_runner.run("docker cp {} {}:{}".format( - target, self.docker_name, + target, self.container_name, self._docker_expand_user(protected_path))) def run_rsync_down(self, source, target): @@ -636,7 +641,7 @@ class DockerCommandRunner(SSHCommandRunner): # Adding a "." means that docker copies the *contents* # Without it, docker copies the source *into* the target self.ssh_command_runner.run("docker cp {}:{} {}".format( - self.docker_name, self._docker_expand_user(protected_path), + self.container_name, self._docker_expand_user(protected_path), source)) self.ssh_command_runner.run_rsync_down(source, target) @@ -644,7 +649,7 @@ class DockerCommandRunner(SSHCommandRunner): inner_str = self.ssh_command_runner.remote_shell_command_str().replace( "ssh", "ssh -tt", 1).strip("\n") return inner_str + " docker exec -it {} /bin/bash\n".format( - self.docker_name) + self.container_name) def _check_docker_installed(self): try: @@ -665,14 +670,14 @@ class DockerCommandRunner(SSHCommandRunner): self.shutdown = True def _check_container_status(self): - no_exist = "not_present" - cmd = check_docker_running_cmd(self.docker_name) + " ".join( - ["||", "echo", quote(no_exist)]) + if self.initialized: + return True output = self.ssh_command_runner.run( - cmd, with_output=True).decode("utf-8").strip() - if no_exist in output: - return False - return "true" in output.lower() + check_docker_running_cmd(self.container_name), + with_output=True).decode("utf-8").strip() + # Checks for the false positive where "true" is in the container name + return ("true" in output.lower() + and "no such object" not in output.lower()) def _docker_expand_user(self, string, any_char=False): user_pos = string.find("~") @@ -680,7 +685,7 @@ class DockerCommandRunner(SSHCommandRunner): if self.home_dir is None: self.home_dir = self.ssh_command_runner.run( "docker exec {} env | grep HOME | cut -d'=' -f2".format( - self.docker_name), + self.container_name), with_output=True).decode("utf-8").strip() if any_char: @@ -690,3 +695,40 @@ class DockerCommandRunner(SSHCommandRunner): return string.replace("~", self.home_dir, 1) return string + + def run_init(self, *, as_head, file_mounts): + image = self.docker_config.get("image") + if image is None: + image = self.docker_config.get( + f"{'head' if as_head else 'worker'}_image") + + self._check_docker_installed() + if self.docker_config.get("pull_before_run", True): + assert image, "Image must be included in config if " + \ + "pull_before_run is specified" + + self.run("docker pull {}".format(image), run_env="host") + + start_command = docker_start_cmds( + self.ssh_command_runner.ssh_user, image, file_mounts, + self.container_name, + self.docker_config.get("run_options", []) + self.docker_config.get( + f"{'head' if as_head else 'worker'}_run_options", [])) + + if not self._check_container_status(): + self.run(start_command, run_env="host") + else: + running_image = self.run( + check_docker_image(self.container_name), + with_output=True, + run_env="host").decode("utf-8").strip() + if running_image != image: + logger.error(f"A container with name {self.container_name} " + + f"is running image {running_image} instead " + + f"of {image} (which was provided in the YAML") + + # Copy bootstrap config & key over + if as_head: + for copy_cmd in docker_autoscaler_setup(self.container_name): + self.run(copy_cmd, run_env="host") + self.initialized = True diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 1ffc6a58e..16cbfa0c8 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -405,6 +405,7 @@ def kill_node(config_file, yes, hard, override_cluster_name): ray_start_commands=[], runtime_hash="", file_mounts_contents_hash="", + is_head_node=False, docker_config=config.get("docker")) _exec(updater, "ray stop", False, False) @@ -658,6 +659,7 @@ def get_or_create_head_node(config, process_runner=_runner, runtime_hash=runtime_hash, file_mounts_contents_hash=file_mounts_contents_hash, + is_head_node=True, docker_config=config.get("docker")) updater.start() updater.join() @@ -822,6 +824,7 @@ def exec_cluster(config_file: str, ray_start_commands=[], runtime_hash="", file_mounts_contents_hash="", + is_head_node=True, docker_config=config.get("docker")) is_docker = isinstance(updater.cmd_runner, DockerCommandRunner) @@ -933,13 +936,10 @@ def rsync(config_file: str, # and _get_head_node does this too nodes = _get_worker_nodes(config, override_cluster_name) - nodes += [ - _get_head_node( - config, - config_file, - override_cluster_name, - create_if_needed=False) - ] + head_node = _get_head_node( + config, config_file, override_cluster_name, create_if_needed=False) + + nodes += [head_node] for node_id in nodes: updater = NodeUpdaterThread( @@ -954,6 +954,7 @@ def rsync(config_file: str, ray_start_commands=[], runtime_hash="", file_mounts_contents_hash="", + is_head_node=(node_id == head_node), docker_config=config.get("docker")) if down: rsync = updater.rsync_down diff --git a/python/ray/autoscaler/docker.py b/python/ray/autoscaler/docker.py index 98b334a57..ad643bf10 100644 --- a/python/ray/autoscaler/docker.py +++ b/python/ray/autoscaler/docker.py @@ -8,67 +8,23 @@ except ImportError: # py2 logger = logging.getLogger(__name__) -def dockerize_if_needed(config): +def validate_docker_config(config): if "docker" not in config: return config docker_image = config["docker"].get("image") - docker_pull = config["docker"].get("pull_before_run", True) cname = config["docker"].get("container_name") - run_options = config["docker"].get("run_options", []) head_docker_image = config["docker"].get("head_image", docker_image) - head_run_options = config["docker"].get("head_run_options", []) worker_docker_image = config["docker"].get("worker_image", docker_image) - worker_run_options = config["docker"].get("worker_run_options", []) - if not docker_image and not (head_docker_image and worker_docker_image): - if cname: - logger.warning( - "dockerize_if_needed: " - "Container name given but no Docker image(s) - continuing...") - return config + image_present = docker_image or (head_docker_image and worker_docker_image) + if (not cname) and (not image_present): + return else: - assert cname, "Must provide container name!" - ssh_user = config["auth"]["ssh_user"] - docker_mounts = {dst: dst for dst in config["file_mounts"]} + assert cname and image_present, "Must provide a container & image name" - if docker_pull: - docker_pull_cmd = "docker pull {}".format(docker_image) - config["initialization_commands"].append(docker_pull_cmd) - for node_type_config in config.get("available_node_types", - {}).values(): - node_type_config["initialization_commands"].append(docker_pull_cmd) - pass - - head_docker_start = docker_start_cmds(ssh_user, head_docker_image, - docker_mounts, cname, - run_options + head_run_options) - - worker_docker_start = docker_start_cmds(ssh_user, worker_docker_image, - docker_mounts, cname, - run_options + worker_run_options) - - config["head_setup_commands"] = head_docker_start + (with_docker_exec( - config["head_setup_commands"], container_name=cname)) - config["head_start_ray_commands"] = ( - docker_autoscaler_setup(cname) + with_docker_exec( - config["head_start_ray_commands"], container_name=cname)) - - config["worker_setup_commands"] = worker_docker_start + (with_docker_exec( - config["worker_setup_commands"], container_name=cname)) - config["worker_start_ray_commands"] = with_docker_exec( - config["worker_start_ray_commands"], - container_name=cname, - env_vars=["RAY_HEAD_IP"]) - - for node_type_config in config.get("available_node_types", {}).values(): - if "worker_setup_commands" in node_type_config: - node_type_config["worker_setup_commands"] = worker_docker_start + ( - with_docker_exec( - node_type_config["worker_setup_commands"], - container_name=cname)) return config @@ -91,11 +47,19 @@ def with_docker_exec(cmds, def check_docker_running_cmd(cname): - return " ".join(["docker", "inspect", "-f", "'{{.State.Running}}'", cname]) + return " ".join([ + "docker", "inspect", "-f", "'{{.State.Running}}'", cname, "||", "true" + ]) -def docker_start_cmds(user, image, mount, cname, user_options): - cmds = [] +def check_docker_image(cname): + return " ".join([ + "docker", "inspect", "-f", "'{{.Config.Image}}'", cname, "||", "true" + ]) + + +def docker_start_cmds(user, image, mount_dict, cname, user_options): + mount = {dst: dst for dst in mount_dict} # TODO(ilr) Move away from defaulting to /root/ mount_flags = " ".join([ @@ -109,16 +73,11 @@ def docker_start_cmds(user, image, mount, cname, user_options): ["-e {name}={val}".format(name=k, val=v) for k, v in env_vars.items()]) user_options_str = " ".join(user_options) - # TODO(ilr) Check command type - # docker run command - docker_check = check_docker_running_cmd(cname) + " || " docker_run = [ "docker", "run", "--rm", "--name {}".format(cname), "-d", "-it", mount_flags, env_flags, user_options_str, "--net=host", image, "bash" ] - cmds.append(docker_check + " ".join(docker_run)) - - return cmds + return " ".join(docker_run) def docker_autoscaler_setup(cname): diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index b25da5d9b..8f27a4677 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -10,7 +10,8 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \ TAG_RAY_FILE_MOUNTS_CONTENTS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \ STATUS_SETTING_UP, STATUS_SYNCING_FILES -from ray.autoscaler.command_runner import NODE_START_WAIT_S, ProcessRunnerError +from ray.autoscaler.command_runner import NODE_START_WAIT_S, \ + ProcessRunnerError, DockerCommandRunner from ray.autoscaler.log_timer import LogTimer import ray.autoscaler.subprocess_output_util as cmd_output_util @@ -25,7 +26,27 @@ READY_CHECK_INTERVAL = 5 class NodeUpdater: - """A process for syncing files and running init commands on a node.""" + """A process for syncing files and running init commands on a node. + + Arguments: + node_id: the Node ID + provider_config: Provider section of autoscaler yaml + provider: NodeProvider Class + auth_config: Auth section of autoscaler yaml + cluster_name: the name of the cluster. + file_mounts: Map of remote to local paths + initialization_commands: Commands run before container launch + setup_commands: Commands run before ray starts + ray_start_commands: Commands to start ray + runtime_hash: Used to check for config changes + file_mounts_contents_hash: Used to check for changes to file mounts + is_head_node: Whether to use head start/setup commands + process_runner: the module to use to run the commands + in the CommandRunner. E.g., subprocess. + use_internal_ip: Wwhether the node_id belongs to an internal ip + or external ip. + docker_config: Docker section of autoscaler yaml + """ def __init__(self, node_id, @@ -39,6 +60,7 @@ class NodeUpdater: ray_start_commands, runtime_hash, file_mounts_contents_hash, + is_head_node, node_resources=None, cluster_synced_files=None, process_runner=subprocess, @@ -68,6 +90,7 @@ class NodeUpdater: self.file_mounts_contents_hash = file_mounts_contents_hash self.cluster_synced_files = cluster_synced_files self.auth_config = auth_config + self.is_head_node = is_head_node def run(self): cli_logger.old_info(logger, "{}Updating to {}", self.log_prefix, @@ -202,6 +225,7 @@ class NodeUpdater: "{}Waiting for remote shell...", self.log_prefix) + # Run outside of the container self.cmd_runner.run("uptime", run_env="host") cli_logger.old_debug(logger, "Uptime succeeded.") cli_logger.success("Success.") @@ -264,6 +288,12 @@ class NodeUpdater: "{}{} already up-to-date, skip to ray start", self.log_prefix, self.node_id) + # When resuming from a stopped instance the runtime_hash may be the + # same, but the container will not be started. + if isinstance(self.cmd_runner, DockerCommandRunner): + self.cmd_runner.run_init( + as_head=self.is_head_node, file_mounts=self.file_mounts) + else: cli_logger.print( "Updating cluster configuration.", @@ -296,6 +326,7 @@ class NodeUpdater: # with a new SSHOptions class that uses # this ssh_private_key as its only __init__ # argument. + # Run outside docker. self.cmd_runner.run( cmd, ssh_options_override_ssh_key=self. @@ -314,7 +345,10 @@ class NodeUpdater: cli_logger.print( "No initialization commands to run.", _numbered=("[]", 3, 6)) - + if isinstance(self.cmd_runner, DockerCommandRunner): + self.cmd_runner.run_init( + as_head=self.is_head_node, + file_mounts=self.file_mounts) if self.setup_commands: with cli_logger.group( "Running setup commands", @@ -337,7 +371,8 @@ class NodeUpdater: _numbered=("()", i, total)) try: - self.cmd_runner.run(cmd) + # Runs in the container if docker is in use + self.cmd_runner.run(cmd, run_env="auto") except ProcessRunnerError as e: if e.msg_type == "ssh_command_failed": cli_logger.error("Failed.") @@ -365,8 +400,11 @@ class NodeUpdater: try: old_redirected = cmd_output_util.is_output_redirected() cmd_output_util.set_output_redirected(False) + # Runs in the container if docker is in use self.cmd_runner.run( - cmd, environment_variables=env_vars) + cmd, + environment_variables=env_vars, + run_env="auto") cmd_output_util.set_output_redirected(old_redirected) except ProcessRunnerError as e: if e.msg_type == "ssh_command_failed": diff --git a/python/ray/autoscaler/util.py b/python/ray/autoscaler/util.py index 8525a180b..1dbbbfd88 100644 --- a/python/ray/autoscaler/util.py +++ b/python/ray/autoscaler/util.py @@ -9,7 +9,7 @@ from typing import Any, Dict import ray import ray.services as services from ray.autoscaler.node_provider import get_default_config -from ray.autoscaler.docker import dockerize_if_needed +from ray.autoscaler.docker import validate_docker_config REQUIRED, OPTIONAL = True, False RAY_SCHEMA_PATH = os.path.join( @@ -56,7 +56,7 @@ def validate_config(config: Dict[str, Any]) -> None: try: jsonschema.validate(config, schema) except jsonschema.ValidationError as e: - raise jsonschema.ValidationError(message=e.message) from None + raise e from None # Detect out of date defaults. This happens when the autoscaler that filled # out the default values is older than the version of the autoscaler that @@ -90,7 +90,7 @@ def validate_config(config: Dict[str, Any]) -> None: def prepare_config(config): with_defaults = fillout_defaults(config) merge_setup_commands(with_defaults) - dockerize_if_needed(with_defaults) + validate_docker_config(with_defaults) return with_defaults diff --git a/python/ray/tests/test_command_runner.py b/python/ray/tests/test_command_runner.py index 7e94d1c61..c540a6989 100644 --- a/python/ray/tests/test_command_runner.py +++ b/python/ray/tests/test_command_runner.py @@ -114,8 +114,7 @@ def test_docker_command_runner(): "docker_config": docker_config, } cmd_runner = DockerCommandRunner(**args) - process_runner.assert_has_call("1.2.3.4", "command -v docker") - process_runner.clear_history() + assert len(process_runner.calls) == 0, "No calls should be made in ctor" env_vars = {"var1": "quote between this \" and this", "var2": "123"} cmd_runner.run("echo hello", environment_variables=env_vars)