From 01d74af89d0c6b20277393e60e68044ead1e1615 Mon Sep 17 00:00:00 2001 From: Amog Kamsetty Date: Fri, 22 Jan 2021 16:30:10 -0800 Subject: [PATCH] [horovod] Horovod+Ray Pytorch Lightning Accelerator (#13458) --- .travis.yml | 1 + python/ray/tune/examples/mnist_ptl_mini.py | 3 +- python/ray/util/lightning_accelerators/BUILD | 33 +++ .../util/lightning_accelerators/__init__.py | 4 + .../examples/ptl_horovod_ray_example.py | 195 ++++++++++++++++++ .../horovod_ray_accelerator.py | 121 +++++++++++ .../tests/test_horovod_ray_accelerator.py | 191 +++++++++++++++++ 7 files changed, 547 insertions(+), 1 deletion(-) create mode 100644 python/ray/util/lightning_accelerators/BUILD create mode 100644 python/ray/util/lightning_accelerators/__init__.py create mode 100644 python/ray/util/lightning_accelerators/examples/ptl_horovod_ray_example.py create mode 100644 python/ray/util/lightning_accelerators/horovod_ray_accelerator.py create mode 100644 python/ray/util/lightning_accelerators/tests/test_horovod_ray_accelerator.py diff --git a/.travis.yml b/.travis.yml index 5170ed086..4d8f8ddd1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -420,6 +420,7 @@ matrix: script: - ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only --test_tag_filters=py37 python/ray/tune/... - ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/xgboost/... + - ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/lightning_accelerators/... # There are no python 3.7 tests for RaySGD at the moment # - ./ci/keep_alive bazel test --config=ci --build_tests_only --test_tag_filters=py37 python/ray/util/sgd/... # - ./ci/keep_alive bazel test --config=ci --build_tests_only --test_tag_filters=py37 doc/... diff --git a/python/ray/tune/examples/mnist_ptl_mini.py b/python/ray/tune/examples/mnist_ptl_mini.py index b1c2e2aa9..e3b226d44 100644 --- a/python/ray/tune/examples/mnist_ptl_mini.py +++ b/python/ray/tune/examples/mnist_ptl_mini.py @@ -1,7 +1,7 @@ import torch from torch.nn import functional as F import pytorch_lightning as pl -from pl_bolts.datamodules import MNISTDataModule +from pl_bolts.datamodules.mnist_datamodule import MNISTDataModule import os from ray.tune.integration.pytorch_lightning import TuneReportCallback @@ -16,6 +16,7 @@ class LightningMNISTClassifier(pl.LightningModule): self.data_dir = data_dir or os.getcwd() self.lr = config["lr"] layer_1, layer_2 = config["layer_1"], config["layer_2"] + self.batch_size = config["batch_size"] # mnist images are (1, 28, 28) (channels, width, height) self.layer_1 = torch.nn.Linear(28 * 28, layer_1) diff --git a/python/ray/util/lightning_accelerators/BUILD b/python/ray/util/lightning_accelerators/BUILD new file mode 100644 index 000000000..4355c6d33 --- /dev/null +++ b/python/ray/util/lightning_accelerators/BUILD @@ -0,0 +1,33 @@ +# -------------------------------------------------------------------- +# Tests from the python/ray/util/lightning_accelerators/tests directory. +# Please keep these sorted alphabetically. +# -------------------------------------------------------------------- + +py_test( + name = "test_horovod_ray_accelerator", + size = "medium", + srcs = ["tests/test_horovod_ray_accelerator.py"], + tags = ["exclusive", "pytorch-lightning", "pytorch", "horovod"], + deps = [":accelerator_lib"], +) + +# -------------------------------------------------------------------- +# Tests from the python/ray/util/lightning_accelerators/examples directory. +# Please keep these sorted alphabetically. +# -------------------------------------------------------------------- + +py_test( + name = "ptl_horovod_ray_example", + size = "medium", + srcs = ["examples/ptl_horovod_ray_example.py"], + tags = ["exclusive", "example", "pytorch-lightning", "pytorch", "horovod"], + deps = [":accelerator_lib"], + args = ["--smoke-test"] +) + +# # This is a dummy test dependency that causes the above tests to be +# # re-run if any of these files changes. +py_library( + name = "accelerator_lib", + srcs = glob(["**/*.py"], exclude=["tests/*.py"]), +) diff --git a/python/ray/util/lightning_accelerators/__init__.py b/python/ray/util/lightning_accelerators/__init__.py new file mode 100644 index 000000000..038180e01 --- /dev/null +++ b/python/ray/util/lightning_accelerators/__init__.py @@ -0,0 +1,4 @@ +from ray.util.lightning_accelerators.horovod_ray_accelerator import \ + HorovodRayAccelerator + +__all__ = ["HorovodRayAccelerator"] diff --git a/python/ray/util/lightning_accelerators/examples/ptl_horovod_ray_example.py b/python/ray/util/lightning_accelerators/examples/ptl_horovod_ray_example.py new file mode 100644 index 000000000..fffcfb01f --- /dev/null +++ b/python/ray/util/lightning_accelerators/examples/ptl_horovod_ray_example.py @@ -0,0 +1,195 @@ +"""Example using Pytorch Lightning with a Horovod on Ray Accelerator.""" +import os +import tempfile + +import pytorch_lightning as pl +import torch +from torch.utils.data import random_split, DataLoader +from torchvision.datasets import MNIST +from torchvision import transforms + +import ray +from ray import tune +from ray.tune.examples.mnist_ptl_mini import LightningMNISTClassifier +from ray.tune.integration.pytorch_lightning import TuneReportCallback +from ray.util.lightning_accelerators import HorovodRayAccelerator + + +class MNISTClassifier(LightningMNISTClassifier): + def prepare_data(self): + self.dataset = MNIST( + self.data_dir, + train=True, + download=True, + transform=transforms.ToTensor()) + + def train_dataloader(self): + dataset = self.dataset + train_length = len(dataset) + dataset_train, _ = random_split( + dataset, [train_length - 5000, 5000], + generator=torch.Generator().manual_seed(0)) + loader = DataLoader( + dataset_train, + batch_size=self.batch_size, + shuffle=True, + num_workers=1, + drop_last=True, + pin_memory=True, + ) + return loader + + def val_dataloader(self): + dataset = self.dataset + train_length = len(dataset) + _, dataset_val = random_split( + dataset, [train_length - 5000, 5000], + generator=torch.Generator().manual_seed(0)) + loader = DataLoader( + dataset_val, + batch_size=self.batch_size, + shuffle=False, + num_workers=1, + drop_last=True, + pin_memory=True, + ) + return loader + + +def train_mnist(config, + data_dir=None, + num_epochs=10, + num_hosts=1, + num_slots=4, + use_gpu=False, + callbacks=None): + model = MNISTClassifier(config, data_dir) + + callbacks = callbacks or [] + + trainer = pl.Trainer( + max_epochs=num_epochs, + gpus=int(use_gpu), + callbacks=callbacks, + accelerator=HorovodRayAccelerator( + num_hosts=num_hosts, num_slots=num_slots, use_gpu=use_gpu)) + trainer.fit(model) + + +def tune_mnist(data_dir, + num_samples=10, + num_epochs=10, + num_hosts=1, + num_slots=4, + use_gpu=False): + config = { + "layer_1": tune.choice([32, 64, 128]), + "layer_2": tune.choice([64, 128, 256]), + "lr": tune.loguniform(1e-4, 1e-1), + "batch_size": tune.choice([32, 64, 128]), + } + + # Add Tune callback. + metrics = {"loss": "ptl/val_loss", "acc": "ptl/val_accuracy"} + callbacks = [TuneReportCallback(metrics, on="validation_end")] + trainable = tune.with_parameters( + train_mnist, + data_dir=data_dir, + num_epochs=num_epochs, + num_hosts=num_hosts, + num_slots=num_slots, + use_gpu=use_gpu, + callbacks=callbacks) + analysis = tune.run( + trainable, + metric="loss", + mode="min", + config=config, + num_samples=num_samples, + resources_per_trial={ + "cpu": 1, + # Assume 1 cpu per slot. + "extra_cpu": num_hosts * num_slots, + # Assume 1 gpu per slot. + "extra_gpu": num_hosts * num_slots * int(use_gpu) + }, + name="tune_mnist") + + print("Best hyperparameters found were: ", analysis.best_config) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "--num-hosts", + type=int, + help="Number of machines to train on. If using Tune, then each " + "trial will use this many machines.", + default=1) + parser.add_argument( + "--num-slots", + type=int, + help="Number of workers to " + "place on each " + "machine. If using " + "Tune, then each trial will use this many slots per machine.", + default=1) + parser.add_argument( + "--use-gpu", action="store_true", help="Use GPU for " + "training.") + parser.add_argument( + "--tune", + action="store_true", + help="Use Ray Tune " + "for " + "hyperparameter " + "tuning.") + parser.add_argument( + "--num-samples", + type=int, + default=10, + help="Number " + "of " + "samples to tune.") + parser.add_argument( + "--num-epochs", + type=int, + default=10, + help="Number " + "of " + "epochs " + "to train for.") + parser.add_argument( + "--smoke-test", action="store_true", help="Finish quickly for testing") + parser.add_argument( + "--address", + required=False, + type=str, + help="the address to use for Ray") + args, _ = parser.parse_known_args() + + num_epochs = 1 if args.smoke_test else args.num_epochs + num_hosts = 1 if args.smoke_test else args.num_hosts + num_slots = 1 if args.smoke_test else args.num_slots + use_gpu = False if args.smoke_test else args.use_gpu + num_samples = 1 if args.smoke_test else args.num_samples + + if args.smoke_test: + ray.init(num_cpus=2) + else: + ray.init(address=args.address) + + data_dir = os.path.join(tempfile.gettempdir(), "mnist_data_") + + if args.tune: + raise NotImplementedError("Using Tune + Pytorch Lightning with " + "distributed training is currently not " + "supported.") + tune_mnist(data_dir, num_samples, num_epochs, num_hosts, num_slots, + use_gpu) + else: + config = {"layer_1": 32, "layer_2": 64, "lr": 1e-1, "batch_size": 32} + train_mnist(config, data_dir, num_epochs, num_hosts, num_slots, + use_gpu) diff --git a/python/ray/util/lightning_accelerators/horovod_ray_accelerator.py b/python/ray/util/lightning_accelerators/horovod_ray_accelerator.py new file mode 100644 index 000000000..04f73317a --- /dev/null +++ b/python/ray/util/lightning_accelerators/horovod_ray_accelerator.py @@ -0,0 +1,121 @@ +import ray +from pytorch_lightning.accelerators.horovod_accelerator import \ + HorovodAccelerator + +try: + import horovod.torch as hvd + from horovod.ray import RayExecutor +except (ModuleNotFoundError, ImportError): + HOROVOD_AVAILABLE = False +else: + HOROVOD_AVAILABLE = True + + +def get_executable_cls(): + # Only used for testing purposes, currently. + # We need to override this in tests to ensure test path is set correctly. + return None + + +class HorovodRayAccelerator(HorovodAccelerator): + """Pytorch Lightning Accelerator for Horovod training on a Ray cluster. + + This accelerator is used to manage distributed training on a Ray cluster + via the Horovod training framework. Internally, the specified number of + Ray actors are launched in the cluster and are configured as part of the + Horovod ring. The Pytorch Lightning trainer is instantiated on the + driver and sent to each of these training workers where training is + executed. The distributed training protocol is handled by Horovod. + + Each training worker is configured to reserve 1 CPU and if 1 GPU if + ``use_gpu`` is set to ``True``. + + If using this accelerator, you should run your code like a normal Python + script: ``python train.py``, and not with ``horovodrun``. + + Args: + num_hosts (int): The number of nodes/machines to execute the job on. + num_slots (int): Number of workers to be placed on each machine. + use_gpu (bool): Whether to use GPU for allocation. For GPU to be + used, you must also set the ``gpus`` arg in your Pytorch Lightning + Trainer to a value > 0. + + Example: + + .. code_block:: python + + import pytorch_lightning as ptl + from ray.util.lightning_accelerators import HorovodRayAccelerator + + ptl_model = MNISTClassifier(...) + # 2 nodes, 4 workers per node, each using 1 CPU and 1 GPU. + accelerator = HorovodRayAccelerator(num_hosts=2, num_slots=4, + use_gpu=True). + + # If using GPUs, set the ``gpus`` arg to a value > 0. + # The actual number of GPUs is determined by ``num_slots``. + trainer = pl.Trainer(..., gpus=1, accelerator=accelerator). + trainer.fit(ptl_model). + + """ + + def __init__(self, + *args, + num_hosts=1, + num_slots=1, + use_gpu=False, + **kwargs): + super().__init__(*args, trainer=None, **kwargs) + self.nickname = "horovod_ray" + self.num_hosts = num_hosts + self.num_slots = num_slots + self.use_gpu = use_gpu + + def setup(self, model): + self.trainer.use_horovod = True + settings = RayExecutor.create_settings(timeout_s=30) + self.executor = RayExecutor( + settings, + num_hosts=self.num_hosts, + num_slots=self.num_slots, + use_gpu=self.use_gpu) + self.trainer.model = model + self.executor.start(executable_cls=get_executable_cls()) + + def train(self): + trainer = self.trainer + trainer_ref = ray.put(self.trainer) + self.trainer = None + results = self.executor.run(self.train_remote, args=[trainer_ref]) + results, state_dict, best_path = results[0] + + self.trainer = trainer + self.trainer.model.load_state_dict(state_dict) + if self.trainer.checkpoint_callback: + self.trainer.checkpoint_callback.best_model_path = best_path + + return results + + def train_remote(self, trainer_ref): + self.trainer = ray.get(trainer_ref) + hvd.init() + if self.trainer.on_gpu: + # Horovod assigns one local GPU per process. + self.trainer.root_gpu = hvd.local_rank() + + # TODO: Make changes in PTL to clean this up. + super(HorovodRayAccelerator, self).setup(self.trainer.model) + results = super(HorovodRayAccelerator, self).train() + if hvd.rank() != 0: + # Only want results from the first worker. + return None + + best_model_path = None + if self.trainer.checkpoint_callback is not None: + best_model_path = self.trainer.checkpoint_callback.best_model_path + + model = self.trainer.model + return results, model.state_dict(), best_model_path + + def teardown(self): + self.executor.shutdown() diff --git a/python/ray/util/lightning_accelerators/tests/test_horovod_ray_accelerator.py b/python/ray/util/lightning_accelerators/tests/test_horovod_ray_accelerator.py new file mode 100644 index 000000000..1d8bb9d5e --- /dev/null +++ b/python/ray/util/lightning_accelerators/tests/test_horovod_ray_accelerator.py @@ -0,0 +1,191 @@ +import os + +import torch +import pytest +import ray +from pl_bolts.datamodules.mnist_datamodule import MNISTDataModule +from ray.util.sgd.tests.test_ptl import PTL_Module +from ray.tune.examples.mnist_ptl_mini import LightningMNISTClassifier +from ray.util.lightning_accelerators import HorovodRayAccelerator +import pytorch_lightning as pl + +try: + import horovod # noqa: F401 + from horovod.common.util import nccl_built +except ImportError: + HOROVOD_AVAILABLE = False +else: + HOROVOD_AVAILABLE = True + + +def _nccl_available(): + if not HOROVOD_AVAILABLE: + return False + try: + return nccl_built() + except AttributeError: + return False + + +@pytest.fixture +def ray_start_2_cpus(): + address_info = ray.init(num_cpus=2) + yield address_info + ray.shutdown() + + +@pytest.fixture +def ray_start_2_gpus(): + address_info = ray.init(num_cpus=2, num_gpus=2) + yield address_info + ray.shutdown() + # This env var is set by Pytorch Lightning. + # Make sure to reset it after each test. + # TODO: Upstream to PTL to not set this env var if using Ray. + del os.environ["CUDA_VISIBLE_DEVICES"] + + +@pytest.fixture +def seed(): + pl.seed_everything(0) + + +def get_model(lr=1e-2, hidden_size=1, data_size=10, val_size=10, batch_size=2): + config = { + "lr": lr, + "hidden_size": hidden_size, + "data_size": data_size, + "val_size": val_size, + "batch_size": batch_size + } + return PTL_Module(config) + + +def get_trainer(dir, + num_slots=2, + use_gpu=False, + max_epochs=1, + limit_train_batches=10, + limit_val_batches=10, + progress_bar_refresh_rate=0): + accelerator = HorovodRayAccelerator(num_slots=num_slots, use_gpu=use_gpu) + trainer = pl.Trainer( + default_root_dir=dir, + gpus=1 if use_gpu else 0, + max_epochs=max_epochs, + limit_train_batches=limit_train_batches, + limit_val_batches=limit_val_batches, + progress_bar_refresh_rate=progress_bar_refresh_rate, + checkpoint_callback=True, + accelerator=accelerator) + return trainer + + +def train_test(trainer, model): + initial_values = torch.tensor( + [torch.sum(torch.abs(x)) for x in model.parameters()]) + result = trainer.fit(model) + post_train_values = torch.tensor( + [torch.sum(torch.abs(x)) for x in model.parameters()]) + assert result == 1, "trainer failed" + # Check that the model is actually changed post-training. + assert torch.norm(initial_values - post_train_values) > 0.1 + + +@pytest.mark.parametrize("num_slots", [1, 2]) +def test_train(tmpdir, ray_start_2_cpus, seed, num_slots): + model = get_model() + + trainer = get_trainer(tmpdir, num_slots=num_slots) + train_test(trainer, model) + + +def load_test(trainer, model): + trainer.fit(model) + trained_model = PTL_Module.load_from_checkpoint( + trainer.checkpoint_callback.best_model_path, config=model.config) + assert trained_model is not None, "loading model failed" + + +@pytest.mark.parametrize("num_slots", [1, 2]) +def test_load(tmpdir, ray_start_2_cpus, seed, num_slots): + model = get_model() + trainer = get_trainer(tmpdir, num_slots=num_slots) + load_test(trainer, model) + + +def predict_test(trainer, model, dm): + trainer.fit(model, dm) + test_loader = dm.test_dataloader() + acc = pl.metrics.Accuracy() + for batch in test_loader: + x, y = batch + with torch.no_grad(): + y_hat = model(x) + y_hat = y_hat.cpu() + acc.update(y_hat, y) + average_acc = acc.compute() + assert average_acc >= 0.5, f"This model is expected to get > {0.5} in " \ + f"test set (it got {average_acc})" + + +@pytest.mark.parametrize("num_slots", [1, 2]) +def test_predict(tmpdir, ray_start_2_cpus, seed, num_slots): + config = { + "layer_1": 32, + "layer_2": 32, + "lr": 1e-2, + "batch_size": 32, + } + model = LightningMNISTClassifier(config, tmpdir) + dm = MNISTDataModule( + data_dir=tmpdir, num_workers=1, batch_size=config["batch_size"]) + trainer = get_trainer( + tmpdir, limit_train_batches=10, max_epochs=1, num_slots=num_slots) + predict_test(trainer, model, dm) + + +@pytest.mark.skipif( + not _nccl_available(), reason="test requires Horovod with NCCL support") +@pytest.mark.skipif( + torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +@pytest.mark.parametrize("num_slots", [1, 2]) +def test_train_gpu(tmpdir, ray_start_2_gpus, seed, num_slots): + model = get_model() + trainer = get_trainer(tmpdir, num_slots=num_slots, use_gpu=True) + train_test(trainer, model) + + +@pytest.mark.skipif( + not _nccl_available(), reason="test requires Horovod with NCCL support") +@pytest.mark.skipif( + torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +@pytest.mark.parametrize("num_slots", [1, 2]) +def test_load_gpu(tmpdir, ray_start_2_gpus, seed, num_slots): + model = get_model() + trainer = get_trainer(tmpdir, num_slots=num_slots, use_gpu=True) + load_test(trainer, model) + + +@pytest.mark.skipif( + not _nccl_available(), reason="test requires Horovod with NCCL support") +@pytest.mark.skipif( + torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +@pytest.mark.parametrize("num_slots", [1, 2]) +def test_predict_gpu(tmpdir, ray_start_2_gpus, seed, num_slots): + config = { + "layer_1": 32, + "layer_2": 32, + "lr": 1e-2, + "batch_size": 32, + } + model = LightningMNISTClassifier(config, tmpdir) + dm = MNISTDataModule( + data_dir=tmpdir, num_workers=1, batch_size=config["batch_size"]) + trainer = get_trainer( + tmpdir, + limit_train_batches=10, + max_epochs=1, + num_slots=num_slots, + use_gpu=True) + predict_test(trainer, model, dm)