From 8f0f7371a055257b4c704bad49edccbfbfc01831 Mon Sep 17 00:00:00 2001 From: krfricke Date: Sun, 16 Aug 2020 23:09:28 +0200 Subject: [PATCH] [tune] Added Kubernetes syncer and sync client (#10097) Co-authored-by: Edward Oakes Co-authored-by: Richard Liaw Co-authored-by: Kai Fricke --- doc/source/cluster/cloud.rst | 2 + doc/source/tune/user-guide.rst | 23 +++ python/ray/autoscaler/command_runner.py | 28 ++- .../autoscaler/kubernetes/kubectl-rsync.sh | 3 +- python/ray/tune/BUILD | 8 + python/ray/tune/integration/kubernetes.py | 169 ++++++++++++++++++ python/ray/tune/syncer.py | 4 + .../tune/tests/test_integration_kubernetes.py | 147 +++++++++++++++ python/requirements_tune.txt | 1 + 9 files changed, 376 insertions(+), 9 deletions(-) create mode 100644 python/ray/tune/integration/kubernetes.py create mode 100644 python/ray/tune/tests/test_integration_kubernetes.py diff --git a/doc/source/cluster/cloud.rst b/doc/source/cluster/cloud.rst index e945639a9..5ff96df54 100644 --- a/doc/source/cluster/cloud.rst +++ b/doc/source/cluster/cloud.rst @@ -178,6 +178,8 @@ Test that it works by running the following commands from your local machine: .. tip:: This section describes the easiest way to launch a Ray cluster on Kubernetes. See this :ref:`document for advanced usage ` of Kubernetes with Ray. +.. tip:: If you would like to use Ray Tune in your Kubernetes cluster, have a look at :ref:`this short guide to make it work `. + .. _cluster-private-setup: Local On Premise Cluster (List of nodes) diff --git a/doc/source/tune/user-guide.rst b/doc/source/tune/user-guide.rst index 3a350a3ef..8f99e24b1 100644 --- a/doc/source/tune/user-guide.rst +++ b/doc/source/tune/user-guide.rst @@ -431,6 +431,29 @@ If a string is provided, then it must include replacement fields ``{source}`` an By default, syncing occurs every 300 seconds. To change the frequency of syncing, set the ``TUNE_CLOUD_SYNC_S`` environment variable in the driver to the desired syncing period. Note that uploading only happens when global experiment state is collected, and the frequency of this is determined by the ``global_checkpoint_period`` argument. So the true upload period is given by ``max(TUNE_CLOUD_SYNC_S, global_checkpoint_period)``. +.. _tune-kubernetes: + +Using Tune with Kubernetes +-------------------------- +Tune automatically syncs files and checkpoints between different remote +nodes as needed. +To make this work in your Kubernetes cluster, you will need to pass a +``KubernetesSyncer`` to the ``sync_to_driver`` argument of ``tune.run()``. +You have to specify your Kubernetes namespace explicitly: + +.. code-block:: python + + from ray.tune.integration.kubernetes import NamespacedKubernetesSyncer + tune.run(train, + sync_to_driver=NamespacedKubernetesSyncer("ray", use_rsync=True)) + + +The ``KubernetesSyncer`` supports two modes for file synchronisation. Per +default, files are synchronized with ``kubectl cp``, requiring the ``tar`` +binary in your pods. If you would like to use ``rsync`` instead your pods +will have to have ``rsync`` installed. Use the ``use_rsync`` parameter to +decide between the two options. + .. _tune-log_to_file: Redirecting stdout and stderr to files diff --git a/python/ray/autoscaler/command_runner.py b/python/ray/autoscaler/command_runner.py index 6d7ef5e1c..ca0375bbf 100644 --- a/python/ray/autoscaler/command_runner.py +++ b/python/ray/autoscaler/command_runner.py @@ -215,10 +215,16 @@ class KubernetesCommandRunner(CommandRunnerInterface): logger.warning(self.log_prefix + "rsync failed: '{}'. Falling back to 'kubectl cp'" .format(e)) - self.process_runner.check_call(self.kubectl + [ - "cp", source, "{}/{}:{}".format(self.namespace, self.node_id, - target) - ]) + self.run_cp_up(source, target) + + def run_cp_up(self, source, target): + if target.startswith("~"): + target = "/root" + target[1:] + + self.process_runner.check_call(self.kubectl + [ + "cp", source, "{}/{}:{}".format(self.namespace, self.node_id, + target) + ]) def run_rsync_down(self, source, target): if target.startswith("~"): @@ -235,10 +241,16 @@ class KubernetesCommandRunner(CommandRunnerInterface): logger.warning(self.log_prefix + "rsync failed: '{}'. Falling back to 'kubectl cp'" .format(e)) - self.process_runner.check_call(self.kubectl + [ - "cp", "{}/{}:{}".format(self.namespace, self.node_id, source), - target - ]) + self.run_cp_down(source, target) + + def run_cp_down(self, source, target): + if target.startswith("~"): + target = "/root" + target[1:] + + self.process_runner.check_call(self.kubectl + [ + "cp", "{}/{}:{}".format(self.namespace, self.node_id, source), + target + ]) def remote_shell_command_str(self): return "{} exec -it {} bash".format(" ".join(self.kubectl), diff --git a/python/ray/autoscaler/kubernetes/kubectl-rsync.sh b/python/ray/autoscaler/kubernetes/kubectl-rsync.sh index 583da4ad1..3bda9ca9d 100755 --- a/python/ray/autoscaler/kubernetes/kubectl-rsync.sh +++ b/python/ray/autoscaler/kubernetes/kubectl-rsync.sh @@ -18,7 +18,8 @@ shift if [ "X$pod" = "X-l" ]; then pod=$1 shift - namespace="-n $1" + # Space before $1 leads to namespace errors + namespace="-n$1" shift fi diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index 99b10b280..049fa1d23 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -93,6 +93,14 @@ py_test( tags = ["exclusive"], ) +py_test( + name = "test_integration_kubernetes", + size = "small", + srcs = ["tests/test_integration_kubernetes.py"], + deps = [":tune_lib"], + tags = ["exclusive"], +) + py_test( name = "test_integration_wandb", size = "small", diff --git a/python/ray/tune/integration/kubernetes.py b/python/ray/tune/integration/kubernetes.py new file mode 100644 index 000000000..c83e20ac3 --- /dev/null +++ b/python/ray/tune/integration/kubernetes.py @@ -0,0 +1,169 @@ +import kubernetes +import subprocess + +from ray import services, logger +from ray.autoscaler.command_runner import KubernetesCommandRunner +from ray.tune.syncer import NodeSyncer +from ray.tune.sync_client import SyncClient + + +def NamespacedKubernetesSyncer(namespace, use_rsync=False): + """Wrapper to return a ``KubernetesSyncer`` for a Kubernetes namespace. + + Args: + namespace (str): Kubernetes namespace. + use_rsync (bool): Use ``rsync`` if True or ``kubectl cp`` + if False. If True, ``rsync`` will need to be + installed in the Kubernetes pods for this to work. + If False, ``tar`` will need to be installed instead. + + Returns: A ``KubernetesSyncer`` class to be passed to + ``tune.run(sync_to_driver)``. + + Example: + + .. code-block:: python + + from ray.tune.integration.kubernetes import NamespacedKubernetesSyncer + tune.run(train, + sync_to_driver=NamespacedKubernetesSyncer("ray")) + + """ + + class _NamespacedKubernetesSyncer(KubernetesSyncer): + _namespace = namespace + _use_rsync = use_rsync + + return _NamespacedKubernetesSyncer + + +class KubernetesSyncer(NodeSyncer): + """KubernetesSyncer used for synchronization between Kubernetes pods. + + This syncer extends the node syncer, but is usually instantiated + without a custom sync client. The sync client defaults to + ``KubernetesSyncClient`` instead. + + KubernetesSyncer uses the default namespace ``ray``. You should + probably use ``NamespacedKubernetesSyncer`` to return a class + with a custom namespace instead. + """ + + _namespace = "ray" + _use_rsync = False + + def __init__(self, local_dir, remote_dir, sync_client=None): + self.local_ip = services.get_node_ip_address() + self.local_node = self._get_kubernetes_node_by_ip(self.local_ip) + self.worker_ip = None + self.worker_node = None + + sync_client = sync_client or KubernetesSyncClient( + namespace=self.__class__._namespace, + use_rsync=self.__class__._use_rsync) + + super(NodeSyncer, self).__init__(local_dir, remote_dir, sync_client) + + def set_worker_ip(self, worker_ip): + self.worker_ip = worker_ip + self.worker_node = self._get_kubernetes_node_by_ip(worker_ip) + + def _get_kubernetes_node_by_ip(self, node_ip): + """Return node name by internal or external IP""" + kubernetes.config.load_incluster_config() + api = kubernetes.client.CoreV1Api() + pods = api.list_namespaced_pod(self._namespace) + for pod in pods.items: + if pod.status.host_ip == node_ip or \ + pod.status.pod_ip == node_ip: + return pod.metadata.name + + logger.error( + "Could not find Kubernetes pod name for IP {}".format(node_ip)) + return None + + @property + def _remote_path(self): + return (self.worker_node, self._remote_dir) + + +class KubernetesSyncClient(SyncClient): + """KubernetesSyncClient to be used by KubernetesSyncer. + + This client takes care of executing the synchronization + commands for Kubernetes clients. In its ``sync_down`` and + ``sync_up`` commands, it expects tuples for the source + and target, respectively, for compatibility with the + KubernetesCommandRunner. + + Args: + namespace (str): Namespace in which the pods live. + use_rsync (bool): Use ``rsync`` if True or ``kubectl cp`` + if False. If True, ``rsync`` will need to be + installed in the Kubernetes pods for this to work. + If False, ``tar`` will need to be installed instead. + process_runner: How commands should be called. + Defaults to ``subprocess``. + + """ + + def __init__(self, namespace, use_rsync=False, process_runner=subprocess): + self.namespace = namespace + self.use_rsync = use_rsync + self._process_runner = process_runner + self._command_runners = {} + + def _create_command_runner(self, node_id): + """Create a command runner for one Kubernetes node""" + return KubernetesCommandRunner( + log_prefix="KubernetesSyncClient: {}:".format(node_id), + namespace=self.namespace, + node_id=node_id, + auth_config=None, + process_runner=self._process_runner) + + def _get_command_runner(self, node_id): + """Create command runner if it doesn't exist""" + # Todo(krfricke): These cached runners are currently + # never cleaned up. They are cheap so this shouldn't + # cause much problems, but should be addressed if + # the SyncClient is used more extensively in the future. + if node_id not in self._command_runners: + command_runner = self._create_command_runner(node_id) + self._command_runners[node_id] = command_runner + return self._command_runners[node_id] + + def sync_up(self, source, target): + """Here target is a tuple (target_node, target_dir)""" + target_node, target_dir = target + + # Add trailing slashes for rsync + source += "/" if not source.endswith("/") else "" + target_dir += "/" if not target_dir.endswith("/") else "" + + command_runner = self._get_command_runner(target_node) + if self.use_rsync: + command_runner.run_rsync_up(source, target_dir) + else: + command_runner.run_cp_up(source, target_dir) + return True + + def sync_down(self, source, target): + """Here source is a tuple (source_node, source_dir)""" + source_node, source_dir = source + + # Add trailing slashes for rsync + source_dir += "/" if not source_dir.endswith("/") else "" + target += "/" if not target.endswith("/") else "" + + command_runner = self._get_command_runner(source_node) + if self.use_rsync: + command_runner.run_rsync_down(source_dir, target) + else: + command_runner.run_cp_down(source_dir, target) + return True + + def delete(self, target): + """No delete function because it is only used by + the KubernetesSyncer, which doesn't call delete.""" + return True diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index 1ceaec242..0e5ef79f6 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -3,6 +3,7 @@ import logging import os import time +from inspect import isclass from shlex import quote from ray import ray_constants @@ -287,6 +288,9 @@ def get_node_syncer(local_dir, remote_dir=None, sync_function=None): key = (local_dir, remote_dir) if key in _syncers: return _syncers[key] + elif isclass(sync_function) and issubclass(sync_function, Syncer): + _syncers[key] = sync_function(local_dir, remote_dir, None) + return _syncers[key] elif not remote_dir or sync_function is False: sync_client = NOOP elif sync_function and sync_function is not True: diff --git a/python/ray/tune/tests/test_integration_kubernetes.py b/python/ray/tune/tests/test_integration_kubernetes.py new file mode 100644 index 000000000..588179a32 --- /dev/null +++ b/python/ray/tune/tests/test_integration_kubernetes.py @@ -0,0 +1,147 @@ +import unittest + +from ray.autoscaler.command_runner import KUBECTL_RSYNC +from ray.tune.integration.kubernetes import KubernetesSyncer, \ + KubernetesSyncClient +from ray.tune.syncer import NodeSyncer + + +class _MockProcessRunner: + def __init__(self): + self.history = [] + + def check_call(self, command): + self.history.append(command) + return True + + +class _MockLookup: + def __init__(self, node_ips): + self.node_to_ip = {} + self.ip_to_node = {} + for node, ip in node_ips.items(): + self.node_to_ip[node] = ip + self.ip_to_node[ip] = node + + def get_ip(self, node): + return self.node_to_ip[node] + + def get_node(self, ip): + return self.ip_to_node[ip] + + def __call__(self, ip): + return self.ip_to_node[ip] + + +def _create_mock_syncer(namespace, lookup, use_rsync, process_runner, local_ip, + local_dir, remote_dir): + class _MockSyncer(KubernetesSyncer): + _namespace = namespace + _use_rsync = use_rsync + _get_kubernetes_node_by_ip = lookup + + def __init__(self, local_dir, remote_dir, sync_client): + self.local_ip = local_ip + self.local_node = self._get_kubernetes_node_by_ip(self.local_ip) + self.worker_ip = None + self.worker_node = None + + sync_client = sync_client + super(NodeSyncer, self).__init__(local_dir, remote_dir, + sync_client) + + return _MockSyncer( + local_dir, + remote_dir, + sync_client=KubernetesSyncClient( + namespace=namespace, + use_rsync=use_rsync, + process_runner=process_runner)) + + +class KubernetesIntegrationTest(unittest.TestCase): + def setUp(self): + self.namespace = "test_ray" + self.lookup = _MockLookup({ + "head": "1.0.0.0", + "w1": "1.0.0.1", + "w2": "1.0.0.2" + }) + self.process_runner = _MockProcessRunner() + self.local_dir = "/tmp/local" + self.remote_dir = "/tmp/remote" + + def tearDown(self): + pass + + def testKubernetesCpUpDown(self): + syncer = _create_mock_syncer( + self.namespace, self.lookup, False, self.process_runner, + self.lookup.get_ip("head"), self.local_dir, self.remote_dir) + + syncer.set_worker_ip(self.lookup.get_ip("w1")) + + # Test sync up. Should add / to the dirs and call kubectl cp + syncer.sync_up() + + self.assertEqual(self.process_runner.history[-1], [ + "kubectl", "-n", self.namespace, "cp", self.local_dir + "/", + "{}/{}:{}".format(self.namespace, "w1", self.remote_dir + "/") + ]) + + # Test sync down. + syncer.sync_down() + self.assertEqual(self.process_runner.history[-1], [ + "kubectl", "-n", self.namespace, "cp", "{}/{}:{}".format( + self.namespace, + "w1", + self.remote_dir + "/", + ), self.local_dir + "/" + ]) + + # Sync to same node should be ignored + syncer.set_worker_ip(self.lookup.get_ip("head")) + syncer.sync_up() + self.assertTrue(len(self.process_runner.history) == 2) + + syncer.sync_down() + self.assertTrue(len(self.process_runner.history) == 2) + + def testKubernetesRsyncUpDown(self): + syncer = _create_mock_syncer( + self.namespace, self.lookup, True, self.process_runner, + self.lookup.get_ip("head"), self.local_dir, self.remote_dir) + + syncer.set_worker_ip(self.lookup.get_ip("w1")) + + # Test sync up. Should add / to the dirs and call rsync + syncer.sync_up() + self.assertEqual(self.process_runner.history[-1][0], KUBECTL_RSYNC) + self.assertEqual(self.process_runner.history[-1][-2], + self.local_dir + "/") + self.assertEqual( + self.process_runner.history[-1][-1], "{}@{}:{}".format( + "w1", self.namespace, self.remote_dir + "/")) + + # Test sync down. + syncer.sync_down() + self.assertEqual(self.process_runner.history[-1][0], KUBECTL_RSYNC) + self.assertEqual( + self.process_runner.history[-1][-2], "{}@{}:{}".format( + "w1", self.namespace, self.remote_dir + "/")) + self.assertEqual(self.process_runner.history[-1][-1], + self.local_dir + "/") + + # Sync to same node should be ignored + syncer.set_worker_ip(self.lookup.get_ip("head")) + syncer.sync_up() + self.assertTrue(len(self.process_runner.history) == 2) + + syncer.sync_down() + self.assertTrue(len(self.process_runner.history) == 2) + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/requirements_tune.txt b/python/requirements_tune.txt index 976149566..1123fc7ed 100644 --- a/python/requirements_tune.txt +++ b/python/requirements_tune.txt @@ -9,6 +9,7 @@ hpbandster hyperopt==0.1.2 jupyter keras +kubernetes lightgbm matplotlib mlflow