[tune] Added Kubernetes syncer and sync client (#10097)

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Kai Fricke <kai@anyscale.com>
This commit is contained in:
krfricke
2020-08-16 23:09:28 +02:00
committed by GitHub
parent dc659ae89a
commit 8f0f7371a0
9 changed files with 376 additions and 9 deletions
+20 -8
View File
@@ -215,10 +215,16 @@ class KubernetesCommandRunner(CommandRunnerInterface):
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.process_runner.check_call(self.kubectl + [
"cp", source, "{}/{}:{}".format(self.namespace, self.node_id,
target)
])
self.run_cp_up(source, target)
def run_cp_up(self, source, target):
if target.startswith("~"):
target = "/root" + target[1:]
self.process_runner.check_call(self.kubectl + [
"cp", source, "{}/{}:{}".format(self.namespace, self.node_id,
target)
])
def run_rsync_down(self, source, target):
if target.startswith("~"):
@@ -235,10 +241,16 @@ class KubernetesCommandRunner(CommandRunnerInterface):
logger.warning(self.log_prefix +
"rsync failed: '{}'. Falling back to 'kubectl cp'"
.format(e))
self.process_runner.check_call(self.kubectl + [
"cp", "{}/{}:{}".format(self.namespace, self.node_id, source),
target
])
self.run_cp_down(source, target)
def run_cp_down(self, source, target):
if target.startswith("~"):
target = "/root" + target[1:]
self.process_runner.check_call(self.kubectl + [
"cp", "{}/{}:{}".format(self.namespace, self.node_id, source),
target
])
def remote_shell_command_str(self):
return "{} exec -it {} bash".format(" ".join(self.kubectl),
@@ -18,7 +18,8 @@ shift
if [ "X$pod" = "X-l" ]; then
pod=$1
shift
namespace="-n $1"
# Space before $1 leads to namespace errors
namespace="-n$1"
shift
fi
+8
View File
@@ -93,6 +93,14 @@ py_test(
tags = ["exclusive"],
)
py_test(
name = "test_integration_kubernetes",
size = "small",
srcs = ["tests/test_integration_kubernetes.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
)
py_test(
name = "test_integration_wandb",
size = "small",
+169
View File
@@ -0,0 +1,169 @@
import kubernetes
import subprocess
from ray import services, logger
from ray.autoscaler.command_runner import KubernetesCommandRunner
from ray.tune.syncer import NodeSyncer
from ray.tune.sync_client import SyncClient
def NamespacedKubernetesSyncer(namespace, use_rsync=False):
"""Wrapper to return a ``KubernetesSyncer`` for a Kubernetes namespace.
Args:
namespace (str): Kubernetes namespace.
use_rsync (bool): Use ``rsync`` if True or ``kubectl cp``
if False. If True, ``rsync`` will need to be
installed in the Kubernetes pods for this to work.
If False, ``tar`` will need to be installed instead.
Returns: A ``KubernetesSyncer`` class to be passed to
``tune.run(sync_to_driver)``.
Example:
.. code-block:: python
from ray.tune.integration.kubernetes import NamespacedKubernetesSyncer
tune.run(train,
sync_to_driver=NamespacedKubernetesSyncer("ray"))
"""
class _NamespacedKubernetesSyncer(KubernetesSyncer):
_namespace = namespace
_use_rsync = use_rsync
return _NamespacedKubernetesSyncer
class KubernetesSyncer(NodeSyncer):
"""KubernetesSyncer used for synchronization between Kubernetes pods.
This syncer extends the node syncer, but is usually instantiated
without a custom sync client. The sync client defaults to
``KubernetesSyncClient`` instead.
KubernetesSyncer uses the default namespace ``ray``. You should
probably use ``NamespacedKubernetesSyncer`` to return a class
with a custom namespace instead.
"""
_namespace = "ray"
_use_rsync = False
def __init__(self, local_dir, remote_dir, sync_client=None):
self.local_ip = services.get_node_ip_address()
self.local_node = self._get_kubernetes_node_by_ip(self.local_ip)
self.worker_ip = None
self.worker_node = None
sync_client = sync_client or KubernetesSyncClient(
namespace=self.__class__._namespace,
use_rsync=self.__class__._use_rsync)
super(NodeSyncer, self).__init__(local_dir, remote_dir, sync_client)
def set_worker_ip(self, worker_ip):
self.worker_ip = worker_ip
self.worker_node = self._get_kubernetes_node_by_ip(worker_ip)
def _get_kubernetes_node_by_ip(self, node_ip):
"""Return node name by internal or external IP"""
kubernetes.config.load_incluster_config()
api = kubernetes.client.CoreV1Api()
pods = api.list_namespaced_pod(self._namespace)
for pod in pods.items:
if pod.status.host_ip == node_ip or \
pod.status.pod_ip == node_ip:
return pod.metadata.name
logger.error(
"Could not find Kubernetes pod name for IP {}".format(node_ip))
return None
@property
def _remote_path(self):
return (self.worker_node, self._remote_dir)
class KubernetesSyncClient(SyncClient):
"""KubernetesSyncClient to be used by KubernetesSyncer.
This client takes care of executing the synchronization
commands for Kubernetes clients. In its ``sync_down`` and
``sync_up`` commands, it expects tuples for the source
and target, respectively, for compatibility with the
KubernetesCommandRunner.
Args:
namespace (str): Namespace in which the pods live.
use_rsync (bool): Use ``rsync`` if True or ``kubectl cp``
if False. If True, ``rsync`` will need to be
installed in the Kubernetes pods for this to work.
If False, ``tar`` will need to be installed instead.
process_runner: How commands should be called.
Defaults to ``subprocess``.
"""
def __init__(self, namespace, use_rsync=False, process_runner=subprocess):
self.namespace = namespace
self.use_rsync = use_rsync
self._process_runner = process_runner
self._command_runners = {}
def _create_command_runner(self, node_id):
"""Create a command runner for one Kubernetes node"""
return KubernetesCommandRunner(
log_prefix="KubernetesSyncClient: {}:".format(node_id),
namespace=self.namespace,
node_id=node_id,
auth_config=None,
process_runner=self._process_runner)
def _get_command_runner(self, node_id):
"""Create command runner if it doesn't exist"""
# Todo(krfricke): These cached runners are currently
# never cleaned up. They are cheap so this shouldn't
# cause much problems, but should be addressed if
# the SyncClient is used more extensively in the future.
if node_id not in self._command_runners:
command_runner = self._create_command_runner(node_id)
self._command_runners[node_id] = command_runner
return self._command_runners[node_id]
def sync_up(self, source, target):
"""Here target is a tuple (target_node, target_dir)"""
target_node, target_dir = target
# Add trailing slashes for rsync
source += "/" if not source.endswith("/") else ""
target_dir += "/" if not target_dir.endswith("/") else ""
command_runner = self._get_command_runner(target_node)
if self.use_rsync:
command_runner.run_rsync_up(source, target_dir)
else:
command_runner.run_cp_up(source, target_dir)
return True
def sync_down(self, source, target):
"""Here source is a tuple (source_node, source_dir)"""
source_node, source_dir = source
# Add trailing slashes for rsync
source_dir += "/" if not source_dir.endswith("/") else ""
target += "/" if not target.endswith("/") else ""
command_runner = self._get_command_runner(source_node)
if self.use_rsync:
command_runner.run_rsync_down(source_dir, target)
else:
command_runner.run_cp_down(source_dir, target)
return True
def delete(self, target):
"""No delete function because it is only used by
the KubernetesSyncer, which doesn't call delete."""
return True
+4
View File
@@ -3,6 +3,7 @@ import logging
import os
import time
from inspect import isclass
from shlex import quote
from ray import ray_constants
@@ -287,6 +288,9 @@ def get_node_syncer(local_dir, remote_dir=None, sync_function=None):
key = (local_dir, remote_dir)
if key in _syncers:
return _syncers[key]
elif isclass(sync_function) and issubclass(sync_function, Syncer):
_syncers[key] = sync_function(local_dir, remote_dir, None)
return _syncers[key]
elif not remote_dir or sync_function is False:
sync_client = NOOP
elif sync_function and sync_function is not True:
@@ -0,0 +1,147 @@
import unittest
from ray.autoscaler.command_runner import KUBECTL_RSYNC
from ray.tune.integration.kubernetes import KubernetesSyncer, \
KubernetesSyncClient
from ray.tune.syncer import NodeSyncer
class _MockProcessRunner:
def __init__(self):
self.history = []
def check_call(self, command):
self.history.append(command)
return True
class _MockLookup:
def __init__(self, node_ips):
self.node_to_ip = {}
self.ip_to_node = {}
for node, ip in node_ips.items():
self.node_to_ip[node] = ip
self.ip_to_node[ip] = node
def get_ip(self, node):
return self.node_to_ip[node]
def get_node(self, ip):
return self.ip_to_node[ip]
def __call__(self, ip):
return self.ip_to_node[ip]
def _create_mock_syncer(namespace, lookup, use_rsync, process_runner, local_ip,
local_dir, remote_dir):
class _MockSyncer(KubernetesSyncer):
_namespace = namespace
_use_rsync = use_rsync
_get_kubernetes_node_by_ip = lookup
def __init__(self, local_dir, remote_dir, sync_client):
self.local_ip = local_ip
self.local_node = self._get_kubernetes_node_by_ip(self.local_ip)
self.worker_ip = None
self.worker_node = None
sync_client = sync_client
super(NodeSyncer, self).__init__(local_dir, remote_dir,
sync_client)
return _MockSyncer(
local_dir,
remote_dir,
sync_client=KubernetesSyncClient(
namespace=namespace,
use_rsync=use_rsync,
process_runner=process_runner))
class KubernetesIntegrationTest(unittest.TestCase):
def setUp(self):
self.namespace = "test_ray"
self.lookup = _MockLookup({
"head": "1.0.0.0",
"w1": "1.0.0.1",
"w2": "1.0.0.2"
})
self.process_runner = _MockProcessRunner()
self.local_dir = "/tmp/local"
self.remote_dir = "/tmp/remote"
def tearDown(self):
pass
def testKubernetesCpUpDown(self):
syncer = _create_mock_syncer(
self.namespace, self.lookup, False, self.process_runner,
self.lookup.get_ip("head"), self.local_dir, self.remote_dir)
syncer.set_worker_ip(self.lookup.get_ip("w1"))
# Test sync up. Should add / to the dirs and call kubectl cp
syncer.sync_up()
self.assertEqual(self.process_runner.history[-1], [
"kubectl", "-n", self.namespace, "cp", self.local_dir + "/",
"{}/{}:{}".format(self.namespace, "w1", self.remote_dir + "/")
])
# Test sync down.
syncer.sync_down()
self.assertEqual(self.process_runner.history[-1], [
"kubectl", "-n", self.namespace, "cp", "{}/{}:{}".format(
self.namespace,
"w1",
self.remote_dir + "/",
), self.local_dir + "/"
])
# Sync to same node should be ignored
syncer.set_worker_ip(self.lookup.get_ip("head"))
syncer.sync_up()
self.assertTrue(len(self.process_runner.history) == 2)
syncer.sync_down()
self.assertTrue(len(self.process_runner.history) == 2)
def testKubernetesRsyncUpDown(self):
syncer = _create_mock_syncer(
self.namespace, self.lookup, True, self.process_runner,
self.lookup.get_ip("head"), self.local_dir, self.remote_dir)
syncer.set_worker_ip(self.lookup.get_ip("w1"))
# Test sync up. Should add / to the dirs and call rsync
syncer.sync_up()
self.assertEqual(self.process_runner.history[-1][0], KUBECTL_RSYNC)
self.assertEqual(self.process_runner.history[-1][-2],
self.local_dir + "/")
self.assertEqual(
self.process_runner.history[-1][-1], "{}@{}:{}".format(
"w1", self.namespace, self.remote_dir + "/"))
# Test sync down.
syncer.sync_down()
self.assertEqual(self.process_runner.history[-1][0], KUBECTL_RSYNC)
self.assertEqual(
self.process_runner.history[-1][-2], "{}@{}:{}".format(
"w1", self.namespace, self.remote_dir + "/"))
self.assertEqual(self.process_runner.history[-1][-1],
self.local_dir + "/")
# Sync to same node should be ignored
syncer.set_worker_ip(self.lookup.get_ip("head"))
syncer.sync_up()
self.assertTrue(len(self.process_runner.history) == 2)
syncer.sync_down()
self.assertTrue(len(self.process_runner.history) == 2)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
+1
View File
@@ -9,6 +9,7 @@ hpbandster
hyperopt==0.1.2
jupyter
keras
kubernetes
lightgbm
matplotlib
mlflow