From 23926f3e6e07c586d747e37d4c5e41140e16947e Mon Sep 17 00:00:00 2001 From: Maksim Smolin Date: Tue, 17 Nov 2020 00:04:39 -0800 Subject: [PATCH] [CLI] Docker Support (#11761) Co-authored-by: Richard Liaw --- .../ray/autoscaler/_private/command_runner.py | 35 +++++++---- .../_private/subprocess_output_util.py | 10 ++-- python/ray/autoscaler/_private/updater.py | 32 ++++++---- python/ray/tests/test_cli.py | 59 +++++++++++++++---- .../tests/test_cli_patterns/test_ray_up.txt | 13 ++-- .../test_cli_patterns/test_ray_up_config.yaml | 2 +- .../test_cli_patterns/test_ray_up_docker.txt | 48 +++++++++++++++ .../test_ray_up_docker_config.yaml | 43 ++++++++++++++ .../test_cli_patterns/test_ray_up_record.txt | 13 ++-- 9 files changed, 200 insertions(+), 55 deletions(-) create mode 100644 python/ray/tests/test_cli_patterns/test_ray_up_docker.txt create mode 100644 python/ray/tests/test_cli_patterns/test_ray_up_docker_config.yaml diff --git a/python/ray/autoscaler/_private/command_runner.py b/python/ray/autoscaler/_private/command_runner.py index 3b3692a2c..99367f96a 100644 --- a/python/ray/autoscaler/_private/command_runner.py +++ b/python/ray/autoscaler/_private/command_runner.py @@ -431,7 +431,7 @@ class SSHCommandRunner(CommandRunnerInterface): run_env="auto", # Unused argument. ssh_options_override_ssh_key="", shutdown_after_run=False, - ): + silent=False): if shutdown_after_run: cmd += "; sudo shutdown -h now" if ssh_options_override_ssh_key: @@ -483,9 +483,11 @@ class SSHCommandRunner(CommandRunnerInterface): if cli_logger.verbosity > 0: with cli_logger.indented(): - return self._run_helper(final_cmd, with_output, exit_on_fail) + return self._run_helper( + final_cmd, with_output, exit_on_fail, silent=silent) else: - return self._run_helper(final_cmd, with_output, exit_on_fail) + return self._run_helper( + final_cmd, with_output, exit_on_fail, silent=silent) def _create_rsync_filter_args(self, options): rsync_excludes = options.get("rsync_exclude") or [] @@ -574,11 +576,12 @@ class DockerCommandRunner(CommandRunnerInterface): if run_env == "docker": cmd = self._docker_expand_user(cmd, any_char=True) - cmd = " ".join(_with_interactive(cmd)) + if is_using_login_shells(): + cmd = " ".join(_with_interactive(cmd)) cmd = with_docker_exec( [cmd], container_name=self.container_name, - with_interactive=True)[0] + with_interactive=is_using_login_shells())[0] if shutdown_after_run: # sudo shutdown should run after `with_docker_exec` command above @@ -600,7 +603,8 @@ class DockerCommandRunner(CommandRunnerInterface): self.ssh_command_runner.cluster_name), target.lstrip("/")) self.ssh_command_runner.run( - f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}") + f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}", + silent=is_rsync_silent()) self.ssh_command_runner.run_rsync_up( source, host_destination, options=options) @@ -610,9 +614,11 @@ class DockerCommandRunner(CommandRunnerInterface): # Adding a "." means that docker copies the *contents* # Without it, docker copies the source *into* the target host_destination += "/." - self.ssh_command_runner.run("docker cp {} {}:{}".format( - host_destination, self.container_name, - self._docker_expand_user(target))) + self.ssh_command_runner.run( + "docker cp {} {}:{}".format(host_destination, + self.container_name, + self._docker_expand_user(target)), + silent=is_rsync_silent()) def run_rsync_down(self, source, target, options=None): options = options or {} @@ -620,15 +626,18 @@ class DockerCommandRunner(CommandRunnerInterface): self._get_docker_host_mount_location( self.ssh_command_runner.cluster_name), source.lstrip("/")) self.ssh_command_runner.run( - f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}") + f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}", + silent=is_rsync_silent()) if source[-1] == "/": source += "." # Adding a "." means that docker copies the *contents* # Without it, docker copies the source *into* the target if not options.get("docker_mount_if_possible", False): - self.ssh_command_runner.run("docker cp {}:{} {}".format( - self.container_name, self._docker_expand_user(source), - host_source)) + self.ssh_command_runner.run( + "docker cp {}:{} {}".format(self.container_name, + self._docker_expand_user(source), + host_source), + silent=is_rsync_silent()) self.ssh_command_runner.run_rsync_down( host_source, target, options=options) diff --git a/python/ray/autoscaler/_private/subprocess_output_util.py b/python/ray/autoscaler/_private/subprocess_output_util.py index 22bc981e5..88074975b 100644 --- a/python/ray/autoscaler/_private/subprocess_output_util.py +++ b/python/ray/autoscaler/_private/subprocess_output_util.py @@ -164,7 +164,7 @@ def _read_subprocess_stream(f, output_file, is_stdout=False): cli_logger.error(line) - if output_file is not None: + if output_file is not None and output_file != subprocess.DEVNULL: output_file.write(line + "\n") return detected_special_case @@ -238,10 +238,6 @@ def _run_and_process_output(cmd, # See implementation note #1 if use_login_shells or process_runner != subprocess: - if stdout_file is None: - stdout_file = sys.stdout - if stderr_file is None: - stderr_file = sys.stderr return process_runner.check_call( cmd, # See implementation note #2 @@ -333,7 +329,8 @@ def run_cmd_redirected(cmd, return _run_and_process_output( cmd, process_runner=process_runner, - stdout_file=None, + stdout_file=process_runner.DEVNULL, + stderr_file=process_runner.DEVNULL, use_login_shells=use_login_shells) if not is_output_redirected(): @@ -341,6 +338,7 @@ def run_cmd_redirected(cmd, cmd, process_runner=process_runner, stdout_file=sys.stdout, + stderr_file=sys.stderr, use_login_shells=use_login_shells) else: tmpfile_path = os.path.join( diff --git a/python/ray/autoscaler/_private/updater.py b/python/ray/autoscaler/_private/updater.py index d32c75364..0fbf84d94 100644 --- a/python/ray/autoscaler/_private/updater.py +++ b/python/ray/autoscaler/_private/updater.py @@ -22,6 +22,7 @@ from ray.autoscaler._private.event_system import (CreateClusterEvent, logger = logging.getLogger(__name__) +NUM_SETUP_STEPS = 7 READY_CHECK_INTERVAL = 5 @@ -225,7 +226,8 @@ class NodeUpdater: def wait_ready(self, deadline): with cli_logger.group( - "Waiting for SSH to become available", _numbered=("[]", 1, 6)): + "Waiting for SSH to become available", + _numbered=("[]", 1, NUM_SETUP_STEPS)): with LogTimer(self.log_prefix + "Got remote shell"): cli_logger.print("Running `{}` as a test.", cf.bold("uptime")) @@ -252,7 +254,7 @@ class NodeUpdater: # however threading this configuration state # is a pain and I'm leaving it for later - retry_str = str(e) + retry_str = "(" + str(e) + ")" if hasattr(e, "cmd"): retry_str = "(Exit Status {}): {}".format( e.returncode, " ".join(e.cmd)) @@ -296,7 +298,7 @@ class NodeUpdater: cli_logger.print( "Configuration already up to date, " "skipping file mounts, initalization and setup commands.", - _numbered=("[]", "2-5", 6)) + _numbered=("[]", "2-6", NUM_SETUP_STEPS)) else: cli_logger.print( @@ -306,7 +308,8 @@ class NodeUpdater: self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: STATUS_SYNCING_FILES}) cli_logger.labeled_value("New status", STATUS_SYNCING_FILES) - self.sync_file_mounts(self.rsync_up, step_numbers=(2, 6)) + self.sync_file_mounts( + self.rsync_up, step_numbers=(1, NUM_SETUP_STEPS)) # Only run setup commands if runtime_hash has changed because # we don't want to run setup_commands every time the head node @@ -320,7 +323,7 @@ class NodeUpdater: if self.initialization_commands: with cli_logger.group( "Running initialization commands", - _numbered=("[]", 3, 5)): + _numbered=("[]", 4, NUM_SETUP_STEPS)): global_event_system.execute_callback( CreateClusterEvent.run_initialization_cmd) with LogTimer( @@ -353,14 +356,19 @@ class NodeUpdater: else: cli_logger.print( "No initialization commands to run.", - _numbered=("[]", 3, 6)) - self.cmd_runner.run_init( - as_head=self.is_head_node, file_mounts=self.file_mounts) + _numbered=("[]", 4, NUM_SETUP_STEPS)) + with cli_logger.group( + "Initalizing command runner", + # todo: fix command numbering + _numbered=("[]", 5, NUM_SETUP_STEPS)): + self.cmd_runner.run_init( + as_head=self.is_head_node, + file_mounts=self.file_mounts) if self.setup_commands: with cli_logger.group( "Running setup commands", # todo: fix command numbering - _numbered=("[]", 4, 6)): + _numbered=("[]", 6, NUM_SETUP_STEPS)): global_event_system.execute_callback( CreateClusterEvent.run_setup_cmd) with LogTimer( @@ -395,10 +403,12 @@ class NodeUpdater: "Setup command failed.") else: cli_logger.print( - "No setup commands to run.", _numbered=("[]", 4, 6)) + "No setup commands to run.", + _numbered=("[]", 6, NUM_SETUP_STEPS)) with cli_logger.group( - "Starting the Ray runtime", _numbered=("[]", 6, 6)): + "Starting the Ray runtime", _numbered=("[]", 7, + NUM_SETUP_STEPS)): global_event_system.execute_callback( CreateClusterEvent.start_ray_runtime) with LogTimer( diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 696a0271a..1f940674a 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -44,9 +44,9 @@ boto3_list = [{ "DefaultVCpus": 1 } }, { - "InstanceType": "m4.xlarge", + "InstanceType": "t3a.small", "VCpuInfo": { - "DefaultVCpus": 4 + "DefaultVCpus": 2 } }, { "InstanceType": "m4.4xlarge", @@ -184,6 +184,10 @@ def _check_output_via_pattern(name, result): DEFAULT_TEST_CONFIG_PATH = str( Path(__file__).parent / "test_cli_patterns" / "test_ray_up_config.yaml") +DOCKER_TEST_CONFIG_PATH = str( + Path(__file__).parent / "test_cli_patterns" / + "test_ray_up_docker_config.yaml") + @pytest.mark.skipif( sys.platform == "darwin" and "travis" in os.environ.get("USER", ""), @@ -210,12 +214,12 @@ def test_ray_up(configure_lang, _unlink_test_ssh_key, configure_aws): # unfortunately, cutting out SSH prefixes and such # is, to put it lightly, non-trivial if "uptime" in command: - return PopenBehaviour(stdout="MOCKED uptime") + return PopenBehaviour(stdout=b"MOCKED uptime") if "rsync" in command: - return PopenBehaviour(stdout="MOCKED rsync") + return PopenBehaviour(stdout=b"MOCKED rsync") if "ray" in command: - return PopenBehaviour(stdout="MOCKED ray") - return PopenBehaviour(stdout="MOCKED GENERIC") + return PopenBehaviour(stdout=b"MOCKED ray") + return PopenBehaviour(stdout=b"MOCKED GENERIC") with _setup_popen_mock(commands_mock): # config cache does not work with mocks @@ -227,6 +231,37 @@ def test_ray_up(configure_lang, _unlink_test_ssh_key, configure_aws): _check_output_via_pattern("test_ray_up.txt", result) +@pytest.mark.skipif( + sys.platform == "darwin" and "travis" in os.environ.get("USER", ""), + reason=("Mac builds don't provide proper locale support")) +@mock_ec2 +@mock_iam +def test_ray_up_docker(configure_lang, _unlink_test_ssh_key, configure_aws): + def commands_mock(command, stdin): + # if we want to have e.g. some commands fail, + # we can have overrides happen here. + # unfortunately, cutting out SSH prefixes and such + # is, to put it lightly, non-trivial + if ".Config.Env" in command: + return PopenBehaviour(stdout=b"{}") + if "uptime" in command: + return PopenBehaviour(stdout=b"MOCKED uptime") + if "rsync" in command: + return PopenBehaviour(stdout=b"MOCKED rsync") + if "ray" in command: + return PopenBehaviour(stdout=b"MOCKED ray") + return PopenBehaviour(stdout=b"MOCKED GENERIC") + + with _setup_popen_mock(commands_mock): + # config cache does not work with mocks + runner = CliRunner() + result = runner.invoke(scripts.up, [ + DOCKER_TEST_CONFIG_PATH, "--no-config-cache", "-y", + "--log-style=pretty", "--log-color", "False" + ]) + _check_output_via_pattern("test_ray_up_docker.txt", result) + + @pytest.mark.skipif( sys.platform == "darwin" and "travis" in os.environ.get("USER", ""), reason=("Mac builds don't provide proper locale support")) @@ -239,12 +274,12 @@ def test_ray_up_record(configure_lang, _unlink_test_ssh_key, configure_aws): # unfortunately, cutting out SSH prefixes and such # is, to put it lightly, non-trivial if "uptime" in command: - return PopenBehaviour(stdout="MOCKED uptime") + return PopenBehaviour(stdout=b"MOCKED uptime") if "rsync" in command: - return PopenBehaviour(stdout="MOCKED rsync") + return PopenBehaviour(stdout=b"MOCKED rsync") if "ray" in command: - return PopenBehaviour(stdout="MOCKED ray") - return PopenBehaviour(stdout="MOCKED GENERIC") + return PopenBehaviour(stdout=b"MOCKED ray") + return PopenBehaviour(stdout=b"MOCKED GENERIC") with _setup_popen_mock(commands_mock): # config cache does not work with mocks @@ -294,7 +329,7 @@ def test_ray_exec(configure_lang, configure_aws, _unlink_test_ssh_key): # TODO(maximsmol): this is a hack since stdout=sys.stdout # doesn't work with the mock for some reason print("This is a test!") - return PopenBehaviour(stdout="This is a test!") + return PopenBehaviour(stdout=b"This is a test!") with _setup_popen_mock(commands_mock): runner = CliRunner() @@ -327,7 +362,7 @@ def test_ray_submit(configure_lang, configure_aws, _unlink_test_ssh_key): # doesn't work with the mock for some reason if "rsync" not in command: print("This is a test!") - return PopenBehaviour(stdout="This is a test!") + return PopenBehaviour(stdout=b"This is a test!") with _setup_popen_mock(commands_mock): runner = CliRunner() diff --git a/python/ray/tests/test_cli_patterns/test_ray_up.txt b/python/ray/tests/test_cli_patterns/test_ray_up.txt index 48bc59c71..6cf8867d5 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up.txt @@ -19,23 +19,24 @@ Acquiring an up-to-date head node <1/1> Setting up head node Prepared bootstrap config New status: waiting-for-ssh - \[1/6\] Waiting for SSH to become available + \[1/7\] Waiting for SSH to become available Running `uptime` as a test\. Fetched IP: .+ Success\. Updating cluster configuration\. \[hash=.+\] New status: syncing-files - \[3/6\] Processing file mounts + \[2/7\] Processing file mounts ~/tests/ from ./ - \[4/6\] No worker file mounts to sync + \[3/7\] No worker file mounts to sync New status: setting-up - \[3/5\] Running initialization commands - \[4/6\] Running setup commands + \[4/7\] Running initialization commands + \[5/7\] Initalizing command runner + \[6/7\] Running setup commands \(0/4\) echo a \(1/4\) echo b \(2/4\) echo \${echo hi} \(3/4\) echo head - \[6/6\] Starting the Ray runtime + \[7/7\] Starting the Ray runtime New status: up-to-date Useful commands diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml b/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml index 1410708f5..4d6342009 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml +++ b/python/ray/tests/test_cli_patterns/test_ray_up_config.yaml @@ -5,7 +5,7 @@ file_mounts: ~/tests: . head_node: ImageId: latest_dlami - InstanceType: t1.micro + InstanceType: t1.micro head_setup_commands: - echo head head_start_ray_commands: diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_docker.txt b/python/ray/tests/test_cli_patterns/test_ray_up_docker.txt new file mode 100644 index 000000000..6cf8867d5 --- /dev/null +++ b/python/ray/tests/test_cli_patterns/test_ray_up_docker.txt @@ -0,0 +1,48 @@ +Cluster: test-cli + +Checking AWS environment settings +AWS config + IAM Profile: .+ \[default\] + EC2 Key pair \(head & workers\): .+ \[default\] + VPC Subnets \(head & workers\): subnet-.+ \[default\] + EC2 Security groups \(head & workers\): sg-.+ \[default\] + EC2 AMI \(head & workers\): ami-.+ \[dlami\] + +No head node found\. Launching a new cluster\. Confirm \[y/N\]: y \[automatic, due to --yes\] + +Acquiring an up-to-date head node + Launched 1 nodes \[subnet_id=subnet-.+\] + Launched instance i-.+ \[state=pending, info=pending\] + Launched a new head node + Fetching the new head node + +<1/1> Setting up head node + Prepared bootstrap config + New status: waiting-for-ssh + \[1/7\] Waiting for SSH to become available + Running `uptime` as a test\. + Fetched IP: .+ + Success\. + Updating cluster configuration\. \[hash=.+\] + New status: syncing-files + \[2/7\] Processing file mounts + ~/tests/ from ./ + \[3/7\] No worker file mounts to sync + New status: setting-up + \[4/7\] Running initialization commands + \[5/7\] Initalizing command runner + \[6/7\] Running setup commands + \(0/4\) echo a + \(1/4\) echo b + \(2/4\) echo \${echo hi} + \(3/4\) echo head + \[7/7\] Starting the Ray runtime + New status: up-to-date + +Useful commands + Monitor autoscaling with + ray exec .+ 'tail -n 100 -f /tmp/ray/session_latest/logs/monitor\*' + Connect to a terminal on the cluster head: + ray attach .+ + Get a remote shell to the cluster manually: + ssh .+ diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_docker_config.yaml b/python/ray/tests/test_cli_patterns/test_ray_up_docker_config.yaml new file mode 100644 index 000000000..8d898f749 --- /dev/null +++ b/python/ray/tests/test_cli_patterns/test_ray_up_docker_config.yaml @@ -0,0 +1,43 @@ +auth: + ssh_user: ubuntu +cluster_name: test-cli +docker: + image: rayproject/ray:1.0.0 + container_name: raydocker + pull_before_run: True + run_options: [] +file_mounts: + ~/tests: . +head_node: + ImageId: latest_dlami + InstanceType: t3a.small +head_setup_commands: + - echo head +head_start_ray_commands: + - ray stop + - ray start --head --autoscaling-config=~/ray_bootstrap_config.yaml +idle_timeout_minutes: 5 +initial_workers: 1 +initialization_commands: + - echo init +max_workers: 2 +min_workers: 1 +provider: + availability_zone: us-west-2a + key_pair: + key_name: __test-cli + region: us-west-2 + type: aws +setup_commands: + - echo a + - echo b + - echo ${echo hi} +target_utilization_fraction: 0.9 +worker_nodes: + ImageId: latest_dlami + InstanceType: t3a.small +worker_setup_commands: + - echo worker +worker_start_ray_commands: + - ray stop + - ray start --address=$RAY_HEAD_IP diff --git a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt index f64150fac..9828de215 100644 --- a/python/ray/tests/test_cli_patterns/test_ray_up_record.txt +++ b/python/ray/tests/test_cli_patterns/test_ray_up_record.txt @@ -20,7 +20,7 @@ .+\.py.*Prepared bootstrap config .+\.py.*AWSNodeProvider: Set tag ray-node-status=waiting-for-ssh on \['.+'\] \[LogTimer=.+\] .+\.py.*New status: waiting-for-ssh -.+\.py.*\[1/6\] Waiting for SSH to become available +.+\.py.*\[1/7\] Waiting for SSH to become available .+\.py.*Running `uptime` as a test\. .+\.py.*Fetched IP: .+ .+\.py.*NodeUpdater: .+: Got IP \[LogTimer=.+\] @@ -31,7 +31,7 @@ .+\.py.*Updating cluster configuration\. \[hash=.+\] .+\.py.*AWSNodeProvider: Set tag ray-node-status=syncing-files on \['.+'\] \[LogTimer=.+\] .+\.py.*New status: syncing-files -.+\.py.*\[3/6\] Processing file mounts +.+\.py.*\[2/7\] Processing file mounts .+\.py.*Running `mkdir -p ~/tests` .+\.py.*Full command is `ssh.+` .+\.py.*Running `rsync.+` @@ -50,14 +50,15 @@ .+\.py.*`rsync`ed .+__test-cli_key-1\.pem \(local\) to ~/ray_bootstrap_key\.pem \(remote\) .+\.py.*~/ray_bootstrap_key\.pem from .+__test-cli_key-1\.pem .+\.py.*NodeUpdater: i-.+: Synced .+.pem to ~/ray_bootstrap_key\.pem \[LogTimer=.+\] -.+\.py.*\[4/6\] No worker file mounts to sync +.+\.py.*\[3/7\] No worker file mounts to sync .+\.py.*AWSNodeProvider: Set tag ray-node-status=setting-up on \['.+'\] \[LogTimer=.+\] .+\.py.*New status: setting-up -.+\.py.*\[3/5\] Running initialization commands +.+\.py.*\[4/7\] Running initialization commands .+\.py.*Running `echo init` .+\.py.*Full command is `ssh.+` .+\.py.*NodeUpdater: i-.+: Initialization commands succeeded \[LogTimer=.+\] -.+\.py.*\[4/6\] Running setup commands +.+\.py.*\[5/7\] Initalizing command runner +.+\.py.*\[6/7\] Running setup commands .+\.py.*\(0/4\) echo a .+\.py.*Running `echo a` .+\.py.*Full command is `ssh.+` @@ -71,7 +72,7 @@ .+\.py.*Running `echo head` .+\.py.*Full command is `ssh.+` .+\.py.*NodeUpdater: i-.+: Setup commands succeeded \[LogTimer=.+\] -.+\.py.*\[6/6\] Starting the Ray runtime +.+\.py.*\[7/7\] Starting the Ray runtime .+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray stop` .+\.py.*Full command is `ssh.+` .+\.py.*Running `export RAY_OVERRIDE_RESOURCES='{"CPU":1}';ray start --head --autoscaling-config=~/ray_bootstrap_config\.yaml`