diff --git a/ci/jenkins_tests/run_tune_tests.sh b/ci/jenkins_tests/run_tune_tests.sh index b5be73013..579b9d5c4 100755 --- a/ci/jenkins_tests/run_tune_tests.sh +++ b/ci/jenkins_tests/run_tune_tests.sh @@ -146,6 +146,9 @@ $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python -m pytest /ray/python/ray/experimental/sgd/tests +$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + python /ray/doc/examples/doc_code/raysgd_torch_signatures.py + $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/python/ray/experimental/sgd/pytorch/examples/train_example.py diff --git a/doc/examples/doc_code/raysgd_torch_signatures.py b/doc/examples/doc_code/raysgd_torch_signatures.py new file mode 100644 index 000000000..b8fe06180 --- /dev/null +++ b/doc/examples/doc_code/raysgd_torch_signatures.py @@ -0,0 +1,121 @@ +# flake8: noqa +""" +This file holds code for the Pytorch Trainer creator signatures. + +It ignores yapf because yapf doesn't allow comments right after code blocks, +but we put comments right after code blocks to prevent large white spaces +in the documentation. +""" +# yapf: disable + +# __torch_model_start__ +import torch.nn as nn + +def model_creator(config): + """Constructor function for the model(s) to be optimized. + + You will also need to provide a custom training + function to specify the optimization procedure for multiple models. + + Args: + config (dict): Configuration dictionary passed into ``PyTorchTrainer``. + + Returns: + One or more torch.nn.Module objects. + """ + return nn.Linear(1, 1) +# __torch_model_end__ + + +# __torch_optimizer_start__ +import torch + +def optimizer_creator(model, config): + """Constructor of one or more Torch optimizers. + + Args: + models: The return values from ``model_creator``. This can be one + or more torch nn modules. + config (dict): Configuration dictionary passed into ``PyTorchTrainer``. + + Returns: + One or more Torch optimizer objects. + """ + return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4)) +# __torch_optimizer_end__ + + +# __torch_data_start__ +from ray.experimental.sgd.pytorch.examples.train_example import LinearDataset + +def data_creator(config): + """Constructs torch.utils.data.Dataset objects. + + Note that even though two Dataset objects can be returned, + only one dataset will be used for training. + + Args: + config: Configuration dictionary passed into ``PyTorchTrainer`` + + Returns: + One or Two Dataset objects. If only one Dataset object is provided, + ``trainer.validate()`` will throw a ValueError. + """ + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) +# __torch_data_end__ + +# __torch_loss_start__ +import torch + +def loss_creator(config): + """Constructs the Torch Loss object. + + Note that optionally, you can pass in a Torch Loss constructor directly + into the PyTorchTrainer (i.e., ``PyTorchTrainer(loss_creator=nn.BCELoss, ...)``). + + Args: + config: Configuration dictionary passed into ``PyTorchTrainer`` + + Returns: + Torch Loss object. + """ + return torch.nn.BCELoss() +# __torch_loss_end__ + +# __torch_scheduler_start__ +import torch + +def scheduler_creator(optimizer, config): + """Constructor of one or more Torch optimizer schedulers. + + Args: + optimizers: The return values from ``optimizer_creator``. + This can be one or more torch optimizer objects. + config: Configuration dictionary passed into ``PyTorchTrainer`` + + Returns: + One or more Torch scheduler objects. + """ + return torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9) + +# __torch_scheduler_end__ + +# __torch_ray_start__ +import ray + +ray.init() +# or ray.init(address="auto") to connect to a running cluster. +# __torch_ray_end__ + +# __torch_trainer_start__ +from ray.experimental.sgd import PyTorchTrainer + +trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + scheduler_creator=scheduler_creator, + config={"lr": 0.001}) + +# __torch_trainer_end__ diff --git a/doc/source/index.rst b/doc/source/index.rst index 1edb4972e..98b25d82d 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -281,7 +281,6 @@ Getting Involved raysgd/raysgd.rst raysgd/raysgd_pytorch.rst - raysgd/raysgd_pytorch_examples.rst raysgd/raysgd_tensorflow.rst raysgd/raysgd_ref.rst diff --git a/doc/source/raysgd/raysgd.rst b/doc/source/raysgd/raysgd.rst index b54f4adea..426464641 100644 --- a/doc/source/raysgd/raysgd.rst +++ b/doc/source/raysgd/raysgd.rst @@ -1,10 +1,6 @@ RaySGD: Distributed Deep Learning ================================= -.. tip:: Get in touch with us if you're using or considering using `RaySGD `_! -.. warning:: This is still an experimental API and is subject to change in the near future. - - .. image:: raysgdlogo.png :scale: 20% :align: center @@ -17,6 +13,8 @@ The main features are: - **Composability**: RaySGD is built on top of the Ray Actor API, enabling seamless integration with existing Ray applications such as RLlib, Tune, and Ray.Serve. - **Scale up and down**: Start on single CPU. Scale up to multi-node, multi-CPU, or multi-GPU clusters by changing 2 lines of code. +.. tip:: We need your feedback! RaySGD is currently early in its development, and we're hoping to get feedback from people using or considering it. We'd love `to get in touch `_! + Getting Started --------------- @@ -53,7 +51,7 @@ You can start a ``PyTorchTrainer`` with the following: model_creator, data_creator, optimizer_creator, - loss_creator, + loss_creator=nn.MSELoss, num_replicas=2, use_gpu=True, batch_size=512, diff --git a/doc/source/raysgd/raysgd_pytorch.rst b/doc/source/raysgd/raysgd_pytorch.rst index a8c724aec..7dcbf6a69 100644 --- a/doc/source/raysgd/raysgd_pytorch.rst +++ b/doc/source/raysgd/raysgd_pytorch.rst @@ -1,135 +1,124 @@ RaySGD Pytorch ============== -.. warning:: This is still an experimental API and is subject to change in the near future. - -.. tip:: Get in touch with us if you're using or considering using `RaySGD `_! - .. image:: raysgd-pytorch.svg :align: center -The RaySGD ``PyTorchTrainer`` simplifies distributed model training for PyTorch. The ``PyTorchTrainer`` is a wrapper around ``torch.distributed.launch`` with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to execute training outside of Python. For end to end examples, see :ref:`raysgd-pytorch-example`. +The RaySGD ``PyTorchTrainer`` simplifies distributed model training for PyTorch. The ``PyTorchTrainer`` is a wrapper around ``torch.distributed.launch`` with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to wrap your training code in bash scripts. Under the hood, ``PytorchTrainer`` will create *replicas* of your model (controlled by ``num_replicas``), each of which is managed by a Ray actor. .. image:: raysgd-actors.svg :align: center - +For end to end examples leveraging RaySGD PyTorchTrainer, jump to :ref:`raysgd-pytorch-examples`. Setting up training ------------------- -The ``PyTorchTrainer`` can be constructed with functions that wrap components of the training script. Specifically, it needs constructors for the Model, Data, Optimizer, and Loss to create replicated copies across different devices and machines. +.. tip:: We need your feedback! RaySGD is currently early in its development, and we're hoping to get feedback from people using or considering it. We'd love `to get in touch `_! -For example: +The ``PyTorchTrainer`` can be constructed with functions that wrap components of the training script. Specifically, it requires constructors for the Model, Data, Optimizer, Loss, and ``lr_scheduler`` to create replicated copies across different devices and machines. -.. code-block:: python +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_trainer_start__ + :end-before: __torch_trainer_end__ - import numpy as np - import torch - import torch.nn as nn - from torch import distributed +The below section covers the expected signatures of creator functions. Jump to :ref:`starting-pytorch-trainer`. - from ray.experimental.sgd import PyTorchTrainer - from ray.experimental.sgd.examples.train_example import LinearDataset +Model Creator +~~~~~~~~~~~~~ - def model_creator(config): - """Constructor function for the model(s) to be optimized. +This is the signature needed for ``PyTorchTrainer(model_creator=...)``. - Note that if multiple models are returned, the same number of optimizers - must be returned. You will also need to provide a custom training - function to specify the optimization procedure for multiple models. - - Args: - config (dict): Configuration dictionary passed into ``PyTorchTrainer``. - - Returns: - One or more torch.nn.Module objects. - """ - return nn.Linear(1, 1) +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_model_start__ + :end-before: __torch_model_end__ - def optimizer_creator(models, config): - """Constructor of the optimizers. +Optimizer Creator +~~~~~~~~~~~~~~~~~ - Args: - models: The return values from ``model_creator``. This can be one - or more torch nn modules. - config (dict): Configuration dictionary passed into ``PyTorchTrainer``. +This is the signature needed for ``PyTorchTrainer(optimizer_creator=...)``. - Returns: - One or more Torch optimizer objects. You must return as many optimizers - as you have models. - """ - return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4)) - - - def data_creator(config): - """Constructs torch.utils.data.Dataset objects. - - Note that even though two Dataset objects can be returned, - only one dataset will be used for training. - - Args: - config: Configuration dictionary passed into ``PyTorchTrainer`` - - Returns: - One or Two Dataset objects. If only one Dataset object is provided, - ``trainer.validate()`` will throw a ValueError. - """ - return LinearDataset(2, 5), LinearDataset(2, 5, size=400) - - def loss_creator(config): - """Constructs the Torch Loss object. - - Note that optionally, you can pass in a Torch Loss constructor directly - into the PyTorchTrainer (i.e., ``PyTorchTrainer(loss_creator=nn.BCELoss, ...))``). - - Returns: - Torch Loss object. - """ - return torch.nn.BCELoss() +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_optimizer_start__ + :end-before: __torch_optimizer_end__ -Before instantiating the trainer, you'll have to start or connect to a Ray cluster: +Data Creator +~~~~~~~~~~~~ -.. code-block:: python +This is the signature needed for ``PyTorchTrainer(data_creator=...)``. - ray.init() - # or ray.init(address="auto") if a cluster has been started. +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_data_start__ + :end-before: __torch_data_end__ + + + +Loss Creator +~~~~~~~~~~~~ + +This is the signature needed for ``PyTorchTrainer(loss_creator=...)``. + +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_loss_start__ + :end-before: __torch_loss_end__ + + +Scheduler Creator +~~~~~~~~~~~~~~~~~ + +Optionally, you can provide a creator function for the learning rate scheduler. This is the signature needed +for ``PyTorchTrainer(scheduler_creator=...)``. + +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_scheduler_start__ + :end-before: __torch_scheduler_end__ + +.. _starting-pytorch-trainer: + +Putting things together +~~~~~~~~~~~~~~~~~~~~~~~ + +Before instantiating the trainer, first start or connect to a Ray cluster: + +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_ray_start__ + :end-before: __torch_ray_end__ Instantiate the trainer object: -.. code-block:: python - - from ray.experimental.sgd import PyTorchTrainer - - trainer = PyTorchTrainer( - model_creator, - data_creator, - optimizer_creator, - loss_creator=nn.MSELoss, - config={"lr": 0.001}) - +.. literalinclude:: ../../examples/doc_code/raysgd_torch_signatures.py + :language: python + :start-after: __torch_trainer_start__ + :end-before: __torch_trainer_end__ You can also set the number of workers and whether the workers will use GPUs: .. code-block:: python + :emphasize-lines: 8,9 trainer = PyTorchTrainer( model_creator, data_creator, optimizer_creator, loss_creator=nn.MSELoss, + scheduler_creator=scheduler_creator, config={"lr": 0.001}, num_replicas=100, use_gpu=True) - -See the documentation on the PyTorchTrainer here: :ref:`ref-pytorch-trainer`. -We'll look at the training APIs next. +See the documentation on the PyTorchTrainer here: :ref:`ref-pytorch-trainer`. We'll look at the training APIs next. Training APIs ------------- @@ -154,17 +143,19 @@ You can customize the exact function that is called by using a customized traini Shutting down training ---------------------- -After training, you may want to reappropriate the Ray cluster. To release Ray resources obtained by the trainer: +After training, you may want to reappropriate the Ray cluster. To release Ray resources obtained by the Trainer: .. code-block:: python trainer.shutdown() -.. note:: Be sure to call ``save`` or ``get_model`` before shutting down. +.. note:: Be sure to call ``trainer.save()`` or ``trainer.get_model()`` before shutting down. Initialization Functions ------------------------ +.. warning:: This is still an experimental API and is subject to change without warning. + You may want to run some initializers on each worker when they are started. This may be something like setting an environment variable or downloading some data. You can do this via the ``initialization_hook`` parameter: .. code-block:: python @@ -193,15 +184,7 @@ Save and Load If you want to save or reload the training procedure, you can use ``trainer.save`` and ``trainer.load``, which wraps the relevant ``torch.save`` and ``torch.load`` calls. This should work across a distributed cluster even without a NFS because it takes advantage of Ray's distributed object store. -.. code-block:: - - trainer_1 = PyTorchTrainer( - model_creator, - data_creator, - optimizer_creator, - loss_creator=nn.MSELoss, - num_replicas=num_replicas) - trainer_1.train() +.. code-block:: python checkpoint_path = os.path.join(tempfile.mkdtemp(), "checkpoint") trainer_1.save(checkpoint_path) @@ -210,7 +193,7 @@ and ``trainer.load``, which wraps the relevant ``torch.save`` and ``torch.load`` model_creator, data_creator, optimizer_creator, - loss_creator=lambda config: nn.MSELoss(), + loss_creator=nn.MSELoss, num_replicas=num_replicas) trainer_2.restore(checkpoint_path) @@ -220,10 +203,51 @@ Exporting a model for inference The trained torch model can be extracted for use within the same Python program with ``trainer.get_model()``. This will load the state dictionary of the model(s). -.. code-block:: +.. code-block:: python trainer.train() - model = trainer.get_model() + model = trainer.get_model() # Returns multiple models if the model_creator does. + +Mixed Precision (FP16) Training +------------------------------- + +You can enable mixed precision training for PyTorch with the ``use_fp16`` flag. This automatically converts the model(s) and optimizer(s) to train using mixed-precision. This requires NVIDIA ``Apex``, which can be installed from `the NVIDIA/Apex repository `_: + +.. code-block:: python + :emphasize-lines: 7 + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + num_replicas=4, + use_fp16=True + ) + +``Apex`` is a Pytorch extension with NVIDIA-maintained utilities to streamline mixed precision and distributed training. When ``use_fp16=True``, +you should not manually cast your model or data to ``.half()``. The flag informs the Trainer to call ``amp.initialize`` on the created models and optimizers and optimize using the scaled loss: ``amp.scale_loss(loss, optimizer)``. + +To specify particular parameters for ``amp.initialize``, you can use the ``apex_args`` field for the PyTorchTrainer constructor. Valid arguments can be found on the `Apex documentation `_: + +.. code-block:: python + :emphasize-lines: 7-12 + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + num_replicas=4, + use_fp16=True, + apex_args={ + opt_level="O3", + num_losses=2, + verbosity=0 + } + ) + +Note that if using a custom training function, you will need to manage loss scaling manually. Distributed Multi-node Training @@ -236,9 +260,9 @@ You can start a Ray cluster `via the Ray cluster launcher `_ o .. code-block:: bash ray up CLUSTER.yaml - python train.py --address="auto" + ray submit train.py --args="--address='auto'" -Then, you'll be able to scale up the number of workers seamlessly across multiple nodes: +Then, within ``train.py`` you can scale up the number of workers seamlessly across multiple nodes: .. code-block:: python @@ -246,7 +270,7 @@ Then, you'll be able to scale up the number of workers seamlessly across multipl model_creator, data_creator, optimizer_creator, - loss_creator=lambda config: nn.MSELoss(), + loss_creator=nn.MSELoss, num_replicas=100) @@ -255,7 +279,7 @@ Advanced: Fault Tolerance For distributed deep learning, jobs are often run on infrastructure where nodes can be pre-empted frequently (i.e., spot instances in the cloud). To overcome this, RaySGD provides **fault tolerance** features that enable training to continue regardless of node failures. -.. code-block:: bash +.. code-block:: python trainer.train(max_retries=N) @@ -273,7 +297,7 @@ Note that we assume the Trainer itself is not on a pre-emptible node. It is curr Users can set ``checkpoint="auto"`` to always checkpoint the current model before executing a pass over the training dataset. -.. code-block:: bash +.. code-block:: python trainer.train(max_retries=N, checkpoint="auto") @@ -281,6 +305,8 @@ Users can set ``checkpoint="auto"`` to always checkpoint the current model befor Advanced: Hyperparameter Tuning ------------------------------- +.. warning:: This is still an experimental API and is subject to change without warning. + ``PyTorchTrainer`` naturally integrates with Tune via the ``PyTorchTrainable`` interface. The same arguments to ``PyTorchTrainer`` should be passed into the ``tune.run(config=...)`` as shown below. .. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/tune_example.py @@ -288,12 +314,12 @@ Advanced: Hyperparameter Tuning :start-after: __torch_tune_example__ -Simultaneous Multi-model training +Simultaneous Multi-model Training --------------------------------- -In certain scenarios such as training GANs, you may want to use multiple models in the training loop. You can do this in the ``PyTorchTrainer`` by allowing the ``model_creator`` and the ``optimizer_creator`` to return multiple values. +In certain scenarios such as training GANs, you may want to use multiple models in the training loop. You can do this in the ``PyTorchTrainer`` by allowing the ``model_creator``, ``optimizer_creator``, and ``scheduler_creator`` to return multiple values. -If multiple models are returned, you will need to provide a custom training function (and custom validation function if you plan to call ``validate``). +If multiple models, optimizers, or schedulers are returned, you will need to provide a custom training function (and custom validation function if you plan to call ``validate``). You can see the `DCGAN script `_ for an end-to-end example. @@ -336,24 +362,36 @@ You can see the `DCGAN script `_. + +.. _raysgd-pytorch-examples: + +PyTorchTrainer Examples +----------------------- + +Here are some examples of using RaySGD for training PyTorch models. If you'd like +to contribute an example, feel free to create a `pull request here `_. + +- `PyTorch training example `__: + Simple example of using Ray's PyTorchTrainer. + +- `CIFAR10 example `__: + Training a ResNet18 model on CIFAR10. It uses a custom training + function, a custom validation function, and custom initialization code for each worker. + +- `DCGAN example `__: + Training a Deep Convolutional GAN on MNIST. It constructs + two models and two optimizers and uses a custom training and validation function. diff --git a/doc/source/raysgd/raysgd_pytorch_examples.rst b/doc/source/raysgd/raysgd_pytorch_examples.rst deleted file mode 100644 index d8546d801..000000000 --- a/doc/source/raysgd/raysgd_pytorch_examples.rst +++ /dev/null @@ -1,38 +0,0 @@ -.. _raysgd-pytorch-example: - -RaySGD PyTorch Examples -======================= - -Here are some examples of using RaySGD for training PyTorch models. If you'd like -to contribute an example, feel free to create a `pull request here `_. - - -Toy Example ------------ - -Below is an example of using Ray's PyTorchTrainer. - - -.. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/train_example.py - :language: python - :start-after: __torch_train_example__ - - -CIFAR10 Example ---------------- - -Below is an example of training a ResNet18 model on CIFAR10. It uses a custom training -function, a custom validation function, and custom initialization code for each worker. - -.. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py - :language: python - - -DCGAN Example -------------- - -Below is an example of training a Deep Convolutional GAN on MNIST. It constructs -two models and two optimizers and uses a custom training and validation function. - -.. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/dcgan.py - :language: python diff --git a/doc/source/raysgd/raysgd_tensorflow.rst b/doc/source/raysgd/raysgd_tensorflow.rst index c42277376..b031153bb 100644 --- a/doc/source/raysgd/raysgd_tensorflow.rst +++ b/doc/source/raysgd/raysgd_tensorflow.rst @@ -1,14 +1,12 @@ RaySGD TensorFlow ================= -.. warning:: This is still an experimental API and is subject to change in the near future. - -.. tip:: Help us make RaySGD better; take this 1 minute `User Survey `_! - RaySGD's ``TFTrainer`` simplifies distributed model training for Tensorflow. The ``TFTrainer`` is a wrapper around ``MultiWorkerMirroredStrategy`` with a Python API to easily incorporate distributed training into a larger Python application, as opposed to write custom logic of setting environments and starting separate processes. .. important:: This API has only been tested with TensorFlow2.0rc and is still highly experimental. Please file bug reports if you run into any - thanks! +.. tip:: We need your feedback! RaySGD is currently early in its development, and we're hoping to get feedback from people using or considering it. We'd love `to get in touch `_! + ---------- **With Ray**: diff --git a/doc/source/raysgd/raysgdlogo.png b/doc/source/raysgd/raysgdlogo.png index fc281cafc..20c056150 100644 Binary files a/doc/source/raysgd/raysgdlogo.png and b/doc/source/raysgd/raysgdlogo.png differ diff --git a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py index 4a9a4ff78..0917eef65 100644 --- a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py @@ -13,16 +13,17 @@ logger = logging.getLogger(__name__) class DistributedPyTorchRunner(PyTorchRunner): - """Manages a distributed PyTorch model replica.""" + """Manages a distributed PyTorch model replica. + + + Args: + args: Arguments for PyTorchRunner. + backend (string): backend used by distributed PyTorch. + kwargs: Keyword arguments for PyTorchRunner. + + """ def __init__(self, *args, backend="gloo", **kwargs): - """Initializes the runner. - - Args: - args: Arguments for the PyTorchRunner. - kwargs: Keyword arguments for the PyTorchRunner. - backend (string): backend used by distributed PyTorch. - """ super(DistributedPyTorchRunner, self).__init__(*args, **kwargs) self.backend = backend @@ -59,7 +60,6 @@ class DistributedPyTorchRunner(PyTorchRunner): "All models must be PyTorch models: {}.".format(self.models)) if torch.cuda.is_available(): self.models = [model.cuda() for model in self.models] - self.models = [DistributedDataParallel(model) for model in self.models] logger.debug("Creating optimizer.") self.optimizers = self.optimizer_creator(self.given_models, @@ -67,6 +67,13 @@ class DistributedPyTorchRunner(PyTorchRunner): if not isinstance(self.optimizers, collections.Iterable): self.optimizers = [self.optimizers] + self._create_schedulers_if_available() + + self._try_setup_apex() + + # This needs to happen after apex + self.models = [DistributedDataParallel(model) for model in self.models] + logger.debug("Creating loss.") self._create_loss() @@ -98,37 +105,27 @@ class DistributedPyTorchRunner(PyTorchRunner): self.train_loader.sampler.set_epoch(self.epoch) return super(DistributedPyTorchRunner, self).step() - def get_state(self): - """Returns the state of the runner.""" - # This is so that we create a duplicate of weights into CPU rather than - # move the model weights entirely out of the GPU, so that we can - # resume training while saving intermediate checkpoints. + def _get_model_state_dicts(self): + """Fetch state from ``model.module`` instead of ``model``. + + This is needed for PyTorch DistributedDataParallel models. + """ cpu_state_dicts = [] for model in self.models: state_dict = model.module.state_dict() - for k, v in state_dict.items(): - state_dict[k] = v.cpu() - cpu_state_dicts += [state_dict] - return { - "epoch": self.epoch, - "models": cpu_state_dicts, - "optimizers": [opt.state_dict() for opt in self.optimizers], - "stats": self.stats() - } + # This is so that we create a duplicate of weights into CPU rather + # than move the model weights out of the GPU so that we can + # resume training while saving intermediate checkpoints. + cpu_state_dicts += [{k: v.cpu() for k, v in state_dict.items()}] + return cpu_state_dicts - def set_state(self, state): - """Sets the state of the model.""" - # TODO: restore timer stats - for model, model_state_dict in zip(self.models, state["models"]): + def _set_model_state_dicts(self, model_state_dicts): + for model, model_state_dict in zip(self.models, model_state_dicts): model.module.load_state_dict(model_state_dict) - for optimizer, opt_state_dict in zip(self.optimizers, - state["optimizers"]): - optimizer.load_state_dict(opt_state_dict) - self.epoch = state["stats"]["epoch"] - def shutdown(self): + # def shutdown(self): """Attempts to shut down the worker.""" - super(DistributedPyTorchRunner, self).shutdown() + # super(DistributedPyTorchRunner, self).shutdown() # TODO: Temporarily removing since it causes hangs on MacOSX. # However, it seems to be harmless to remove permanently # since the processes are shutdown anyways. This comment can be diff --git a/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py b/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py index 1d454e019..531740649 100644 --- a/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py +++ b/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py @@ -10,6 +10,7 @@ import torchvision.transforms as transforms import ray from ray.experimental.sgd.pytorch import (PyTorchTrainer, PyTorchTrainable) from ray.experimental.sgd.pytorch.resnet import ResNet18 +from ray.experimental.sgd.pytorch.utils import TEST_MODE def initialization_hook(runner): @@ -20,55 +21,6 @@ def initialization_hook(runner): os.environ["NCCL_DEBUG"] = "INFO" -def train(model, train_iterator, criterion, optimizer, config): - model.train() - train_loss, total_num, correct = 0, 0, 0 - for batch_idx, (data, target) in enumerate(train_iterator): - if config.get("test_mode") and batch_idx > 0: - break - # get small model update - if torch.cuda.is_available(): - data, target = data.cuda(), target.cuda() - output = model(data) - loss = criterion(output, target) - loss.backward() - train_loss += loss.item() * target.size(0) - total_num += target.size(0) - _, predicted = output.max(1) - correct += predicted.eq(target).sum().item() - optimizer.step() - optimizer.zero_grad() - stats = { - "train_loss": train_loss / total_num, - "train_acc": correct / total_num - } - return stats - - -def validate(model, val_iterator, criterion, config): - # switch to evaluate mode - model.eval() - correct = 0 - total = 0 - total_loss = 0 - with torch.no_grad(): - for batch_idx, (features, target) in enumerate(val_iterator): - if config.get("test_mode") and batch_idx > 10: - break - if torch.cuda.is_available(): - features = features.cuda(non_blocking=True) - target = target.cuda(non_blocking=True) - # compute output - output = model(features) - loss = criterion(output, target) - total_loss += loss.item() * target.size(0) - _, predicted = torch.max(output.data, 1) - total += target.size(0) - correct += (predicted == target).sum().item() - stats = {"mean_accuracy": correct / total, "mean_loss": total_loss / total} - return stats - - def cifar_creator(config): transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4), @@ -96,23 +48,34 @@ def optimizer_creator(model, config): return torch.optim.SGD(model.parameters(), lr=config.get("lr", 0.1)) -def train_example(num_replicas=1, use_gpu=False, test_mode=False): - config = {"test_mode": test_mode} +def scheduler_creator(optimizer, config): + return torch.optim.lr_scheduler.MultiStepLR( + optimizer, milestones=[150, 250, 350], gamma=0.1) + + +def train_example(num_replicas=1, + num_epochs=5, + use_gpu=False, + use_fp16=False, + test_mode=False): + config = {TEST_MODE: test_mode} trainer1 = PyTorchTrainer( ResNet18, cifar_creator, optimizer_creator, nn.CrossEntropyLoss, + scheduler_creator=scheduler_creator, initialization_hook=initialization_hook, - train_function=train, - validation_function=validate, num_replicas=num_replicas, config=config, use_gpu=use_gpu, batch_size=16 if test_mode else 512, - backend="nccl" if use_gpu else "gloo") - for i in range(5): - stats = trainer1.train() + backend="nccl" if use_gpu else "gloo", + scheduler_step_freq="epoch", + use_fp16=use_fp16) + for i in range(num_epochs): + # Increase `max_retries` to turn on fault tolerance. + stats = trainer1.train(max_retries=0) print(stats) print(trainer1.validate()) @@ -126,15 +89,13 @@ def tune_example(num_replicas=1, use_gpu=False, test_mode=False): "data_creator": cifar_creator, "optimizer_creator": optimizer_creator, "loss_creator": lambda config: nn.CrossEntropyLoss(), - "train_function": train, - "validation_function": validate, "num_replicas": num_replicas, "initialization_hook": initialization_hook, "use_gpu": use_gpu, "batch_size": 16 if test_mode else 512, "config": { "lr": tune.choice([1e-4, 1e-3, 5e-3, 1e-2]), - "test_mode": test_mode + TEST_MODE: test_mode }, "backend": "nccl" if use_gpu else "gloo" } @@ -152,7 +113,7 @@ def tune_example(num_replicas=1, use_gpu=False, test_mode=False): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( - "--ray-redis-address", + "--address", required=False, type=str, help="the address to use for Redis") @@ -162,11 +123,18 @@ if __name__ == "__main__": type=int, default=1, help="Sets number of replicas for training.") + parser.add_argument( + "--num-epochs", type=int, default=5, help="Number of epochs to train.") parser.add_argument( "--use-gpu", action="store_true", default=False, help="Enables GPU training") + parser.add_argument( + "--fp16", + action="store_true", + default=False, + help="Enables FP16 training with apex. Requires `use-gpu`.") parser.add_argument( "--smoke-test", action="store_true", @@ -177,7 +145,7 @@ if __name__ == "__main__": args, _ = parser.parse_known_args() - ray.init(address=args.ray_redis_address, log_to_driver=False) + ray.init(address=args.address, log_to_driver=True) if args.tune: tune_example( @@ -187,5 +155,7 @@ if __name__ == "__main__": else: train_example( num_replicas=args.num_replicas, + num_epochs=args.num_epochs, use_gpu=args.use_gpu, + use_fp16=args.fp16, test_mode=args.smoke_test) diff --git a/python/ray/experimental/sgd/pytorch/examples/dcgan.py b/python/ray/experimental/sgd/pytorch/examples/dcgan.py index 031e89e61..91f41e265 100644 --- a/python/ray/experimental/sgd/pytorch/examples/dcgan.py +++ b/python/ray/experimental/sgd/pytorch/examples/dcgan.py @@ -16,6 +16,7 @@ from scipy.stats import entropy import ray from ray.experimental.sgd import PyTorchTrainer +from ray.experimental.sgd.pytorch.utils import TEST_MODE # Training parameters TRAIN_BATCHES = 5 @@ -157,7 +158,7 @@ def model_creator(config): return netD, netG -def train(models, dataloader, criterion, optimizers, config): +def train(config, models, dataloader, criterion, optimizers, **kwargs): netD, netG = models optimD, optimG = optimizers real_label = 1 @@ -165,7 +166,7 @@ def train(models, dataloader, criterion, optimizers, config): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") for i, data in enumerate(dataloader, 0): - if i >= TRAIN_BATCHES and config.get("test_mode"): + if i >= TRAIN_BATCHES and config.get(TEST_MODE): break netD.zero_grad() @@ -211,7 +212,7 @@ def optimizer_creator(models, config): def train_example(num_replicas=1, use_gpu=False, test_mode=False): - config = {"test_mode": test_mode} + config = {TEST_MODE: test_mode} trainer = PyTorchTrainer( model_creator, data_creator, diff --git a/python/ray/experimental/sgd/pytorch/examples/example-sgd.yaml b/python/ray/experimental/sgd/pytorch/examples/example-sgd.yaml index 32a7bbf86..c72c16d1b 100644 --- a/python/ray/experimental/sgd/pytorch/examples/example-sgd.yaml +++ b/python/ray/experimental/sgd/pytorch/examples/example-sgd.yaml @@ -19,7 +19,6 @@ idle_timeout_minutes: 20 provider: type: aws region: us-east-1 - availability_zone: us-east-1f # How Ray will authenticate with newly launched nodes. auth: @@ -37,18 +36,18 @@ head_node: worker_nodes: InstanceType: p3.8xlarge ImageId: ami-0757fc5a639fe7666 + # Run workers on spot by default. Comment this out to use on-demand. InstanceMarketOptions: MarketType: spot # SpotOptions: # MaxPrice: "9.0" - # # Run workers on spot by default. Comment this out to use on-demand. - # InstanceMarketOptions: - # MarketType: spot - setup_commands: - ray || pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.9.0.dev0-cp36-cp36m-manylinux1_x86_64.whl - pip install -U ipdb ray[rllib] torch torchvision + # Install apex. + # - rm -rf apex || true + # - git clone https://github.com/NVIDIA/apex && cd apex && pip install -v --no-cache-dir ./ || true file_mounts: { diff --git a/python/ray/experimental/sgd/pytorch/examples/train_example.py b/python/ray/experimental/sgd/pytorch/examples/train_example.py index 5c9e68f7e..4b1f94bdb 100644 --- a/python/ray/experimental/sgd/pytorch/examples/train_example.py +++ b/python/ray/experimental/sgd/pytorch/examples/train_example.py @@ -32,12 +32,25 @@ class LinearDataset(torch.utils.data.Dataset): def model_creator(config): - return nn.Linear(1, 1) + """Returns a torch.nn.Module object.""" + return nn.Linear(1, config.get("hidden_size", 1)) def optimizer_creator(model, config): - """Returns optimizer.""" - return torch.optim.SGD(model.parameters(), lr=1e-2) + """Returns optimizer defined upon the model parameters.""" + return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-2)) + + +def scheduler_creator(optimizer, config): + """Returns a learning rate scheduler wrapping the optimizer. + + You will need to set ``PyTorchTrainer(scheduler_step_freq="epoch")`` + for the scheduler to be incremented correctly. + + If using a scheduler for validation loss, be sure to call + ``trainer.update_scheduler(validation_loss)``. + """ + return torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9) def data_creator(config): @@ -51,10 +64,13 @@ def train_example(num_replicas=1, use_gpu=False): data_creator, optimizer_creator, loss_creator=nn.MSELoss, + scheduler_creator=scheduler_creator, num_replicas=num_replicas, use_gpu=use_gpu, batch_size=num_replicas * 4, - backend="gloo") + config={"lr": 1e-2, "hidden_size": 1}, + backend="gloo", + scheduler_step_freq="epoch") for i in range(5): stats = trainer1.train() print(stats) diff --git a/python/ray/experimental/sgd/pytorch/pytorch_runner.py b/python/ray/experimental/sgd/pytorch/pytorch_runner.py index e5319a8a5..5587574d0 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_runner.py @@ -12,44 +12,57 @@ from ray.experimental.sgd.pytorch import utils as pytorch_utils from ray.experimental.sgd import utils logger = logging.getLogger(__name__) +amp = None + +try: + from apex import amp +except ImportError: + logger.debug("apex is not installed.") + pass class PyTorchRunner: - """Manages a PyTorch model for training.""" + """Manages a PyTorch model for training. + + Args: + model_creator (dict -> *): see pytorch_trainer.py + data_creator (dict -> Dataset, Dataset): see pytorch_trainer.py. + optimizer_creator (models, dict -> optimizers): see pytorch_trainer.py. + loss_creator (dict -> loss | Loss class): see pytorch_trainer.py. + scheduler_creator (optimizers, dict -> schedulers): see + pytorch_trainer.py. + train_function: see pytorch_trainer.py + validation_function: see pytorch_trainer.py + config (dict): see pytorch_trainer.py. + dataloader_config (dict): See pytorch_trainer.py. + batch_size (int): see pytorch_trainer.py. + use_fp16 (bool): see pytorch_trainer.py. + apex_args (dict|None): see pytorch_trainer.py. + scheduler_step_freq (str): see pytorch_trainer.py. + """ def __init__(self, model_creator, data_creator, optimizer_creator, loss_creator, + scheduler_creator=None, train_function=None, validation_function=None, config=None, dataloader_config=None, - batch_size=16): - """Initializes the runner. - - Args: - model_creator (dict -> torch.nn.Module): see pytorch_trainer.py - data_creator (int, dict -> Dataset, Dataset): see - pytorch_trainer.py. - optimizer_creator (torch.nn.Module, dict -> loss, optimizer): - see pytorch_trainer.py. - loss_creator (dict -> loss | Loss class): see pytorch_trainer.py. - train_function: see pytorch_trainer.py - validation_function: see pytorch_trainer.py - config (dict): see pytorch_trainer.py. - dataloader_config (dict): See pytorch_trainer.py. - batch_size (int): see pytorch_trainer.py. - """ + batch_size=16, + use_fp16=False, + apex_args=None, + scheduler_step_freq="batch"): self.model_creator = model_creator self.data_creator = data_creator self.optimizer_creator = optimizer_creator self.loss_creator = loss_creator + self.scheduler_creator = scheduler_creator self.config = {} if config is None else config self.dataloader_config = { - "num_workers": 2, - "pin_memory": True + "num_workers": 2 } if dataloader_config is None else dataloader_config self.train_function = train_function or pytorch_utils.train self.validation_function = (validation_function @@ -65,12 +78,19 @@ class PyTorchRunner: "validation", "training" ] } - self.models = None self.optimizers = None self.criterion = None + self.schedulers = None self.train_loader = None self.validation_loader = None + self.use_fp16 = use_fp16 + self.apex_args = apex_args or {} + if use_fp16 and not amp: + raise ImportError( + "Please install apex from " + "https://www.github.com/nvidia/apex to use fp16 training.") + self.scheduler_step_freq = scheduler_step_freq def _validate_datasets(self, dataset): assert dataset, "Datasets need to be returned in data_creator." @@ -91,6 +111,22 @@ class PyTorchRunner: if torch.cuda.is_available(): self.criterion = self.criterion.cuda() + def _create_schedulers_if_available(self): + # Learning rate schedules are optional. + if not self.scheduler_creator: + return + self.schedulers = self.scheduler_creator(self.given_optimizers, + self.config) + + if not isinstance(self.schedulers, collections.Iterable): + self.schedulers = [self.schedulers] + + def _try_setup_apex(self): + """Sets up the model for fp16 training via apex if available.""" + if self.use_fp16 and amp: + self.models, self.optimizers = amp.initialize( + self.models, self.optimizers, **self.apex_args) + def setup(self): """Initializes the model.""" logger.debug("Creating model") @@ -105,7 +141,8 @@ class PyTorchRunner: self.config) if not isinstance(self.optimizers, collections.Iterable): self.optimizers = [self.optimizers] - + self._create_schedulers_if_available() + self._try_setup_apex() self._create_loss() logger.debug("Creating dataset") @@ -134,10 +171,19 @@ class PyTorchRunner: def step(self): """Runs a training epoch and updates the model parameters.""" logger.debug("Begin Training Epoch {}".format(self.epoch + 1)) + training_config = self.config.copy() + training_config.update({ + pytorch_utils.USE_FP16: self.use_fp16, + pytorch_utils.SCHEDULER_STEP: self.scheduler_step_freq + }) with self._timers["training"]: train_stats = self.train_function( - self.given_models, self.train_loader, self.criterion, - self.given_optimizers, self.config) + training_config, + self.given_models, + self.train_loader, + self.criterion, + self.given_optimizers, + scheduler=self.given_schedulers) train_stats["epoch"] = self.epoch self.epoch += 1 @@ -151,8 +197,11 @@ class PyTorchRunner: raise ValueError("No validation dataloader provided.") with self._timers["validation"]: validation_stats = self.validation_function( - self.given_models, self.validation_loader, self.criterion, - self.config) + self.config, + self.given_models, + self.validation_loader, + self.criterion, + scheduler=self.given_schedulers) validation_stats.update(self.stats()) return validation_stats @@ -166,31 +215,53 @@ class PyTorchRunner: t.reset() return stats - def get_state(self): - """Returns the state of the runner.""" + def _get_model_state_dicts(self): # This is so that we create a duplicate of weights into CPU rather than # move the model weights entirely out of the GPU, so that we can # resume training while saving intermediate checkpoints. cpu_state_dicts = [] for model in self.models: state_dict = model.state_dict() - for k, v in state_dict.items(): - state_dict[k] = v.cpu() - cpu_state_dicts += [state_dict] - return { + cpu_state_dicts += [{k: v.cpu() for k, v in state_dict.items()}] + return cpu_state_dicts + + def _set_model_state_dicts(self, models_state_dicts): + for model, state_dict in zip(self.models, models_state_dicts): + model.load_state_dict(state_dict) + + def get_state(self): + """Returns the state of the runner.""" + + state = { "epoch": self.epoch, - "models": cpu_state_dicts, + "models": self._get_model_state_dicts(), "optimizers": [opt.state_dict() for opt in self.optimizers], "stats": self.stats() } + if self.schedulers: + state.update({ + "schedulers": [ + scheduler.state_dict() for scheduler in self.schedulers + ] + }) + # Check if fp16 is True and if NVIDIA Apex is imported. + if self.use_fp16 and amp: + state.update({"amp": amp.state_dict()}) + return state def set_state(self, state): """Sets the state of the model.""" # TODO: restore timer stats - for model, state_dict in zip(self.models, state["models"]): - model.load_state_dict(state_dict) + self._set_model_state_dicts(state["models"]) for optimizer, state_dict in zip(self.optimizers, state["optimizers"]): optimizer.load_state_dict(state_dict) + if self.schedulers: + for scheduler, state_dict in zip(self.schedulers, + state["schedulers"]): + scheduler.load_state_dict(state_dict) + + if self.use_fp16 and "amp" in state and amp: + amp.load_state_dict(state["amp"]) self.epoch = state["stats"]["epoch"] def apply_fn(self, fn): @@ -206,6 +277,13 @@ class PyTorchRunner: if torch.cuda.is_available(): torch.cuda.empty_cache() + @property + def given_models(self): + if len(self.models) > 1: + return self.models + else: + return self.models[0] + @property def given_optimizers(self): if len(self.optimizers) > 1: @@ -214,8 +292,10 @@ class PyTorchRunner: return self.optimizers[0] @property - def given_models(self): - if len(self.models) > 1: - return self.models + def given_schedulers(self): + if not self.schedulers: + return self.schedulers + if len(self.schedulers) > 1: + return self.schedulers else: - return self.models[0] + return self.schedulers[0] diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index ed8ce40ce..a10933532 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -15,6 +15,7 @@ from ray.experimental.sgd.pytorch.distributed_pytorch_runner import ( DistributedPyTorchRunner) from ray.experimental.sgd import utils from ray.experimental.sgd.pytorch.pytorch_runner import PyTorchRunner +from ray.experimental.sgd.pytorch import utils as pytorch_utils logger = logging.getLogger(__name__) RESIZE_COOLDOWN_S = 10 @@ -26,55 +27,59 @@ class PyTorchTrainer: Launches a set of actors which connect via distributed PyTorch and coordinate gradient updates to train the provided model. - .. code-block:: python + .. code-block:: python - def model_creator(config): - return nn.Linear(1, 1) + def model_creator(config): + return nn.Linear(1, 1) - def optimizer_creator(model, config): - return torch.optim.SGD( - model.parameters(), lr=config.get("lr", 1e-4)) + def optimizer_creator(model, config): + return torch.optim.SGD( + model.parameters(), lr=config.get("lr", 1e-4)) - def data_creator(config): - return LinearDataset(2, 5), LinearDataset(2, 5, size=400) + def data_creator(config): + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + use_gpu=True + ) + trainer.train() - trainer = PyTorchTrainer( - model_creator, - data_creator, - optimizer_creator, - loss_creator=nn.MSELoss, - use_gpu=True - ) - trainer.train() Args: - model_creator (dict -> *): Constructor function that takes in + model_creator (dict -> Model(s)): Constructor function that takes in config and returns the model(s) to be optimized. These must be - ``torch.nn.Module`` objects. Note that if multiple models - are returned, the same number of optimizers must be returned - by the optimizer_creator. If multiple models are returned, + ``torch.nn.Module`` objects. If multiple models are returned, a ``train_function`` must be specified. You do not need to - handle GPU/devices in this function; - RaySGD will do that under the hood. - data_creator (dict -> Dataset, Dataset): Constructor function + handle GPU/devices in this function; RaySGD will do that under + the hood. + data_creator (dict -> Dataset(s)): Constructor function that takes in the passed config and returns one or two ``torch.utils.data.Dataset`` objects. Note that even though two Dataset objects can be returned, only one dataset will be used for training. RaySGD will automatically wrap the objects with a ``DataLoader``. - optimizer_creator (models, dict -> optimizers): Constructor + optimizer_creator ((models, dict) -> optimizers): Constructor function that takes in the return values from ``model_creator`` and the passed config and returns One or - more Torch optimizer objects. You must return as many - optimizers as you have models. You do not need to handle + more Torch optimizer objects. You do not need to handle GPU/devices in this function; ``RaySGD`` will do that for you. - loss_creator (dict -> loss or torch.nn.*Loss): A constructor function - for the training loss. This can be either a function that + loss_creator (torch.nn.*Loss class | dict -> loss): A constructor + function for the training loss. This can be either a function that takes in the provided config for customization or a subclass of ``torch.nn.modules.loss._Loss``, which is most Pytorch loss classes. For example, ``loss_creator=torch.nn.BCELoss``. + scheduler_creator (optimizers, dict -> loss): + A constructor function for the scheduler loss. This is + a function that takes in the generated optimizers (from + ``optimizer_creator``) provided config for customization. + Be sure to set ``scheduler_step_freq`` to increment the + scheduler correctly. train_function: Custom function for training. This function will be executed in parallel across all workers at once. The function needs to take in (models, train_dataloader, criterion, @@ -104,6 +109,19 @@ class PyTorchTrainer: support "nccl", "gloo", and "auto". If "auto", RaySGD will automatically use "nccl" if `use_gpu` is True, and "gloo" otherwise. + use_fp16 (bool): Enables mixed precision training via apex if apex + is installed. This is automatically done after the model and + optimizers are constructed and will work for multi-model training. + Please see https://github.com/NVIDIA/apex for more details. + apex_args (dict|None): Dict containing keyword args for amp.initialize. + See https://nvidia.github.io/apex/amp.html#module-apex.amp. By + default, the models and optimizers are passed in. Consider using + "num_losses" if operating over multiple models and optimizers. + scheduler_step_freq: "batch", "epoch", or None. This will + determine when ``scheduler.step`` is called. If "batch", + ``step`` will be called after every optimizer step. If "epoch", + ``step`` will be called after one pass of the DataLoader. + """ def __init__(self, @@ -111,6 +129,7 @@ class PyTorchTrainer: data_creator, optimizer_creator, loss_creator, + scheduler_creator=None, train_function=None, validation_function=None, initialization_hook=None, @@ -119,8 +138,10 @@ class PyTorchTrainer: num_replicas=1, use_gpu=False, batch_size=16, - backend="auto"): - # TODO: add support for mixed precision + backend="auto", + use_fp16=False, + apex_args=None, + scheduler_step_freq="batch"): if num_replicas > 1 and not dist.is_available(): raise ValueError( ("Distributed PyTorch is not supported on macOS. " @@ -133,6 +154,7 @@ class PyTorchTrainer: self.train_function = train_function self.optimizer_creator = optimizer_creator self.loss_creator = loss_creator + self.scheduler_creator = scheduler_creator self.validation_function = validation_function self.initialization_hook = initialization_hook self.config = {} if config is None else config @@ -147,9 +169,25 @@ class PyTorchTrainer: self.use_gpu = use_gpu self.batch_size = batch_size self.max_replicas = num_replicas + + self.use_fp16 = use_fp16 + + if apex_args and not isinstance(apex_args, dict): + raise ValueError("apex_args needs to be a dict object.") + + self.apex_args = apex_args self.temp_dir = tempfile.mkdtemp(prefix="raysgd") self._num_failures = 0 self._last_resize = float("-inf") + + if scheduler_step_freq and ( + scheduler_step_freq not in pytorch_utils.VALID_SCHEDULER_STEP): + raise ValueError( + "Scheduler step freq must be in {}. Got {}".format( + pytorch_utils.VALID_SCHEDULER_STEP, scheduler_step_freq)) + + self.scheduler_step_freq = scheduler_step_freq + self._start_workers(self.max_replicas) def _start_workers(self, num_replicas): @@ -165,11 +203,16 @@ class PyTorchTrainer: self.data_creator, self.optimizer_creator, self.loss_creator, + self.scheduler_creator, train_function=self.train_function, validation_function=self.validation_function, config=self.config, dataloader_config=self.dataloader_config, - batch_size=self.batch_size) + batch_size=self.batch_size, + use_fp16=self.use_fp16, + apex_args=self.apex_args, + scheduler_step_freq=self.scheduler_step_freq, + ) ] if self.initialization_hook: self.apply_all_workers(self.initialization_hook) @@ -198,12 +241,16 @@ class PyTorchTrainer: self.data_creator, self.optimizer_creator, self.loss_creator, + self.scheduler_creator, backend=self.backend, train_function=self.train_function, validation_function=self.validation_function, config=self.config, dataloader_config=self.dataloader_config, - batch_size=batch_size_per_replica) + batch_size=batch_size_per_replica, + use_fp16=self.use_fp16, + apex_args=self.apex_args, + scheduler_step_freq=self.scheduler_step_freq) for i in range(num_replicas) ] if self.initialization_hook: @@ -219,7 +266,7 @@ class PyTorchTrainer: for i, worker in enumerate(self.workers) ]) - def train(self, max_retries=10, checkpoint="auto"): + def train(self, max_retries=0, checkpoint="auto"): """Runs a training epoch. Runs an average over all values returned from workers. Set @@ -294,6 +341,14 @@ class PyTorchTrainer: [s.get(stat_key, np.nan) for s in worker_stats]) return validation_stats + def update_scheduler(self, metric): + """Calls ``scheduler.step(metric)`` on all schedulers. + + This is useful for lr_schedulers such as ``ReduceLROnPlateau``. + """ + self.apply_all_workers( + lambda runner: [sched.step(metric) for sched in runner.schedulers]) + def get_model(self): """Returns the learned model(s).""" models = self.model_creator(self.config) diff --git a/python/ray/experimental/sgd/pytorch/utils.py b/python/ray/experimental/sgd/pytorch/utils.py index 84ad160af..295499d19 100644 --- a/python/ray/experimental/sgd/pytorch/utils.py +++ b/python/ray/experimental/sgd/pytorch/utils.py @@ -4,32 +4,87 @@ import torch from ray.experimental.sgd.utils import TimerStat +amp = None -def train(model, train_iterator, criterion, optimizer, config): - """Runs 1 training epoch""" +try: + from apex import amp +except ImportError: + # Apex library is not installed, so we cannot enable mixed precision. + # We don't log here because logging happens in the pytorch_runner, + # where amp is initialized. + pass + +USE_FP16 = "__use_fp16__" +TEST_MODE = "__test_mode__" +BATCH_COUNT = "batch_processed" +SCHEDULER_STEP = "scheduler_step" +SCHEDULER_STEP_BATCH = "batch" +SCHEDULER_STEP_EPOCH = "epoch" + +VALID_SCHEDULER_STEP = {SCHEDULER_STEP_BATCH, SCHEDULER_STEP_EPOCH} + + +def train(config, model, train_iterator, criterion, optimizer, scheduler=None): + """Runs one standard training pass over the train_iterator. + + This function automatically measures timing for various operations such + as host to device transfer, gradient calculation, and gradient application. + + It also automatically detects and places the data on the given GPU device + if available. + + The scheduler will only be called at a batch or epoch frequency, depending + on the user parameter. Be sure to set ``scheduler_step_freq`` in + ``PyTorchTrainer`` to either "batch" or "epoch" to increment the scheduler + correctly during training. If using a learning rate scheduler + that depends on validation loss, you can use ``trainer.update_scheduler``. + + Raises: + ValueError if multiple models/optimizers/schedulers are provided. You + are expected to have a custom training function if you wish + to use multiple models/optimizers/schedulers. + + Args: + config: (dict): A user configuration provided into the Trainer + constructor. + model: The model as created by the model_creator. + train_iterator: An iterator created from the DataLoader which + wraps the provided Dataset. + criterion: The loss object created by the loss_creator. + optimizer: The torch.optim.Optimizer object as created by the + optimizer_creator. + scheduler (optional): The torch.optim.lr_scheduler object + as created by the scheduler_creator. Be sure to set + ``scheduler_step_freq`` in ``PyTorchTrainer`` + to increment the scheduler correctly. + + Returns: + A dict of metrics from training. + """ if isinstance(model, collections.Iterable) or isinstance( - optimizer, collections.Iterable): + optimizer, collections.Iterable) or isinstance( + scheduler, collections.Iterable): raise ValueError( "Need to provide custom training function if using multi-model " - "or multi-optimizer training.") + "or multi-scheduler or multi-optimizer training.") batch_time = AverageMeter() data_time = AverageMeter() losses = AverageMeter() - timers = {k: TimerStat() for k in ["d2h", "fwd", "grad", "apply"]} + timers = {k: TimerStat() for k in ["h2d", "fwd", "grad", "apply"]} # switch to train mode model.train() end = time.time() - for i, (features, target) in enumerate(train_iterator): + for batch_idx, (features, target) in enumerate(train_iterator): # measure data loading time data_time.update(time.time() - end) # Create non_blocking tensors for distributed training - with timers["d2h"]: + with timers["h2d"]: if torch.cuda.is_available(): features = features.cuda(non_blocking=True) target = target.cuda(non_blocking=True) @@ -45,19 +100,33 @@ def train(model, train_iterator, criterion, optimizer, config): with timers["grad"]: # compute gradients in a backward pass optimizer.zero_grad() - loss.backward() + + if config.get(USE_FP16): + with amp.scale_loss(loss, optimizer) as scaled_loss: + scaled_loss.backward() + else: + loss.backward() with timers["apply"]: # Call step of optimizer to update model params optimizer.step() + if scheduler and config.get(SCHEDULER_STEP) == SCHEDULER_STEP_BATCH: + scheduler.step() + # measure elapsed time batch_time.update(time.time() - end) end = time.time() + if config.get(TEST_MODE) and batch_idx == 0: + break + + if scheduler and config.get(SCHEDULER_STEP) == SCHEDULER_STEP_EPOCH: + scheduler.step() + stats = { "batch_time": batch_time.avg, - "batch_processed": losses.count, + BATCH_COUNT: batch_idx + 1, "train_loss": losses.avg, "data_time": data_time.avg, } @@ -65,11 +134,40 @@ def train(model, train_iterator, criterion, optimizer, config): return stats -def validate(model, val_iterator, criterion, config): - if isinstance(model, collections.Iterable): +def validate(config, model, val_iterator, criterion, scheduler=None): + """Runs one standard validation pass over the val_iterator. + + This function automatically measures timing for various operations such + as host to device transfer and processing time for the batch. + + It also automatically detects and places the data on the given GPU device + if available. + + Raises: + ValueError if multiple models/schedulers are provided. You + are expected to have a custom validation function if you wish + to use multiple models/schedulers. + + Args: + config: (dict): A user configuration provided into the Trainer + constructor. + model: The model as created by the model_creator. + train_iterator: An iterator created from the DataLoader which + wraps the provided Dataset. + criterion: The loss object created by the loss_creator. + scheduler (optional): The torch.optim.lr_scheduler object + as created by the scheduler_creator. By default, + this is not used in this function. + + Returns: + A dict of metrics from the evaluation. + """ + + if isinstance(model, collections.Iterable) or isinstance( + scheduler, collections.Iterable): raise ValueError( "Need to provide custom validation function if using multi-model " - "training.") + "or multi-scheduler training.") batch_time = AverageMeter() losses = AverageMeter() @@ -77,10 +175,10 @@ def validate(model, val_iterator, criterion, config): model.eval() correct = 0 total = 0 + batch_idx = 0 with torch.no_grad(): end = time.time() - for i, (features, target) in enumerate(val_iterator): - + for batch_idx, (features, target) in enumerate(val_iterator): if torch.cuda.is_available(): features = features.cuda(non_blocking=True) target = target.cuda(non_blocking=True) @@ -99,8 +197,16 @@ def validate(model, val_iterator, criterion, config): batch_time.update(time.time() - end) end = time.time() - stats = {"batch_time": batch_time.avg, "validation_loss": losses.avg} - stats.update(mean_accuracy=correct / total) + if config.get(TEST_MODE) and batch_idx == 0: + break + + stats = { + BATCH_COUNT: batch_idx + 1, + "batch_time": batch_time.avg, + "validation_loss": losses.avg, + "mean_accuracy": correct / total, + "mean_loss": losses.sum / total, + } return stats diff --git a/python/ray/experimental/sgd/tests/test_pytorch.py b/python/ray/experimental/sgd/tests/test_pytorch.py index ce8f5b6b8..8e174200f 100644 --- a/python/ray/experimental/sgd/tests/test_pytorch.py +++ b/python/ray/experimental/sgd/tests/test_pytorch.py @@ -12,13 +12,29 @@ import ray from ray import tune from ray.tests.conftest import ray_start_2_cpus # noqa: F401 from ray.experimental.sgd.pytorch import PyTorchTrainer, PyTorchTrainable -from ray.experimental.sgd.pytorch.utils import train +from ray.experimental.sgd.pytorch.utils import (train, BATCH_COUNT, TEST_MODE, + SCHEDULER_STEP) from ray.experimental.sgd.utils import check_for_failure from ray.experimental.sgd.pytorch.examples.train_example import ( model_creator, optimizer_creator, data_creator, LinearDataset) +def test_test_mode(ray_start_2_cpus): # noqa: F811 + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=lambda config: nn.MSELoss(), + config={TEST_MODE: True}, + num_replicas=1) + metrics = trainer.train() + assert metrics[BATCH_COUNT] == 1 + + val_metrics = trainer.validate() + assert val_metrics[BATCH_COUNT] == 1 + + @pytest.mark.parametrize("num_replicas", [1, 2] if dist.is_available() else [1]) def test_train(ray_start_2_cpus, num_replicas): # noqa: F811 @@ -28,10 +44,12 @@ def test_train(ray_start_2_cpus, num_replicas): # noqa: F811 optimizer_creator, loss_creator=lambda config: nn.MSELoss(), num_replicas=num_replicas) - train_loss1 = trainer.train()["train_loss"] + for i in range(3): + train_loss1 = trainer.train()["train_loss"] validation_loss1 = trainer.validate()["validation_loss"] - train_loss2 = trainer.train()["train_loss"] + for i in range(3): + train_loss2 = trainer.train()["train_loss"] validation_loss2 = trainer.validate()["validation_loss"] print(train_loss1, train_loss2) @@ -44,11 +62,12 @@ def test_train(ray_start_2_cpus, num_replicas): # noqa: F811 @pytest.mark.parametrize("num_replicas", [1, 2] if dist.is_available() else [1]) def test_multi_model(ray_start_2_cpus, num_replicas): # noqa: F811 - def custom_train(models, dataloader, criterion, optimizers, config): + def custom_train(config, models, dataloader, criterion, optimizers, + **kwargs): result = {} for i, (model, optimizer) in enumerate(zip(models, optimizers)): - result["model_{}".format(i)] = train(model, dataloader, criterion, - optimizer, config) + result["model_{}".format(i)] = train(config, model, dataloader, + criterion, optimizer) return result def multi_model_creator(config): @@ -103,7 +122,107 @@ def test_multi_model(ray_start_2_cpus, num_replicas): # noqa: F811 @pytest.mark.parametrize("num_replicas", [1, 2] if dist.is_available() else [1]) -@pytest.mark.xfail +def test_multi_model_matrix(ray_start_2_cpus, num_replicas): # noqa: F811 + def custom_train(config, model, dataloader, criterion, optimizer, + scheduler): + if config.get("models", 1) > 1: + assert len(model) == config["models"], config + + if config.get("optimizers", 1) > 1: + assert len(optimizer) == config["optimizers"], config + + if config.get("schedulers", 1) > 1: + assert len(scheduler) == config["schedulers"], config + return {"done": 1} + + def multi_model_creator(config): + models = [] + for i in range(config.get("models", 1)): + models += [nn.Linear(1, 1)] + return models[0] if len(models) == 1 else models + + def multi_optimizer_creator(models, config): + optimizers = [] + main_model = models[0] if type(models) is list else models + for i in range(config.get("optimizers", 1)): + optimizers += [torch.optim.SGD(main_model.parameters(), lr=0.0001)] + return optimizers[0] if len(optimizers) == 1 else optimizers + + def multi_scheduler_creator(optimizer, config): + schedulers = [] + main_opt = optimizer[0] if type(optimizer) is list else optimizer + for i in range(config.get("schedulers", 1)): + schedulers += [ + torch.optim.lr_scheduler.StepLR( + main_opt, step_size=30, gamma=0.1) + ] + return schedulers[0] if len(schedulers) == 1 else schedulers + + for model_count in range(1, 3): + for optimizer_count in range(1, 3): + for scheduler_count in range(1, 3): + trainer = PyTorchTrainer( + multi_model_creator, + data_creator, + multi_optimizer_creator, + loss_creator=nn.MSELoss, + scheduler_creator=multi_scheduler_creator, + train_function=custom_train, + num_replicas=num_replicas, + config={ + "models": model_count, + "optimizers": optimizer_count, + "schedulers": scheduler_count + }) + trainer.train() + trainer.shutdown() + + +@pytest.mark.parametrize("scheduler_freq", ["epoch", "batch"]) +def test_scheduler_freq(ray_start_2_cpus, scheduler_freq): # noqa: F811 + def custom_train(config, model, dataloader, criterion, optimizer, + scheduler): + assert config[SCHEDULER_STEP] == scheduler_freq + return {"done": 1} + + def scheduler_creator(optimizer, config): + return torch.optim.lr_scheduler.StepLR( + optimizer, step_size=30, gamma=0.1) + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=lambda config: nn.MSELoss(), + scheduler_creator=scheduler_creator) + + for i in range(3): + trainer.train()["train_loss"] + trainer.shutdown() + + +def test_scheduler_validate(ray_start_2_cpus): # noqa: F811 + def custom_train(config, model, dataloader, criterion, optimizer, + scheduler): + return {"done": 1} + + from torch.optim.lr_scheduler import ReduceLROnPlateau + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=lambda config: nn.MSELoss(), + scheduler_creator=lambda optimizer, cfg: ReduceLROnPlateau(optimizer)) + trainer.update_scheduler(0.5) + trainer.update_scheduler(0.5) + assert all( + trainer.apply_all_workers(lambda r: r.schedulers[0].last_epoch == 2)) + trainer.shutdown() + + +@pytest.mark.parametrize("num_replicas", [1, 2] + if dist.is_available() else [1]) def test_tune_train(ray_start_2_cpus, num_replicas): # noqa: F811 config = { @@ -114,7 +233,10 @@ def test_tune_train(ray_start_2_cpus, num_replicas): # noqa: F811 "num_replicas": num_replicas, "use_gpu": False, "batch_size": 512, - "backend": "gloo" + "backend": "gloo", + "config": { + "lr": 0.001 + } } analysis = tune.run(