From 82cdcff898468d81c755ebb73acf7322f620bbd2 Mon Sep 17 00:00:00 2001 From: Ameer Haj Ali Date: Wed, 12 Aug 2020 01:09:49 +0300 Subject: [PATCH] Removing kwargs & SSHOptions args from command runners (#10014) --- python/ray/autoscaler/command_runner.py | 50 +++++++++++++++++-------- python/ray/autoscaler/updater.py | 12 +++--- 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index d2b9ed76b..beac9d946 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -78,15 +78,21 @@ class CommandRunnerInterface: Command runner instances are returned by provider.get_command_runner().""" - def run(self, + def run( + self, cmd: str = None, timeout: int = 120, exit_on_fail: bool = False, port_forward: List[Tuple[int, int]] = None, with_output: bool = False, - **kwargs) -> str: + run_env: str = "auto", + ssh_options_override_ssh_key: str = "", + ) -> str: """Run the given command on the cluster node and optionally get output. + WARNING: the cloudgateway needs arguments of "run" function to be json + dumpable to send them over HTTP requests. + Args: cmd (str): The command to run. timeout (int): The command timeout in seconds. @@ -94,6 +100,10 @@ class CommandRunnerInterface: port_forward (list): List of (local, remote) ports to forward, or a single tuple. with_output (bool): Whether to return output. + run_env (str): Options: docker/host/auto. Used in + DockerCommandRunner to determine the run environment. + ssh_options_override_ssh_key (str): if provided, overwrites + SSHOptions class with SSHOptions(ssh_options_override_ssh_key). """ raise NotImplementedError @@ -130,13 +140,16 @@ class KubernetesCommandRunner(CommandRunnerInterface): self.namespace = namespace self.kubectl = ["kubectl", "-n", self.namespace] - def run(self, + def run( + self, cmd=None, timeout=120, exit_on_fail=False, port_forward=None, with_output=False, - **kwargs): + run_env="auto", # Unused argument. + ssh_options_override_ssh_key="", # Unused argument. + ): if cmd and port_forward: raise Exception( "exec with Kubernetes can't forward ports and execute" @@ -402,15 +415,21 @@ class SSHCommandRunner(CommandRunnerInterface): "SSH command Failed. See above for the output from the" " failure.") from None - def run(self, + def run( + self, cmd, timeout=120, exit_on_fail=False, port_forward=None, with_output=False, - ssh_options_override=None, - **kwargs): - ssh_options = ssh_options_override or self.ssh_options + run_env="auto", # Unused argument. + ssh_options_override_ssh_key="", + ): + + if ssh_options_override_ssh_key: + ssh_options = SSHOptions(ssh_options_override_ssh_key) + else: + ssh_options = self.ssh_options assert isinstance( ssh_options, SSHOptions @@ -506,15 +525,16 @@ class DockerCommandRunner(SSHCommandRunner): self._check_docker_installed() self.shutdown = False - def run(self, + def run( + self, cmd, timeout=120, exit_on_fail=False, port_forward=None, with_output=False, - run_env=True, - ssh_options_override=None, - **kwargs): + run_env="auto", + ssh_options_override_ssh_key="", + ): if run_env == "auto": run_env = "host" if cmd.find("docker") == 0 else "docker" @@ -533,14 +553,14 @@ class DockerCommandRunner(SSHCommandRunner): exit_on_fail=exit_on_fail, port_forward=port_forward, with_output=with_output, - ssh_options_override=ssh_options_override) + ssh_options_override_ssh_key=ssh_options_override_ssh_key) def run_rsync_up(self, source, target): protected_path = target if target.find("/root") == 0: target = target.replace("/root", "/tmp/root") self.ssh_command_runner.run( - f"mkdir -p {os.path.dirname(target.rstrip('/'))}", run_env="host") + f"mkdir -p {os.path.dirname(target.rstrip('/'))}") self.ssh_command_runner.run_rsync_up(source, target) if self._check_container_status(): self.ssh_command_runner.run("docker cp {} {}:{}".format( @@ -552,7 +572,7 @@ class DockerCommandRunner(SSHCommandRunner): if source.find("/root") == 0: source = source.replace("/root", "/tmp/root") self.ssh_command_runner.run( - f"mkdir -p {os.path.dirname(source.rstrip('/'))}", run_env="host") + f"mkdir -p {os.path.dirname(source.rstrip('/'))}") self.ssh_command_runner.run("docker cp {}:{} {}".format( self.docker_name, self._docker_expand_user(protected_path), source)) diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 1b39e72e4..eaa3f7571 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -10,8 +10,7 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \ TAG_RAY_FILE_MOUNTS_CONTENTS, \ STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \ STATUS_SETTING_UP, STATUS_SYNCING_FILES -from ray.autoscaler.command_runner import NODE_START_WAIT_S, SSHOptions, \ - ProcessRunnerError +from ray.autoscaler.command_runner import NODE_START_WAIT_S, ProcessRunnerError from ray.autoscaler.log_timer import LogTimer import ray.autoscaler.subprocess_output_util as cmd_output_util @@ -279,11 +278,14 @@ class NodeUpdater: show_status=True): for cmd in self.initialization_commands: try: + # Overriding the existing SSHOptions class + # with a new SSHOptions class that uses + # this ssh_private_key as its only __init__ + # argument. self.cmd_runner.run( cmd, - ssh_options_override=SSHOptions( - self.auth_config.get( - "ssh_private_key"))) + ssh_options_override_ssh_key=self. + auth_config.get("ssh_private_key")) except ProcessRunnerError as e: if e.msg_type == "ssh_command_failed": cli_logger.error("Failed.")