mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 18:44:07 +08:00
[autoscaler] Add the cluster_name to docker file mounts directory prefix to make it more unique (#11600)
This commit is contained in:
committed by
Edward Oakes
parent
bcc92f59fd
commit
9ba8f72ff1
@@ -16,7 +16,6 @@ from ray.autoscaler._private.docker import check_bind_mounts_cmd, \
|
||||
check_docker_running_cmd, \
|
||||
check_docker_image, \
|
||||
docker_start_cmds, \
|
||||
DOCKER_MOUNT_PREFIX, \
|
||||
with_docker_exec
|
||||
from ray.autoscaler._private.log_timer import LogTimer
|
||||
|
||||
@@ -292,6 +291,7 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
ssh_user_hash[:HASH_MAX_LENGTH],
|
||||
ssh_control_hash[:HASH_MAX_LENGTH])
|
||||
|
||||
self.cluster_name = cluster_name
|
||||
self.log_prefix = log_prefix
|
||||
self.process_runner = process_runner
|
||||
self.node_id = node_id
|
||||
@@ -597,8 +597,9 @@ class DockerCommandRunner(CommandRunnerInterface):
|
||||
|
||||
def run_rsync_up(self, source, target, options=None):
|
||||
options = options or {}
|
||||
host_destination = os.path.join(DOCKER_MOUNT_PREFIX,
|
||||
target.lstrip("/"))
|
||||
host_destination = os.path.join(
|
||||
self._get_docker_host_mount_location(
|
||||
self.ssh_command_runner.cluster_name), target.lstrip("/"))
|
||||
|
||||
self.ssh_command_runner.run(
|
||||
f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}")
|
||||
@@ -617,7 +618,9 @@ class DockerCommandRunner(CommandRunnerInterface):
|
||||
|
||||
def run_rsync_down(self, source, target, options=None):
|
||||
options = options or {}
|
||||
host_source = os.path.join(DOCKER_MOUNT_PREFIX, source.lstrip("/"))
|
||||
host_source = os.path.join(
|
||||
self._get_docker_host_mount_location(
|
||||
self.ssh_command_runner.cluster_name), source.lstrip("/"))
|
||||
self.ssh_command_runner.run(
|
||||
f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}")
|
||||
if source[-1] == "/":
|
||||
@@ -709,7 +712,8 @@ class DockerCommandRunner(CommandRunnerInterface):
|
||||
self.docker_config.get(
|
||||
"run_options", []) + self.docker_config.get(
|
||||
f"{'head' if as_head else 'worker'}_run_options",
|
||||
[]) + self._configure_runtime())
|
||||
[]) + self._configure_runtime(),
|
||||
self.ssh_command_runner.cluster_name)
|
||||
self.run(start_command, run_env="host")
|
||||
else:
|
||||
running_image = self.run(
|
||||
@@ -746,7 +750,9 @@ class DockerCommandRunner(CommandRunnerInterface):
|
||||
if mount in file_mounts:
|
||||
self.ssh_command_runner.run(
|
||||
"docker cp {src} {container}:{dst}".format(
|
||||
src=os.path.join(DOCKER_MOUNT_PREFIX, mount),
|
||||
src=os.path.join(
|
||||
self._get_docker_host_mount_location(
|
||||
self.ssh_command_runner.cluster_name), mount),
|
||||
container=self.container_name,
|
||||
dst=self._docker_expand_user(mount)))
|
||||
self.initialized = True
|
||||
@@ -769,3 +775,9 @@ class DockerCommandRunner(CommandRunnerInterface):
|
||||
return []
|
||||
|
||||
return []
|
||||
|
||||
def _get_docker_host_mount_location(self, cluster_name: str) -> str:
|
||||
"""Return the docker host mount directory location."""
|
||||
# Imported here due to circular dependency in imports.
|
||||
from ray.autoscaler.sdk import get_docker_host_mount_location
|
||||
return get_docker_host_mount_location(cluster_name)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import os
|
||||
|
||||
from ray.ray_constants import ( # noqa F401
|
||||
from ray.ray_constants import ( # noqa F401
|
||||
AUTOSCALER_RESOURCE_REQUEST_CHANNEL, LOGGER_FORMAT,
|
||||
MEMORY_RESOURCE_UNIT_BYTES, RESOURCES_ENVIRONMENT_VARIABLE)
|
||||
|
||||
@@ -36,6 +36,3 @@ AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S",
|
||||
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
|
||||
# Max number of retries to create an EC2 node (retry different subnet)
|
||||
BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5)
|
||||
|
||||
# Host path that Docker mounts attach to
|
||||
DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount"
|
||||
|
||||
@@ -4,8 +4,6 @@ try: # py3
|
||||
except ImportError: # py2
|
||||
from pipes import quote
|
||||
|
||||
from ray.autoscaler._private.constants import DOCKER_MOUNT_PREFIX
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -66,8 +64,12 @@ def check_docker_image(cname):
|
||||
return _check_helper(cname, ".Config.Image")
|
||||
|
||||
|
||||
def docker_start_cmds(user, image, mount_dict, cname, user_options):
|
||||
mount = {f"{DOCKER_MOUNT_PREFIX}/{dst}": dst for dst in mount_dict}
|
||||
def docker_start_cmds(user, image, mount_dict, container_name, user_options,
|
||||
cluster_name):
|
||||
# Imported here due to circular dependency.
|
||||
from ray.autoscaler.sdk import get_docker_host_mount_location
|
||||
docker_mount_prefix = get_docker_host_mount_location(cluster_name)
|
||||
mount = {f"{docker_mount_prefix}/{dst}": dst for dst in mount_dict}
|
||||
|
||||
# TODO(ilr) Move away from defaulting to /root/
|
||||
mount_flags = " ".join([
|
||||
@@ -82,7 +84,8 @@ def docker_start_cmds(user, image, mount_dict, cname, user_options):
|
||||
|
||||
user_options_str = " ".join(user_options)
|
||||
docker_run = [
|
||||
"docker", "run", "--rm", "--name {}".format(cname), "-d", "-it",
|
||||
mount_flags, env_flags, user_options_str, "--net=host", image, "bash"
|
||||
"docker", "run", "--rm", "--name {}".format(container_name), "-d",
|
||||
"-it", mount_flags, env_flags, user_options_str, "--net=host", image,
|
||||
"bash"
|
||||
]
|
||||
return " ".join(docker_run)
|
||||
|
||||
@@ -208,3 +208,9 @@ def fillout_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Fillout default values for a cluster_config based on the provider."""
|
||||
from ray.autoscaler._private.util import fillout_defaults
|
||||
return fillout_defaults(config)
|
||||
|
||||
|
||||
def get_docker_host_mount_location(cluster_name: str) -> str:
|
||||
"""Return host path that Docker mounts attach to."""
|
||||
docker_mount_prefix = "/tmp/ray_tmp_mount/{cluster_name}"
|
||||
return docker_mount_prefix.format(cluster_name=cluster_name)
|
||||
|
||||
@@ -15,7 +15,7 @@ import ray
|
||||
import ray._private.services as services
|
||||
from ray.autoscaler._private.util import prepare_config, validate_config
|
||||
from ray.autoscaler._private import commands
|
||||
from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX
|
||||
from ray.autoscaler.sdk import get_docker_host_mount_location
|
||||
from ray.autoscaler._private.load_metrics import LoadMetrics
|
||||
from ray.autoscaler._private.autoscaler import StandardAutoscaler
|
||||
from ray.autoscaler._private.providers import (_NODE_PROVIDERS,
|
||||
@@ -502,14 +502,18 @@ class AutoscalingTest(unittest.TestCase):
|
||||
runner.assert_has_call("1.2.3.4", "start_ray_head")
|
||||
self.assertEqual(self.provider.mock_nodes[0].node_type, None)
|
||||
runner.assert_has_call("1.2.3.4", pattern="docker run")
|
||||
|
||||
docker_mount_prefix = get_docker_host_mount_location(
|
||||
SMALL_CLUSTER["cluster_name"])
|
||||
runner.assert_not_has_call(
|
||||
"1.2.3.4", pattern="-v /tmp/ray_tmp_mount/~/ray_bootstrap_config")
|
||||
"1.2.3.4",
|
||||
pattern=f"-v {docker_mount_prefix}/~/ray_bootstrap_config")
|
||||
runner.assert_has_call(
|
||||
"1.2.3.4",
|
||||
pattern="docker cp /tmp/ray_tmp_mount/~/ray_bootstrap_key.pem")
|
||||
runner.assert_has_call(
|
||||
"1.2.3.4",
|
||||
pattern="docker cp /tmp/ray_tmp_mount/~/ray_bootstrap_config.yaml")
|
||||
pattern=f"docker cp {docker_mount_prefix}/~/ray_bootstrap_key.pem")
|
||||
pattern_to_assert = \
|
||||
f"docker cp {docker_mount_prefix}/~/ray_bootstrap_config.yaml"
|
||||
runner.assert_has_call("1.2.3.4", pattern=pattern_to_assert)
|
||||
|
||||
@unittest.skipIf(sys.platform == "win32", "Failing on Windows.")
|
||||
def testRsyncCommandWithDocker(self):
|
||||
@@ -1473,12 +1477,13 @@ class AutoscalingTest(unittest.TestCase):
|
||||
self.waitForNodes(
|
||||
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
|
||||
autoscaler.update()
|
||||
|
||||
docker_mount_prefix = get_docker_host_mount_location(
|
||||
config["cluster_name"])
|
||||
for i in [0, 1]:
|
||||
runner.assert_has_call(f"172.0.0.{i}", "setup_cmd")
|
||||
runner.assert_has_call(
|
||||
f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
|
||||
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
|
||||
f"{docker_mount_prefix}/home/test-folder/")
|
||||
|
||||
runner.clear_history()
|
||||
|
||||
@@ -1498,7 +1503,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
runner.assert_has_call(
|
||||
f"172.0.0.{i}", f"172.0.0.{i}",
|
||||
f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
|
||||
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
|
||||
f"{docker_mount_prefix}/home/test-folder/")
|
||||
|
||||
def testFileMountsNonContinuous(self):
|
||||
file_mount_dir = tempfile.mkdtemp()
|
||||
@@ -1525,13 +1530,15 @@ class AutoscalingTest(unittest.TestCase):
|
||||
self.waitForNodes(
|
||||
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
|
||||
autoscaler.update()
|
||||
docker_mount_prefix = get_docker_host_mount_location(
|
||||
config["cluster_name"])
|
||||
|
||||
for i in [0, 1]:
|
||||
runner.assert_has_call(f"172.0.0.{i}", "setup_cmd")
|
||||
runner.assert_has_call(
|
||||
f"172.0.0.{i}", f"172.0.0.{i}",
|
||||
f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
|
||||
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
|
||||
f"{docker_mount_prefix}/home/test-folder/")
|
||||
|
||||
runner.clear_history()
|
||||
|
||||
@@ -1548,7 +1555,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
runner.assert_not_has_call(f"172.0.0.{i}", "setup_cmd")
|
||||
runner.assert_not_has_call(
|
||||
f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
|
||||
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
|
||||
f"{docker_mount_prefix}/home/test-folder/")
|
||||
|
||||
# Simulate a second `ray up` call
|
||||
from ray.autoscaler._private import util
|
||||
@@ -1574,7 +1581,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
runner.assert_has_call(
|
||||
f"172.0.0.{i}", f"172.0.0.{i}",
|
||||
f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
|
||||
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
|
||||
f"{docker_mount_prefix}/home/test-folder/")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -7,7 +7,7 @@ from ray.tests.test_autoscaler import MockProvider, MockProcessRunner
|
||||
from ray.autoscaler.command_runner import CommandRunnerInterface
|
||||
from ray.autoscaler._private.command_runner import SSHCommandRunner, \
|
||||
DockerCommandRunner, KubernetesCommandRunner, _with_environment_variables
|
||||
from ray.autoscaler._private.docker import DOCKER_MOUNT_PREFIX
|
||||
from ray.autoscaler.sdk import get_docker_host_mount_location
|
||||
from getpass import getuser
|
||||
import hashlib
|
||||
|
||||
@@ -242,11 +242,12 @@ def test_docker_rsync():
|
||||
|
||||
local_mount = "/home/ubuntu/base/mount/"
|
||||
remote_mount = "/root/protected_mount/"
|
||||
remote_host_mount = f"{DOCKER_MOUNT_PREFIX}{remote_mount}"
|
||||
docker_mount_prefix = get_docker_host_mount_location(cluster_name)
|
||||
remote_host_mount = f"{docker_mount_prefix}{remote_mount}"
|
||||
|
||||
local_file = "/home/ubuntu/base-file"
|
||||
remote_file = "/root/protected-file"
|
||||
remote_host_file = f"{DOCKER_MOUNT_PREFIX}{remote_file}"
|
||||
remote_host_file = f"{docker_mount_prefix}{remote_file}"
|
||||
|
||||
process_runner.respond_to_call("docker inspect -f", ["true"])
|
||||
cmd_runner.run_rsync_up(
|
||||
|
||||
Reference in New Issue
Block a user