mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 22:17:21 +08:00
[autoscaler] Create Docker Command Runner (v2) (#8840)
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
@@ -321,7 +321,8 @@ class StandardAutoscaler:
|
||||
self.config["worker_start_ray_commands"]),
|
||||
runtime_hash=self.runtime_hash,
|
||||
process_runner=self.process_runner,
|
||||
use_internal_ip=True)
|
||||
use_internal_ip=True,
|
||||
docker_config=self.config["docker"])
|
||||
updater.start()
|
||||
self.updaters[node_id] = updater
|
||||
|
||||
@@ -360,7 +361,8 @@ class StandardAutoscaler:
|
||||
ray_start_commands=with_head_node_ip(ray_start_commands),
|
||||
runtime_hash=self.runtime_hash,
|
||||
process_runner=self.process_runner,
|
||||
use_internal_ip=True)
|
||||
use_internal_ip=True,
|
||||
docker_config=self.config["docker"])
|
||||
updater.start()
|
||||
self.updaters[node_id] = updater
|
||||
|
||||
|
||||
@@ -147,7 +147,8 @@ def kill_node(config_file, yes, hard, override_cluster_name):
|
||||
initialization_commands=[],
|
||||
setup_commands=[],
|
||||
ray_start_commands=[],
|
||||
runtime_hash="")
|
||||
runtime_hash="",
|
||||
docker_config=config["docker"])
|
||||
|
||||
_exec(updater, "ray stop", False, False)
|
||||
|
||||
@@ -286,7 +287,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
|
||||
setup_commands=init_commands,
|
||||
ray_start_commands=ray_start_commands,
|
||||
runtime_hash=runtime_hash,
|
||||
)
|
||||
docker_config=config["docker"])
|
||||
updater.start()
|
||||
updater.join()
|
||||
|
||||
@@ -407,7 +408,7 @@ def exec_cluster(config_file,
|
||||
setup_commands=[],
|
||||
ray_start_commands=[],
|
||||
runtime_hash="",
|
||||
)
|
||||
docker_config=config["docker"])
|
||||
|
||||
def wrap_docker(command):
|
||||
container_name = config["docker"]["container_name"]
|
||||
@@ -529,7 +530,7 @@ def rsync(config_file,
|
||||
setup_commands=[],
|
||||
ray_start_commands=[],
|
||||
runtime_hash="",
|
||||
)
|
||||
docker_config=config["docker"])
|
||||
if down:
|
||||
rsync = updater.rsync_down
|
||||
else:
|
||||
|
||||
@@ -80,6 +80,10 @@ def aptwait_cmd():
|
||||
"do echo 'Waiting for release of dpkg/apt locks'; sleep 5; done")
|
||||
|
||||
|
||||
def check_docker_running_cmd(cname):
|
||||
return " ".join(["docker", "inspect", "-f", "'{{.State.Running}}'", cname])
|
||||
|
||||
|
||||
def docker_start_cmds(user, image, mount, cname, user_options):
|
||||
cmds = []
|
||||
|
||||
@@ -99,15 +103,13 @@ def docker_start_cmds(user, image, mount, cname, user_options):
|
||||
|
||||
user_options_str = " ".join(user_options)
|
||||
# docker run command
|
||||
docker_check = [
|
||||
"docker", "inspect", "-f", "'{{.State.Running}}'", cname, "||"
|
||||
]
|
||||
docker_check = check_docker_running_cmd(cname) + " || "
|
||||
docker_run = [
|
||||
"docker", "run", "--rm", "--name {}".format(cname), "-d", "-it",
|
||||
port_flags, mount_flags, env_flags, user_options_str, "--net=host",
|
||||
image, "bash"
|
||||
]
|
||||
cmds.append(" ".join(docker_check + docker_run))
|
||||
cmds.append(docker_check + " ".join(docker_run))
|
||||
|
||||
return cmds
|
||||
|
||||
|
||||
@@ -87,7 +87,13 @@ class KubernetesNodeProvider(NodeProvider):
|
||||
for node_id in node_ids:
|
||||
self.terminate_node(node_id)
|
||||
|
||||
def get_command_runner(self, log_prefix, node_id, auth_config,
|
||||
cluster_name, process_runner, use_internal_ip):
|
||||
def get_command_runner(self,
|
||||
log_prefix,
|
||||
node_id,
|
||||
auth_config,
|
||||
cluster_name,
|
||||
process_runner,
|
||||
use_internal_ip,
|
||||
docker_config=None):
|
||||
return KubernetesCommandRunner(log_prefix, self.namespace, node_id,
|
||||
auth_config, process_runner)
|
||||
|
||||
@@ -3,7 +3,7 @@ import logging
|
||||
import os
|
||||
import yaml
|
||||
|
||||
from ray.autoscaler.updater import SSHCommandRunner
|
||||
from ray.autoscaler.updater import SSHCommandRunner, DockerCommandRunner
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -211,8 +211,14 @@ class NodeProvider:
|
||||
"""Clean-up when a Provider is no longer required."""
|
||||
pass
|
||||
|
||||
def get_command_runner(self, log_prefix, node_id, auth_config,
|
||||
cluster_name, process_runner, use_internal_ip):
|
||||
def get_command_runner(self,
|
||||
log_prefix,
|
||||
node_id,
|
||||
auth_config,
|
||||
cluster_name,
|
||||
process_runner,
|
||||
use_internal_ip,
|
||||
docker_config=None):
|
||||
""" Returns the CommandRunner class used to perform SSH commands.
|
||||
|
||||
Args:
|
||||
@@ -226,7 +232,19 @@ class NodeProvider:
|
||||
in the CommandRunner. E.g., subprocess.
|
||||
use_internal_ip(bool): whether the node_id belongs to an internal ip
|
||||
or external ip.
|
||||
docker_config(dict): If set, the docker information of the docker
|
||||
container that commands should be run on.
|
||||
"""
|
||||
|
||||
return SSHCommandRunner(log_prefix, node_id, self, auth_config,
|
||||
cluster_name, process_runner, use_internal_ip)
|
||||
common_args = {
|
||||
"log_prefix": log_prefix,
|
||||
"node_id": node_id,
|
||||
"provider": self,
|
||||
"auth_config": auth_config,
|
||||
"cluster_name": cluster_name,
|
||||
"process_runner": process_runner,
|
||||
"use_internal_ip": use_internal_ip
|
||||
}
|
||||
if docker_config and docker_config["container_name"] != "":
|
||||
return DockerCommandRunner(docker_config, **common_args)
|
||||
else:
|
||||
return SSHCommandRunner(**common_args)
|
||||
|
||||
@@ -17,6 +17,7 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \
|
||||
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \
|
||||
STATUS_SETTING_UP, STATUS_SYNCING_FILES
|
||||
from ray.autoscaler.log_timer import LogTimer
|
||||
from ray.autoscaler.docker import check_docker_running_cmd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -294,6 +295,66 @@ class SSHCommandRunner:
|
||||
self.ssh_private_key, self.ssh_user, self.ssh_ip)
|
||||
|
||||
|
||||
class DockerCommandRunner(SSHCommandRunner):
|
||||
def __init__(self, docker_config, **common_args):
|
||||
self.ssh_command_runner = SSHCommandRunner(**common_args)
|
||||
self.docker_name = docker_config["container_name"]
|
||||
self.docker_config = docker_config
|
||||
self.home_dir = None
|
||||
|
||||
def run(self,
|
||||
cmd,
|
||||
timeout=120,
|
||||
exit_on_fail=False,
|
||||
port_forward=None,
|
||||
with_output=False):
|
||||
|
||||
return self.ssh_command_runner.run(
|
||||
cmd,
|
||||
timeout=timeout,
|
||||
exit_on_fail=exit_on_fail,
|
||||
port_forward=None,
|
||||
with_output=False)
|
||||
|
||||
def check_container_status(self):
|
||||
no_exist = "not_present"
|
||||
cmd = check_docker_running_cmd(self.docker_name) + " ".join(
|
||||
["||", "echo", quote(no_exist)])
|
||||
output = self.ssh_command_runner.run(
|
||||
cmd, with_output=True).decode("utf-8").strip()
|
||||
if no_exist in output:
|
||||
return False
|
||||
return output
|
||||
|
||||
def run_rsync_up(self, source, target):
|
||||
self.ssh_command_runner.run_rsync_up(source, target)
|
||||
if self.check_container_status():
|
||||
self.ssh_command_runner.run("docker cp {} {}:{}".format(
|
||||
target, self.docker_name, self.docker_expand_user(target)))
|
||||
|
||||
def run_rsync_down(self, source, target):
|
||||
self.ssh_command_runner.run("docker cp {}:{} {}".format(
|
||||
self.docker_name, self.docker_expand_user(source), source))
|
||||
self.ssh_command_runner.run_rsync_down(source, target)
|
||||
|
||||
def remote_shell_command_str(self):
|
||||
inner_str = self.ssh_command_runner.remote_shell_command_str().replace(
|
||||
"ssh", "ssh -tt", 1).strip("\n")
|
||||
return inner_str + " docker exec -it {} /bin/bash\n".format(
|
||||
self.docker_name)
|
||||
|
||||
def docker_expand_user(self, string):
|
||||
if string.find("~") == 0:
|
||||
if self.home_dir is None:
|
||||
self.home_dir = self.ssh_command_runner.run(
|
||||
"docker exec {} env | grep HOME | cut -d'=' -f2".format(
|
||||
self.docker_name),
|
||||
with_output=True).decode("utf-8").strip()
|
||||
return string.replace("~", self.home_dir)
|
||||
else:
|
||||
return string
|
||||
|
||||
|
||||
class NodeUpdater:
|
||||
"""A process for syncing files and running init commands on a node."""
|
||||
|
||||
@@ -309,14 +370,15 @@ class NodeUpdater:
|
||||
ray_start_commands,
|
||||
runtime_hash,
|
||||
process_runner=subprocess,
|
||||
use_internal_ip=False):
|
||||
use_internal_ip=False,
|
||||
docker_config=None):
|
||||
|
||||
self.log_prefix = "NodeUpdater: {}: ".format(node_id)
|
||||
use_internal_ip = (use_internal_ip
|
||||
or provider_config.get("use_internal_ips", False))
|
||||
self.cmd_runner = provider.get_command_runner(
|
||||
self.log_prefix, node_id, auth_config, cluster_name,
|
||||
process_runner, use_internal_ip)
|
||||
process_runner, use_internal_ip, docker_config)
|
||||
|
||||
self.daemon = True
|
||||
self.process_runner = process_runner
|
||||
|
||||
@@ -847,8 +847,9 @@ def submit(cluster_config_file, docker, screen, tmux, stop, start,
|
||||
if start:
|
||||
create_or_update_cluster(cluster_config_file, None, None, False, False,
|
||||
True, cluster_name)
|
||||
|
||||
target = os.path.join("~", os.path.basename(script))
|
||||
target = os.path.basename(script)
|
||||
if not docker:
|
||||
target = os.path.join("~", target)
|
||||
rsync(cluster_config_file, script, target, cluster_name, down=False)
|
||||
|
||||
command_parts = ["python", target]
|
||||
|
||||
Reference in New Issue
Block a user