diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 00e6205b9..b94200907 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -50,7 +50,6 @@ class KubernetesCommandRunner(object): def run(self, cmd, timeout=120, - redirect=None, allocate_tty=False, exit_on_fail=False, port_forward=None): @@ -76,11 +75,7 @@ class KubernetesCommandRunner(object): "--", ] + with_interactive(cmd) try: - self.process_runner.check_call( - " ".join(final_cmd), - shell=True, - stdout=redirect, - stderr=redirect) + self.process_runner.check_call(" ".join(final_cmd), shell=True) except subprocess.CalledProcessError: if exit_on_fail: quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) @@ -107,57 +102,45 @@ class KubernetesCommandRunner(object): "Killing port forward with SIGKILL.") port_forward_process.kill() - def run_rsync_up(self, source, target, redirect=None): + def run_rsync_up(self, source, target): if target.startswith("~"): target = "/root" + target[1:] try: - self.process_runner.check_call( - [ - KUBECTL_RSYNC, - "-avz", - source, - "{}@{}:{}".format(self.node_id, self.namespace, target), - ], - stdout=redirect, - stderr=redirect) + self.process_runner.check_call([ + KUBECTL_RSYNC, + "-avz", + source, + "{}@{}:{}".format(self.node_id, self.namespace, target), + ]) except Exception as e: logger.warning(self.log_prefix + "rsync failed: '{}'. Falling back to 'kubectl cp'" .format(e)) - self.process_runner.check_call( - self.kubectl + [ - "cp", source, "{}/{}:{}".format(self.namespace, - self.node_id, target) - ], - stdout=redirect, - stderr=redirect) + self.process_runner.check_call(self.kubectl + [ + "cp", source, "{}/{}:{}".format(self.namespace, self.node_id, + target) + ]) - def run_rsync_down(self, source, target, redirect=None): + def run_rsync_down(self, source, target): if target.startswith("~"): target = "/root" + target[1:] try: - self.process_runner.check_call( - [ - KUBECTL_RSYNC, - "-avz", - "{}@{}:{}".format(self.node_id, self.namespace, source), - target, - ], - stdout=redirect, - stderr=redirect) + self.process_runner.check_call([ + KUBECTL_RSYNC, + "-avz", + "{}@{}:{}".format(self.node_id, self.namespace, source), + target, + ]) except Exception as e: logger.warning(self.log_prefix + "rsync failed: '{}'. Falling back to 'kubectl cp'" .format(e)) - self.process_runner.check_call( - self.kubectl + [ - "cp", "{}/{}:{}".format(self.namespace, self.node_id, - source), target - ], - stdout=redirect, - stderr=redirect) + self.process_runner.check_call(self.kubectl + [ + "cp", "{}/{}:{}".format(self.namespace, self.node_id, source), + target + ]) def remote_shell_command_str(self): return "{} exec -it {} bash".format(" ".join(self.kubectl), @@ -231,27 +214,21 @@ class SSHCommandRunner(object): # This should run before any SSH commands and therefore ensure that # the ControlPath directory exists, allowing SSH to maintain # persistent sessions later on. - with open("/dev/null", "w") as redirect: - try: - self.process_runner.check_call( - ["mkdir", "-p", self.ssh_control_path], - stdout=redirect, - stderr=redirect) - except subprocess.CalledProcessError as e: - logger.warning(e) + try: + self.process_runner.check_call( + ["mkdir", "-p", self.ssh_control_path]) + except subprocess.CalledProcessError as e: + logger.warning(e) - try: - self.process_runner.check_call( - ["chmod", "0700", self.ssh_control_path], - stdout=redirect, - stderr=redirect) - except subprocess.CalledProcessError as e: - logger.warning(self.log_prefix + str(e)) + try: + self.process_runner.check_call( + ["chmod", "0700", self.ssh_control_path]) + except subprocess.CalledProcessError as e: + logger.warning(self.log_prefix + str(e)) def run(self, cmd, timeout=120, - redirect=None, allocate_tty=False, exit_on_fail=False, port_forward=None): @@ -271,8 +248,7 @@ class SSHCommandRunner(object): "{}@{}".format(self.ssh_user, self.ssh_ip) ] + with_interactive(cmd) try: - self.process_runner.check_call( - final_cmd, stdout=redirect, stderr=redirect) + self.process_runner.check_call(final_cmd) except subprocess.CalledProcessError: if exit_on_fail: quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) @@ -282,27 +258,21 @@ class SSHCommandRunner(object): else: raise - def run_rsync_up(self, source, target, redirect=None): + def run_rsync_up(self, source, target): self.set_ssh_ip_if_required() - self.process_runner.check_call( - [ - "rsync", "--rsh", - " ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz", - source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target) - ], - stdout=redirect, - stderr=redirect) + self.process_runner.check_call([ + "rsync", "--rsh", + " ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz", + source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target) + ]) - def run_rsync_down(self, source, target, redirect=None): + def run_rsync_down(self, source, target): self.set_ssh_ip_if_required() - self.process_runner.check_call( - [ - "rsync", "--rsh", - " ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz", - "{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target - ], - stdout=redirect, - stderr=redirect) + self.process_runner.check_call([ + "rsync", "--rsh", + " ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz", + "{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target + ]) def remote_shell_command_str(self): return "ssh -i {} {}@{}\n".format(self.ssh_private_key, self.ssh_user, @@ -391,7 +361,7 @@ class NodeUpdater(object): "Synced {} to {}".format(local_path, remote_path)): self.cmd_runner.run("mkdir -p {}".format( os.path.dirname(remote_path))) - sync_cmd(local_path, remote_path, redirect=None) + sync_cmd(local_path, remote_path) def wait_ready(self, deadline): with LogTimer(self.log_prefix + "Got remote shell"): @@ -403,10 +373,7 @@ class NodeUpdater(object): logger.debug(self.log_prefix + "Waiting for remote shell...") - # Setting redirect=None allows the user to see errors like - # unix_listener: path "/tmp/rkn_ray_ssh_sockets/..." too - # long for Unix domain socket. - self.cmd_runner.run("uptime", timeout=5, redirect=None) + self.cmd_runner.run("uptime", timeout=5) logger.debug("Uptime succeeded.") return True @@ -455,15 +422,15 @@ class NodeUpdater(object): for cmd in self.ray_start_commands: self.cmd_runner.run(cmd) - def rsync_up(self, source, target, redirect=None): + def rsync_up(self, source, target): logger.info(self.log_prefix + "Syncing {} to {}...".format(source, target)) - self.cmd_runner.run_rsync_up(source, target, redirect=None) + self.cmd_runner.run_rsync_up(source, target) - def rsync_down(self, source, target, redirect=None): + def rsync_down(self, source, target): logger.info(self.log_prefix + "Syncing {} from {}...".format(source, target)) - self.cmd_runner.run_rsync_down(source, target, redirect=None) + self.cmd_runner.run_rsync_down(source, target) class NodeUpdaterThread(NodeUpdater, Thread):