diff --git a/python/ray/autoscaler/_private/command_runner.py b/python/ray/autoscaler/_private/command_runner.py index 00b1b194e..6db2c92fa 100644 --- a/python/ray/autoscaler/_private/command_runner.py +++ b/python/ray/autoscaler/_private/command_runner.py @@ -16,7 +16,6 @@ from ray.autoscaler._private.docker import check_bind_mounts_cmd, \ check_docker_running_cmd, \ check_docker_image, \ docker_start_cmds, \ - DOCKER_MOUNT_PREFIX, \ with_docker_exec from ray.autoscaler._private.log_timer import LogTimer @@ -292,6 +291,7 @@ class SSHCommandRunner(CommandRunnerInterface): ssh_user_hash[:HASH_MAX_LENGTH], ssh_control_hash[:HASH_MAX_LENGTH]) + self.cluster_name = cluster_name self.log_prefix = log_prefix self.process_runner = process_runner self.node_id = node_id @@ -597,8 +597,9 @@ class DockerCommandRunner(CommandRunnerInterface): def run_rsync_up(self, source, target, options=None): options = options or {} - host_destination = os.path.join(DOCKER_MOUNT_PREFIX, - target.lstrip("/")) + host_destination = os.path.join( + self._get_docker_host_mount_location( + self.ssh_command_runner.cluster_name), target.lstrip("/")) self.ssh_command_runner.run( f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}") @@ -617,7 +618,9 @@ class DockerCommandRunner(CommandRunnerInterface): def run_rsync_down(self, source, target, options=None): options = options or {} - host_source = os.path.join(DOCKER_MOUNT_PREFIX, source.lstrip("/")) + host_source = os.path.join( + self._get_docker_host_mount_location( + self.ssh_command_runner.cluster_name), source.lstrip("/")) self.ssh_command_runner.run( f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}") if source[-1] == "/": @@ -709,7 +712,8 @@ class DockerCommandRunner(CommandRunnerInterface): self.docker_config.get( "run_options", []) + self.docker_config.get( f"{'head' if as_head else 'worker'}_run_options", - []) + self._configure_runtime()) + []) + self._configure_runtime(), + self.ssh_command_runner.cluster_name) self.run(start_command, run_env="host") else: running_image = self.run( @@ -746,7 +750,9 @@ class DockerCommandRunner(CommandRunnerInterface): if mount in file_mounts: self.ssh_command_runner.run( "docker cp {src} {container}:{dst}".format( - src=os.path.join(DOCKER_MOUNT_PREFIX, mount), + src=os.path.join( + self._get_docker_host_mount_location( + self.ssh_command_runner.cluster_name), mount), container=self.container_name, dst=self._docker_expand_user(mount))) self.initialized = True @@ -769,3 +775,9 @@ class DockerCommandRunner(CommandRunnerInterface): return [] return [] + + def _get_docker_host_mount_location(self, cluster_name: str) -> str: + """Return the docker host mount directory location.""" + # Imported here due to circular dependency in imports. + from ray.autoscaler.sdk import get_docker_host_mount_location + return get_docker_host_mount_location(cluster_name) diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index 0991c86d6..4aca9eb49 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -1,6 +1,6 @@ import os -from ray.ray_constants import ( # noqa F401 +from ray.ray_constants import ( # noqa F401 AUTOSCALER_RESOURCE_REQUEST_CHANNEL, LOGGER_FORMAT, MEMORY_RESOURCE_UNIT_BYTES, RESOURCES_ENVIRONMENT_VARIABLE) @@ -36,6 +36,3 @@ AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S", BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) # Max number of retries to create an EC2 node (retry different subnet) BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5) - -# Host path that Docker mounts attach to -DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount" diff --git a/python/ray/autoscaler/_private/docker.py b/python/ray/autoscaler/_private/docker.py index 4d7f20733..62d49e590 100644 --- a/python/ray/autoscaler/_private/docker.py +++ b/python/ray/autoscaler/_private/docker.py @@ -4,8 +4,6 @@ try: # py3 except ImportError: # py2 from pipes import quote -from ray.autoscaler._private.constants import DOCKER_MOUNT_PREFIX - logger = logging.getLogger(__name__) @@ -66,8 +64,12 @@ def check_docker_image(cname): return _check_helper(cname, ".Config.Image") -def docker_start_cmds(user, image, mount_dict, cname, user_options): - mount = {f"{DOCKER_MOUNT_PREFIX}/{dst}": dst for dst in mount_dict} +def docker_start_cmds(user, image, mount_dict, container_name, user_options, + cluster_name): + # Imported here due to circular dependency. + from ray.autoscaler.sdk import get_docker_host_mount_location + docker_mount_prefix = get_docker_host_mount_location(cluster_name) + mount = {f"{docker_mount_prefix}/{dst}": dst for dst in mount_dict} # TODO(ilr) Move away from defaulting to /root/ mount_flags = " ".join([ @@ -82,7 +84,8 @@ def docker_start_cmds(user, image, mount_dict, cname, user_options): user_options_str = " ".join(user_options) docker_run = [ - "docker", "run", "--rm", "--name {}".format(cname), "-d", "-it", - mount_flags, env_flags, user_options_str, "--net=host", image, "bash" + "docker", "run", "--rm", "--name {}".format(container_name), "-d", + "-it", mount_flags, env_flags, user_options_str, "--net=host", image, + "bash" ] return " ".join(docker_run) diff --git a/python/ray/autoscaler/sdk.py b/python/ray/autoscaler/sdk.py index 334f45b3c..5e6e02637 100644 --- a/python/ray/autoscaler/sdk.py +++ b/python/ray/autoscaler/sdk.py @@ -208,3 +208,9 @@ def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]: """Fillout default values for a cluster_config based on the provider.""" from ray.autoscaler._private.util import fillout_defaults return fillout_defaults(config) + + +def get_docker_host_mount_location(cluster_name: str) -> str: + """Return host path that Docker mounts attach to.""" + docker_mount_prefix = "/tmp/ray_tmp_mount/{cluster_name}" + return docker_mount_prefix.format(cluster_name=cluster_name) diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 29e49f926..3f1e24e3f 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -15,7 +15,7 @@ import ray import ray._private.services as services from ray.autoscaler._private.util import prepare_config, validate_config from ray.autoscaler._private import commands -from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX +from ray.autoscaler.sdk import get_docker_host_mount_location from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.providers import (_NODE_PROVIDERS, @@ -502,14 +502,18 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call("1.2.3.4", "start_ray_head") self.assertEqual(self.provider.mock_nodes[0].node_type, None) runner.assert_has_call("1.2.3.4", pattern="docker run") + + docker_mount_prefix = get_docker_host_mount_location( + SMALL_CLUSTER["cluster_name"]) runner.assert_not_has_call( - "1.2.3.4", pattern="-v /tmp/ray_tmp_mount/~/ray_bootstrap_config") + "1.2.3.4", + pattern=f"-v {docker_mount_prefix}/~/ray_bootstrap_config") runner.assert_has_call( "1.2.3.4", - pattern="docker cp /tmp/ray_tmp_mount/~/ray_bootstrap_key.pem") - runner.assert_has_call( - "1.2.3.4", - pattern="docker cp /tmp/ray_tmp_mount/~/ray_bootstrap_config.yaml") + pattern=f"docker cp {docker_mount_prefix}/~/ray_bootstrap_key.pem") + pattern_to_assert = \ + f"docker cp {docker_mount_prefix}/~/ray_bootstrap_config.yaml" + runner.assert_has_call("1.2.3.4", pattern=pattern_to_assert) @unittest.skipIf(sys.platform == "win32", "Failing on Windows.") def testRsyncCommandWithDocker(self): @@ -1473,12 +1477,13 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes( 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) autoscaler.update() - + docker_mount_prefix = get_docker_host_mount_location( + config["cluster_name"]) for i in [0, 1]: runner.assert_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_has_call( f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" - f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") + f"{docker_mount_prefix}/home/test-folder/") runner.clear_history() @@ -1498,7 +1503,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call( f"172.0.0.{i}", f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" - f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") + f"{docker_mount_prefix}/home/test-folder/") def testFileMountsNonContinuous(self): file_mount_dir = tempfile.mkdtemp() @@ -1525,13 +1530,15 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes( 2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE}) autoscaler.update() + docker_mount_prefix = get_docker_host_mount_location( + config["cluster_name"]) for i in [0, 1]: runner.assert_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_has_call( f"172.0.0.{i}", f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" - f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") + f"{docker_mount_prefix}/home/test-folder/") runner.clear_history() @@ -1548,7 +1555,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_not_has_call(f"172.0.0.{i}", "setup_cmd") runner.assert_not_has_call( f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" - f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") + f"{docker_mount_prefix}/home/test-folder/") # Simulate a second `ray up` call from ray.autoscaler._private import util @@ -1574,7 +1581,7 @@ class AutoscalingTest(unittest.TestCase): runner.assert_has_call( f"172.0.0.{i}", f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:" - f"{DOCKER_MOUNT_PREFIX}/home/test-folder/") + f"{docker_mount_prefix}/home/test-folder/") if __name__ == "__main__": diff --git a/python/ray/tests/test_command_runner.py b/python/ray/tests/test_command_runner.py index c28d25130..3df75cbde 100644 --- a/python/ray/tests/test_command_runner.py +++ b/python/ray/tests/test_command_runner.py @@ -7,7 +7,7 @@ from ray.tests.test_autoscaler import MockProvider, MockProcessRunner from ray.autoscaler.command_runner import CommandRunnerInterface from ray.autoscaler._private.command_runner import SSHCommandRunner, \ DockerCommandRunner, KubernetesCommandRunner, _with_environment_variables -from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX +from ray.autoscaler.sdk import get_docker_host_mount_location from getpass import getuser import hashlib @@ -242,11 +242,12 @@ def test_docker_rsync(): local_mount = "/home/ubuntu/base/mount/" remote_mount = "/root/protected_mount/" - remote_host_mount = f"{DOCKER_MOUNT_PREFIX}{remote_mount}" + docker_mount_prefix = get_docker_host_mount_location(cluster_name) + remote_host_mount = f"{docker_mount_prefix}{remote_mount}" local_file = "/home/ubuntu/base-file" remote_file = "/root/protected-file" - remote_host_file = f"{DOCKER_MOUNT_PREFIX}{remote_file}" + remote_host_file = f"{docker_mount_prefix}{remote_file}" process_runner.respond_to_call("docker inspect -f", ["true"]) cmd_runner.run_rsync_up(