From b9e907aa4d3a4b7db367eb5e33829d3eddbdadc7 Mon Sep 17 00:00:00 2001 From: Ameer Haj Ali Date: Tue, 15 Sep 2020 06:28:38 +0300 Subject: [PATCH] Fix abstraction violations in command_runner interface (#10715) * Fix abstraction violations in command_runner interface * user guide * lint * breaking abstraction in commands * extra initialization commands * more cleanup * small fixes * fix test_integration_kubernetes.py * lint Co-authored-by: root Co-authored-by: Ameer Haj Ali --- doc/source/tune/user-guide.rst | 8 +- python/ray/autoscaler/command_runner.py | 73 +++++++++++-------- python/ray/autoscaler/commands.py | 19 ++--- python/ray/autoscaler/updater.py | 20 ++--- python/ray/tune/integration/kubernetes.py | 28 ++----- .../tune/tests/test_integration_kubernetes.py | 44 +---------- 6 files changed, 72 insertions(+), 120 deletions(-) diff --git a/doc/source/tune/user-guide.rst b/doc/source/tune/user-guide.rst index 2d5b4b55b..e65acd087 100644 --- a/doc/source/tune/user-guide.rst +++ b/doc/source/tune/user-guide.rst @@ -461,19 +461,13 @@ You have to specify your Kubernetes namespace explicitly: from ray.tune.integration.kubernetes import NamespacedKubernetesSyncer sync_config = tune.SyncConfig( - sync_to_driver=NamespacedKubernetesSyncer("ray", use_rsync=True) + sync_to_driver=NamespacedKubernetesSyncer("ray") ) tune.run(train, sync_config=sync_config) -The ``KubernetesSyncer`` supports two modes for file synchronisation. Per -default, files are synchronized with ``kubectl cp``, requiring the ``tar`` -binary in your pods. If you would like to use ``rsync`` instead your pods -will have to have ``rsync`` installed. Use the ``use_rsync`` parameter to -decide between the two options. - .. _tune-log_to_file: Redirecting stdout and stderr to files diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index 3df2ae4fb..8ef132043 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -9,6 +9,7 @@ import os import subprocess import sys import time +import warnings from ray.autoscaler.docker import check_bind_mounts_cmd, \ check_docker_running_cmd, \ @@ -114,6 +115,7 @@ class CommandRunnerInterface: environment_variables: Dict[str, object] = None, run_env: str = "auto", ssh_options_override_ssh_key: str = "", + shutdown_after_run: bool = False, ) -> str: """Run the given command on the cluster node and optionally get output. @@ -133,6 +135,8 @@ class CommandRunnerInterface: DockerCommandRunner to determine the run environment. ssh_options_override_ssh_key (str): if provided, overwrites SSHOptions class with SSHOptions(ssh_options_override_ssh_key). + shutdown_after_run (bool): if provided, shutdowns down the machine + after executing the command with `sudo shutdown -h now`. """ raise NotImplementedError @@ -164,6 +168,15 @@ class CommandRunnerInterface: """Return the command the user can use to open a shell.""" raise NotImplementedError + def run_init(self, *, as_head: bool, file_mounts: Dict[str, str]) -> None: + """Used to run extra initialization commands. + + Args: + as_head (bool): Run as head image or worker. + file_mounts (dict): Files to copy to the head and worker nodes. + """ + pass + class KubernetesCommandRunner(CommandRunnerInterface): def __init__(self, log_prefix, namespace, node_id, auth_config, @@ -185,7 +198,10 @@ class KubernetesCommandRunner(CommandRunnerInterface): environment_variables: Dict[str, object] = None, run_env="auto", # Unused argument. ssh_options_override_ssh_key="", # Unused argument. + shutdown_after_run=False, ): + if shutdown_after_run: + cmd += "; sudo shutdown -h now" if cmd and port_forward: raise Exception( "exec with Kubernetes can't forward ports and execute" @@ -250,19 +266,17 @@ class KubernetesCommandRunner(CommandRunnerInterface): "{}@{}:{}".format(self.node_id, self.namespace, target), ]) except Exception as e: - logger.warning(self.log_prefix + - "rsync failed: '{}'. Falling back to 'kubectl cp'" - .format(e)) - self.run_cp_up(source, target) + warnings.warn( + self.log_prefix + + "rsync failed: '{}'. Falling back to 'kubectl cp'".format(e), + UserWarning) + if target.startswith("~"): + target = "/root" + target[1:] - def run_cp_up(self, source, target): - if target.startswith("~"): - target = "/root" + target[1:] - - self.process_runner.check_call(self.kubectl + [ - "cp", source, "{}/{}:{}".format(self.namespace, self.node_id, - target) - ]) + self.process_runner.check_call(self.kubectl + [ + "cp", source, "{}/{}:{}".format(self.namespace, self.node_id, + target) + ]) def run_rsync_down(self, source, target, options=None): if target.startswith("~"): @@ -276,19 +290,17 @@ class KubernetesCommandRunner(CommandRunnerInterface): target, ]) except Exception as e: - logger.warning(self.log_prefix + - "rsync failed: '{}'. Falling back to 'kubectl cp'" - .format(e)) - self.run_cp_down(source, target) + warnings.warn( + self.log_prefix + + "rsync failed: '{}'. Falling back to 'kubectl cp'".format(e), + UserWarning) + if target.startswith("~"): + target = "/root" + target[1:] - def run_cp_down(self, source, target): - if target.startswith("~"): - target = "/root" + target[1:] - - self.process_runner.check_call(self.kubectl + [ - "cp", "{}/{}:{}".format(self.namespace, self.node_id, source), - target - ]) + self.process_runner.check_call(self.kubectl + [ + "cp", "{}/{}:{}".format(self.namespace, self.node_id, source), + target + ]) def remote_shell_command_str(self): return "{} exec -it {} bash".format(" ".join(self.kubectl), @@ -480,7 +492,10 @@ class SSHCommandRunner(CommandRunnerInterface): environment_variables: Dict[str, object] = None, run_env="auto", # Unused argument. ssh_options_override_ssh_key="", + shutdown_after_run=False, ): + if shutdown_after_run: + cmd += "; sudo shutdown -h now" if ssh_options_override_ssh_key: ssh_options = SSHOptions(ssh_options_override_ssh_key) else: @@ -579,7 +594,6 @@ class DockerCommandRunner(CommandRunnerInterface): self.container_name = docker_config["container_name"] self.docker_config = docker_config self.home_dir = None - self.shutdown = False self.initialized = False def run( @@ -592,6 +606,7 @@ class DockerCommandRunner(CommandRunnerInterface): environment_variables: Dict[str, object] = None, run_env="auto", ssh_options_override_ssh_key="", + shutdown_after_run=False, ): if run_env == "auto": run_env = "host" if cmd.find("docker") == 0 else "docker" @@ -607,8 +622,11 @@ class DockerCommandRunner(CommandRunnerInterface): container_name=self.container_name, with_interactive=True)[0] - if self.shutdown: + if shutdown_after_run: + # sudo shutdown should run after `with_docker_exec` command above cmd += "; sudo shutdown -h now" + # Do not pass shutdown_after_run argument to ssh_command_runner.run() + # since it is handled above. return self.ssh_command_runner.run( cmd, timeout=timeout, @@ -675,9 +693,6 @@ class DockerCommandRunner(CommandRunnerInterface): "following commands to 'initialization_commands':\n" + "\n".join(install_commands)) - def _shutdown_after_next_cmd(self): - self.shutdown = True - def _check_container_status(self): if self.initialized: return True diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 18327847b..b40b0e81a 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -33,7 +33,6 @@ from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler.updater import NodeUpdaterThread from ray.autoscaler.command_runner import set_using_login_shells, \ set_rsync_silent -from ray.autoscaler.command_runner import DockerCommandRunner from ray.autoscaler.log_timer import LogTimer from ray.worker import global_worker from ray.util.debug import log_once @@ -857,18 +856,13 @@ def exec_cluster(config_file: str, file_mounts_contents_hash="", is_head_node=True, docker_config=config.get("docker")) - - is_docker = isinstance(updater.cmd_runner, DockerCommandRunner) - + shutdown_after_run = False if cmd and stop: cmd += "; ".join([ "ray stop", "ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only" ]) - if is_docker and run_env == "docker": - updater.cmd_runner.shutdown_after_next_cmd() - else: - cmd += "; sudo shutdown -h now" + shutdown_after_run = True result = _exec( updater, @@ -877,7 +871,8 @@ def exec_cluster(config_file: str, tmux, port_forward=port_forward, with_output=with_output, - run_env=run_env) + run_env=run_env, + shutdown_after_run=shutdown_after_run) if tmux or screen: attach_command_parts = ["ray attach", config_file] if override_cluster_name is not None: @@ -906,7 +901,8 @@ def _exec(updater, tmux, port_forward=None, with_output=False, - run_env="auto"): + run_env="auto", + shutdown_after_run=False): if cmd: if screen: cmd = [ @@ -926,7 +922,8 @@ def _exec(updater, exit_on_fail=True, port_forward=port_forward, with_output=with_output, - run_env=run_env) + run_env=run_env, + shutdown_after_run=shutdown_after_run) def rsync(config_file: str, diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 8b69714cc..8040dbcbd 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -11,7 +11,7 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \ STATUS_SETTING_UP, STATUS_SYNCING_FILES from ray.autoscaler.command_runner import NODE_START_WAIT_S, \ - ProcessRunnerError, DockerCommandRunner + ProcessRunnerError from ray.autoscaler.log_timer import LogTimer import ray.autoscaler.subprocess_output_util as cmd_output_util @@ -91,6 +91,7 @@ class NodeUpdater: self.cluster_synced_files = cluster_synced_files self.auth_config = auth_config self.is_head_node = is_head_node + self.docker_config = docker_config def run(self): cli_logger.old_info(logger, "{}Updating to {}", self.log_prefix, @@ -181,8 +182,10 @@ class NodeUpdater: with LogTimer(self.log_prefix + "Synced {} to {}".format(local_path, remote_path)): - if not isinstance(self.cmd_runner, DockerCommandRunner): - # The DockerCommandRunner handles this internally + is_docker = (self.docker_config + and self.docker_config["container_name"] != "") + if not is_docker: + # The DockerCommandRunner handles this internally. self.cmd_runner.run( "mkdir -p {}".format(os.path.dirname(remote_path)), run_env="host") @@ -292,9 +295,8 @@ class NodeUpdater: # When resuming from a stopped instance the runtime_hash may be the # same, but the container will not be started. - if isinstance(self.cmd_runner, DockerCommandRunner): - self.cmd_runner.run_init( - as_head=self.is_head_node, file_mounts=self.file_mounts) + self.cmd_runner.run_init( + as_head=self.is_head_node, file_mounts=self.file_mounts) else: cli_logger.print( @@ -347,10 +349,8 @@ class NodeUpdater: cli_logger.print( "No initialization commands to run.", _numbered=("[]", 3, 6)) - if isinstance(self.cmd_runner, DockerCommandRunner): - self.cmd_runner.run_init( - as_head=self.is_head_node, - file_mounts=self.file_mounts) + self.cmd_runner.run_init( + as_head=self.is_head_node, file_mounts=self.file_mounts) if self.setup_commands: with cli_logger.group( "Running setup commands", diff --git a/python/ray/tune/integration/kubernetes.py b/python/ray/tune/integration/kubernetes.py index 78f24ef35..0aa0c07ab 100644 --- a/python/ray/tune/integration/kubernetes.py +++ b/python/ray/tune/integration/kubernetes.py @@ -7,15 +7,11 @@ from ray.tune.syncer import NodeSyncer from ray.tune.sync_client import SyncClient -def NamespacedKubernetesSyncer(namespace, use_rsync=False): +def NamespacedKubernetesSyncer(namespace): """Wrapper to return a ``KubernetesSyncer`` for a Kubernetes namespace. Args: namespace (str): Kubernetes namespace. - use_rsync (bool): Use ``rsync`` if True or ``kubectl cp`` - if False. If True, ``rsync`` will need to be - installed in the Kubernetes pods for this to work. - If False, ``tar`` will need to be installed instead. Returns: A ``KubernetesSyncer`` class to be passed to ``tune.run()``. @@ -31,7 +27,6 @@ def NamespacedKubernetesSyncer(namespace, use_rsync=False): class _NamespacedKubernetesSyncer(KubernetesSyncer): _namespace = namespace - _use_rsync = use_rsync return _NamespacedKubernetesSyncer @@ -49,7 +44,6 @@ class KubernetesSyncer(NodeSyncer): """ _namespace = "ray" - _use_rsync = False def __init__(self, local_dir, remote_dir, sync_client=None): self.local_ip = services.get_node_ip_address() @@ -58,8 +52,7 @@ class KubernetesSyncer(NodeSyncer): self.worker_node = None sync_client = sync_client or KubernetesSyncClient( - namespace=self.__class__._namespace, - use_rsync=self.__class__._use_rsync) + namespace=self.__class__._namespace) super(NodeSyncer, self).__init__(local_dir, remote_dir, sync_client) @@ -97,18 +90,13 @@ class KubernetesSyncClient(SyncClient): Args: namespace (str): Namespace in which the pods live. - use_rsync (bool): Use ``rsync`` if True or ``kubectl cp`` - if False. If True, ``rsync`` will need to be - installed in the Kubernetes pods for this to work. - If False, ``tar`` will need to be installed instead. process_runner: How commands should be called. Defaults to ``subprocess``. """ - def __init__(self, namespace, use_rsync=False, process_runner=subprocess): + def __init__(self, namespace, process_runner=subprocess): self.namespace = namespace - self.use_rsync = use_rsync self._process_runner = process_runner self._command_runners = {} @@ -141,10 +129,7 @@ class KubernetesSyncClient(SyncClient): target_dir += "/" if not target_dir.endswith("/") else "" command_runner = self._get_command_runner(target_node) - if self.use_rsync: - command_runner.run_rsync_up(source, target_dir) - else: - command_runner.run_cp_up(source, target_dir) + command_runner.run_rsync_up(source, target_dir) return True def sync_down(self, source, target): @@ -156,10 +141,7 @@ class KubernetesSyncClient(SyncClient): target += "/" if not target.endswith("/") else "" command_runner = self._get_command_runner(source_node) - if self.use_rsync: - command_runner.run_rsync_down(source_dir, target) - else: - command_runner.run_cp_down(source_dir, target) + command_runner.run_rsync_down(source_dir, target) return True def delete(self, target): diff --git a/python/ray/tune/tests/test_integration_kubernetes.py b/python/ray/tune/tests/test_integration_kubernetes.py index 588179a32..0699cdc27 100644 --- a/python/ray/tune/tests/test_integration_kubernetes.py +++ b/python/ray/tune/tests/test_integration_kubernetes.py @@ -33,11 +33,10 @@ class _MockLookup: return self.ip_to_node[ip] -def _create_mock_syncer(namespace, lookup, use_rsync, process_runner, local_ip, - local_dir, remote_dir): +def _create_mock_syncer(namespace, lookup, process_runner, local_ip, local_dir, + remote_dir): class _MockSyncer(KubernetesSyncer): _namespace = namespace - _use_rsync = use_rsync _get_kubernetes_node_by_ip = lookup def __init__(self, local_dir, remote_dir, sync_client): @@ -54,9 +53,7 @@ def _create_mock_syncer(namespace, lookup, use_rsync, process_runner, local_ip, local_dir, remote_dir, sync_client=KubernetesSyncClient( - namespace=namespace, - use_rsync=use_rsync, - process_runner=process_runner)) + namespace=namespace, process_runner=process_runner)) class KubernetesIntegrationTest(unittest.TestCase): @@ -74,42 +71,9 @@ class KubernetesIntegrationTest(unittest.TestCase): def tearDown(self): pass - def testKubernetesCpUpDown(self): - syncer = _create_mock_syncer( - self.namespace, self.lookup, False, self.process_runner, - self.lookup.get_ip("head"), self.local_dir, self.remote_dir) - - syncer.set_worker_ip(self.lookup.get_ip("w1")) - - # Test sync up. Should add / to the dirs and call kubectl cp - syncer.sync_up() - - self.assertEqual(self.process_runner.history[-1], [ - "kubectl", "-n", self.namespace, "cp", self.local_dir + "/", - "{}/{}:{}".format(self.namespace, "w1", self.remote_dir + "/") - ]) - - # Test sync down. - syncer.sync_down() - self.assertEqual(self.process_runner.history[-1], [ - "kubectl", "-n", self.namespace, "cp", "{}/{}:{}".format( - self.namespace, - "w1", - self.remote_dir + "/", - ), self.local_dir + "/" - ]) - - # Sync to same node should be ignored - syncer.set_worker_ip(self.lookup.get_ip("head")) - syncer.sync_up() - self.assertTrue(len(self.process_runner.history) == 2) - - syncer.sync_down() - self.assertTrue(len(self.process_runner.history) == 2) - def testKubernetesRsyncUpDown(self): syncer = _create_mock_syncer( - self.namespace, self.lookup, True, self.process_runner, + self.namespace, self.lookup, self.process_runner, self.lookup.get_ip("head"), self.local_dir, self.remote_dir) syncer.set_worker_ip(self.lookup.get_ip("w1"))