mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 19:49:04 +08:00
[autoscaler/docker] Docker Inititialization Revamp (#9515)
* Basic idea * Small fixes * dockerize start commands in Command Runner * Remove run_init from CommandRunnerInterface * Add Parens Co-authored-by: Simon Mo <simon.mo@hey.com> * Cleaning up * Response to richards comments * Further small fixes * Fix Json * schema format fix * cleanup * run more often * fix indent * Fix richards responses * fix ups * remove docker_commands from schema * default to list * fix docker cmd runner test * lint fix Co-authored-by: Simon Mo <simon.mo@hey.com>
This commit is contained in:
@@ -385,6 +385,7 @@ class StandardAutoscaler:
|
||||
file_mounts_contents_hash=self.file_mounts_contents_hash,
|
||||
process_runner=self.process_runner,
|
||||
use_internal_ip=True,
|
||||
is_head_node=False,
|
||||
docker_config=self.config.get("docker"))
|
||||
updater.start()
|
||||
self.updaters[node_id] = updater
|
||||
@@ -441,6 +442,7 @@ class StandardAutoscaler:
|
||||
ray_start_commands=with_head_node_ip(ray_start_commands),
|
||||
runtime_hash=self.runtime_hash,
|
||||
file_mounts_contents_hash=self.file_mounts_contents_hash,
|
||||
is_head_node=False,
|
||||
cluster_synced_files=self.config["cluster_synced_files"],
|
||||
process_runner=self.process_runner,
|
||||
use_internal_ip=True,
|
||||
|
||||
@@ -9,7 +9,11 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
from ray.autoscaler.docker import check_docker_running_cmd, with_docker_exec
|
||||
from ray.autoscaler.docker import check_docker_running_cmd, \
|
||||
check_docker_image, \
|
||||
docker_autoscaler_setup, \
|
||||
docker_start_cmds, \
|
||||
with_docker_exec
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
|
||||
from ray.autoscaler.subprocess_output_util import (run_cmd_redirected,
|
||||
@@ -565,14 +569,14 @@ class SSHCommandRunner(CommandRunnerInterface):
|
||||
self.ssh_user, self.ssh_ip)
|
||||
|
||||
|
||||
class DockerCommandRunner(SSHCommandRunner):
|
||||
class DockerCommandRunner(CommandRunnerInterface):
|
||||
def __init__(self, docker_config, **common_args):
|
||||
self.ssh_command_runner = SSHCommandRunner(**common_args)
|
||||
self.docker_name = docker_config["container_name"]
|
||||
self.container_name = docker_config["container_name"]
|
||||
self.docker_config = docker_config
|
||||
self.home_dir = None
|
||||
self._check_docker_installed()
|
||||
self.shutdown = False
|
||||
self.initialized = False
|
||||
|
||||
def run(
|
||||
self,
|
||||
@@ -595,7 +599,8 @@ class DockerCommandRunner(SSHCommandRunner):
|
||||
cmd = self._docker_expand_user(cmd, any_char=True)
|
||||
cmd = " ".join(_with_interactive(cmd))
|
||||
cmd = with_docker_exec(
|
||||
[cmd], container_name=self.docker_name,
|
||||
[cmd],
|
||||
container_name=self.container_name,
|
||||
with_interactive=True)[0]
|
||||
|
||||
if self.shutdown:
|
||||
@@ -622,7 +627,7 @@ class DockerCommandRunner(SSHCommandRunner):
|
||||
# Without it, docker copies the source *into* the target
|
||||
target += "/."
|
||||
self.ssh_command_runner.run("docker cp {} {}:{}".format(
|
||||
target, self.docker_name,
|
||||
target, self.container_name,
|
||||
self._docker_expand_user(protected_path)))
|
||||
|
||||
def run_rsync_down(self, source, target):
|
||||
@@ -636,7 +641,7 @@ class DockerCommandRunner(SSHCommandRunner):
|
||||
# Adding a "." means that docker copies the *contents*
|
||||
# Without it, docker copies the source *into* the target
|
||||
self.ssh_command_runner.run("docker cp {}:{} {}".format(
|
||||
self.docker_name, self._docker_expand_user(protected_path),
|
||||
self.container_name, self._docker_expand_user(protected_path),
|
||||
source))
|
||||
self.ssh_command_runner.run_rsync_down(source, target)
|
||||
|
||||
@@ -644,7 +649,7 @@ class DockerCommandRunner(SSHCommandRunner):
|
||||
inner_str = self.ssh_command_runner.remote_shell_command_str().replace(
|
||||
"ssh", "ssh -tt", 1).strip("\n")
|
||||
return inner_str + " docker exec -it {} /bin/bash\n".format(
|
||||
self.docker_name)
|
||||
self.container_name)
|
||||
|
||||
def _check_docker_installed(self):
|
||||
try:
|
||||
@@ -665,14 +670,14 @@ class DockerCommandRunner(SSHCommandRunner):
|
||||
self.shutdown = True
|
||||
|
||||
def _check_container_status(self):
|
||||
no_exist = "not_present"
|
||||
cmd = check_docker_running_cmd(self.docker_name) + " ".join(
|
||||
["||", "echo", quote(no_exist)])
|
||||
if self.initialized:
|
||||
return True
|
||||
output = self.ssh_command_runner.run(
|
||||
cmd, with_output=True).decode("utf-8").strip()
|
||||
if no_exist in output:
|
||||
return False
|
||||
return "true" in output.lower()
|
||||
check_docker_running_cmd(self.container_name),
|
||||
with_output=True).decode("utf-8").strip()
|
||||
# Checks for the false positive where "true" is in the container name
|
||||
return ("true" in output.lower()
|
||||
and "no such object" not in output.lower())
|
||||
|
||||
def _docker_expand_user(self, string, any_char=False):
|
||||
user_pos = string.find("~")
|
||||
@@ -680,7 +685,7 @@ class DockerCommandRunner(SSHCommandRunner):
|
||||
if self.home_dir is None:
|
||||
self.home_dir = self.ssh_command_runner.run(
|
||||
"docker exec {} env | grep HOME | cut -d'=' -f2".format(
|
||||
self.docker_name),
|
||||
self.container_name),
|
||||
with_output=True).decode("utf-8").strip()
|
||||
|
||||
if any_char:
|
||||
@@ -690,3 +695,40 @@ class DockerCommandRunner(SSHCommandRunner):
|
||||
return string.replace("~", self.home_dir, 1)
|
||||
|
||||
return string
|
||||
|
||||
def run_init(self, *, as_head, file_mounts):
|
||||
image = self.docker_config.get("image")
|
||||
if image is None:
|
||||
image = self.docker_config.get(
|
||||
f"{'head' if as_head else 'worker'}_image")
|
||||
|
||||
self._check_docker_installed()
|
||||
if self.docker_config.get("pull_before_run", True):
|
||||
assert image, "Image must be included in config if " + \
|
||||
"pull_before_run is specified"
|
||||
|
||||
self.run("docker pull {}".format(image), run_env="host")
|
||||
|
||||
start_command = docker_start_cmds(
|
||||
self.ssh_command_runner.ssh_user, image, file_mounts,
|
||||
self.container_name,
|
||||
self.docker_config.get("run_options", []) + self.docker_config.get(
|
||||
f"{'head' if as_head else 'worker'}_run_options", []))
|
||||
|
||||
if not self._check_container_status():
|
||||
self.run(start_command, run_env="host")
|
||||
else:
|
||||
running_image = self.run(
|
||||
check_docker_image(self.container_name),
|
||||
with_output=True,
|
||||
run_env="host").decode("utf-8").strip()
|
||||
if running_image != image:
|
||||
logger.error(f"A container with name {self.container_name} " +
|
||||
f"is running image {running_image} instead " +
|
||||
f"of {image} (which was provided in the YAML")
|
||||
|
||||
# Copy bootstrap config & key over
|
||||
if as_head:
|
||||
for copy_cmd in docker_autoscaler_setup(self.container_name):
|
||||
self.run(copy_cmd, run_env="host")
|
||||
self.initialized = True
|
||||
|
||||
@@ -405,6 +405,7 @@ def kill_node(config_file, yes, hard, override_cluster_name):
|
||||
ray_start_commands=[],
|
||||
runtime_hash="",
|
||||
file_mounts_contents_hash="",
|
||||
is_head_node=False,
|
||||
docker_config=config.get("docker"))
|
||||
|
||||
_exec(updater, "ray stop", False, False)
|
||||
@@ -658,6 +659,7 @@ def get_or_create_head_node(config,
|
||||
process_runner=_runner,
|
||||
runtime_hash=runtime_hash,
|
||||
file_mounts_contents_hash=file_mounts_contents_hash,
|
||||
is_head_node=True,
|
||||
docker_config=config.get("docker"))
|
||||
updater.start()
|
||||
updater.join()
|
||||
@@ -822,6 +824,7 @@ def exec_cluster(config_file: str,
|
||||
ray_start_commands=[],
|
||||
runtime_hash="",
|
||||
file_mounts_contents_hash="",
|
||||
is_head_node=True,
|
||||
docker_config=config.get("docker"))
|
||||
|
||||
is_docker = isinstance(updater.cmd_runner, DockerCommandRunner)
|
||||
@@ -933,13 +936,10 @@ def rsync(config_file: str,
|
||||
# and _get_head_node does this too
|
||||
nodes = _get_worker_nodes(config, override_cluster_name)
|
||||
|
||||
nodes += [
|
||||
_get_head_node(
|
||||
config,
|
||||
config_file,
|
||||
override_cluster_name,
|
||||
create_if_needed=False)
|
||||
]
|
||||
head_node = _get_head_node(
|
||||
config, config_file, override_cluster_name, create_if_needed=False)
|
||||
|
||||
nodes += [head_node]
|
||||
|
||||
for node_id in nodes:
|
||||
updater = NodeUpdaterThread(
|
||||
@@ -954,6 +954,7 @@ def rsync(config_file: str,
|
||||
ray_start_commands=[],
|
||||
runtime_hash="",
|
||||
file_mounts_contents_hash="",
|
||||
is_head_node=(node_id == head_node),
|
||||
docker_config=config.get("docker"))
|
||||
if down:
|
||||
rsync = updater.rsync_down
|
||||
|
||||
@@ -8,67 +8,23 @@ except ImportError: # py2
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def dockerize_if_needed(config):
|
||||
def validate_docker_config(config):
|
||||
if "docker" not in config:
|
||||
return config
|
||||
|
||||
docker_image = config["docker"].get("image")
|
||||
docker_pull = config["docker"].get("pull_before_run", True)
|
||||
cname = config["docker"].get("container_name")
|
||||
run_options = config["docker"].get("run_options", [])
|
||||
|
||||
head_docker_image = config["docker"].get("head_image", docker_image)
|
||||
head_run_options = config["docker"].get("head_run_options", [])
|
||||
|
||||
worker_docker_image = config["docker"].get("worker_image", docker_image)
|
||||
worker_run_options = config["docker"].get("worker_run_options", [])
|
||||
|
||||
if not docker_image and not (head_docker_image and worker_docker_image):
|
||||
if cname:
|
||||
logger.warning(
|
||||
"dockerize_if_needed: "
|
||||
"Container name given but no Docker image(s) - continuing...")
|
||||
return config
|
||||
image_present = docker_image or (head_docker_image and worker_docker_image)
|
||||
if (not cname) and (not image_present):
|
||||
return
|
||||
else:
|
||||
assert cname, "Must provide container name!"
|
||||
ssh_user = config["auth"]["ssh_user"]
|
||||
docker_mounts = {dst: dst for dst in config["file_mounts"]}
|
||||
assert cname and image_present, "Must provide a container & image name"
|
||||
|
||||
if docker_pull:
|
||||
docker_pull_cmd = "docker pull {}".format(docker_image)
|
||||
config["initialization_commands"].append(docker_pull_cmd)
|
||||
for node_type_config in config.get("available_node_types",
|
||||
{}).values():
|
||||
node_type_config["initialization_commands"].append(docker_pull_cmd)
|
||||
pass
|
||||
|
||||
head_docker_start = docker_start_cmds(ssh_user, head_docker_image,
|
||||
docker_mounts, cname,
|
||||
run_options + head_run_options)
|
||||
|
||||
worker_docker_start = docker_start_cmds(ssh_user, worker_docker_image,
|
||||
docker_mounts, cname,
|
||||
run_options + worker_run_options)
|
||||
|
||||
config["head_setup_commands"] = head_docker_start + (with_docker_exec(
|
||||
config["head_setup_commands"], container_name=cname))
|
||||
config["head_start_ray_commands"] = (
|
||||
docker_autoscaler_setup(cname) + with_docker_exec(
|
||||
config["head_start_ray_commands"], container_name=cname))
|
||||
|
||||
config["worker_setup_commands"] = worker_docker_start + (with_docker_exec(
|
||||
config["worker_setup_commands"], container_name=cname))
|
||||
config["worker_start_ray_commands"] = with_docker_exec(
|
||||
config["worker_start_ray_commands"],
|
||||
container_name=cname,
|
||||
env_vars=["RAY_HEAD_IP"])
|
||||
|
||||
for node_type_config in config.get("available_node_types", {}).values():
|
||||
if "worker_setup_commands" in node_type_config:
|
||||
node_type_config["worker_setup_commands"] = worker_docker_start + (
|
||||
with_docker_exec(
|
||||
node_type_config["worker_setup_commands"],
|
||||
container_name=cname))
|
||||
return config
|
||||
|
||||
|
||||
@@ -91,11 +47,19 @@ def with_docker_exec(cmds,
|
||||
|
||||
|
||||
def check_docker_running_cmd(cname):
|
||||
return " ".join(["docker", "inspect", "-f", "'{{.State.Running}}'", cname])
|
||||
return " ".join([
|
||||
"docker", "inspect", "-f", "'{{.State.Running}}'", cname, "||", "true"
|
||||
])
|
||||
|
||||
|
||||
def docker_start_cmds(user, image, mount, cname, user_options):
|
||||
cmds = []
|
||||
def check_docker_image(cname):
|
||||
return " ".join([
|
||||
"docker", "inspect", "-f", "'{{.Config.Image}}'", cname, "||", "true"
|
||||
])
|
||||
|
||||
|
||||
def docker_start_cmds(user, image, mount_dict, cname, user_options):
|
||||
mount = {dst: dst for dst in mount_dict}
|
||||
|
||||
# TODO(ilr) Move away from defaulting to /root/
|
||||
mount_flags = " ".join([
|
||||
@@ -109,16 +73,11 @@ def docker_start_cmds(user, image, mount, cname, user_options):
|
||||
["-e {name}={val}".format(name=k, val=v) for k, v in env_vars.items()])
|
||||
|
||||
user_options_str = " ".join(user_options)
|
||||
# TODO(ilr) Check command type
|
||||
# docker run command
|
||||
docker_check = check_docker_running_cmd(cname) + " || "
|
||||
docker_run = [
|
||||
"docker", "run", "--rm", "--name {}".format(cname), "-d", "-it",
|
||||
mount_flags, env_flags, user_options_str, "--net=host", image, "bash"
|
||||
]
|
||||
cmds.append(docker_check + " ".join(docker_run))
|
||||
|
||||
return cmds
|
||||
return " ".join(docker_run)
|
||||
|
||||
|
||||
def docker_autoscaler_setup(cname):
|
||||
|
||||
@@ -10,7 +10,8 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \
|
||||
TAG_RAY_FILE_MOUNTS_CONTENTS, \
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \
|
||||
STATUS_SETTING_UP, STATUS_SYNCING_FILES
|
||||
from ray.autoscaler.command_runner import NODE_START_WAIT_S, ProcessRunnerError
|
||||
from ray.autoscaler.command_runner import NODE_START_WAIT_S, \
|
||||
ProcessRunnerError, DockerCommandRunner
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
|
||||
import ray.autoscaler.subprocess_output_util as cmd_output_util
|
||||
@@ -25,7 +26,27 @@ READY_CHECK_INTERVAL = 5
|
||||
|
||||
|
||||
class NodeUpdater:
|
||||
"""A process for syncing files and running init commands on a node."""
|
||||
"""A process for syncing files and running init commands on a node.
|
||||
|
||||
Arguments:
|
||||
node_id: the Node ID
|
||||
provider_config: Provider section of autoscaler yaml
|
||||
provider: NodeProvider Class
|
||||
auth_config: Auth section of autoscaler yaml
|
||||
cluster_name: the name of the cluster.
|
||||
file_mounts: Map of remote to local paths
|
||||
initialization_commands: Commands run before container launch
|
||||
setup_commands: Commands run before ray starts
|
||||
ray_start_commands: Commands to start ray
|
||||
runtime_hash: Used to check for config changes
|
||||
file_mounts_contents_hash: Used to check for changes to file mounts
|
||||
is_head_node: Whether to use head start/setup commands
|
||||
process_runner: the module to use to run the commands
|
||||
in the CommandRunner. E.g., subprocess.
|
||||
use_internal_ip: Wwhether the node_id belongs to an internal ip
|
||||
or external ip.
|
||||
docker_config: Docker section of autoscaler yaml
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
node_id,
|
||||
@@ -39,6 +60,7 @@ class NodeUpdater:
|
||||
ray_start_commands,
|
||||
runtime_hash,
|
||||
file_mounts_contents_hash,
|
||||
is_head_node,
|
||||
node_resources=None,
|
||||
cluster_synced_files=None,
|
||||
process_runner=subprocess,
|
||||
@@ -68,6 +90,7 @@ class NodeUpdater:
|
||||
self.file_mounts_contents_hash = file_mounts_contents_hash
|
||||
self.cluster_synced_files = cluster_synced_files
|
||||
self.auth_config = auth_config
|
||||
self.is_head_node = is_head_node
|
||||
|
||||
def run(self):
|
||||
cli_logger.old_info(logger, "{}Updating to {}", self.log_prefix,
|
||||
@@ -202,6 +225,7 @@ class NodeUpdater:
|
||||
"{}Waiting for remote shell...",
|
||||
self.log_prefix)
|
||||
|
||||
# Run outside of the container
|
||||
self.cmd_runner.run("uptime", run_env="host")
|
||||
cli_logger.old_debug(logger, "Uptime succeeded.")
|
||||
cli_logger.success("Success.")
|
||||
@@ -264,6 +288,12 @@ class NodeUpdater:
|
||||
"{}{} already up-to-date, skip to ray start",
|
||||
self.log_prefix, self.node_id)
|
||||
|
||||
# When resuming from a stopped instance the runtime_hash may be the
|
||||
# same, but the container will not be started.
|
||||
if isinstance(self.cmd_runner, DockerCommandRunner):
|
||||
self.cmd_runner.run_init(
|
||||
as_head=self.is_head_node, file_mounts=self.file_mounts)
|
||||
|
||||
else:
|
||||
cli_logger.print(
|
||||
"Updating cluster configuration.",
|
||||
@@ -296,6 +326,7 @@ class NodeUpdater:
|
||||
# with a new SSHOptions class that uses
|
||||
# this ssh_private_key as its only __init__
|
||||
# argument.
|
||||
# Run outside docker.
|
||||
self.cmd_runner.run(
|
||||
cmd,
|
||||
ssh_options_override_ssh_key=self.
|
||||
@@ -314,7 +345,10 @@ class NodeUpdater:
|
||||
cli_logger.print(
|
||||
"No initialization commands to run.",
|
||||
_numbered=("[]", 3, 6))
|
||||
|
||||
if isinstance(self.cmd_runner, DockerCommandRunner):
|
||||
self.cmd_runner.run_init(
|
||||
as_head=self.is_head_node,
|
||||
file_mounts=self.file_mounts)
|
||||
if self.setup_commands:
|
||||
with cli_logger.group(
|
||||
"Running setup commands",
|
||||
@@ -337,7 +371,8 @@ class NodeUpdater:
|
||||
_numbered=("()", i, total))
|
||||
|
||||
try:
|
||||
self.cmd_runner.run(cmd)
|
||||
# Runs in the container if docker is in use
|
||||
self.cmd_runner.run(cmd, run_env="auto")
|
||||
except ProcessRunnerError as e:
|
||||
if e.msg_type == "ssh_command_failed":
|
||||
cli_logger.error("Failed.")
|
||||
@@ -365,8 +400,11 @@ class NodeUpdater:
|
||||
try:
|
||||
old_redirected = cmd_output_util.is_output_redirected()
|
||||
cmd_output_util.set_output_redirected(False)
|
||||
# Runs in the container if docker is in use
|
||||
self.cmd_runner.run(
|
||||
cmd, environment_variables=env_vars)
|
||||
cmd,
|
||||
environment_variables=env_vars,
|
||||
run_env="auto")
|
||||
cmd_output_util.set_output_redirected(old_redirected)
|
||||
except ProcessRunnerError as e:
|
||||
if e.msg_type == "ssh_command_failed":
|
||||
|
||||
@@ -9,7 +9,7 @@ from typing import Any, Dict
|
||||
import ray
|
||||
import ray.services as services
|
||||
from ray.autoscaler.node_provider import get_default_config
|
||||
from ray.autoscaler.docker import dockerize_if_needed
|
||||
from ray.autoscaler.docker import validate_docker_config
|
||||
|
||||
REQUIRED, OPTIONAL = True, False
|
||||
RAY_SCHEMA_PATH = os.path.join(
|
||||
@@ -56,7 +56,7 @@ def validate_config(config: Dict[str, Any]) -> None:
|
||||
try:
|
||||
jsonschema.validate(config, schema)
|
||||
except jsonschema.ValidationError as e:
|
||||
raise jsonschema.ValidationError(message=e.message) from None
|
||||
raise e from None
|
||||
|
||||
# Detect out of date defaults. This happens when the autoscaler that filled
|
||||
# out the default values is older than the version of the autoscaler that
|
||||
@@ -90,7 +90,7 @@ def validate_config(config: Dict[str, Any]) -> None:
|
||||
def prepare_config(config):
|
||||
with_defaults = fillout_defaults(config)
|
||||
merge_setup_commands(with_defaults)
|
||||
dockerize_if_needed(with_defaults)
|
||||
validate_docker_config(with_defaults)
|
||||
return with_defaults
|
||||
|
||||
|
||||
|
||||
@@ -114,8 +114,7 @@ def test_docker_command_runner():
|
||||
"docker_config": docker_config,
|
||||
}
|
||||
cmd_runner = DockerCommandRunner(**args)
|
||||
process_runner.assert_has_call("1.2.3.4", "command -v docker")
|
||||
process_runner.clear_history()
|
||||
assert len(process_runner.calls) == 0, "No calls should be made in ctor"
|
||||
|
||||
env_vars = {"var1": "quote between this \" and this", "var2": "123"}
|
||||
cmd_runner.run("echo hello", environment_variables=env_vars)
|
||||
|
||||
Reference in New Issue
Block a user