From 283f4d1060060d3a10a62ac5d74c9ca45c596791 Mon Sep 17 00:00:00 2001 From: Ian Rodney Date: Tue, 1 Sep 2020 13:14:35 -0700 Subject: [PATCH] [docker] Use tmp paths for rsync and fix file_mounts on docker (#10368) --- python/ray/autoscaler/command_runner.py | 99 +++++++++++++++--------- python/ray/autoscaler/commands.py | 8 +- python/ray/autoscaler/docker.py | 36 ++++----- python/ray/autoscaler/updater.py | 22 ++++-- python/ray/tests/test_autoscaler.py | 56 +++++++++----- python/ray/tests/test_command_runner.py | 83 ++++++++++++++++++++ src/ray/core_worker/lib/java/jni_utils.h | 3 +- 7 files changed, 215 insertions(+), 92 deletions(-) diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index c6ae24669..5b6f2df71 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -1,19 +1,20 @@ from getpass import getuser from shlex import quote -from typing import List, Tuple, Dict +from typing import Any, List, Tuple, Dict, Optional import click import hashlib +import json import logging import os import subprocess import sys import time -import json -from ray.autoscaler.docker import check_docker_running_cmd, \ +from ray.autoscaler.docker import check_bind_mounts_cmd, \ + check_docker_running_cmd, \ check_docker_image, \ - docker_autoscaler_setup, \ docker_start_cmds, \ + DOCKER_MOUNT_PREFIX, \ with_docker_exec from ray.autoscaler.log_timer import LogTimer @@ -134,7 +135,10 @@ class CommandRunnerInterface: """ raise NotImplementedError - def run_rsync_up(self, source: str, target: str) -> None: + def run_rsync_up(self, + source: str, + target: str, + options: Optional[Dict[str, Any]] = None) -> None: """Rsync files up to the cluster node. Args: @@ -143,7 +147,10 @@ class CommandRunnerInterface: """ raise NotImplementedError - def run_rsync_down(self, source: str, target: str) -> None: + def run_rsync_down(self, + source: str, + target: str, + options: Optional[Dict[str, Any]] = None) -> None: """Rsync files down from the cluster node. Args: @@ -231,7 +238,7 @@ class KubernetesCommandRunner(CommandRunnerInterface): else: raise - def run_rsync_up(self, source, target): + def run_rsync_up(self, source, target, options=None): if target.startswith("~"): target = "/root" + target[1:] @@ -257,7 +264,7 @@ class KubernetesCommandRunner(CommandRunnerInterface): target) ]) - def run_rsync_down(self, source, target): + def run_rsync_down(self, source, target, options=None): if target.startswith("~"): target = "/root" + target[1:] @@ -531,7 +538,7 @@ class SSHCommandRunner(CommandRunnerInterface): else: return self._run_helper(final_cmd, with_output, exit_on_fail) - def run_rsync_up(self, source, target): + def run_rsync_up(self, source, target, options=None): self._set_ssh_ip_if_required() command = [ "rsync", "--rsh", @@ -543,7 +550,7 @@ class SSHCommandRunner(CommandRunnerInterface): cli_logger.verbose("Running `{}`", cf.bold(" ".join(command))) self._run_helper(command, silent=is_rsync_silent()) - def run_rsync_down(self, source, target): + def run_rsync_down(self, source, target, options=None): self._set_ssh_ip_if_required() command = [ @@ -609,37 +616,41 @@ class DockerCommandRunner(CommandRunnerInterface): with_output=with_output, ssh_options_override_ssh_key=ssh_options_override_ssh_key) - def run_rsync_up(self, source, target): - # TODO(ilr) Expose this to before NodeUpdater::sync_file_mounts - protected_path = target - if target.find("/root") == 0: - target = target.replace("/root", "/tmp/root") + def run_rsync_up(self, source, target, options=None): + options = options or {} + host_destination = os.path.join(DOCKER_MOUNT_PREFIX, + target.lstrip("/")) + self.ssh_command_runner.run( - f"mkdir -p {os.path.dirname(target.rstrip('/'))}") - self.ssh_command_runner.run_rsync_up(source, target) - if self._check_container_status(): + f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}") + + self.ssh_command_runner.run_rsync_up( + source, host_destination, options=None) + if self._check_container_status() and not options.get( + "file_mount", False): if os.path.isdir(source): # Adding a "." means that docker copies the *contents* # Without it, docker copies the source *into* the target - target += "/." + host_destination += "/." self.ssh_command_runner.run("docker cp {} {}:{}".format( - target, self.container_name, - self._docker_expand_user(protected_path))) + host_destination, self.container_name, + self._docker_expand_user(target))) - def run_rsync_down(self, source, target): - protected_path = source - if source.find("/root") == 0: - source = source.replace("/root", "/tmp/root") + def run_rsync_down(self, source, target, options=None): + options = options or {} + host_source = os.path.join(DOCKER_MOUNT_PREFIX, source.lstrip("/")) self.ssh_command_runner.run( - f"mkdir -p {os.path.dirname(source.rstrip('/'))}") - if protected_path[-1] == "/": - protected_path += "." + f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}") + if source[-1] == "/": + source += "." # Adding a "." means that docker copies the *contents* # Without it, docker copies the source *into* the target - self.ssh_command_runner.run("docker cp {}:{} {}".format( - self.container_name, self._docker_expand_user(protected_path), - source)) - self.ssh_command_runner.run_rsync_down(source, target) + if not options.get("file_mount", False): + self.ssh_command_runner.run("docker cp {}:{} {}".format( + self.container_name, self._docker_expand_user(source), + host_source)) + self.ssh_command_runner.run_rsync_down( + host_source, target, options=None) def remote_shell_command_str(self): inner_str = self.ssh_command_runner.remote_shell_command_str().replace( @@ -722,9 +733,23 @@ class DockerCommandRunner(CommandRunnerInterface): logger.error(f"A container with name {self.container_name} " + f"is running image {running_image} instead " + f"of {image} (which was provided in the YAML") - - # Copy bootstrap config & key over - if as_head: - for copy_cmd in docker_autoscaler_setup(self.container_name): - self.run(copy_cmd, run_env="host") + mounts = self.run( + check_bind_mounts_cmd(self.container_name), + with_output=True, + run_env="host").decode("utf-8").strip() + try: + active_mounts = json.loads(mounts) + active_remote_mounts = [ + mnt["Destination"] for mnt in active_mounts + ] + for remote, local in file_mounts.items(): + remote = self._docker_expand_user(remote) + if remote not in active_remote_mounts: + cli_logger.error( + "Please ray stop & restart cluster to " + f"allow mount {remote}:{local} to take hold") + except json.JSONDecodeError: + cli_logger.verbose( + "Unable to check if file_mounts specified in the YAML " + "differ from those on the running container.") self.initialized = True diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index dd7a00599..83c13d145 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -664,13 +664,13 @@ def get_or_create_head_node(config, cli_logger.print("Prepared bootstrap config") if restart_only: - init_commands = [] + setup_commands = [] ray_start_commands = config["head_start_ray_commands"] elif no_restart: - init_commands = config["head_setup_commands"] + setup_commands = config["head_setup_commands"] ray_start_commands = [] else: - init_commands = config["head_setup_commands"] + setup_commands = config["head_setup_commands"] ray_start_commands = config["head_start_ray_commands"] if not no_restart: @@ -684,7 +684,7 @@ def get_or_create_head_node(config, cluster_name=config["cluster_name"], file_mounts=config["file_mounts"], initialization_commands=config["initialization_commands"], - setup_commands=init_commands, + setup_commands=setup_commands, ray_start_commands=ray_start_commands, process_runner=_runner, runtime_hash=runtime_hash, diff --git a/python/ray/autoscaler/docker.py b/python/ray/autoscaler/docker.py index ad643bf10..80b99b7d7 100644 --- a/python/ray/autoscaler/docker.py +++ b/python/ray/autoscaler/docker.py @@ -1,4 +1,3 @@ -import os import logging try: # py3 from shlex import quote @@ -7,6 +6,8 @@ except ImportError: # py2 logger = logging.getLogger(__name__) +DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount" + def validate_docker_config(config): if "docker" not in config: @@ -46,20 +47,27 @@ def with_docker_exec(cmds, ] -def check_docker_running_cmd(cname): +def _check_helper(cname, template): return " ".join([ - "docker", "inspect", "-f", "'{{.State.Running}}'", cname, "||", "true" + "docker", "inspect", "-f", "'{{" + template + "}}'", cname, "||", + "true" ]) +def check_docker_running_cmd(cname): + return _check_helper(cname, ".State.Running") + + +def check_bind_mounts_cmd(cname): + return _check_helper(cname, "json .Mounts") + + def check_docker_image(cname): - return " ".join([ - "docker", "inspect", "-f", "'{{.Config.Image}}'", cname, "||", "true" - ]) + return _check_helper(cname, ".Config.Image") def docker_start_cmds(user, image, mount_dict, cname, user_options): - mount = {dst: dst for dst in mount_dict} + mount = {f"{DOCKER_MOUNT_PREFIX}/{dst}": dst for dst in mount_dict} # TODO(ilr) Move away from defaulting to /root/ mount_flags = " ".join([ @@ -78,17 +86,3 @@ def docker_start_cmds(user, image, mount_dict, cname, user_options): mount_flags, env_flags, user_options_str, "--net=host", image, "bash" ] return " ".join(docker_run) - - -def docker_autoscaler_setup(cname): - cmds = [] - for path in ["~/ray_bootstrap_config.yaml", "~/ray_bootstrap_key.pem"]: - # needed because docker doesn't allow relative paths - base_path = os.path.basename(path) - cmds.append("docker cp {path} {cname}:{dpath}".format( - path=path, dpath=base_path, cname=cname)) - cmds.extend( - with_docker_exec( - ["cp {} {}".format("/" + base_path, path)], - container_name=cname)) - return cmds diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 8f27a4677..351f9f24e 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -181,10 +181,12 @@ class NodeUpdater: with LogTimer(self.log_prefix + "Synced {} to {}".format(local_path, remote_path)): - self.cmd_runner.run( - "mkdir -p {}".format(os.path.dirname(remote_path)), - run_env="host") - sync_cmd(local_path, remote_path) + if not isinstance(self.cmd_runner, DockerCommandRunner): + # The DockerCommandRunner handles this internally + self.cmd_runner.run( + "mkdir -p {}".format(os.path.dirname(remote_path)), + run_env="host") + sync_cmd(local_path, remote_path, file_mount=True) if remote_path not in nolog_paths: # todo: timed here? @@ -413,19 +415,23 @@ class NodeUpdater: raise click.ClickException("Start command failed.") - def rsync_up(self, source, target): + def rsync_up(self, source, target, file_mount=False): cli_logger.old_info(logger, "{}Syncing {} to {}...", self.log_prefix, source, target) - self.cmd_runner.run_rsync_up(source, target) + options = {} + options["file_mount"] = file_mount + self.cmd_runner.run_rsync_up(source, target, options=options) cli_logger.verbose("`rsync`ed {} (local) to {} (remote)", cf.bold(source), cf.bold(target)) - def rsync_down(self, source, target): + def rsync_down(self, source, target, file_mount=False): cli_logger.old_info(logger, "{}Syncing {} from {}...", self.log_prefix, source, target) - self.cmd_runner.run_rsync_down(source, target) + options = {} + options["file_mount"] = file_mount + self.cmd_runner.run_rsync_down(source, target, options=options) cli_logger.verbose("`rsync`ed {} (remote) to {} (local)", cf.bold(source), cf.bold(target)) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 1ad1ed69a..ba7eb9588 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -12,6 +12,7 @@ import ray import ray.services as services from ray.autoscaler.util import prepare_config, validate_config from ray.autoscaler.commands import get_or_create_head_node +from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX from ray.autoscaler.load_metrics import LoadMetrics from ray.autoscaler.autoscaler import StandardAutoscaler from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \ @@ -42,6 +43,7 @@ class MockProcessRunner: def __init__(self, fail_cmds=[]): self.calls = [] self.fail_cmds = fail_cmds + self.call_response = {} def check_call(self, cmd, *args, **kwargs): for token in self.fail_cmds: @@ -51,7 +53,18 @@ class MockProcessRunner: def check_output(self, cmd): self.check_call(cmd) - return "command-output".encode() + return_string = "command-output" + key_to_delete = None + for pattern, pair in self.call_response.items(): + if pattern in str(cmd): + return_string = pair[0] + if pair[1] - 1 == 0: + key_to_delete = pattern + break + if key_to_delete: + del self.call_response[key_to_delete] + + return return_string.encode() def assert_has_call(self, ip, pattern=None, exact=None): assert pattern or exact, \ @@ -95,6 +108,9 @@ class MockProcessRunner: def clear_history(self): self.calls = [] + def respond_to_call(self, pattern, response, num_times=1): + self.call_response[pattern] = (response, num_times) + class MockProvider(NodeProvider): def __init__(self, cache_stopped=False): @@ -1144,11 +1160,10 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() for i in [0, 1]: - runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_has_call( - "172.0.0.{}".format(i), - "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( - file_mount_dir, i)) + f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" + f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") runner.clear_history() @@ -1164,11 +1179,11 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() for i in [0, 1]: - runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_not_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_has_call( - "172.0.0.{}".format(i), - "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( - file_mount_dir, i)) + f"172.0.0.{i}", f"172.0.0.{i}", + f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" + f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") def testFileMountsNonContinuous(self): file_mount_dir = tempfile.mkdtemp() @@ -1197,11 +1212,11 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() for i in [0, 1]: - runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_has_call( - "172.0.0.{}".format(i), - "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( - file_mount_dir, i)) + f"172.0.0.{i}", f"172.0.0.{i}", + f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" + f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") runner.clear_history() @@ -1215,11 +1230,10 @@ class AutoscalingTest(unittest.TestCase): 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) for i in [0, 1]: - runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_not_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_not_has_call( - "172.0.0.{}".format(i), - "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( - file_mount_dir, i)) + f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" + f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") # Simulate a second `ray up` call from ray.autoscaler import util @@ -1241,11 +1255,11 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() for i in [0, 1]: - runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd") + runner.assert_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_has_call( - "172.0.0.{}".format(i), - "{}/ ubuntu@172.0.0.{}:/home/test-folder/".format( - file_mount_dir, i)) + f"172.0.0.{i}", f"172.0.0.{i}", + f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" + f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") if __name__ == "__main__": diff --git a/python/ray/tests/test_command_runner.py b/python/ray/tests/test_command_runner.py index 164ee32aa..12f34eb7b 100644 --- a/python/ray/tests/test_command_runner.py +++ b/python/ray/tests/test_command_runner.py @@ -2,6 +2,7 @@ import pytest from ray.tests.test_autoscaler import MockProvider, MockProcessRunner from ray.autoscaler.command_runner import SSHCommandRunner, \ _with_environment_variables, DockerCommandRunner +from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX from getpass import getuser import hashlib @@ -141,6 +142,88 @@ def test_docker_command_runner(): process_runner.assert_has_call("1.2.3.4", exact=expected) +def test_docker_rsync(): + process_runner = MockProcessRunner() + provider = MockProvider() + provider.create_node({}, {}, 1) + cluster_name = "cluster" + docker_config = {"container_name": "container"} + args = { + "log_prefix": "prefix", + "node_id": 0, + "provider": provider, + "auth_config": auth_config, + "cluster_name": cluster_name, + "process_runner": process_runner, + "use_internal_ip": False, + "docker_config": docker_config, + } + cmd_runner = DockerCommandRunner(**args) + + local_mount = "/home/ubuntu/base/mount/" + remote_mount = "/root/protected_mount/" + remote_host_mount = f"{DOCKER_MOUNT_PREFIX}{remote_mount}" + + local_file = "/home/ubuntu/base-file" + remote_file = "/root/protected-file" + remote_host_file = f"{DOCKER_MOUNT_PREFIX}{remote_file}" + + process_runner.respond_to_call("docker inspect -f", "true") + cmd_runner.run_rsync_up( + local_mount, remote_mount, options={"file_mount": True}) + + # Make sure we do not copy directly to raw destination + process_runner.assert_not_has_call( + "1.2.3.4", pattern=f"-avz {local_mount} ray@1.2.3.4:{remote_mount}") + process_runner.assert_not_has_call( + "1.2.3.4", pattern=f"mkdir -p {remote_mount}") + # No docker cp for file_mounts + process_runner.assert_not_has_call("1.2.3.4", pattern=f"docker cp") + process_runner.assert_has_call( + "1.2.3.4", + pattern=f"-avz {local_mount} ray@1.2.3.4:{remote_host_mount}") + process_runner.clear_history() + ############################## + + process_runner.respond_to_call("docker inspect -f", "true") + cmd_runner.run_rsync_up( + local_file, remote_file, options={"file_mount": False}) + + # Make sure we do not copy directly to raw destination + process_runner.assert_not_has_call( + "1.2.3.4", pattern=f"-avz {local_file} ray@1.2.3.4:{remote_file}") + process_runner.assert_not_has_call( + "1.2.3.4", pattern=f"mkdir -p {remote_file}") + + process_runner.assert_has_call("1.2.3.4", pattern=f"docker cp") + process_runner.assert_has_call( + "1.2.3.4", pattern=f"-avz {local_file} ray@1.2.3.4:{remote_host_file}") + process_runner.clear_history() + ############################## + + cmd_runner.run_rsync_down( + remote_mount, local_mount, options={"file_mount": True}) + + process_runner.assert_not_has_call("1.2.3.4", pattern=f"docker cp") + process_runner.assert_not_has_call( + "1.2.3.4", pattern=f"-avz ray@1.2.3.4:{remote_mount} {local_mount}") + process_runner.assert_has_call( + "1.2.3.4", + pattern=f"-avz ray@1.2.3.4:{remote_host_mount} {local_mount}") + + process_runner.clear_history() + ############################## + + cmd_runner.run_rsync_down( + remote_file, local_file, options={"file_mount": False}) + + process_runner.assert_has_call("1.2.3.4", pattern=f"docker cp") + process_runner.assert_not_has_call( + "1.2.3.4", pattern=f"-avz ray@1.2.3.4:{remote_file} {local_file}") + process_runner.assert_has_call( + "1.2.3.4", pattern=f"-avz ray@1.2.3.4:{remote_host_file} {local_file}") + + if __name__ == "__main__": import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/lib/java/jni_utils.h b/src/ray/core_worker/lib/java/jni_utils.h index 02e9f1731..058810dcd 100644 --- a/src/ray/core_worker/lib/java/jni_utils.h +++ b/src/ray/core_worker/lib/java/jni_utils.h @@ -257,7 +257,8 @@ inline ID JavaByteArrayToId(JNIEnv *env, const jbyteArray &bytes) { env->GetByteArrayRegion(bytes, 0, ID::Size(), reinterpret_cast(&id_str.front())); auto arr_size = env->GetArrayLength(bytes); - RAY_CHECK(arr_size == ID::Size()) << "ID length should be " << ID::Size() << " instead of " << arr_size; + RAY_CHECK(arr_size == ID::Size()) + << "ID length should be " << ID::Size() << " instead of " << arr_size; return ID::FromBinary(id_str); }