From bdf647c4ec4cd50e686df9fc3c749373e9e30a45 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Thu, 1 Oct 2020 19:59:23 +0100 Subject: [PATCH] [tune] docker syncer (#11035) * Add DockerSyncer * Add docs * Update python/ray/tune/integration/docker.py Co-authored-by: Richard Liaw * Updated docs * fix dir * Added docker integration test * added docker integration test to bazel build * Use sdk.rsync API Co-authored-by: Richard Liaw --- doc/source/tune/api_docs/integration.rst | 9 ++ doc/source/tune/user-guide.rst | 21 ++++ python/ray/tune/BUILD | 8 ++ python/ray/tune/integration/docker.py | 106 +++++++++++++++++ python/ray/tune/integration/kubernetes.py | 12 +- .../ray/tune/tests/test_integration_docker.py | 110 ++++++++++++++++++ 6 files changed, 261 insertions(+), 5 deletions(-) create mode 100644 python/ray/tune/integration/docker.py create mode 100644 python/ray/tune/tests/test_integration_docker.py diff --git a/doc/source/tune/api_docs/integration.rst b/doc/source/tune/api_docs/integration.rst index 24debbf94..4731b8d51 100644 --- a/doc/source/tune/api_docs/integration.rst +++ b/doc/source/tune/api_docs/integration.rst @@ -7,6 +7,15 @@ External library integrations (tune.integration) :local: :depth: 1 + +.. _tune-integration-docker: + +Docker (tune.integration.docker) +-------------------------------- + +.. autofunction:: ray.tune.integration.docker.DockerSyncer + + .. _tune-integration-keras: Keras (tune.integration.keras) diff --git a/doc/source/tune/user-guide.rst b/doc/source/tune/user-guide.rst index 385d014e3..8458fcabc 100644 --- a/doc/source/tune/user-guide.rst +++ b/doc/source/tune/user-guide.rst @@ -448,6 +448,27 @@ By default, syncing occurs every 300 seconds. To change the frequency of syncing Note that uploading only happens when global experiment state is collected, and the frequency of this is determined by the ``TUNE_GLOBAL_CHECKPOINT_S`` environment variable. So the true upload period is given by ``max(TUNE_CLOUD_SYNC_S, TUNE_GLOBAL_CHECKPOINT_S)``. + +.. _tune-docker: + +Using Tune with Docker +---------------------- +Tune automatically syncs files and checkpoints between different remote +containers as needed. + +To make this work in your Docker cluster, e.g. when you are using the Ray autoscaler +with docker containers, you will need to pass a +``DockerSyncer`` to the ``sync_to_driver`` argument of ``tune.SyncConfig``. + +.. code-block:: python + + from ray.tune.integration.docker import DockerSyncer + sync_config = tune.SyncConfig( + sync_to_driver=DockerSyncer) + + tune.run(train, sync_config=sync_config) + + .. _tune-kubernetes: Using Tune with Kubernetes diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index 5b77bbfdf..8bad0a87f 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -93,6 +93,14 @@ py_test( tags = ["exclusive"], ) +py_test( + name = "test_integration_docker", + size = "small", + srcs = ["tests/test_integration_docker.py"], + deps = [":tune_lib"], + tags = ["exclusive"], +) + py_test( name = "test_integration_kubernetes", size = "small", diff --git a/python/ray/tune/integration/docker.py b/python/ray/tune/integration/docker.py new file mode 100644 index 000000000..3b1290de7 --- /dev/null +++ b/python/ray/tune/integration/docker.py @@ -0,0 +1,106 @@ +import os +from typing import Optional, Tuple + +from ray import services +from ray.autoscaler.sdk import rsync +from ray.tune.syncer import NodeSyncer +from ray.tune.sync_client import SyncClient + + +class DockerSyncer(NodeSyncer): + """DockerSyncer used for synchronization between Docker containers. + This syncer extends the node syncer, but is usually instantiated + without a custom sync client. The sync client defaults to + ``DockerSyncClient`` instead. + + .. note:: + This syncer only works with the Ray cluster launcher. + If you use your own Docker setup, make sure the nodes can connect + to each other via SSH, and try the regular SSH-based syncer instead. + + Example: + + .. code-block:: python + + from ray.tune.integration.docker import DockerSyncer + tune.run(train, + sync_config=tune.SyncConfig( + sync_to_driver=DockerSyncer)) + + """ + + _cluster_config_file = os.path.expanduser("~/ray_bootstrap_config.yaml") + + def __init__(self, + local_dir: str, + remote_dir: str, + sync_client: Optional[SyncClient] = None): + self.local_ip = services.get_node_ip_address() + self.worker_ip = None + + sync_client = sync_client or DockerSyncClient() + sync_client.configure(self._cluster_config_file) + + super(NodeSyncer, self).__init__(local_dir, remote_dir, sync_client) + + def set_worker_ip(self, worker_ip: str): + self.worker_ip = worker_ip + + @property + def _remote_path(self) -> Tuple[str, str]: + return (self.worker_ip, self._remote_dir) + + +class DockerSyncClient(SyncClient): + """DockerSyncClient to be used by DockerSyncer. + This client takes care of executing the synchronization + commands for Docker nodes. In its ``sync_down`` and + ``sync_up`` commands, it expects tuples for the source + and target, respectively, for compatibility with docker. + """ + + def __init__(self): + self._command_runners = {} + self._cluster_config = None + + def configure(self, cluster_config_file: str): + self._cluster_config_file = cluster_config_file + + def sync_up(self, source: str, target: Tuple[str, str]) -> bool: + """Here target is a tuple (target_node, target_dir)""" + target_node, target_dir = target + + # Add trailing slashes for rsync + source = os.path.join(source, "") + target_dir = os.path.join(target_dir, "") + + rsync( + cluster_config=self._cluster_config_file, + source=source, + target=target_dir, + down=False, + ip_address=target_node, + use_internal_ip=True) + + return True + + def sync_down(self, source: Tuple[str, str], target: str) -> bool: + """Here source is a tuple (source_node, source_dir)""" + source_node, source_dir = source + + # Add trailing slashes for rsync + source_dir = os.path.join(source_dir, "") + target = os.path.join(target, "") + + rsync( + cluster_config=self._cluster_config_file, + source=source_dir, + target=target, + down=True, + ip_address=source_node, + use_internal_ip=True) + + return True + + def delete(self, target: str) -> bool: + raise NotImplementedError diff --git a/python/ray/tune/integration/kubernetes.py b/python/ray/tune/integration/kubernetes.py index adb67a57f..3f776b168 100644 --- a/python/ray/tune/integration/kubernetes.py +++ b/python/ray/tune/integration/kubernetes.py @@ -1,3 +1,4 @@ +import os from typing import Any, Optional, Tuple import kubernetes @@ -23,7 +24,8 @@ def NamespacedKubernetesSyncer(namespace): from ray.tune.integration.kubernetes import NamespacedKubernetesSyncer tune.run(train, - sync_to_driver=NamespacedKubernetesSyncer("ray")) + sync_config=tune.SyncConfig( + sync_to_driver=NamespacedKubernetesSyncer("ray"))) """ @@ -130,8 +132,8 @@ class KubernetesSyncClient(SyncClient): target_node, target_dir = target # Add trailing slashes for rsync - source += "/" if not source.endswith("/") else "" - target_dir += "/" if not target_dir.endswith("/") else "" + source = os.path.join(source, "") + target_dir = os.path.join(target_dir, "") command_runner = self._get_command_runner(target_node) command_runner.run_rsync_up(source, target_dir) @@ -142,8 +144,8 @@ class KubernetesSyncClient(SyncClient): source_node, source_dir = source # Add trailing slashes for rsync - source_dir += "/" if not source_dir.endswith("/") else "" - target += "/" if not target.endswith("/") else "" + source_dir = os.path.join(source_dir, "") + target = os.path.join(target, "") command_runner = self._get_command_runner(source_node) command_runner.run_rsync_down(source_dir, target) diff --git a/python/ray/tune/tests/test_integration_docker.py b/python/ray/tune/tests/test_integration_docker.py new file mode 100644 index 000000000..6efc56ec1 --- /dev/null +++ b/python/ray/tune/tests/test_integration_docker.py @@ -0,0 +1,110 @@ +import unittest +from typing import Optional + +from ray.tune.integration.docker import DockerSyncer, DockerSyncClient +from ray.tune.sync_client import SyncClient +from ray.tune.syncer import NodeSyncer + + +class _MockRsync: + def __init__(self): + self.history = [] + + def __call__(self, *args, **kwargs): + self.history.append(kwargs) + + +class _MockLookup: + def __init__(self, node_ips): + self.node_to_ip = {} + self.ip_to_node = {} + for node, ip in node_ips.items(): + self.node_to_ip[node] = ip + self.ip_to_node[ip] = node + + def get_ip(self, node): + return self.node_to_ip[node] + + def get_node(self, ip): + return self.ip_to_node[ip] + + +def _create_mock_syncer(local_ip, local_dir, remote_dir): + class _MockSyncer(DockerSyncer): + def __init__(self, + local_dir: str, + remote_dir: str, + sync_client: Optional[SyncClient] = None): + self.local_ip = local_ip + self.worker_ip = None + + sync_client = sync_client or DockerSyncClient() + sync_client.configure("__nofile__") + + super(NodeSyncer, self).__init__(local_dir, remote_dir, + sync_client) + + return _MockSyncer(local_dir, remote_dir) + + +class DockerIntegrationTest(unittest.TestCase): + def setUp(self): + self.lookup = _MockLookup({ + "head": "1.0.0.0", + "w1": "1.0.0.1", + "w2": "1.0.0.2" + }) + self.local_dir = "/tmp/local" + self.remote_dir = "/tmp/remote" + + self.mock_command = _MockRsync() + + from ray.tune.integration import docker + docker.rsync = self.mock_command + + def tearDown(self): + pass + + def testDockerRsyncUpDown(self): + syncer = _create_mock_syncer( + self.lookup.get_ip("head"), self.local_dir, self.remote_dir) + + syncer.set_worker_ip(self.lookup.get_ip("w1")) + + # Test sync up. Should add / to the dirs and call rsync + syncer.sync_up() + print(self.mock_command.history[-1]) + self.assertEqual(self.mock_command.history[-1]["source"], + self.local_dir + "/") + self.assertEqual(self.mock_command.history[-1]["target"], + self.remote_dir + "/") + self.assertEqual(self.mock_command.history[-1]["down"], False) + self.assertEqual(self.mock_command.history[-1]["ip_address"], + self.lookup.get_ip("w1")) + + # Test sync down. + syncer.sync_down() + print(self.mock_command.history[-1]) + + self.assertEqual(self.mock_command.history[-1]["target"], + self.local_dir + "/") + self.assertEqual(self.mock_command.history[-1]["source"], + self.remote_dir + "/") + self.assertEqual(self.mock_command.history[-1]["down"], True) + self.assertEqual(self.mock_command.history[-1]["ip_address"], + self.lookup.get_ip("w1")) + + # Sync to same node should be ignored + prev = len(self.mock_command.history) + syncer.set_worker_ip(self.lookup.get_ip("head")) + syncer.sync_up() + self.assertEqual(len(self.mock_command.history), prev) + + syncer.sync_down() + self.assertEqual(len(self.mock_command.history), prev) + + +if __name__ == "__main__": + import pytest + import sys + sys.exit(pytest.main(["-v", __file__]))