From a50128079d616cc25630471ad01d09537d38657d Mon Sep 17 00:00:00 2001 From: Keqiu Hu Date: Mon, 16 Nov 2020 01:11:39 -0800 Subject: [PATCH] [tune/placement group] dist. training placement group support (#11934) Co-authored-by: Richard Liaw --- python/ray/tune/examples/ddp_mnist_torch.py | 12 ++- .../examples/tf_distributed_keras_example.py | 40 ++++++--- python/ray/tune/integration/tensorflow.py | 68 ++++++++++----- python/ray/tune/integration/torch.py | 75 ++++++++++------- .../tune/tests/test_tensorflow_trainable.py | 82 ++++++++++++++++++- python/ray/tune/tests/test_torch_trainable.py | 74 +++++++++++++++++ python/ray/tune/utils/trainable.py | 51 ++++++++++++ python/ray/tune/utils/util.py | 2 +- 8 files changed, 337 insertions(+), 67 deletions(-) diff --git a/python/ray/tune/examples/ddp_mnist_torch.py b/python/ray/tune/examples/ddp_mnist_torch.py index 889e1299f..ff5a271cf 100644 --- a/python/ray/tune/examples/ddp_mnist_torch.py +++ b/python/ray/tune/examples/ddp_mnist_torch.py @@ -53,7 +53,7 @@ if __name__ == "__main__": default=2, help="Sets number of workers for training.") parser.add_argument( - "--use-gpu", + "--num-gpus-per-worker", action="store_true", default=False, help="enables CUDA training") @@ -62,6 +62,10 @@ if __name__ == "__main__": action="store_true", default=False, help="enables multi-node tuning") + parser.add_argument( + "--workers-per-node", + type=int, + help="Forces workers to be colocated on machines if set.") args = parser.parse_args() @@ -71,7 +75,11 @@ if __name__ == "__main__": options = dict(num_cpus=2) ray.init(**options) trainable_cls = DistributedTrainableCreator( - train_mnist, num_workers=args.num_workers, use_gpu=args.use_gpu) + train_mnist, + num_workers=args.num_workers, + num_gpus_per_worker=args.num_gpus_per_worker, + num_workers_per_host=args.workers_per_node) + analysis = tune.run( trainable_cls, num_samples=4, diff --git a/python/ray/tune/examples/tf_distributed_keras_example.py b/python/ray/tune/examples/tf_distributed_keras_example.py index 643bb52ff..99e0fe217 100644 --- a/python/ray/tune/examples/tf_distributed_keras_example.py +++ b/python/ray/tune/examples/tf_distributed_keras_example.py @@ -5,6 +5,7 @@ https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras import argparse import tensorflow as tf import numpy as np +import ray from ray import tune from ray.tune.schedulers import AsyncHyperBandScheduler from ray.tune.integration.keras import TuneReportCheckpointCallback @@ -49,16 +50,14 @@ def train_mnist(config, checkpoint_dir=None): multi_worker_dataset = mnist_dataset(global_batch_size) with strategy.scope(): multi_worker_model = build_and_compile_cnn_model(config) - multi_worker_model.fit( multi_worker_dataset, epochs=2, steps_per_epoch=70, callbacks=[ - TuneReportCheckpointCallback( - { - "mean_accuracy": "accuracy" - }, filename="checkpoint") + TuneReportCheckpointCallback({ + "mean_accuracy": "accuracy" + }) ]) @@ -71,20 +70,39 @@ if __name__ == "__main__": default=2, help="Sets number of workers for training.") parser.add_argument( - "--use-gpu", - action="store_true", - default=False, - help="enables CUDA training") + "--num-workers-per-host", + "-w", + type=int, + help="Sets number of workers for training.") + parser.add_argument( + "--num-cpus-per-worker", + "-c", + type=int, + default=2, + help="number of CPUs for this worker") + parser.add_argument( + "--num-gpus-per-worker", + "-g", + type=int, + default=0, + help="number of GPUs for this worker") parser.add_argument( "--cluster", action="store_true", default=False, help="enables multi-node tuning") args = parser.parse_args() + if args.cluster: + options = dict(address="auto") + else: + options = dict(num_cpus=4) + ray.init(**options) tf_trainable = DistributedTrainableCreator( train_mnist, - use_gpu=args.use_gpu, - num_workers=2, + num_workers=args.num_workers, + num_workers_per_host=args.num_workers_per_host, + num_cpus_per_worker=args.num_cpus_per_worker, + num_gpus_per_worker=args.num_gpus_per_worker, ) sched = AsyncHyperBandScheduler(max_t=400, grace_period=20) diff --git a/python/ray/tune/integration/tensorflow.py b/python/ray/tune/integration/tensorflow.py index 62f25ffda..2a386efc5 100644 --- a/python/ray/tune/integration/tensorflow.py +++ b/python/ray/tune/integration/tensorflow.py @@ -1,13 +1,19 @@ import json +import logging + import ray import os from ray import tune from ray.tune.result import RESULT_DUPLICATE from ray.tune.function_runner import wrap_function from ray.tune.resources import Resources -from ray.tune.utils.trainable import TrainableUtil from ray.util.sgd.utils import find_free_port -from typing import Callable, Dict, Type +from ray.util.placement_group import remove_placement_group +from ray.tune.utils.trainable import PlacementGroupUtil, TrainableUtil +from ray.tune.utils.util import detect_checkpoint_function +from typing import Callable, Dict, Type, Optional + +logger = logging.getLogger(__name__) def setup_process_group(worker_addresses, index): @@ -39,16 +45,17 @@ class _TensorFlowTrainable(tune.Trainable): """Base class for distributed training on Tune.""" _function = None _num_workers = None - _use_gpu = None _num_cpus_per_worker = None + _num_gpus_per_worker = None + _num_workers_per_host = None + _placement_group = None + _timeout_s = None __slots__ = ["workers", "_finished"] - @classmethod - def get_remote_worker_options(self) -> Dict[str, int]: - num_gpus = 1 if self._use_gpu else 0 - num_cpus = int(self._num_cpus_per_worker or 1) - return dict(num_cpus=num_cpus, num_gpus=num_gpus) + @property + def should_colocate(self) -> bool: + return self._num_workers_per_host is not None def setup(self, config: Dict): self._finished = False @@ -57,8 +64,13 @@ class _TensorFlowTrainable(tune.Trainable): func_trainable = wrap_function(self.__class__._function) remote_trainable = ray.remote(func_trainable) - remote_trainable = remote_trainable.options( - **self.get_remote_worker_options()) + remote_option, self._placement_group =\ + PlacementGroupUtil.get_remote_worker_options( + self._num_workers, self._num_cpus_per_worker, + self._num_gpus_per_worker, + self._num_workers_per_host, self._timeout_s) + remote_trainable = \ + remote_trainable.options(**remote_option) self.workers = [ remote_trainable.remote(config=config, ) for _ in range(num_workers) @@ -99,13 +111,17 @@ class _TensorFlowTrainable(tune.Trainable): def stop(self): ray.get([worker.stop.remote() for worker in self.workers]) + if self.should_colocate: + remove_placement_group(self._placement_group) def DistributedTrainableCreator( func: Callable, - use_gpu: bool = False, num_workers: int = 2, - num_cpus_per_worker: int = 1) -> Type[_TensorFlowTrainable]: + num_gpus_per_worker: int = 0, + num_cpus_per_worker: int = 1, + num_workers_per_host: Optional[int] = None, + timeout_s: int = 15 * 60) -> Type[_TensorFlowTrainable]: """Converts TensorFlow MultiWorkerMirror training to be executable by Tune. Requires TensorFlow > 2.0 to work, recommends TensorFlow > 2.2. @@ -120,11 +136,17 @@ def DistributedTrainableCreator( func (Callable[[dict], None]): A training function that takes in a config dict for hyperparameters and should initialize horovod via horovod.init. - use_gpu (bool); Whether to allocate a GPU per worker. + num_gpus_per_worker (int); Number of GPUs to request + from Ray per worker. num_cpus_per_worker (int): Number of CPUs to request from Ray per worker. num_workers (int): Number of hosts that each trial is expected to use. + num_workers_per_host (Optional[int]): Number of workers to + colocate per host. None if not specified. + timeout_s (float): Seconds before triggering placement timeouts + if forcing colocation. Default to 15 minutes. + Returns: Trainable class that can be passed into `tune.run`. @@ -138,29 +160,33 @@ def DistributedTrainableCreator( # Please refer to full example in tf_distributed_keras_example.py tf_trainable = DistributedTrainableCreator( train_mnist, - use_gpu=args.use_gpu, num_workers=2) tune.run(tf_trainable, num_samples=1) """ + detect_checkpoint_function(func, abort=True) + if num_workers_per_host: + if num_workers % num_workers_per_host: + raise ValueError("`num_workers` must be an integer multiple " + f"of num_workers_per_host. Got: " + f"num_workers: {num_workers}, " + f"num_workers_per_host: {num_workers_per_host}") class WrappedDistributedTensorFlowTrainable(_TensorFlowTrainable): _function = func _num_workers = num_workers _num_cpus_per_worker = num_cpus_per_worker - _use_gpu = use_gpu + _num_workers_per_host = num_workers_per_host + _num_gpus_per_worker = num_gpus_per_worker + _timeout_s = timeout_s @classmethod def default_resource_request(cls, config: Dict) -> Resources: - num_workers_ = int(config.get("num_workers", num_workers)) - num_worker_cpus = int( - config.get("num_cpus_per_worker", num_cpus_per_worker)) - use_gpu_ = config.get("use_gpu", use_gpu) return Resources( cpu=0, gpu=0, - extra_cpu=num_workers * num_worker_cpus, - extra_gpu=num_workers_ if use_gpu_ else 0) + extra_cpu=num_workers * num_cpus_per_worker, + extra_gpu=num_workers * num_gpus_per_worker) return WrappedDistributedTensorFlowTrainable diff --git a/python/ray/tune/integration/torch.py b/python/ray/tune/integration/torch.py index d71e90cb8..b70b0b561 100644 --- a/python/ray/tune/integration/torch.py +++ b/python/ray/tune/integration/torch.py @@ -16,9 +16,10 @@ from ray.tune.result import RESULT_DUPLICATE from ray.tune.logger import NoopLogger from ray.tune.function_runner import wrap_function from ray.tune.resources import Resources -from ray.tune.utils.trainable import TrainableUtil +from ray.tune.utils.trainable import PlacementGroupUtil, TrainableUtil from ray.tune.utils import detect_checkpoint_function from ray.util.sgd.torch.utils import setup_process_group, setup_address +from ray.util.placement_group import remove_placement_group from ray.util.sgd.torch.constants import NCCL_TIMEOUT_S logger = logging.getLogger(__name__) @@ -50,20 +51,21 @@ class _TorchTrainable(tune.Trainable): """ _function = None _num_workers = None - _use_gpu = None + _num_gpus_per_worker = None _num_cpus_per_worker = None + _num_workers_per_host = None + _placement_group = None + _timeout_s = None __slots__ = ["workers", "_finished"] - @classmethod - def default_process_group_parameters(self) -> Dict: - return dict(timeout=timedelta(NCCL_TIMEOUT_S), backend="gloo") + @property + def should_colocate(self) -> bool: + return self._num_workers_per_host is not None @classmethod - def get_remote_worker_options(self) -> Dict[str, int]: - num_gpus = 1 if self._use_gpu else 0 - num_cpus = int(self._num_cpus_per_worker or 1) - return dict(num_cpus=num_cpus, num_gpus=num_gpus) + def default_process_group_parameters(cls) -> Dict: + return dict(timeout=timedelta(NCCL_TIMEOUT_S), backend="gloo") def setup(self, config: Dict): self._finished = False @@ -74,8 +76,13 @@ class _TorchTrainable(tune.Trainable): func_trainable = wrap_function(self.__class__._function) remote_trainable = ray.remote(func_trainable) - remote_trainable = remote_trainable.options( - **self.get_remote_worker_options()) + remote_option, self._placement_group = \ + PlacementGroupUtil.get_remote_worker_options( + self._num_workers, self._num_cpus_per_worker, + self._num_gpus_per_worker, + self._num_workers_per_host, self._timeout_s) + remote_trainable = \ + remote_trainable.options(**remote_option) self.workers = [ remote_trainable.remote( @@ -127,15 +134,18 @@ class _TorchTrainable(tune.Trainable): def stop(self): ray.get([worker.stop.remote() for worker in self.workers]) + if self.should_colocate: + remove_placement_group(self._placement_group) -def DistributedTrainableCreator( - func: Callable, - use_gpu: bool = False, - num_workers: int = 1, - num_cpus_per_worker: int = 1, - backend: str = "gloo", - timeout_s: int = NCCL_TIMEOUT_S) -> Type[_TorchTrainable]: +def DistributedTrainableCreator(func: Callable, + num_workers: int = 1, + num_cpus_per_worker: int = 1, + num_gpus_per_worker: int = 0, + num_workers_per_host: Optional[int] = None, + backend: str = "gloo", + timeout_s: int = NCCL_TIMEOUT_S, + use_gpu=None) -> Type[_TorchTrainable]: """Creates a class that executes distributed training. Similar to running `torch.distributed.launch`. @@ -148,17 +158,19 @@ def DistributedTrainableCreator( This function must have 2 args in the signature, and the latter arg must contain `checkpoint_dir`. For example: `func(config, checkpoint_dir=None)`. - use_gpu (bool): Sets resource allocation for workers to 1 GPU - if true. Also automatically sets CUDA_VISIBLE_DEVICES - for each training worker. num_workers (int): Number of training workers to include in world. num_cpus_per_worker (int): Number of CPU resources to reserve per training worker. + num_gpus_per_worker (int): Number of GPU resources to reserve + per training worker. + num_workers_per_host: Optional[int]: Number of workers to + colocate per host. backend (str): One of "gloo", "nccl". timeout_s (float): Seconds before the torch process group times out. Useful when machines are unreliable. Defaults - to 60 seconds. + to 60 seconds. This value is also reused for triggering + placement timeouts if forcing colocation. Returns: type(Trainable): A trainable class object that can be passed @@ -173,13 +185,22 @@ def DistributedTrainableCreator( train_func, num_workers=2) analysis = tune.run(trainable_cls) """ + if use_gpu: + raise ValueError( + "use_gpu is deprecated. Use 'num_gpus_per_worker' instead.") detect_checkpoint_function(func, abort=True) + if num_workers_per_host: + if num_workers % num_workers_per_host: + raise ValueError("`num_workers` must be an integer multiple " + "of workers_per_node.") class WrappedDistributedTorchTrainable(_TorchTrainable): _function = func _num_workers = num_workers - _use_gpu = use_gpu _num_cpus_per_worker = num_cpus_per_worker + _num_gpus_per_worker = num_gpus_per_worker + _num_workers_per_host = num_workers_per_host + _timeout_s = timeout_s @classmethod def default_process_group_parameters(self) -> Dict: @@ -187,16 +208,12 @@ def DistributedTrainableCreator( @classmethod def default_resource_request(cls, config: Dict) -> Resources: - num_workers_ = int(config.get("num_workers", num_workers)) - num_cpus = int( - config.get("num_cpus_per_worker", num_cpus_per_worker)) - use_gpu_ = config.get("use_gpu", use_gpu) return Resources( cpu=0, gpu=0, - extra_cpu=num_cpus * num_workers_, - extra_gpu=num_workers_ if use_gpu_ else 0) + extra_cpu=num_cpus_per_worker * num_workers, + extra_gpu=num_gpus_per_worker * num_workers) return WrappedDistributedTorchTrainable diff --git a/python/ray/tune/tests/test_tensorflow_trainable.py b/python/ray/tune/tests/test_tensorflow_trainable.py index aacf35267..efa90e44a 100644 --- a/python/ray/tune/tests/test_tensorflow_trainable.py +++ b/python/ray/tune/tests/test_tensorflow_trainable.py @@ -1,5 +1,9 @@ +from typing import Dict, Optional + import pytest import ray +from ray import tune +from ray.cluster_utils import Cluster from ray.tune.integration.tensorflow import DistributedTrainableCreator from ray.tune.examples.tf_distributed_keras_example import train_mnist @@ -20,6 +24,34 @@ def ray_start_4_cpus(): ray.shutdown() +@pytest.fixture +def ray_4_node(): + cluster = Cluster() + for _ in range(4): + cluster.add_node(num_cpus=1) + + ray.init(address=cluster.address) + + yield + + ray.shutdown() + cluster.shutdown() + + +@pytest.fixture +def ray_4_node_gpu(): + cluster = Cluster() + for _ in range(4): + cluster.add_node(num_cpus=2, num_gpus=2) + + ray.init(address=cluster.address) + + yield + + ray.shutdown() + cluster.shutdown() + + @pytest.fixture def ray_connect_cluster(): try: @@ -31,8 +63,16 @@ def ray_connect_cluster(): ray.shutdown() +def _train_check_global(config: Dict, checkpoint_dir: Optional[str] = None): + """For testing only. Putting this here because Ray has problems + serializing within the test file.""" + import time + time.sleep(0.1) + tune.report(is_distributed=True) + + def test_single_step(ray_start_2_cpus): # noqa: F811 - trainable_cls = DistributedTrainableCreator(train_mnist) + trainable_cls = DistributedTrainableCreator(train_mnist, num_workers=2) trainer = trainable_cls() trainer.train() trainer.stop() @@ -50,9 +90,45 @@ def test_validation(ray_start_2_cpus): # noqa: F811 def bad_func(a, b, c): return 1 - t_cls = DistributedTrainableCreator(bad_func) with pytest.raises(ValueError): - t_cls() + DistributedTrainableCreator(bad_func) + + +def test_colocated(ray_4_node): # noqa: F811 + assert ray.available_resources()["CPU"] == 4 + trainable_cls = DistributedTrainableCreator( + _train_check_global, num_workers=4, num_workers_per_host=1) + trainable = trainable_cls() + assert ray.available_resources().get("CPU", 0) == 0 + trainable.train() + trainable.stop() + + +def test_colocated_gpu(ray_4_node_gpu): # noqa: F811 + assert ray.available_resources()["GPU"] == 8 + trainable_cls = DistributedTrainableCreator( + _train_check_global, + num_workers=4, + num_gpus_per_worker=2, + num_workers_per_host=1) + trainable = trainable_cls() + assert ray.available_resources().get("GPU", 0) == 0 + trainable.train() + trainable.stop() + + +def test_colocated_gpu_double(ray_4_node_gpu): # noqa: F811 + assert ray.available_resources()["GPU"] == 8 + trainable_cls = DistributedTrainableCreator( + _train_check_global, + num_workers=8, + num_gpus_per_worker=1, + num_cpus_per_worker=1, + num_workers_per_host=2) + trainable = trainable_cls() + assert ray.available_resources().get("GPU", 0) == 0 + trainable.train() + trainable.stop() if __name__ == "__main__": diff --git a/python/ray/tune/tests/test_torch_trainable.py b/python/ray/tune/tests/test_torch_trainable.py index 0ab43e54a..2e388fab9 100644 --- a/python/ray/tune/tests/test_torch_trainable.py +++ b/python/ray/tune/tests/test_torch_trainable.py @@ -6,6 +6,7 @@ import torch.distributed as dist import ray from ray import tune +from ray.cluster_utils import Cluster from ray.tune.integration.torch import (DistributedTrainableCreator, distributed_checkpoint_dir, _train_simple, _train_check_global) @@ -97,6 +98,79 @@ def test_checkpoint(ray_start_2_cpus, rank): # noqa: F811 assert not os.path.exists(path) +@pytest.fixture +def ray_4_node(): + cluster = Cluster() + for _ in range(4): + cluster.add_node(num_cpus=1) + + ray.init(address=cluster.address) + + yield + + ray.shutdown() + cluster.shutdown() + # Ensure that tests don't ALL fail + if dist.is_initialized(): + dist.destroy_process_group() + + +@pytest.fixture +def ray_4_node_gpu(): + cluster = Cluster() + for _ in range(4): + cluster.add_node(num_cpus=2, num_gpus=2) + + ray.init(address=cluster.address) + + yield + + ray.shutdown() + cluster.shutdown() + # Ensure that tests don't ALL fail + if dist.is_initialized(): + dist.destroy_process_group() + + +def test_colocated(ray_4_node): # noqa: F811 + assert ray.available_resources()["CPU"] == 4 + trainable_cls = DistributedTrainableCreator( + _train_check_global, num_workers=4, num_workers_per_host=1) + trainable = trainable_cls() + assert ray.available_resources().get("CPU", 0) == 0 + trainable.train() + trainable.stop() + + +def test_colocated_gpu(ray_4_node_gpu): # noqa: F811 + assert ray.available_resources()["GPU"] == 8 + trainable_cls = DistributedTrainableCreator( + _train_check_global, + num_workers=4, + num_gpus_per_worker=2, + num_workers_per_host=1) + trainable = trainable_cls() + assert ray.available_resources().get("GPU", 0) == 0 + trainable.train() + trainable.stop() + + +def test_colocated_gpu_double(ray_4_node_gpu): # noqa: F811 + assert ray.available_resources()["GPU"] == 8 + trainable_cls = DistributedTrainableCreator( + _train_check_global, + num_workers=8, + num_gpus_per_worker=1, + num_workers_per_host=2, + timeout_s=30) + trainable = trainable_cls() + print("?????") + print(ray.available_resources().get("GPU")) + assert ray.available_resources().get("GPU", 0) == 0 + trainable.train() + trainable.stop() + + if __name__ == "__main__": import pytest import sys diff --git a/python/ray/tune/utils/trainable.py b/python/ray/tune/utils/trainable.py index b7509f028..bb9299793 100644 --- a/python/ray/tune/utils/trainable.py +++ b/python/ray/tune/utils/trainable.py @@ -2,11 +2,14 @@ import glob import io import logging import shutil +from typing import Dict, Any import pandas as pd import pickle import os +import ray +from ray.util import placement_group from six import string_types logger = logging.getLogger(__name__) @@ -159,3 +162,51 @@ class TrainableUtil: chkpt_df = pd.DataFrame( iter_chkpt_pairs, columns=["training_iteration", "chkpt_path"]) return chkpt_df + + +class PlacementGroupUtil: + @staticmethod + def get_remote_worker_options( + num_workers, num_cpus_per_worker, num_gpus_per_worker, + num_workers_per_host, + timeout_s) -> (Dict[str, Any], placement_group): + """ Returns the option for remote workers. + + Args: + num_workers (int): Number of training workers to include in + world. + num_cpus_per_worker (int): Number of CPU resources to reserve + per training worker. + num_gpus_per_worker (int): Number of GPU resources to reserve + per training worker. + num_workers_per_host: Optional[int]: Number of workers to + colocate per host. + timeout_s (Optional[int]): Seconds before the torch process group + times out. Useful when machines are unreliable. Defaults + to 60 seconds. This value is also reused for triggering + placement timeouts if forcing colocation. + + + Returns: + type(Dict[Str, Any]): option that contains CPU/GPU count of + the remote worker and the placement group information. + pg(placement_group): return a reference to the placement group + """ + pg = None + options = dict( + num_cpus=num_cpus_per_worker, num_gpus=num_gpus_per_worker) + if num_workers_per_host: + num_hosts = int(num_workers / num_workers_per_host) + cpus_per_node = num_cpus_per_worker * num_workers_per_host + gpus_per_node = \ + num_gpus_per_worker * num_workers_per_host + bundle = {"CPU": cpus_per_node, "GPU": gpus_per_node} + + all_bundles = [bundle] * num_hosts + pg = placement_group(all_bundles, strategy="STRICT_SPREAD") + logger.debug("Waiting for placement_group to start.") + ray.get(pg.ready(), timeout=timeout_s) + logger.debug("Placement_group started.") + options["placement_group"] = pg + + return options, pg diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 1048f0e0e..d4bc669c5 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -223,7 +223,7 @@ def deep_update(original, if isinstance(original.get(k), dict) and isinstance(value, dict): # Check old type vs old one. If different, override entire value. if k in override_all_if_type_changes and \ - "type" in value and "type" in original[k] and \ + "type" in value and "type" in original[k] and \ value["type"] != original[k]["type"]: original[k] = value # Allowed key -> ok to add new subkeys.