[tune] docker syncer (#11035)

* Add DockerSyncer

* Add docs

* Update python/ray/tune/integration/docker.py

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>

* Updated docs

* fix dir

* Added docker integration test

* added docker integration test to bazel build

* Use sdk.rsync API

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Kai Fricke
2020-10-01 19:59:23 +01:00
committed by GitHub
parent 98ebf8e2d8
commit bdf647c4ec
6 changed files with 261 additions and 5 deletions
+8
View File
@@ -93,6 +93,14 @@ py_test(
tags = ["exclusive"],
)
py_test(
name = "test_integration_docker",
size = "small",
srcs = ["tests/test_integration_docker.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
)
py_test(
name = "test_integration_kubernetes",
size = "small",
+106
View File
@@ -0,0 +1,106 @@
import os
from typing import Optional, Tuple
from ray import services
from ray.autoscaler.sdk import rsync
from ray.tune.syncer import NodeSyncer
from ray.tune.sync_client import SyncClient
class DockerSyncer(NodeSyncer):
"""DockerSyncer used for synchronization between Docker containers.
This syncer extends the node syncer, but is usually instantiated
without a custom sync client. The sync client defaults to
``DockerSyncClient`` instead.
.. note::
This syncer only works with the Ray cluster launcher.
If you use your own Docker setup, make sure the nodes can connect
to each other via SSH, and try the regular SSH-based syncer instead.
Example:
.. code-block:: python
from ray.tune.integration.docker import DockerSyncer
tune.run(train,
sync_config=tune.SyncConfig(
sync_to_driver=DockerSyncer))
"""
_cluster_config_file = os.path.expanduser("~/ray_bootstrap_config.yaml")
def __init__(self,
local_dir: str,
remote_dir: str,
sync_client: Optional[SyncClient] = None):
self.local_ip = services.get_node_ip_address()
self.worker_ip = None
sync_client = sync_client or DockerSyncClient()
sync_client.configure(self._cluster_config_file)
super(NodeSyncer, self).__init__(local_dir, remote_dir, sync_client)
def set_worker_ip(self, worker_ip: str):
self.worker_ip = worker_ip
@property
def _remote_path(self) -> Tuple[str, str]:
return (self.worker_ip, self._remote_dir)
class DockerSyncClient(SyncClient):
"""DockerSyncClient to be used by DockerSyncer.
This client takes care of executing the synchronization
commands for Docker nodes. In its ``sync_down`` and
``sync_up`` commands, it expects tuples for the source
and target, respectively, for compatibility with docker.
"""
def __init__(self):
self._command_runners = {}
self._cluster_config = None
def configure(self, cluster_config_file: str):
self._cluster_config_file = cluster_config_file
def sync_up(self, source: str, target: Tuple[str, str]) -> bool:
"""Here target is a tuple (target_node, target_dir)"""
target_node, target_dir = target
# Add trailing slashes for rsync
source = os.path.join(source, "")
target_dir = os.path.join(target_dir, "")
rsync(
cluster_config=self._cluster_config_file,
source=source,
target=target_dir,
down=False,
ip_address=target_node,
use_internal_ip=True)
return True
def sync_down(self, source: Tuple[str, str], target: str) -> bool:
"""Here source is a tuple (source_node, source_dir)"""
source_node, source_dir = source
# Add trailing slashes for rsync
source_dir = os.path.join(source_dir, "")
target = os.path.join(target, "")
rsync(
cluster_config=self._cluster_config_file,
source=source_dir,
target=target,
down=True,
ip_address=source_node,
use_internal_ip=True)
return True
def delete(self, target: str) -> bool:
raise NotImplementedError
+7 -5
View File
@@ -1,3 +1,4 @@
import os
from typing import Any, Optional, Tuple
import kubernetes
@@ -23,7 +24,8 @@ def NamespacedKubernetesSyncer(namespace):
from ray.tune.integration.kubernetes import NamespacedKubernetesSyncer
tune.run(train,
sync_to_driver=NamespacedKubernetesSyncer("ray"))
sync_config=tune.SyncConfig(
sync_to_driver=NamespacedKubernetesSyncer("ray")))
"""
@@ -130,8 +132,8 @@ class KubernetesSyncClient(SyncClient):
target_node, target_dir = target
# Add trailing slashes for rsync
source += "/" if not source.endswith("/") else ""
target_dir += "/" if not target_dir.endswith("/") else ""
source = os.path.join(source, "")
target_dir = os.path.join(target_dir, "")
command_runner = self._get_command_runner(target_node)
command_runner.run_rsync_up(source, target_dir)
@@ -142,8 +144,8 @@ class KubernetesSyncClient(SyncClient):
source_node, source_dir = source
# Add trailing slashes for rsync
source_dir += "/" if not source_dir.endswith("/") else ""
target += "/" if not target.endswith("/") else ""
source_dir = os.path.join(source_dir, "")
target = os.path.join(target, "")
command_runner = self._get_command_runner(source_node)
command_runner.run_rsync_down(source_dir, target)
@@ -0,0 +1,110 @@
import unittest
from typing import Optional
from ray.tune.integration.docker import DockerSyncer, DockerSyncClient
from ray.tune.sync_client import SyncClient
from ray.tune.syncer import NodeSyncer
class _MockRsync:
def __init__(self):
self.history = []
def __call__(self, *args, **kwargs):
self.history.append(kwargs)
class _MockLookup:
def __init__(self, node_ips):
self.node_to_ip = {}
self.ip_to_node = {}
for node, ip in node_ips.items():
self.node_to_ip[node] = ip
self.ip_to_node[ip] = node
def get_ip(self, node):
return self.node_to_ip[node]
def get_node(self, ip):
return self.ip_to_node[ip]
def _create_mock_syncer(local_ip, local_dir, remote_dir):
class _MockSyncer(DockerSyncer):
def __init__(self,
local_dir: str,
remote_dir: str,
sync_client: Optional[SyncClient] = None):
self.local_ip = local_ip
self.worker_ip = None
sync_client = sync_client or DockerSyncClient()
sync_client.configure("__nofile__")
super(NodeSyncer, self).__init__(local_dir, remote_dir,
sync_client)
return _MockSyncer(local_dir, remote_dir)
class DockerIntegrationTest(unittest.TestCase):
def setUp(self):
self.lookup = _MockLookup({
"head": "1.0.0.0",
"w1": "1.0.0.1",
"w2": "1.0.0.2"
})
self.local_dir = "/tmp/local"
self.remote_dir = "/tmp/remote"
self.mock_command = _MockRsync()
from ray.tune.integration import docker
docker.rsync = self.mock_command
def tearDown(self):
pass
def testDockerRsyncUpDown(self):
syncer = _create_mock_syncer(
self.lookup.get_ip("head"), self.local_dir, self.remote_dir)
syncer.set_worker_ip(self.lookup.get_ip("w1"))
# Test sync up. Should add / to the dirs and call rsync
syncer.sync_up()
print(self.mock_command.history[-1])
self.assertEqual(self.mock_command.history[-1]["source"],
self.local_dir + "/")
self.assertEqual(self.mock_command.history[-1]["target"],
self.remote_dir + "/")
self.assertEqual(self.mock_command.history[-1]["down"], False)
self.assertEqual(self.mock_command.history[-1]["ip_address"],
self.lookup.get_ip("w1"))
# Test sync down.
syncer.sync_down()
print(self.mock_command.history[-1])
self.assertEqual(self.mock_command.history[-1]["target"],
self.local_dir + "/")
self.assertEqual(self.mock_command.history[-1]["source"],
self.remote_dir + "/")
self.assertEqual(self.mock_command.history[-1]["down"], True)
self.assertEqual(self.mock_command.history[-1]["ip_address"],
self.lookup.get_ip("w1"))
# Sync to same node should be ignored
prev = len(self.mock_command.history)
syncer.set_worker_ip(self.lookup.get_ip("head"))
syncer.sync_up()
self.assertEqual(len(self.mock_command.history), prev)
syncer.sync_down()
self.assertEqual(len(self.mock_command.history), prev)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))