[CLI] Docker Support (#11761)

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Maksim Smolin
2020-11-17 00:04:39 -08:00
committed by GitHub
parent bea0031491
commit 23926f3e6e
9 changed files with 200 additions and 55 deletions
@@ -431,7 +431,7 @@ class SSHCommandRunner(CommandRunnerInterface):
run_env="auto", # Unused argument.
ssh_options_override_ssh_key="",
shutdown_after_run=False,
):
silent=False):
if shutdown_after_run:
cmd += "; sudo shutdown -h now"
if ssh_options_override_ssh_key:
@@ -483,9 +483,11 @@ class SSHCommandRunner(CommandRunnerInterface):
if cli_logger.verbosity > 0:
with cli_logger.indented():
return self._run_helper(final_cmd, with_output, exit_on_fail)
return self._run_helper(
final_cmd, with_output, exit_on_fail, silent=silent)
else:
return self._run_helper(final_cmd, with_output, exit_on_fail)
return self._run_helper(
final_cmd, with_output, exit_on_fail, silent=silent)
def _create_rsync_filter_args(self, options):
rsync_excludes = options.get("rsync_exclude") or []
@@ -574,11 +576,12 @@ class DockerCommandRunner(CommandRunnerInterface):
if run_env == "docker":
cmd = self._docker_expand_user(cmd, any_char=True)
cmd = " ".join(_with_interactive(cmd))
if is_using_login_shells():
cmd = " ".join(_with_interactive(cmd))
cmd = with_docker_exec(
[cmd],
container_name=self.container_name,
with_interactive=True)[0]
with_interactive=is_using_login_shells())[0]
if shutdown_after_run:
# sudo shutdown should run after `with_docker_exec` command above
@@ -600,7 +603,8 @@ class DockerCommandRunner(CommandRunnerInterface):
self.ssh_command_runner.cluster_name), target.lstrip("/"))
self.ssh_command_runner.run(
f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}")
f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}",
silent=is_rsync_silent())
self.ssh_command_runner.run_rsync_up(
source, host_destination, options=options)
@@ -610,9 +614,11 @@ class DockerCommandRunner(CommandRunnerInterface):
# Adding a "." means that docker copies the *contents*
# Without it, docker copies the source *into* the target
host_destination += "/."
self.ssh_command_runner.run("docker cp {} {}:{}".format(
host_destination, self.container_name,
self._docker_expand_user(target)))
self.ssh_command_runner.run(
"docker cp {} {}:{}".format(host_destination,
self.container_name,
self._docker_expand_user(target)),
silent=is_rsync_silent())
def run_rsync_down(self, source, target, options=None):
options = options or {}
@@ -620,15 +626,18 @@ class DockerCommandRunner(CommandRunnerInterface):
self._get_docker_host_mount_location(
self.ssh_command_runner.cluster_name), source.lstrip("/"))
self.ssh_command_runner.run(
f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}")
f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}",
silent=is_rsync_silent())
if source[-1] == "/":
source += "."
# Adding a "." means that docker copies the *contents*
# Without it, docker copies the source *into* the target
if not options.get("docker_mount_if_possible", False):
self.ssh_command_runner.run("docker cp {}:{} {}".format(
self.container_name, self._docker_expand_user(source),
host_source))
self.ssh_command_runner.run(
"docker cp {}:{} {}".format(self.container_name,
self._docker_expand_user(source),
host_source),
silent=is_rsync_silent())
self.ssh_command_runner.run_rsync_down(
host_source, target, options=options)
@@ -164,7 +164,7 @@ def _read_subprocess_stream(f, output_file, is_stdout=False):
cli_logger.error(line)
if output_file is not None:
if output_file is not None and output_file != subprocess.DEVNULL:
output_file.write(line + "\n")
return detected_special_case
@@ -238,10 +238,6 @@ def _run_and_process_output(cmd,
# See implementation note #1
if use_login_shells or process_runner != subprocess:
if stdout_file is None:
stdout_file = sys.stdout
if stderr_file is None:
stderr_file = sys.stderr
return process_runner.check_call(
cmd,
# See implementation note #2
@@ -333,7 +329,8 @@ def run_cmd_redirected(cmd,
return _run_and_process_output(
cmd,
process_runner=process_runner,
stdout_file=None,
stdout_file=process_runner.DEVNULL,
stderr_file=process_runner.DEVNULL,
use_login_shells=use_login_shells)
if not is_output_redirected():
@@ -341,6 +338,7 @@ def run_cmd_redirected(cmd,
cmd,
process_runner=process_runner,
stdout_file=sys.stdout,
stderr_file=sys.stderr,
use_login_shells=use_login_shells)
else:
tmpfile_path = os.path.join(
+21 -11
View File
@@ -22,6 +22,7 @@ from ray.autoscaler._private.event_system import (CreateClusterEvent,
logger = logging.getLogger(__name__)
NUM_SETUP_STEPS = 7
READY_CHECK_INTERVAL = 5
@@ -225,7 +226,8 @@ class NodeUpdater:
def wait_ready(self, deadline):
with cli_logger.group(
"Waiting for SSH to become available", _numbered=("[]", 1, 6)):
"Waiting for SSH to become available",
_numbered=("[]", 1, NUM_SETUP_STEPS)):
with LogTimer(self.log_prefix + "Got remote shell"):
cli_logger.print("Running `{}` as a test.", cf.bold("uptime"))
@@ -252,7 +254,7 @@ class NodeUpdater:
# however threading this configuration state
# is a pain and I'm leaving it for later
retry_str = str(e)
retry_str = "(" + str(e) + ")"
if hasattr(e, "cmd"):
retry_str = "(Exit Status {}): {}".format(
e.returncode, " ".join(e.cmd))
@@ -296,7 +298,7 @@ class NodeUpdater:
cli_logger.print(
"Configuration already up to date, "
"skipping file mounts, initalization and setup commands.",
_numbered=("[]", "2-5", 6))
_numbered=("[]", "2-6", NUM_SETUP_STEPS))
else:
cli_logger.print(
@@ -306,7 +308,8 @@ class NodeUpdater:
self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SYNCING_FILES})
cli_logger.labeled_value("New status", STATUS_SYNCING_FILES)
self.sync_file_mounts(self.rsync_up, step_numbers=(2, 6))
self.sync_file_mounts(
self.rsync_up, step_numbers=(1, NUM_SETUP_STEPS))
# Only run setup commands if runtime_hash has changed because
# we don't want to run setup_commands every time the head node
@@ -320,7 +323,7 @@ class NodeUpdater:
if self.initialization_commands:
with cli_logger.group(
"Running initialization commands",
_numbered=("[]", 3, 5)):
_numbered=("[]", 4, NUM_SETUP_STEPS)):
global_event_system.execute_callback(
CreateClusterEvent.run_initialization_cmd)
with LogTimer(
@@ -353,14 +356,19 @@ class NodeUpdater:
else:
cli_logger.print(
"No initialization commands to run.",
_numbered=("[]", 3, 6))
self.cmd_runner.run_init(
as_head=self.is_head_node, file_mounts=self.file_mounts)
_numbered=("[]", 4, NUM_SETUP_STEPS))
with cli_logger.group(
"Initalizing command runner",
# todo: fix command numbering
_numbered=("[]", 5, NUM_SETUP_STEPS)):
self.cmd_runner.run_init(
as_head=self.is_head_node,
file_mounts=self.file_mounts)
if self.setup_commands:
with cli_logger.group(
"Running setup commands",
# todo: fix command numbering
_numbered=("[]", 4, 6)):
_numbered=("[]", 6, NUM_SETUP_STEPS)):
global_event_system.execute_callback(
CreateClusterEvent.run_setup_cmd)
with LogTimer(
@@ -395,10 +403,12 @@ class NodeUpdater:
"Setup command failed.")
else:
cli_logger.print(
"No setup commands to run.", _numbered=("[]", 4, 6))
"No setup commands to run.",
_numbered=("[]", 6, NUM_SETUP_STEPS))
with cli_logger.group(
"Starting the Ray runtime", _numbered=("[]", 6, 6)):
"Starting the Ray runtime", _numbered=("[]", 7,
NUM_SETUP_STEPS)):
global_event_system.execute_callback(
CreateClusterEvent.start_ray_runtime)
with LogTimer(
+47 -12
View File
@@ -44,9 +44,9 @@ boto3_list = [{
"DefaultVCpus": 1
}
}, {
"InstanceType": "m4.xlarge",
"InstanceType": "t3a.small",
"VCpuInfo": {
"DefaultVCpus": 4
"DefaultVCpus": 2
}
}, {
"InstanceType": "m4.4xlarge",
@@ -184,6 +184,10 @@ def _check_output_via_pattern(name, result):
DEFAULT_TEST_CONFIG_PATH = str(
Path(__file__).parent / "test_cli_patterns" / "test_ray_up_config.yaml")
DOCKER_TEST_CONFIG_PATH = str(
Path(__file__).parent / "test_cli_patterns" /
"test_ray_up_docker_config.yaml")
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
@@ -210,12 +214,12 @@ def test_ray_up(configure_lang, _unlink_test_ssh_key, configure_aws):
# unfortunately, cutting out SSH prefixes and such
# is, to put it lightly, non-trivial
if "uptime" in command:
return PopenBehaviour(stdout="MOCKED uptime")
return PopenBehaviour(stdout=b"MOCKED uptime")
if "rsync" in command:
return PopenBehaviour(stdout="MOCKED rsync")
return PopenBehaviour(stdout=b"MOCKED rsync")
if "ray" in command:
return PopenBehaviour(stdout="MOCKED ray")
return PopenBehaviour(stdout="MOCKED GENERIC")
return PopenBehaviour(stdout=b"MOCKED ray")
return PopenBehaviour(stdout=b"MOCKED GENERIC")
with _setup_popen_mock(commands_mock):
# config cache does not work with mocks
@@ -227,6 +231,37 @@ def test_ray_up(configure_lang, _unlink_test_ssh_key, configure_aws):
_check_output_via_pattern("test_ray_up.txt", result)
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"))
@mock_ec2
@mock_iam
def test_ray_up_docker(configure_lang, _unlink_test_ssh_key, configure_aws):
def commands_mock(command, stdin):
# if we want to have e.g. some commands fail,
# we can have overrides happen here.
# unfortunately, cutting out SSH prefixes and such
# is, to put it lightly, non-trivial
if ".Config.Env" in command:
return PopenBehaviour(stdout=b"{}")
if "uptime" in command:
return PopenBehaviour(stdout=b"MOCKED uptime")
if "rsync" in command:
return PopenBehaviour(stdout=b"MOCKED rsync")
if "ray" in command:
return PopenBehaviour(stdout=b"MOCKED ray")
return PopenBehaviour(stdout=b"MOCKED GENERIC")
with _setup_popen_mock(commands_mock):
# config cache does not work with mocks
runner = CliRunner()
result = runner.invoke(scripts.up, [
DOCKER_TEST_CONFIG_PATH, "--no-config-cache", "-y",
"--log-style=pretty", "--log-color", "False"
])
_check_output_via_pattern("test_ray_up_docker.txt", result)
@pytest.mark.skipif(
sys.platform == "darwin" and "travis" in os.environ.get("USER", ""),
reason=("Mac builds don't provide proper locale support"))
@@ -239,12 +274,12 @@ def test_ray_up_record(configure_lang, _unlink_test_ssh_key, configure_aws):
# unfortunately, cutting out SSH prefixes and such
# is, to put it lightly, non-trivial
if "uptime" in command:
return PopenBehaviour(stdout="MOCKED uptime")
return PopenBehaviour(stdout=b"MOCKED uptime")
if "rsync" in command:
return PopenBehaviour(stdout="MOCKED rsync")
return PopenBehaviour(stdout=b"MOCKED rsync")
if "ray" in command:
return PopenBehaviour(stdout="MOCKED ray")
return PopenBehaviour(stdout="MOCKED GENERIC")
return PopenBehaviour(stdout=b"MOCKED ray")
return PopenBehaviour(stdout=b"MOCKED GENERIC")
with _setup_popen_mock(commands_mock):
# config cache does not work with mocks
@@ -294,7 +329,7 @@ def test_ray_exec(configure_lang, configure_aws, _unlink_test_ssh_key):
# TODO(maximsmol): this is a hack since stdout=sys.stdout
# doesn't work with the mock for some reason
print("This is a test!")
return PopenBehaviour(stdout="This is a test!")
return PopenBehaviour(stdout=b"This is a test!")
with _setup_popen_mock(commands_mock):
runner = CliRunner()
@@ -327,7 +362,7 @@ def test_ray_submit(configure_lang, configure_aws, _unlink_test_ssh_key):
# doesn't work with the mock for some reason
if "rsync" not in command:
print("This is a test!")
return PopenBehaviour(stdout="This is a test!")
return PopenBehaviour(stdout=b"This is a test!")
with _setup_popen_mock(commands_mock):
runner = CliRunner()
@@ -19,23 +19,24 @@ Acquiring an up-to-date head node
<1/1> Setting up head node
Prepared bootstrap config
New status: waiting-for-ssh
\[1/6\] Waiting for SSH to become available
\[1/7\] Waiting for SSH to become available
Running `uptime` as a test\.
Fetched IP: .+
Success\.
Updating cluster configuration\. \[hash=.+\]
New status: syncing-files
\[3/6\] Processing file mounts
\[2/7\] Processing file mounts
~/tests/ from ./
\[4/6\] No worker file mounts to sync
\[3/7\] No worker file mounts to sync
New status: setting-up
\[3/5\] Running initialization commands
\[4/6\] Running setup commands
\[4/7\] Running initialization commands
\[5/7\] Initalizing command runner
\[6/7\] Running setup commands
\(0/4\) echo a
\(1/4\) echo b
\(2/4\) echo \${echo hi}
\(3/4\) echo head
\[6/6\] Starting the Ray runtime
\[7/7\] Starting the Ray runtime
New status: up-to-date
Useful commands
@@ -5,7 +5,7 @@ file_mounts:
~/tests: .
head_node:
ImageId: latest_dlami
InstanceType: t1.micro
InstanceType: t1.micro
head_setup_commands:
- echo head
head_start_ray_commands:
@@ -0,0 +1,48 @@
Cluster: test-cli
Checking AWS environment settings
AWS config
IAM Profile: .+ \[default\]
EC2 Key pair \(head & workers\): .+ \[default\]
VPC Subnets \(head & workers\): subnet-.+ \[default\]
EC2 Security groups \(head & workers\): sg-.+ \[default\]
EC2 AMI \(head & workers\): ami-.+ \[dlami\]
No head node found\. Launching a new cluster\. Confirm \[y/N\]: y \[automatic, due to --yes\]
Acquiring an up-to-date head node
Launched 1 nodes \[subnet_id=subnet-.+\]
Launched instance i-.+ \[state=pending, info=pending\]
Launched a new head node
Fetching the new head node
<1/1> Setting up head node
Prepared bootstrap config
New status: waiting-for-ssh
\[1/7\] Waiting for SSH to become available
Running `uptime` as a test\.
Fetched IP: .+
Success\.
Updating cluster configuration\. \[hash=.+\]
New status: syncing-files
\[2/7\] Processing file mounts
~/tests/ from ./
\[3/7\] No worker file mounts to sync
New status: setting-up
\[4/7\] Running initialization commands
\[5/7\] Initalizing command runner
\[6/7\] Running setup commands
\(0/4\) echo a
\(1/4\) echo b
\(2/4\) echo \${echo hi}
\(3/4\) echo head
\[7/7\] Starting the Ray runtime
New status: up-to-date
Useful commands
Monitor autoscaling with
ray exec .+ 'tail -n 100 -f /tmp/ray/session_latest/logs/monitor\*'
Connect to a terminal on the cluster head:
ray attach .+
Get a remote shell to the cluster manually:
ssh .+
@@ -0,0 +1,43 @@
auth:
ssh_user: ubuntu
cluster_name: test-cli
docker:
image: rayproject/ray:1.0.0
container_name: raydocker
pull_before_run: True
run_options: []
file_mounts:
~/tests: .
head_node:
ImageId: latest_dlami
InstanceType: t3a.small
head_setup_commands:
- echo head
head_start_ray_commands:
- ray stop
- ray start --head --autoscaling-config=~/ray_bootstrap_config.yaml
idle_timeout_minutes: 5
initial_workers: 1
initialization_commands:
- echo init
max_workers: 2
min_workers: 1
provider:
availability_zone: us-west-2a
key_pair:
key_name: __test-cli
region: us-west-2
type: aws
setup_commands:
- echo a
- echo b
- echo ${echo hi}
target_utilization_fraction: 0.9
worker_nodes:
ImageId: latest_dlami
InstanceType: t3a.small
worker_setup_commands:
- echo worker
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP
@@ -20,7 +20,7 @@
.+\.py.*Prepared bootstrap config
.+\.py.*AWSNodeProvider: Set tag ray-node-status=waiting-for-ssh on \['.+'\] \[LogTimer=.+\]
.+\.py.*New status: waiting-for-ssh
.+\.py.*\[1/6\] Waiting for SSH to become available
.+\.py.*\[1/7\] Waiting for SSH to become available
.+\.py.*Running `uptime` as a test\.
.+\.py.*Fetched IP: .+
.+\.py.*NodeUpdater: .+: Got IP \[LogTimer=.+\]
@@ -31,7 +31,7 @@
.+\.py.*Updating cluster configuration\. \[hash=.+\]
.+\.py.*AWSNodeProvider: Set tag ray-node-status=syncing-files on \['.+'\] \[LogTimer=.+\]
.+\.py.*New status: syncing-files
.+\.py.*\[3/6\] Processing file mounts
.+\.py.*\[2/7\] Processing file mounts
.+\.py.*Running `mkdir -p ~/tests`
.+\.py.*Full command is `ssh.+`
.+\.py.*Running `rsync.+`
@@ -50,14 +50,15 @@
.+\.py.*`rsync`ed .+__test-cli_key-1\.pem \(local\) to ~/ray_bootstrap_key\.pem \(remote\)
.+\.py.*~/ray_bootstrap_key\.pem from .+__test-cli_key-1\.pem
.+\.py.*NodeUpdater: i-.+: Synced .+.pem to ~/ray_bootstrap_key\.pem \[LogTimer=.+\]
.+\.py.*\[4/6\] No worker file mounts to sync
.+\.py.*\[3/7\] No worker file mounts to sync
.+\.py.*AWSNodeProvider: Set tag ray-node-status=setting-up on \['.+'\] \[LogTimer=.+\]
.+\.py.*New status: setting-up
.+\.py.*\[3/5\] Running initialization commands
.+\.py.*\[4/7\] Running initialization commands
.+\.py.*Running `echo init`
.+\.py.*Full command is `ssh.+`
.+\.py.*NodeUpdater: i-.+: Initialization commands succeeded \[LogTimer=.+\]
.+\.py.*\[4/6\] Running setup commands
.+\.py.*\[5/7\] Initalizing command runner
.+\.py.*\[6/7\] Running setup commands
.+\.py.*\(0/4\) echo a
.+\.py.*Running `echo a`
.+\.py.*Full command is `ssh.+`
@@ -71,7 +72,7 @@
.+\.py.*Running `echo head`
.+\.py.*Full command is `ssh.+`
.+\.py.*NodeUpdater: i-.+: Setup commands succeeded \[LogTimer=.+\]
.+\.py.*\[6/6\] Starting the Ray runtime
.+\.py.*\[7/7\] Starting the Ray runtime
.+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray stop`
.+\.py.*Full command is `ssh.+`
.+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml`