Fix abstraction violations in command_runner interface (#10715)

* Fix abstraction violations in command_runner interface

* user guide

* lint

* breaking abstraction in commands

* extra initialization commands

* more cleanup

* small fixes

* fix test_integration_kubernetes.py

* lint

Co-authored-by: root <root@ip-172-31-28-155.us-west-2.compute.internal>
Co-authored-by: Ameer Haj Ali <ameerhajali@Ameers-MacBook-Pro.local>
This commit is contained in:
Ameer Haj Ali
2020-09-15 06:28:38 +03:00
committed by Barak Michener
parent f907c1a715
commit b9e907aa4d
6 changed files with 72 additions and 120 deletions
+1 -7
View File
@@ -461,19 +461,13 @@ You have to specify your Kubernetes namespace explicitly:
from ray.tune.integration.kubernetes import NamespacedKubernetesSyncer
sync_config = tune.SyncConfig(
sync_to_driver=NamespacedKubernetesSyncer("ray", use_rsync=True)
sync_to_driver=NamespacedKubernetesSyncer("ray")
)
tune.run(train, sync_config=sync_config)
The ``KubernetesSyncer`` supports two modes for file synchronisation. Per
default, files are synchronized with ``kubectl cp``, requiring the ``tar``
binary in your pods. If you would like to use ``rsync`` instead your pods
will have to have ``rsync`` installed. Use the ``use_rsync`` parameter to
decide between the two options.
.. _tune-log_to_file:
Redirecting stdout and stderr to files
+44 -29
View File
@@ -9,6 +9,7 @@ import os
import subprocess
import sys
import time
import warnings
from ray.autoscaler.docker import check_bind_mounts_cmd, \
check_docker_running_cmd, \
@@ -114,6 +115,7 @@ class CommandRunnerInterface:
environment_variables: Dict[str, object] = None,
run_env: str = "auto",
ssh_options_override_ssh_key: str = "",
shutdown_after_run: bool = False,
) -> str:
"""Run the given command on the cluster node and optionally get output.
@@ -133,6 +135,8 @@ class CommandRunnerInterface:
DockerCommandRunner to determine the run environment.
ssh_options_override_ssh_key (str): if provided, overwrites
SSHOptions class with SSHOptions(ssh_options_override_ssh_key).
shutdown_after_run (bool): if provided, shutdowns down the machine
after executing the command with `sudo shutdown -h now`.
"""
raise NotImplementedError
@@ -164,6 +168,15 @@ class CommandRunnerInterface:
"""Return the command the user can use to open a shell."""
raise NotImplementedError
def run_init(self, *, as_head: bool, file_mounts: Dict[str, str]) -> None:
"""Used to run extra initialization commands.
Args:
as_head (bool): Run as head image or worker.
file_mounts (dict): Files to copy to the head and worker nodes.
"""
pass
class KubernetesCommandRunner(CommandRunnerInterface):
def __init__(self, log_prefix, namespace, node_id, auth_config,
@@ -185,7 +198,10 @@ class KubernetesCommandRunner(CommandRunnerInterface):
environment_variables: Dict[str, object] = None,
run_env="auto", # Unused argument.
ssh_options_override_ssh_key="", # Unused argument.
shutdown_after_run=False,
):
if shutdown_after_run:
cmd += "; sudo shutdown -h now"
if cmd and port_forward:
raise Exception(
"exec with Kubernetes can't forward ports and execute"
@@ -250,19 +266,17 @@ class KubernetesCommandRunner(CommandRunnerInterface):
"{}@{}:{}".format(self.node_id, self.namespace, target),
])
except Exception as e:
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.run_cp_up(source, target)
warnings.warn(
self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'".format(e),
UserWarning)
if target.startswith("~"):
target = "/root" + target[1:]
def run_cp_up(self, source, target):
if target.startswith("~"):
target = "/root" + target[1:]
self.process_runner.check_call(self.kubectl + [
"cp", source, "{}/{}:{}".format(self.namespace, self.node_id,
target)
])
self.process_runner.check_call(self.kubectl + [
"cp", source, "{}/{}:{}".format(self.namespace, self.node_id,
target)
])
def run_rsync_down(self, source, target, options=None):
if target.startswith("~"):
@@ -276,19 +290,17 @@ class KubernetesCommandRunner(CommandRunnerInterface):
target,
])
except Exception as e:
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.run_cp_down(source, target)
warnings.warn(
self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'".format(e),
UserWarning)
if target.startswith("~"):
target = "/root" + target[1:]
def run_cp_down(self, source, target):
if target.startswith("~"):
target = "/root" + target[1:]
self.process_runner.check_call(self.kubectl + [
"cp", "{}/{}:{}".format(self.namespace, self.node_id, source),
target
])
self.process_runner.check_call(self.kubectl + [
"cp", "{}/{}:{}".format(self.namespace, self.node_id, source),
target
])
def remote_shell_command_str(self):
return "{} exec -it {} bash".format(" ".join(self.kubectl),
@@ -480,7 +492,10 @@ class SSHCommandRunner(CommandRunnerInterface):
environment_variables: Dict[str, object] = None,
run_env="auto", # Unused argument.
ssh_options_override_ssh_key="",
shutdown_after_run=False,
):
if shutdown_after_run:
cmd += "; sudo shutdown -h now"
if ssh_options_override_ssh_key:
ssh_options = SSHOptions(ssh_options_override_ssh_key)
else:
@@ -579,7 +594,6 @@ class DockerCommandRunner(CommandRunnerInterface):
self.container_name = docker_config["container_name"]
self.docker_config = docker_config
self.home_dir = None
self.shutdown = False
self.initialized = False
def run(
@@ -592,6 +606,7 @@ class DockerCommandRunner(CommandRunnerInterface):
environment_variables: Dict[str, object] = None,
run_env="auto",
ssh_options_override_ssh_key="",
shutdown_after_run=False,
):
if run_env == "auto":
run_env = "host" if cmd.find("docker") == 0 else "docker"
@@ -607,8 +622,11 @@ class DockerCommandRunner(CommandRunnerInterface):
container_name=self.container_name,
with_interactive=True)[0]
if self.shutdown:
if shutdown_after_run:
# sudo shutdown should run after `with_docker_exec` command above
cmd += "; sudo shutdown -h now"
# Do not pass shutdown_after_run argument to ssh_command_runner.run()
# since it is handled above.
return self.ssh_command_runner.run(
cmd,
timeout=timeout,
@@ -675,9 +693,6 @@ class DockerCommandRunner(CommandRunnerInterface):
"following commands to 'initialization_commands':\n" +
"\n".join(install_commands))
def _shutdown_after_next_cmd(self):
self.shutdown = True
def _check_container_status(self):
if self.initialized:
return True
+8 -11
View File
@@ -33,7 +33,6 @@ from ray.ray_constants import AUTOSCALER_RESOURCE_REQUEST_CHANNEL
from ray.autoscaler.updater import NodeUpdaterThread
from ray.autoscaler.command_runner import set_using_login_shells, \
set_rsync_silent
from ray.autoscaler.command_runner import DockerCommandRunner
from ray.autoscaler.log_timer import LogTimer
from ray.worker import global_worker
from ray.util.debug import log_once
@@ -857,18 +856,13 @@ def exec_cluster(config_file: str,
file_mounts_contents_hash="",
is_head_node=True,
docker_config=config.get("docker"))
is_docker = isinstance(updater.cmd_runner, DockerCommandRunner)
shutdown_after_run = False
if cmd and stop:
cmd += "; ".join([
"ray stop",
"ray teardown ~/ray_bootstrap_config.yaml --yes --workers-only"
])
if is_docker and run_env == "docker":
updater.cmd_runner.shutdown_after_next_cmd()
else:
cmd += "; sudo shutdown -h now"
shutdown_after_run = True
result = _exec(
updater,
@@ -877,7 +871,8 @@ def exec_cluster(config_file: str,
tmux,
port_forward=port_forward,
with_output=with_output,
run_env=run_env)
run_env=run_env,
shutdown_after_run=shutdown_after_run)
if tmux or screen:
attach_command_parts = ["ray attach", config_file]
if override_cluster_name is not None:
@@ -906,7 +901,8 @@ def _exec(updater,
tmux,
port_forward=None,
with_output=False,
run_env="auto"):
run_env="auto",
shutdown_after_run=False):
if cmd:
if screen:
cmd = [
@@ -926,7 +922,8 @@ def _exec(updater,
exit_on_fail=True,
port_forward=port_forward,
with_output=with_output,
run_env=run_env)
run_env=run_env,
shutdown_after_run=shutdown_after_run)
def rsync(config_file: str,
+10 -10
View File
@@ -11,7 +11,7 @@ from ray.autoscaler.tags import TAG_RAY_NODE_STATUS, TAG_RAY_RUNTIME_CONFIG, \
STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED, STATUS_WAITING_FOR_SSH, \
STATUS_SETTING_UP, STATUS_SYNCING_FILES
from ray.autoscaler.command_runner import NODE_START_WAIT_S, \
ProcessRunnerError, DockerCommandRunner
ProcessRunnerError
from ray.autoscaler.log_timer import LogTimer
import ray.autoscaler.subprocess_output_util as cmd_output_util
@@ -91,6 +91,7 @@ class NodeUpdater:
self.cluster_synced_files = cluster_synced_files
self.auth_config = auth_config
self.is_head_node = is_head_node
self.docker_config = docker_config
def run(self):
cli_logger.old_info(logger, "{}Updating to {}", self.log_prefix,
@@ -181,8 +182,10 @@ class NodeUpdater:
with LogTimer(self.log_prefix +
"Synced {} to {}".format(local_path, remote_path)):
if not isinstance(self.cmd_runner, DockerCommandRunner):
# The DockerCommandRunner handles this internally
is_docker = (self.docker_config
and self.docker_config["container_name"] != "")
if not is_docker:
# The DockerCommandRunner handles this internally.
self.cmd_runner.run(
"mkdir -p {}".format(os.path.dirname(remote_path)),
run_env="host")
@@ -292,9 +295,8 @@ class NodeUpdater:
# When resuming from a stopped instance the runtime_hash may be the
# same, but the container will not be started.
if isinstance(self.cmd_runner, DockerCommandRunner):
self.cmd_runner.run_init(
as_head=self.is_head_node, file_mounts=self.file_mounts)
self.cmd_runner.run_init(
as_head=self.is_head_node, file_mounts=self.file_mounts)
else:
cli_logger.print(
@@ -347,10 +349,8 @@ class NodeUpdater:
cli_logger.print(
"No initialization commands to run.",
_numbered=("[]", 3, 6))
if isinstance(self.cmd_runner, DockerCommandRunner):
self.cmd_runner.run_init(
as_head=self.is_head_node,
file_mounts=self.file_mounts)
self.cmd_runner.run_init(
as_head=self.is_head_node, file_mounts=self.file_mounts)
if self.setup_commands:
with cli_logger.group(
"Running setup commands",
+5 -23
View File
@@ -7,15 +7,11 @@ from ray.tune.syncer import NodeSyncer
from ray.tune.sync_client import SyncClient
def NamespacedKubernetesSyncer(namespace, use_rsync=False):
def NamespacedKubernetesSyncer(namespace):
"""Wrapper to return a ``KubernetesSyncer`` for a Kubernetes namespace.
Args:
namespace (str): Kubernetes namespace.
use_rsync (bool): Use ``rsync`` if True or ``kubectl cp``
if False. If True, ``rsync`` will need to be
installed in the Kubernetes pods for this to work.
If False, ``tar`` will need to be installed instead.
Returns: A ``KubernetesSyncer`` class to be passed to ``tune.run()``.
@@ -31,7 +27,6 @@ def NamespacedKubernetesSyncer(namespace, use_rsync=False):
class _NamespacedKubernetesSyncer(KubernetesSyncer):
_namespace = namespace
_use_rsync = use_rsync
return _NamespacedKubernetesSyncer
@@ -49,7 +44,6 @@ class KubernetesSyncer(NodeSyncer):
"""
_namespace = "ray"
_use_rsync = False
def __init__(self, local_dir, remote_dir, sync_client=None):
self.local_ip = services.get_node_ip_address()
@@ -58,8 +52,7 @@ class KubernetesSyncer(NodeSyncer):
self.worker_node = None
sync_client = sync_client or KubernetesSyncClient(
namespace=self.__class__._namespace,
use_rsync=self.__class__._use_rsync)
namespace=self.__class__._namespace)
super(NodeSyncer, self).__init__(local_dir, remote_dir, sync_client)
@@ -97,18 +90,13 @@ class KubernetesSyncClient(SyncClient):
Args:
namespace (str): Namespace in which the pods live.
use_rsync (bool): Use ``rsync`` if True or ``kubectl cp``
if False. If True, ``rsync`` will need to be
installed in the Kubernetes pods for this to work.
If False, ``tar`` will need to be installed instead.
process_runner: How commands should be called.
Defaults to ``subprocess``.
"""
def __init__(self, namespace, use_rsync=False, process_runner=subprocess):
def __init__(self, namespace, process_runner=subprocess):
self.namespace = namespace
self.use_rsync = use_rsync
self._process_runner = process_runner
self._command_runners = {}
@@ -141,10 +129,7 @@ class KubernetesSyncClient(SyncClient):
target_dir += "/" if not target_dir.endswith("/") else ""
command_runner = self._get_command_runner(target_node)
if self.use_rsync:
command_runner.run_rsync_up(source, target_dir)
else:
command_runner.run_cp_up(source, target_dir)
command_runner.run_rsync_up(source, target_dir)
return True
def sync_down(self, source, target):
@@ -156,10 +141,7 @@ class KubernetesSyncClient(SyncClient):
target += "/" if not target.endswith("/") else ""
command_runner = self._get_command_runner(source_node)
if self.use_rsync:
command_runner.run_rsync_down(source_dir, target)
else:
command_runner.run_cp_down(source_dir, target)
command_runner.run_rsync_down(source_dir, target)
return True
def delete(self, target):
@@ -33,11 +33,10 @@ class _MockLookup:
return self.ip_to_node[ip]
def _create_mock_syncer(namespace, lookup, use_rsync, process_runner, local_ip,
local_dir, remote_dir):
def _create_mock_syncer(namespace, lookup, process_runner, local_ip, local_dir,
remote_dir):
class _MockSyncer(KubernetesSyncer):
_namespace = namespace
_use_rsync = use_rsync
_get_kubernetes_node_by_ip = lookup
def __init__(self, local_dir, remote_dir, sync_client):
@@ -54,9 +53,7 @@ def _create_mock_syncer(namespace, lookup, use_rsync, process_runner, local_ip,
local_dir,
remote_dir,
sync_client=KubernetesSyncClient(
namespace=namespace,
use_rsync=use_rsync,
process_runner=process_runner))
namespace=namespace, process_runner=process_runner))
class KubernetesIntegrationTest(unittest.TestCase):
@@ -74,42 +71,9 @@ class KubernetesIntegrationTest(unittest.TestCase):
def tearDown(self):
pass
def testKubernetesCpUpDown(self):
syncer = _create_mock_syncer(
self.namespace, self.lookup, False, self.process_runner,
self.lookup.get_ip("head"), self.local_dir, self.remote_dir)
syncer.set_worker_ip(self.lookup.get_ip("w1"))
# Test sync up. Should add / to the dirs and call kubectl cp
syncer.sync_up()
self.assertEqual(self.process_runner.history[-1], [
"kubectl", "-n", self.namespace, "cp", self.local_dir + "/",
"{}/{}:{}".format(self.namespace, "w1", self.remote_dir + "/")
])
# Test sync down.
syncer.sync_down()
self.assertEqual(self.process_runner.history[-1], [
"kubectl", "-n", self.namespace, "cp", "{}/{}:{}".format(
self.namespace,
"w1",
self.remote_dir + "/",
), self.local_dir + "/"
])
# Sync to same node should be ignored
syncer.set_worker_ip(self.lookup.get_ip("head"))
syncer.sync_up()
self.assertTrue(len(self.process_runner.history) == 2)
syncer.sync_down()
self.assertTrue(len(self.process_runner.history) == 2)
def testKubernetesRsyncUpDown(self):
syncer = _create_mock_syncer(
self.namespace, self.lookup, True, self.process_runner,
self.namespace, self.lookup, self.process_runner,
self.lookup.get_ip("head"), self.local_dir, self.remote_dir)
syncer.set_worker_ip(self.lookup.get_ip("w1"))