[tune/placement group] dist. training placement group support (#11934)

Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
Keqiu Hu
2020-11-16 01:11:39 -08:00
committed by GitHub
parent 8fb926565c
commit a50128079d
8 changed files with 337 additions and 67 deletions
+10 -2
View File
@@ -53,7 +53,7 @@ if __name__ == "__main__":
default=2,
help="Sets number of workers for training.")
parser.add_argument(
"--use-gpu",
"--num-gpus-per-worker",
action="store_true",
default=False,
help="enables CUDA training")
@@ -62,6 +62,10 @@ if __name__ == "__main__":
action="store_true",
default=False,
help="enables multi-node tuning")
parser.add_argument(
"--workers-per-node",
type=int,
help="Forces workers to be colocated on machines if set.")
args = parser.parse_args()
@@ -71,7 +75,11 @@ if __name__ == "__main__":
options = dict(num_cpus=2)
ray.init(**options)
trainable_cls = DistributedTrainableCreator(
train_mnist, num_workers=args.num_workers, use_gpu=args.use_gpu)
train_mnist,
num_workers=args.num_workers,
num_gpus_per_worker=args.num_gpus_per_worker,
num_workers_per_host=args.workers_per_node)
analysis = tune.run(
trainable_cls,
num_samples=4,
@@ -5,6 +5,7 @@ https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
import argparse
import tensorflow as tf
import numpy as np
import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.integration.keras import TuneReportCheckpointCallback
@@ -49,16 +50,14 @@ def train_mnist(config, checkpoint_dir=None):
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = build_and_compile_cnn_model(config)
multi_worker_model.fit(
multi_worker_dataset,
epochs=2,
steps_per_epoch=70,
callbacks=[
TuneReportCheckpointCallback(
{
"mean_accuracy": "accuracy"
}, filename="checkpoint")
TuneReportCheckpointCallback({
"mean_accuracy": "accuracy"
})
])
@@ -71,20 +70,39 @@ if __name__ == "__main__":
default=2,
help="Sets number of workers for training.")
parser.add_argument(
"--use-gpu",
action="store_true",
default=False,
help="enables CUDA training")
"--num-workers-per-host",
"-w",
type=int,
help="Sets number of workers for training.")
parser.add_argument(
"--num-cpus-per-worker",
"-c",
type=int,
default=2,
help="number of CPUs for this worker")
parser.add_argument(
"--num-gpus-per-worker",
"-g",
type=int,
default=0,
help="number of GPUs for this worker")
parser.add_argument(
"--cluster",
action="store_true",
default=False,
help="enables multi-node tuning")
args = parser.parse_args()
if args.cluster:
options = dict(address="auto")
else:
options = dict(num_cpus=4)
ray.init(**options)
tf_trainable = DistributedTrainableCreator(
train_mnist,
use_gpu=args.use_gpu,
num_workers=2,
num_workers=args.num_workers,
num_workers_per_host=args.num_workers_per_host,
num_cpus_per_worker=args.num_cpus_per_worker,
num_gpus_per_worker=args.num_gpus_per_worker,
)
sched = AsyncHyperBandScheduler(max_t=400, grace_period=20)
+47 -21
View File
@@ -1,13 +1,19 @@
import json
import logging
import ray
import os
from ray import tune
from ray.tune.result import RESULT_DUPLICATE
from ray.tune.function_runner import wrap_function
from ray.tune.resources import Resources
from ray.tune.utils.trainable import TrainableUtil
from ray.util.sgd.utils import find_free_port
from typing import Callable, Dict, Type
from ray.util.placement_group import remove_placement_group
from ray.tune.utils.trainable import PlacementGroupUtil, TrainableUtil
from ray.tune.utils.util import detect_checkpoint_function
from typing import Callable, Dict, Type, Optional
logger = logging.getLogger(__name__)
def setup_process_group(worker_addresses, index):
@@ -39,16 +45,17 @@ class _TensorFlowTrainable(tune.Trainable):
"""Base class for distributed training on Tune."""
_function = None
_num_workers = None
_use_gpu = None
_num_cpus_per_worker = None
_num_gpus_per_worker = None
_num_workers_per_host = None
_placement_group = None
_timeout_s = None
__slots__ = ["workers", "_finished"]
@classmethod
def get_remote_worker_options(self) -> Dict[str, int]:
num_gpus = 1 if self._use_gpu else 0
num_cpus = int(self._num_cpus_per_worker or 1)
return dict(num_cpus=num_cpus, num_gpus=num_gpus)
@property
def should_colocate(self) -> bool:
return self._num_workers_per_host is not None
def setup(self, config: Dict):
self._finished = False
@@ -57,8 +64,13 @@ class _TensorFlowTrainable(tune.Trainable):
func_trainable = wrap_function(self.__class__._function)
remote_trainable = ray.remote(func_trainable)
remote_trainable = remote_trainable.options(
**self.get_remote_worker_options())
remote_option, self._placement_group =\
PlacementGroupUtil.get_remote_worker_options(
self._num_workers, self._num_cpus_per_worker,
self._num_gpus_per_worker,
self._num_workers_per_host, self._timeout_s)
remote_trainable = \
remote_trainable.options(**remote_option)
self.workers = [
remote_trainable.remote(config=config, )
for _ in range(num_workers)
@@ -99,13 +111,17 @@ class _TensorFlowTrainable(tune.Trainable):
def stop(self):
ray.get([worker.stop.remote() for worker in self.workers])
if self.should_colocate:
remove_placement_group(self._placement_group)
def DistributedTrainableCreator(
func: Callable,
use_gpu: bool = False,
num_workers: int = 2,
num_cpus_per_worker: int = 1) -> Type[_TensorFlowTrainable]:
num_gpus_per_worker: int = 0,
num_cpus_per_worker: int = 1,
num_workers_per_host: Optional[int] = None,
timeout_s: int = 15 * 60) -> Type[_TensorFlowTrainable]:
"""Converts TensorFlow MultiWorkerMirror training to be executable by Tune.
Requires TensorFlow > 2.0 to work, recommends TensorFlow > 2.2.
@@ -120,11 +136,17 @@ def DistributedTrainableCreator(
func (Callable[[dict], None]): A training function that takes in
a config dict for hyperparameters and should initialize
horovod via horovod.init.
use_gpu (bool); Whether to allocate a GPU per worker.
num_gpus_per_worker (int); Number of GPUs to request
from Ray per worker.
num_cpus_per_worker (int): Number of CPUs to request
from Ray per worker.
num_workers (int): Number of hosts that each trial is expected
to use.
num_workers_per_host (Optional[int]): Number of workers to
colocate per host. None if not specified.
timeout_s (float): Seconds before triggering placement timeouts
if forcing colocation. Default to 15 minutes.
Returns:
Trainable class that can be passed into `tune.run`.
@@ -138,29 +160,33 @@ def DistributedTrainableCreator(
# Please refer to full example in tf_distributed_keras_example.py
tf_trainable = DistributedTrainableCreator(
train_mnist,
use_gpu=args.use_gpu,
num_workers=2)
tune.run(tf_trainable,
num_samples=1)
"""
detect_checkpoint_function(func, abort=True)
if num_workers_per_host:
if num_workers % num_workers_per_host:
raise ValueError("`num_workers` must be an integer multiple "
f"of num_workers_per_host. Got: "
f"num_workers: {num_workers}, "
f"num_workers_per_host: {num_workers_per_host}")
class WrappedDistributedTensorFlowTrainable(_TensorFlowTrainable):
_function = func
_num_workers = num_workers
_num_cpus_per_worker = num_cpus_per_worker
_use_gpu = use_gpu
_num_workers_per_host = num_workers_per_host
_num_gpus_per_worker = num_gpus_per_worker
_timeout_s = timeout_s
@classmethod
def default_resource_request(cls, config: Dict) -> Resources:
num_workers_ = int(config.get("num_workers", num_workers))
num_worker_cpus = int(
config.get("num_cpus_per_worker", num_cpus_per_worker))
use_gpu_ = config.get("use_gpu", use_gpu)
return Resources(
cpu=0,
gpu=0,
extra_cpu=num_workers * num_worker_cpus,
extra_gpu=num_workers_ if use_gpu_ else 0)
extra_cpu=num_workers * num_cpus_per_worker,
extra_gpu=num_workers * num_gpus_per_worker)
return WrappedDistributedTensorFlowTrainable
+46 -29
View File
@@ -16,9 +16,10 @@ from ray.tune.result import RESULT_DUPLICATE
from ray.tune.logger import NoopLogger
from ray.tune.function_runner import wrap_function
from ray.tune.resources import Resources
from ray.tune.utils.trainable import TrainableUtil
from ray.tune.utils.trainable import PlacementGroupUtil, TrainableUtil
from ray.tune.utils import detect_checkpoint_function
from ray.util.sgd.torch.utils import setup_process_group, setup_address
from ray.util.placement_group import remove_placement_group
from ray.util.sgd.torch.constants import NCCL_TIMEOUT_S
logger = logging.getLogger(__name__)
@@ -50,20 +51,21 @@ class _TorchTrainable(tune.Trainable):
"""
_function = None
_num_workers = None
_use_gpu = None
_num_gpus_per_worker = None
_num_cpus_per_worker = None
_num_workers_per_host = None
_placement_group = None
_timeout_s = None
__slots__ = ["workers", "_finished"]
@classmethod
def default_process_group_parameters(self) -> Dict:
return dict(timeout=timedelta(NCCL_TIMEOUT_S), backend="gloo")
@property
def should_colocate(self) -> bool:
return self._num_workers_per_host is not None
@classmethod
def get_remote_worker_options(self) -> Dict[str, int]:
num_gpus = 1 if self._use_gpu else 0
num_cpus = int(self._num_cpus_per_worker or 1)
return dict(num_cpus=num_cpus, num_gpus=num_gpus)
def default_process_group_parameters(cls) -> Dict:
return dict(timeout=timedelta(NCCL_TIMEOUT_S), backend="gloo")
def setup(self, config: Dict):
self._finished = False
@@ -74,8 +76,13 @@ class _TorchTrainable(tune.Trainable):
func_trainable = wrap_function(self.__class__._function)
remote_trainable = ray.remote(func_trainable)
remote_trainable = remote_trainable.options(
**self.get_remote_worker_options())
remote_option, self._placement_group = \
PlacementGroupUtil.get_remote_worker_options(
self._num_workers, self._num_cpus_per_worker,
self._num_gpus_per_worker,
self._num_workers_per_host, self._timeout_s)
remote_trainable = \
remote_trainable.options(**remote_option)
self.workers = [
remote_trainable.remote(
@@ -127,15 +134,18 @@ class _TorchTrainable(tune.Trainable):
def stop(self):
ray.get([worker.stop.remote() for worker in self.workers])
if self.should_colocate:
remove_placement_group(self._placement_group)
def DistributedTrainableCreator(
func: Callable,
use_gpu: bool = False,
num_workers: int = 1,
num_cpus_per_worker: int = 1,
backend: str = "gloo",
timeout_s: int = NCCL_TIMEOUT_S) -> Type[_TorchTrainable]:
def DistributedTrainableCreator(func: Callable,
num_workers: int = 1,
num_cpus_per_worker: int = 1,
num_gpus_per_worker: int = 0,
num_workers_per_host: Optional[int] = None,
backend: str = "gloo",
timeout_s: int = NCCL_TIMEOUT_S,
use_gpu=None) -> Type[_TorchTrainable]:
"""Creates a class that executes distributed training.
Similar to running `torch.distributed.launch`.
@@ -148,17 +158,19 @@ def DistributedTrainableCreator(
This function must have 2 args in the signature, and the
latter arg must contain `checkpoint_dir`. For example:
`func(config, checkpoint_dir=None)`.
use_gpu (bool): Sets resource allocation for workers to 1 GPU
if true. Also automatically sets CUDA_VISIBLE_DEVICES
for each training worker.
num_workers (int): Number of training workers to include in
world.
num_cpus_per_worker (int): Number of CPU resources to reserve
per training worker.
num_gpus_per_worker (int): Number of GPU resources to reserve
per training worker.
num_workers_per_host: Optional[int]: Number of workers to
colocate per host.
backend (str): One of "gloo", "nccl".
timeout_s (float): Seconds before the torch process group
times out. Useful when machines are unreliable. Defaults
to 60 seconds.
to 60 seconds. This value is also reused for triggering
placement timeouts if forcing colocation.
Returns:
type(Trainable): A trainable class object that can be passed
@@ -173,13 +185,22 @@ def DistributedTrainableCreator(
train_func, num_workers=2)
analysis = tune.run(trainable_cls)
"""
if use_gpu:
raise ValueError(
"use_gpu is deprecated. Use 'num_gpus_per_worker' instead.")
detect_checkpoint_function(func, abort=True)
if num_workers_per_host:
if num_workers % num_workers_per_host:
raise ValueError("`num_workers` must be an integer multiple "
"of workers_per_node.")
class WrappedDistributedTorchTrainable(_TorchTrainable):
_function = func
_num_workers = num_workers
_use_gpu = use_gpu
_num_cpus_per_worker = num_cpus_per_worker
_num_gpus_per_worker = num_gpus_per_worker
_num_workers_per_host = num_workers_per_host
_timeout_s = timeout_s
@classmethod
def default_process_group_parameters(self) -> Dict:
@@ -187,16 +208,12 @@ def DistributedTrainableCreator(
@classmethod
def default_resource_request(cls, config: Dict) -> Resources:
num_workers_ = int(config.get("num_workers", num_workers))
num_cpus = int(
config.get("num_cpus_per_worker", num_cpus_per_worker))
use_gpu_ = config.get("use_gpu", use_gpu)
return Resources(
cpu=0,
gpu=0,
extra_cpu=num_cpus * num_workers_,
extra_gpu=num_workers_ if use_gpu_ else 0)
extra_cpu=num_cpus_per_worker * num_workers,
extra_gpu=num_gpus_per_worker * num_workers)
return WrappedDistributedTorchTrainable
@@ -1,5 +1,9 @@
from typing import Dict, Optional
import pytest
import ray
from ray import tune
from ray.cluster_utils import Cluster
from ray.tune.integration.tensorflow import DistributedTrainableCreator
from ray.tune.examples.tf_distributed_keras_example import train_mnist
@@ -20,6 +24,34 @@ def ray_start_4_cpus():
ray.shutdown()
@pytest.fixture
def ray_4_node():
cluster = Cluster()
for _ in range(4):
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
yield
ray.shutdown()
cluster.shutdown()
@pytest.fixture
def ray_4_node_gpu():
cluster = Cluster()
for _ in range(4):
cluster.add_node(num_cpus=2, num_gpus=2)
ray.init(address=cluster.address)
yield
ray.shutdown()
cluster.shutdown()
@pytest.fixture
def ray_connect_cluster():
try:
@@ -31,8 +63,16 @@ def ray_connect_cluster():
ray.shutdown()
def _train_check_global(config: Dict, checkpoint_dir: Optional[str] = None):
"""For testing only. Putting this here because Ray has problems
serializing within the test file."""
import time
time.sleep(0.1)
tune.report(is_distributed=True)
def test_single_step(ray_start_2_cpus): # noqa: F811
trainable_cls = DistributedTrainableCreator(train_mnist)
trainable_cls = DistributedTrainableCreator(train_mnist, num_workers=2)
trainer = trainable_cls()
trainer.train()
trainer.stop()
@@ -50,9 +90,45 @@ def test_validation(ray_start_2_cpus): # noqa: F811
def bad_func(a, b, c):
return 1
t_cls = DistributedTrainableCreator(bad_func)
with pytest.raises(ValueError):
t_cls()
DistributedTrainableCreator(bad_func)
def test_colocated(ray_4_node): # noqa: F811
assert ray.available_resources()["CPU"] == 4
trainable_cls = DistributedTrainableCreator(
_train_check_global, num_workers=4, num_workers_per_host=1)
trainable = trainable_cls()
assert ray.available_resources().get("CPU", 0) == 0
trainable.train()
trainable.stop()
def test_colocated_gpu(ray_4_node_gpu): # noqa: F811
assert ray.available_resources()["GPU"] == 8
trainable_cls = DistributedTrainableCreator(
_train_check_global,
num_workers=4,
num_gpus_per_worker=2,
num_workers_per_host=1)
trainable = trainable_cls()
assert ray.available_resources().get("GPU", 0) == 0
trainable.train()
trainable.stop()
def test_colocated_gpu_double(ray_4_node_gpu): # noqa: F811
assert ray.available_resources()["GPU"] == 8
trainable_cls = DistributedTrainableCreator(
_train_check_global,
num_workers=8,
num_gpus_per_worker=1,
num_cpus_per_worker=1,
num_workers_per_host=2)
trainable = trainable_cls()
assert ray.available_resources().get("GPU", 0) == 0
trainable.train()
trainable.stop()
if __name__ == "__main__":
@@ -6,6 +6,7 @@ import torch.distributed as dist
import ray
from ray import tune
from ray.cluster_utils import Cluster
from ray.tune.integration.torch import (DistributedTrainableCreator,
distributed_checkpoint_dir,
_train_simple, _train_check_global)
@@ -97,6 +98,79 @@ def test_checkpoint(ray_start_2_cpus, rank): # noqa: F811
assert not os.path.exists(path)
@pytest.fixture
def ray_4_node():
cluster = Cluster()
for _ in range(4):
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
yield
ray.shutdown()
cluster.shutdown()
# Ensure that tests don't ALL fail
if dist.is_initialized():
dist.destroy_process_group()
@pytest.fixture
def ray_4_node_gpu():
cluster = Cluster()
for _ in range(4):
cluster.add_node(num_cpus=2, num_gpus=2)
ray.init(address=cluster.address)
yield
ray.shutdown()
cluster.shutdown()
# Ensure that tests don't ALL fail
if dist.is_initialized():
dist.destroy_process_group()
def test_colocated(ray_4_node): # noqa: F811
assert ray.available_resources()["CPU"] == 4
trainable_cls = DistributedTrainableCreator(
_train_check_global, num_workers=4, num_workers_per_host=1)
trainable = trainable_cls()
assert ray.available_resources().get("CPU", 0) == 0
trainable.train()
trainable.stop()
def test_colocated_gpu(ray_4_node_gpu): # noqa: F811
assert ray.available_resources()["GPU"] == 8
trainable_cls = DistributedTrainableCreator(
_train_check_global,
num_workers=4,
num_gpus_per_worker=2,
num_workers_per_host=1)
trainable = trainable_cls()
assert ray.available_resources().get("GPU", 0) == 0
trainable.train()
trainable.stop()
def test_colocated_gpu_double(ray_4_node_gpu): # noqa: F811
assert ray.available_resources()["GPU"] == 8
trainable_cls = DistributedTrainableCreator(
_train_check_global,
num_workers=8,
num_gpus_per_worker=1,
num_workers_per_host=2,
timeout_s=30)
trainable = trainable_cls()
print("?????")
print(ray.available_resources().get("GPU"))
assert ray.available_resources().get("GPU", 0) == 0
trainable.train()
trainable.stop()
if __name__ == "__main__":
import pytest
import sys
+51
View File
@@ -2,11 +2,14 @@ import glob
import io
import logging
import shutil
from typing import Dict, Any
import pandas as pd
import pickle
import os
import ray
from ray.util import placement_group
from six import string_types
logger = logging.getLogger(__name__)
@@ -159,3 +162,51 @@ class TrainableUtil:
chkpt_df = pd.DataFrame(
iter_chkpt_pairs, columns=["training_iteration", "chkpt_path"])
return chkpt_df
class PlacementGroupUtil:
@staticmethod
def get_remote_worker_options(
num_workers, num_cpus_per_worker, num_gpus_per_worker,
num_workers_per_host,
timeout_s) -> (Dict[str, Any], placement_group):
""" Returns the option for remote workers.
Args:
num_workers (int): Number of training workers to include in
world.
num_cpus_per_worker (int): Number of CPU resources to reserve
per training worker.
num_gpus_per_worker (int): Number of GPU resources to reserve
per training worker.
num_workers_per_host: Optional[int]: Number of workers to
colocate per host.
timeout_s (Optional[int]): Seconds before the torch process group
times out. Useful when machines are unreliable. Defaults
to 60 seconds. This value is also reused for triggering
placement timeouts if forcing colocation.
Returns:
type(Dict[Str, Any]): option that contains CPU/GPU count of
the remote worker and the placement group information.
pg(placement_group): return a reference to the placement group
"""
pg = None
options = dict(
num_cpus=num_cpus_per_worker, num_gpus=num_gpus_per_worker)
if num_workers_per_host:
num_hosts = int(num_workers / num_workers_per_host)
cpus_per_node = num_cpus_per_worker * num_workers_per_host
gpus_per_node = \
num_gpus_per_worker * num_workers_per_host
bundle = {"CPU": cpus_per_node, "GPU": gpus_per_node}
all_bundles = [bundle] * num_hosts
pg = placement_group(all_bundles, strategy="STRICT_SPREAD")
logger.debug("Waiting for placement_group to start.")
ray.get(pg.ready(), timeout=timeout_s)
logger.debug("Placement_group started.")
options["placement_group"] = pg
return options, pg
+1 -1
View File
@@ -223,7 +223,7 @@ def deep_update(original,
if isinstance(original.get(k), dict) and isinstance(value, dict):
# Check old type vs old one. If different, override entire value.
if k in override_all_if_type_changes and \
"type" in value and "type" in original[k] and \
"type" in value and "type" in original[k] and \
value["type"] != original[k]["type"]:
original[k] = value
# Allowed key -> ok to add new subkeys.