remove evil redirects (#5919)

This commit is contained in:
Eric Liang
2019-10-14 19:41:04 -07:00
committed by GitHub
parent 5382a26c2e
commit 69d5c1b53a
+52 -85
View File
@@ -50,7 +50,6 @@ class KubernetesCommandRunner(object):
def run(self,
cmd,
timeout=120,
redirect=None,
allocate_tty=False,
exit_on_fail=False,
port_forward=None):
@@ -76,11 +75,7 @@ class KubernetesCommandRunner(object):
"--",
] + with_interactive(cmd)
try:
self.process_runner.check_call(
" ".join(final_cmd),
shell=True,
stdout=redirect,
stderr=redirect)
self.process_runner.check_call(" ".join(final_cmd), shell=True)
except subprocess.CalledProcessError:
if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
@@ -107,57 +102,45 @@ class KubernetesCommandRunner(object):
"Killing port forward with SIGKILL.")
port_forward_process.kill()
def run_rsync_up(self, source, target, redirect=None):
def run_rsync_up(self, source, target):
if target.startswith("~"):
target = "/root" + target[1:]
try:
self.process_runner.check_call(
[
KUBECTL_RSYNC,
"-avz",
source,
"{}@{}:{}".format(self.node_id, self.namespace, target),
],
stdout=redirect,
stderr=redirect)
self.process_runner.check_call([
KUBECTL_RSYNC,
"-avz",
source,
"{}@{}:{}".format(self.node_id, self.namespace, target),
])
except Exception as e:
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.process_runner.check_call(
self.kubectl + [
"cp", source, "{}/{}:{}".format(self.namespace,
self.node_id, target)
],
stdout=redirect,
stderr=redirect)
self.process_runner.check_call(self.kubectl + [
"cp", source, "{}/{}:{}".format(self.namespace, self.node_id,
target)
])
def run_rsync_down(self, source, target, redirect=None):
def run_rsync_down(self, source, target):
if target.startswith("~"):
target = "/root" + target[1:]
try:
self.process_runner.check_call(
[
KUBECTL_RSYNC,
"-avz",
"{}@{}:{}".format(self.node_id, self.namespace, source),
target,
],
stdout=redirect,
stderr=redirect)
self.process_runner.check_call([
KUBECTL_RSYNC,
"-avz",
"{}@{}:{}".format(self.node_id, self.namespace, source),
target,
])
except Exception as e:
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.process_runner.check_call(
self.kubectl + [
"cp", "{}/{}:{}".format(self.namespace, self.node_id,
source), target
],
stdout=redirect,
stderr=redirect)
self.process_runner.check_call(self.kubectl + [
"cp", "{}/{}:{}".format(self.namespace, self.node_id, source),
target
])
def remote_shell_command_str(self):
return "{} exec -it {} bash".format(" ".join(self.kubectl),
@@ -231,27 +214,21 @@ class SSHCommandRunner(object):
# This should run before any SSH commands and therefore ensure that
# the ControlPath directory exists, allowing SSH to maintain
# persistent sessions later on.
with open("/dev/null", "w") as redirect:
try:
self.process_runner.check_call(
["mkdir", "-p", self.ssh_control_path],
stdout=redirect,
stderr=redirect)
except subprocess.CalledProcessError as e:
logger.warning(e)
try:
self.process_runner.check_call(
["mkdir", "-p", self.ssh_control_path])
except subprocess.CalledProcessError as e:
logger.warning(e)
try:
self.process_runner.check_call(
["chmod", "0700", self.ssh_control_path],
stdout=redirect,
stderr=redirect)
except subprocess.CalledProcessError as e:
logger.warning(self.log_prefix + str(e))
try:
self.process_runner.check_call(
["chmod", "0700", self.ssh_control_path])
except subprocess.CalledProcessError as e:
logger.warning(self.log_prefix + str(e))
def run(self,
cmd,
timeout=120,
redirect=None,
allocate_tty=False,
exit_on_fail=False,
port_forward=None):
@@ -271,8 +248,7 @@ class SSHCommandRunner(object):
"{}@{}".format(self.ssh_user, self.ssh_ip)
] + with_interactive(cmd)
try:
self.process_runner.check_call(
final_cmd, stdout=redirect, stderr=redirect)
self.process_runner.check_call(final_cmd)
except subprocess.CalledProcessError:
if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
@@ -282,27 +258,21 @@ class SSHCommandRunner(object):
else:
raise
def run_rsync_up(self, source, target, redirect=None):
def run_rsync_up(self, source, target):
self.set_ssh_ip_if_required()
self.process_runner.check_call(
[
"rsync", "--rsh",
" ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz",
source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target)
],
stdout=redirect,
stderr=redirect)
self.process_runner.check_call([
"rsync", "--rsh",
" ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz",
source, "{}@{}:{}".format(self.ssh_user, self.ssh_ip, target)
])
def run_rsync_down(self, source, target, redirect=None):
def run_rsync_down(self, source, target):
self.set_ssh_ip_if_required()
self.process_runner.check_call(
[
"rsync", "--rsh",
" ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz",
"{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target
],
stdout=redirect,
stderr=redirect)
self.process_runner.check_call([
"rsync", "--rsh",
" ".join(["ssh"] + self.get_default_ssh_options(120)), "-avz",
"{}@{}:{}".format(self.ssh_user, self.ssh_ip, source), target
])
def remote_shell_command_str(self):
return "ssh -i {} {}@{}\n".format(self.ssh_private_key, self.ssh_user,
@@ -391,7 +361,7 @@ class NodeUpdater(object):
"Synced {} to {}".format(local_path, remote_path)):
self.cmd_runner.run("mkdir -p {}".format(
os.path.dirname(remote_path)))
sync_cmd(local_path, remote_path, redirect=None)
sync_cmd(local_path, remote_path)
def wait_ready(self, deadline):
with LogTimer(self.log_prefix + "Got remote shell"):
@@ -403,10 +373,7 @@ class NodeUpdater(object):
logger.debug(self.log_prefix +
"Waiting for remote shell...")
# Setting redirect=None allows the user to see errors like
# unix_listener: path "/tmp/rkn_ray_ssh_sockets/..." too
# long for Unix domain socket.
self.cmd_runner.run("uptime", timeout=5, redirect=None)
self.cmd_runner.run("uptime", timeout=5)
logger.debug("Uptime succeeded.")
return True
@@ -455,15 +422,15 @@ class NodeUpdater(object):
for cmd in self.ray_start_commands:
self.cmd_runner.run(cmd)
def rsync_up(self, source, target, redirect=None):
def rsync_up(self, source, target):
logger.info(self.log_prefix +
"Syncing {} to {}...".format(source, target))
self.cmd_runner.run_rsync_up(source, target, redirect=None)
self.cmd_runner.run_rsync_up(source, target)
def rsync_down(self, source, target, redirect=None):
def rsync_down(self, source, target):
logger.info(self.log_prefix +
"Syncing {} from {}...".format(source, target))
self.cmd_runner.run_rsync_down(source, target, redirect=None)
self.cmd_runner.run_rsync_down(source, target)
class NodeUpdaterThread(NodeUpdater, Thread):