mirror of
https://github.com/wassname/ray.git
synced 2026-07-04 21:04:35 +08:00
[docker] Support non-root container (#11407)
This commit is contained in:
@@ -25,6 +25,8 @@ from ray.autoscaler._private.subprocess_output_util import (
|
||||
from ray.autoscaler._private.cli_logger import cli_logger, cf
|
||||
from ray.util.debug import log_once
|
||||
|
||||
from ray.autoscaler._private.constants import RAY_HOME
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# How long to wait for a node to start, in seconds
|
||||
@@ -190,7 +192,7 @@ class KubernetesCommandRunner(CommandRunnerInterface):
|
||||
logger.warning("'rsync_filter' detected but is currently "
|
||||
"unsupported for k8s.")
|
||||
if target.startswith("~"):
|
||||
target = "/root" + target[1:]
|
||||
target = RAY_HOME + target[1:]
|
||||
|
||||
try:
|
||||
flags = "-aqz" if is_rsync_silent() else "-avz"
|
||||
@@ -206,7 +208,7 @@ class KubernetesCommandRunner(CommandRunnerInterface):
|
||||
"rsync failed: '{}'. Falling back to 'kubectl cp'".format(e),
|
||||
UserWarning)
|
||||
if target.startswith("~"):
|
||||
target = "/root" + target[1:]
|
||||
target = RAY_HOME + target[1:]
|
||||
|
||||
self.process_runner.check_call(self.kubectl + [
|
||||
"cp", source, "{}/{}:{}".format(self.namespace, self.node_id,
|
||||
@@ -215,7 +217,7 @@ class KubernetesCommandRunner(CommandRunnerInterface):
|
||||
|
||||
def run_rsync_down(self, source, target, options=None):
|
||||
if target.startswith("~"):
|
||||
target = "/root" + target[1:]
|
||||
target = RAY_HOME + target[1:]
|
||||
|
||||
try:
|
||||
flags = "-aqz" if is_rsync_silent() else "-avz"
|
||||
@@ -231,7 +233,7 @@ class KubernetesCommandRunner(CommandRunnerInterface):
|
||||
"rsync failed: '{}'. Falling back to 'kubectl cp'".format(e),
|
||||
UserWarning)
|
||||
if target.startswith("~"):
|
||||
target = "/root" + target[1:]
|
||||
target = RAY_HOME + target[1:]
|
||||
|
||||
self.process_runner.check_call(self.kubectl + [
|
||||
"cp", "{}/{}:{}".format(self.namespace, self.node_id, source),
|
||||
@@ -699,6 +701,16 @@ class DockerCommandRunner(CommandRunnerInterface):
|
||||
cleaned_bind_mounts.pop(mnt, None)
|
||||
|
||||
if not self._check_container_status():
|
||||
# Get home directory
|
||||
image_env = self.ssh_command_runner.run(
|
||||
"docker inspect -f '{{json .Config.Env}}' " + image,
|
||||
with_output=True).decode().strip()
|
||||
home_directory = "/root"
|
||||
for env_var in json.loads(image_env):
|
||||
if env_var.startswith("HOME="):
|
||||
home_directory = env_var.split("HOME=")[1]
|
||||
break
|
||||
|
||||
start_command = docker_start_cmds(
|
||||
self.ssh_command_runner.ssh_user, image, cleaned_bind_mounts,
|
||||
self.container_name,
|
||||
@@ -706,7 +718,7 @@ class DockerCommandRunner(CommandRunnerInterface):
|
||||
"run_options", []) + self.docker_config.get(
|
||||
f"{'head' if as_head else 'worker'}_run_options",
|
||||
[]) + self._configure_runtime(),
|
||||
self.ssh_command_runner.cluster_name)
|
||||
self.ssh_command_runner.cluster_name, home_directory)
|
||||
self.run(start_command, run_env="host")
|
||||
else:
|
||||
running_image = self.run(
|
||||
|
||||
@@ -41,3 +41,6 @@ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE = 1000
|
||||
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
|
||||
# Max number of retries to create an EC2 node (retry different subnet)
|
||||
BOTO_CREATE_MAX_RETRIES = env_integer("BOTO_CREATE_MAX_RETRIES", 5)
|
||||
|
||||
# ray home path in the container image
|
||||
RAY_HOME = "/home/ray"
|
||||
|
||||
@@ -65,15 +65,15 @@ def check_docker_image(cname):
|
||||
|
||||
|
||||
def docker_start_cmds(user, image, mount_dict, container_name, user_options,
|
||||
cluster_name):
|
||||
cluster_name, home_directory):
|
||||
# Imported here due to circular dependency.
|
||||
from ray.autoscaler.sdk import get_docker_host_mount_location
|
||||
docker_mount_prefix = get_docker_host_mount_location(cluster_name)
|
||||
mount = {f"{docker_mount_prefix}/{dst}": dst for dst in mount_dict}
|
||||
|
||||
# TODO(ilr) Move away from defaulting to /root/
|
||||
mount_flags = " ".join([
|
||||
"-v {src}:{dest}".format(src=k, dest=v.replace("~/", "/root/"))
|
||||
"-v {src}:{dest}".format(
|
||||
src=k, dest=v.replace("~/", home_directory + "/"))
|
||||
for k, v in mount.items()
|
||||
])
|
||||
|
||||
|
||||
@@ -17,21 +17,3 @@ class StaroidCommandRunner(KubernetesCommandRunner):
|
||||
if kube_api_server is not None:
|
||||
self.kubectl.extend(["--server", kube_api_server])
|
||||
os.environ["KUBE_API_SERVER"] = kube_api_server
|
||||
|
||||
def _rewrite_target_home_dir(self, target):
|
||||
# Staroid forces containers to run non-root permission. Ray docker
|
||||
# image does not have a support for non-root user at the moment.
|
||||
# Use /tmp/ray as a home directory until docker image supports
|
||||
# non-root user.
|
||||
|
||||
if target.startswith("~/"):
|
||||
return "/home/ray" + target[1:]
|
||||
return target
|
||||
|
||||
def run_rsync_up(self, source, target, options=None):
|
||||
target = self._rewrite_target_home_dir(target)
|
||||
super().run_rsync_up(source, target, options)
|
||||
|
||||
def run_rsync_down(self, source, target, options=None):
|
||||
target = self._rewrite_target_home_dir(target)
|
||||
super().run_rsync_down(source, target, options)
|
||||
|
||||
@@ -390,6 +390,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
# Two initial calls to docker cp, one before run, two final calls to cp
|
||||
runner.respond_to_call(".State.Running",
|
||||
["false", "false", "false", "true", "true"])
|
||||
runner.respond_to_call("json .Config.Env", ["[]"])
|
||||
commands.get_or_create_head_node(
|
||||
SMALL_CLUSTER,
|
||||
config_path,
|
||||
@@ -968,6 +969,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
@@ -1006,6 +1008,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
@@ -1159,6 +1162,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(SMALL_CLUSTER)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
@@ -1227,6 +1231,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider(cache_stopped=False)
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
@@ -1269,6 +1274,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider(cache_stopped=True)
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(3)])
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
@@ -1335,6 +1341,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider(cache_stopped=True)
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(13)])
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
@@ -1385,6 +1392,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config["max_workers"] = 2
|
||||
config_path = self.write_config(config)
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)])
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
@@ -1438,6 +1446,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config["max_workers"] = 2
|
||||
config_path = self.write_config(config)
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
@@ -1484,6 +1493,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
from ray.autoscaler._private import util
|
||||
util._hash_cache = {}
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
lm = LoadMetrics()
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
|
||||
@@ -1002,6 +1002,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(MULTI_WORKER_CLUSTER)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]"])
|
||||
get_or_create_head_node(
|
||||
MULTI_WORKER_CLUSTER,
|
||||
config_path,
|
||||
@@ -1272,6 +1273,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
@@ -1353,6 +1355,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(2)])
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
@@ -1405,6 +1408,7 @@ class AutoscalingTest(unittest.TestCase):
|
||||
config_path = self.write_config(config)
|
||||
self.provider = MockProvider()
|
||||
runner = MockProcessRunner()
|
||||
runner.respond_to_call("json .Config.Env", ["[]" for i in range(4)])
|
||||
autoscaler = StandardAutoscaler(
|
||||
config_path,
|
||||
LoadMetrics(),
|
||||
|
||||
Reference in New Issue
Block a user