From 037aa2b961e82feaf563679e6074ed50ff9c60ff Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Wed, 29 Jan 2020 08:51:01 -0800 Subject: [PATCH] [sgd] Refactor PyTorch SGD Documentation. (#6910) * Refactor documentation and directory structurre * update loss * ,ore examples * fix comments * more code * svgs * formatting * more_docs * more writing * comments ready * move * whitespace * examples * fix * bold * pytorch * batch * fix * fix test * Apply suggestions from code review * quarantinegp * tests/ * fix missing --- ci/jenkins_tests/run_tune_tests.sh | 37 +- doc/source/index.rst | 5 + doc/source/raysgd/raysgd-actors.svg | 1 + doc/source/raysgd/raysgd-pytorch.svg | 1 + doc/source/raysgd/raysgd.rst | 61 ++- doc/source/raysgd/raysgd_ft.rst | 33 -- doc/source/raysgd/raysgd_pytorch.rst | 424 +++++++++++++++--- doc/source/raysgd/raysgd_pytorch_examples.rst | 38 ++ doc/source/raysgd/raysgd_ref.rst | 27 ++ doc/source/raysgd/raysgd_tensorflow.rst | 10 +- python/ray/experimental/sgd/__init__.py | 4 + .../sgd/pytorch/distributed_pytorch_runner.py | 31 +- .../examples/cifar_pytorch_example.py | 30 +- .../sgd/pytorch/examples/dcgan.py | 23 +- .../{ => pytorch}/examples/example-sgd.yaml | 0 .../{ => pytorch}/examples/train_example.py | 31 +- .../{ => pytorch}/examples/tune_example.py | 28 +- .../sgd/pytorch/pytorch_runner.py | 70 ++- .../sgd/pytorch/pytorch_trainer.py | 122 +++-- .../experimental/sgd/tests/test_pytorch.py | 25 +- .../sgd/tests/test_pytorch_runner.py | 32 +- .../experimental/sgd/tests/test_tensorflow.py | 2 +- .../sgd/{ => tf}/examples/cifar_tf_example.py | 0 .../examples/tensorflow_train_example.py | 0 .../sgd/{ => tf}/examples/tf-example-sgd.yaml | 0 25 files changed, 708 insertions(+), 327 deletions(-) create mode 100644 doc/source/raysgd/raysgd-actors.svg create mode 100644 doc/source/raysgd/raysgd-pytorch.svg delete mode 100644 doc/source/raysgd/raysgd_ft.rst create mode 100644 doc/source/raysgd/raysgd_pytorch_examples.rst create mode 100644 doc/source/raysgd/raysgd_ref.rst rename python/ray/experimental/sgd/{ => pytorch}/examples/cifar_pytorch_example.py (86%) rename python/ray/experimental/sgd/{ => pytorch}/examples/example-sgd.yaml (100%) rename python/ray/experimental/sgd/{ => pytorch}/examples/train_example.py (69%) rename python/ray/experimental/sgd/{ => pytorch}/examples/tune_example.py (72%) rename python/ray/experimental/sgd/{ => tf}/examples/cifar_tf_example.py (100%) rename python/ray/experimental/sgd/{ => tf}/examples/tensorflow_train_example.py (100%) rename python/ray/experimental/sgd/{ => tf}/examples/tf-example-sgd.yaml (100%) diff --git a/ci/jenkins_tests/run_tune_tests.sh b/ci/jenkins_tests/run_tune_tests.sh index 22b04450b..b5be73013 100755 --- a/ci/jenkins_tests/run_tune_tests.sh +++ b/ci/jenkins_tests/run_tune_tests.sh @@ -122,9 +122,10 @@ $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} python /ray/python/ray/tune/examples/skopt_example.py \ --smoke-test -$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/tune/examples/pbt_memnn_example.py \ - --smoke-test +# Commenting out because flaky +# $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ +# python /ray/python/ray/tune/examples/pbt_memnn_example.py \ +# --smoke-test $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/python/ray/tune/examples/pbt_convnet_example.py \ @@ -146,43 +147,43 @@ $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} python -m pytest /ray/python/ray/experimental/sgd/tests $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/train_example.py + python /ray/python/ray/experimental/sgd/pytorch/examples/train_example.py $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/train_example.py --num-replicas=2 + python /ray/python/ray/experimental/sgd/pytorch/examples/train_example.py --num-replicas=2 $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/tune_example.py + python /ray/python/ray/experimental/sgd/pytorch/examples/tune_example.py $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/tune_example.py --num-replicas=2 + python /ray/python/ray/experimental/sgd/pytorch/examples/tune_example.py --num-replicas=2 $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/cifar_pytorch_example.py --smoke-test + python /ray/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py --smoke-test $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/cifar_pytorch_example.py --smoke-test --num-replicas=2 + python /ray/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py --smoke-test --num-replicas=2 + +$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + python /ray/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py --smoke-test --tune $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/python/ray/experimental/sgd/pytorch/examples/dcgan.py --smoke-test --num-replicas=2 $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/cifar_pytorch_example.py --smoke-test --tune + python /ray/python/ray/experimental/sgd/tf/examples/tensorflow_train_example.py $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/tensorflow_train_example.py + python /ray/python/ray/experimental/sgd/tf/examples/tensorflow_train_example.py --num-replicas=2 $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/tensorflow_train_example.py --num-replicas=2 + python /ray/python/ray/experimental/sgd/tf/examples/tensorflow_train_example.py --tune $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/tensorflow_train_example.py --tune + python /ray/python/ray/experimental/sgd/tf/examples/cifar_tf_example.py --smoke-test $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/cifar_tf_example.py --smoke-test + python /ray/python/ray/experimental/sgd/tf/examples/cifar_tf_example.py --num-replicas 2 --smoke-test $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/cifar_tf_example.py --num-replicas 2 --smoke-test - -$SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ - python /ray/python/ray/experimental/sgd/examples/cifar_tf_example.py --num-replicas 2 --smoke-test --augment-data + python /ray/python/ray/experimental/sgd/tf/examples/cifar_tf_example.py --num-replicas 2 --smoke-test --augment-data diff --git a/doc/source/index.rst b/doc/source/index.rst index 8bd73c12f..e3b8c0284 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -280,6 +280,11 @@ Getting Involved :caption: RaySGD raysgd/raysgd.rst + raysgd/raysgd_pytorch.rst + raysgd/raysgd_pytorch_examples.rst + raysgd/raysgd_tensorflow.rst + raysgd/raysgd_ref.rst + .. toctree:: :maxdepth: -1 diff --git a/doc/source/raysgd/raysgd-actors.svg b/doc/source/raysgd/raysgd-actors.svg new file mode 100644 index 000000000..7b4537ccf --- /dev/null +++ b/doc/source/raysgd/raysgd-actors.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/doc/source/raysgd/raysgd-pytorch.svg b/doc/source/raysgd/raysgd-pytorch.svg new file mode 100644 index 000000000..a19ec8d7f --- /dev/null +++ b/doc/source/raysgd/raysgd-pytorch.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/doc/source/raysgd/raysgd.rst b/doc/source/raysgd/raysgd.rst index 7e5478691..b54f4adea 100644 --- a/doc/source/raysgd/raysgd.rst +++ b/doc/source/raysgd/raysgd.rst @@ -1,24 +1,65 @@ RaySGD: Distributed Deep Learning ================================= +.. tip:: Get in touch with us if you're using or considering using `RaySGD `_! +.. warning:: This is still an experimental API and is subject to change in the near future. + + .. image:: raysgdlogo.png :scale: 20% :align: center -RaySGD is a lightweight library for distributed deep learning, providing thin wrappers around framework-native modules for data parallel training. - -.. tip:: Help us make RaySGD better; take this 1 minute `User Survey `_! +RaySGD is a lightweight library for distributed deep learning, providing thin wrappers around PyTorch and TensorFlow native modules for data parallel training. The main features are: - - Ease of use: Scale Pytorch's native ``DistributedDataParallel`` and TensorFlow's ``tf.distribute.MirroredStrategy`` without needing to monitor individual nodes. - - Composibility: RaySGD is built on top of the Ray Actor API, enabling seamless integration with existing Ray applications such as RLlib, Tune, and Ray.Serve. - - Scale up and down: Start on single CPU. Scale up to multi-node, multi-gpu by changing 2 lines of code. + - **Ease of use**: Scale Pytorch's native ``DistributedDataParallel`` and TensorFlow's ``tf.distribute.MirroredStrategy`` without needing to monitor individual nodes. + - **Composability**: RaySGD is built on top of the Ray Actor API, enabling seamless integration with existing Ray applications such as RLlib, Tune, and Ray.Serve. + - **Scale up and down**: Start on single CPU. Scale up to multi-node, multi-CPU, or multi-GPU clusters by changing 2 lines of code. +Getting Started +--------------- -.. toctree:: +You can start a ``PyTorchTrainer`` with the following: - raysgd_pytorch.rst - raysgd_tensorflow.rst - raysgd_ft.rst +.. code-block:: python + + import numpy as np + import torch + import torch.nn as nn + from torch import distributed + + from ray.experimental.sgd import PyTorchTrainer + from ray.experimental.sgd.examples.train_example import LinearDataset + + + def model_creator(config): + return nn.Linear(1, 1) + + + def optimizer_creator(model, config): + """Returns optimizer.""" + return torch.optim.SGD(model.parameters(), lr=1e-2) + + + def data_creator(batch_size, config): + """Returns training dataloader, validation dataloader.""" + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) + + ray.init() + + trainer1 = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator, + num_replicas=2, + use_gpu=True, + batch_size=512, + backend="nccl") + + stats = trainer1.train() + print(stats) + trainer1.shutdown() + print("success!") diff --git a/doc/source/raysgd/raysgd_ft.rst b/doc/source/raysgd/raysgd_ft.rst deleted file mode 100644 index eec8733e4..000000000 --- a/doc/source/raysgd/raysgd_ft.rst +++ /dev/null @@ -1,33 +0,0 @@ -RaySGD Fault Tolerance -====================== - -.. note:: Fault tolerance is currently only enabled for the PyTorchTrainer. - -For distributed deep learning, jobs are often run on infrastructure where nodes can be pre-empted frequently (i.e., spot instances in the cloud). To overcome this, RaySGD provides **fault tolerance** features that enable training to continue regardless of node failures. - -.. code-block:: bash - - trainer.train(max_retries=N) - - -How does it work? ------------------ - -During each ``train`` method, each parallel worker iterates through the dataset, synchronizing gradients and parameters at each batch. These synchronization primitives can hang when one or more of the parallel workers becomes unresponsive (i.e., when a node is lost). To address this, we've implemented the following protocol. - - 1. If any worker node is lost, Ray will mark the training task as complete (``ray.wait`` will return). - 2. Ray will throw ``RayActorException`` when fetching the result for any worker, so the Trainer class will call ``ray.get`` on the "finished" training task. - 3. Upon catching this exception, the Trainer class will kill all of its workers. - 4. The Trainer will then detect the quantity of available resources (either CPUs or GPUs). It will then restart as many workers as it can, each resuming from the last checkpoint. Note that this may result in fewer workers than initially specified. - 5. If there are no available resources, the Trainer will apply an exponential backoff before retrying to create workers. - 6. If there are available resources and the Trainer has fewer workers than initially specified, then it will scale up its worker pool until it reaches the initially specified ``num_workers``. - -Note that we assume the Trainer itself is not on a pre-emptible node. It is currently not possible to recover from a Trainer node failure. - -Users can set ``checkpoint="auto"`` to always checkpoint the current model before executing a pass over the training dataset. - -.. code-block:: bash - - trainer.train(max_retries=N, checkpoint="auto") - - diff --git a/doc/source/raysgd/raysgd_pytorch.rst b/doc/source/raysgd/raysgd_pytorch.rst index 3f8ace1d2..a8c724aec 100644 --- a/doc/source/raysgd/raysgd_pytorch.rst +++ b/doc/source/raysgd/raysgd_pytorch.rst @@ -3,114 +3,404 @@ RaySGD Pytorch .. warning:: This is still an experimental API and is subject to change in the near future. -.. tip:: Help us make RaySGD better; take this 1 minute `User Survey `_! +.. tip:: Get in touch with us if you're using or considering using `RaySGD `_! -Ray's ``PyTorchTrainer`` simplifies distributed model training for PyTorch. The ``PyTorchTrainer`` is a wrapper around ``torch.distributed.launch`` with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to execute training outside of Python. +.. image:: raysgd-pytorch.svg + :align: center ----------- +The RaySGD ``PyTorchTrainer`` simplifies distributed model training for PyTorch. The ``PyTorchTrainer`` is a wrapper around ``torch.distributed.launch`` with a Python API to easily incorporate distributed training into a larger Python application, as opposed to needing to execute training outside of Python. For end to end examples, see :ref:`raysgd-pytorch-example`. -**With Ray**: +Under the hood, ``PytorchTrainer`` will create *replicas* of your model (controlled by ``num_replicas``), each of which is managed by a Ray actor. -Wrap your training with this: +.. image:: raysgd-actors.svg + :align: center + + + +Setting up training +------------------- + +The ``PyTorchTrainer`` can be constructed with functions that wrap components of the training script. Specifically, it needs constructors for the Model, Data, Optimizer, and Loss to create replicated copies across different devices and machines. + +For example: .. code-block:: python - ray.init(args.address) + import numpy as np + import torch + import torch.nn as nn + from torch import distributed - trainer1 = PyTorchTrainer( + from ray.experimental.sgd import PyTorchTrainer + from ray.experimental.sgd.examples.train_example import LinearDataset + + def model_creator(config): + """Constructor function for the model(s) to be optimized. + + Note that if multiple models are returned, the same number of optimizers + must be returned. You will also need to provide a custom training + function to specify the optimization procedure for multiple models. + + Args: + config (dict): Configuration dictionary passed into ``PyTorchTrainer``. + + Returns: + One or more torch.nn.Module objects. + """ + return nn.Linear(1, 1) + + + def optimizer_creator(models, config): + """Constructor of the optimizers. + + Args: + models: The return values from ``model_creator``. This can be one + or more torch nn modules. + config (dict): Configuration dictionary passed into ``PyTorchTrainer``. + + Returns: + One or more Torch optimizer objects. You must return as many optimizers + as you have models. + """ + return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4)) + + + def data_creator(config): + """Constructs torch.utils.data.Dataset objects. + + Note that even though two Dataset objects can be returned, + only one dataset will be used for training. + + Args: + config: Configuration dictionary passed into ``PyTorchTrainer`` + + Returns: + One or Two Dataset objects. If only one Dataset object is provided, + ``trainer.validate()`` will throw a ValueError. + """ + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) + + def loss_creator(config): + """Constructs the Torch Loss object. + + Note that optionally, you can pass in a Torch Loss constructor directly + into the PyTorchTrainer (i.e., ``PyTorchTrainer(loss_creator=nn.BCELoss, ...))``). + + Returns: + Torch Loss object. + """ + return torch.nn.BCELoss() + + + +Before instantiating the trainer, you'll have to start or connect to a Ray cluster: + +.. code-block:: python + + ray.init() + # or ray.init(address="auto") if a cluster has been started. + +Instantiate the trainer object: + +.. code-block:: python + + from ray.experimental.sgd import PyTorchTrainer + + trainer = PyTorchTrainer( model_creator, data_creator, optimizer_creator, - loss_creator, - num_replicas= * , - use_gpu=True, - batch_size=512, - backend="nccl") - - stats = trainer1.train() - print(stats) - trainer1.shutdown() - print("success!") + loss_creator=nn.MSELoss, + config={"lr": 0.001}) +You can also set the number of workers and whether the workers will use GPUs: -Then, start a Ray cluster `via autoscaler `_ or `manually `_. +.. code-block:: python + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + config={"lr": 0.001}, + num_replicas=100, + use_gpu=True) + + +See the documentation on the PyTorchTrainer here: :ref:`ref-pytorch-trainer`. +We'll look at the training APIs next. + +Training APIs +------------- + +Now that the trainer is constructed, you'll naturally want to train the model. + +.. code-block:: python + + trainer.train() + +This takes one pass over the training data. + +To run the model on the validation data passed in by the ``data_creator``, you can simply call: + +.. code-block:: python + + trainer.validate() + +You can customize the exact function that is called by using a customized training function (see :ref:`raysgd-custom-training`). + + +Shutting down training +---------------------- + +After training, you may want to reappropriate the Ray cluster. To release Ray resources obtained by the trainer: + +.. code-block:: python + + trainer.shutdown() + +.. note:: Be sure to call ``save`` or ``get_model`` before shutting down. + +Initialization Functions +------------------------ + +You may want to run some initializers on each worker when they are started. This may be something like setting an environment variable or downloading some data. You can do this via the ``initialization_hook`` parameter: + +.. code-block:: python + + + def initialization_hook(runner): + print("NCCL DEBUG SET") + # Need this for avoiding a connection restart issue + os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo" + os.environ["NCCL_LL_THRESHOLD"] = "0" + os.environ["NCCL_DEBUG"] = "INFO" + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + initialization_hook=initialization_hook, + config={"lr": 0.001} + num_replicas=100, + use_gpu=True) + +Save and Load +------------- + +If you want to save or reload the training procedure, you can use ``trainer.save`` +and ``trainer.load``, which wraps the relevant ``torch.save`` and ``torch.load`` calls. This should work across a distributed cluster even without a NFS because it takes advantage of Ray's distributed object store. + +.. code-block:: + + trainer_1 = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + num_replicas=num_replicas) + trainer_1.train() + + checkpoint_path = os.path.join(tempfile.mkdtemp(), "checkpoint") + trainer_1.save(checkpoint_path) + + trainer_2 = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=lambda config: nn.MSELoss(), + num_replicas=num_replicas) + trainer_2.restore(checkpoint_path) + + +Exporting a model for inference +------------------------------- + +The trained torch model can be extracted for use within the same Python program with ``trainer.get_model()``. This will load the state dictionary of the model(s). + +.. code-block:: + + trainer.train() + model = trainer.get_model() + + +Distributed Multi-node Training +------------------------------- + +You can scale out your training onto multiple nodes without making any modifications to your training code. To train across a cluster, simply make sure that the Ray cluster is started. + +You can start a Ray cluster `via the Ray cluster launcher `_ or `manually `_. .. code-block:: bash ray up CLUSTER.yaml - python train.py --address="localhost:" + python train.py --address="auto" - ----------- - -**Before, with Pytorch**: - -In your training program, insert the following: +Then, you'll be able to scale up the number of workers seamlessly across multiple nodes: .. code-block:: python - torch.distributed.init_process_group(backend='YOUR BACKEND', - init_method='env://') + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=lambda config: nn.MSELoss(), + num_replicas=100) - model = torch.nn.parallel.DistributedDataParallel(model, - device_ids=[arg.local_rank], - output_device=arg.local_rank) -Then, separately, on each machine: +Advanced: Fault Tolerance +------------------------- + +For distributed deep learning, jobs are often run on infrastructure where nodes can be pre-empted frequently (i.e., spot instances in the cloud). To overcome this, RaySGD provides **fault tolerance** features that enable training to continue regardless of node failures. .. code-block:: bash - # Node 1: *(IP: 192.168.1.1, and has a free port: 1234)* - $ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE - --nnodes=4 --node_rank=0 --master_addr="192.168.1.1" - --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 - and all other arguments of your training script) - # Node 2: - $ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE - --nnodes=4 --node_rank=1 --master_addr="192.168.1.1" - --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 - and all other arguments of your training script) - # Node 3: - $ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE - --nnodes=4 --node_rank=2 --master_addr="192.168.1.1" - --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 - and all other arguments of your training script) - # Node 4: - $ python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE - --nnodes=4 --node_rank=3 --master_addr="192.168.1.1" - --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 - and all other arguments of your training script) + trainer.train(max_retries=N) -PyTorchTrainer Example ----------------------- +During each ``train`` method, each parallel worker iterates through the dataset, synchronizing gradients and parameters at each batch. These synchronization primitives can hang when one or more of the parallel workers becomes unresponsive (i.e., when a node is lost). To address this, we've implemented the following protocol. -Below is an example of using Ray's PyTorchTrainer. Under the hood, ``PytorchTrainer`` will create *replicas* of your model (controlled by ``num_replicas``) which are each managed by a worker. + 1. If any worker node is lost, Ray will mark the training task as complete (``ray.wait`` will return). + 2. Ray will throw ``RayActorException`` when fetching the result for any worker, so the Trainer class will call ``ray.get`` on the "finished" training task. + 3. Upon catching this exception, the Trainer class will kill all of its workers. + 4. The Trainer will then detect the quantity of available resources (either CPUs or GPUs). It will then restart as many workers as it can, each resuming from the last checkpoint. Note that this may result in fewer workers than initially specified. + 5. If there are no available resources, the Trainer will apply an exponential backoff before retrying to create workers. + 6. If there are available resources and the Trainer has fewer workers than initially specified, then it will scale up its worker pool until it reaches the initially specified ``num_workers``. -.. literalinclude:: ../../../python/ray/experimental/sgd/examples/train_example.py - :language: python - :start-after: __torch_train_example__ +Note that we assume the Trainer itself is not on a pre-emptible node. It is currently not possible to recover from a Trainer node failure. + +Users can set ``checkpoint="auto"`` to always checkpoint the current model before executing a pass over the training dataset. + +.. code-block:: bash + + trainer.train(max_retries=N, checkpoint="auto") -Hyperparameter Optimization on Distributed Pytorch --------------------------------------------------- +Advanced: Hyperparameter Tuning +------------------------------- ``PyTorchTrainer`` naturally integrates with Tune via the ``PyTorchTrainable`` interface. The same arguments to ``PyTorchTrainer`` should be passed into the ``tune.run(config=...)`` as shown below. -.. literalinclude:: ../../../python/ray/experimental/sgd/examples/tune_example.py +.. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/tune_example.py :language: python :start-after: __torch_tune_example__ -Package Reference ------------------ +Simultaneous Multi-model training +--------------------------------- -.. autoclass:: ray.experimental.sgd.pytorch.PyTorchTrainer - :members: +In certain scenarios such as training GANs, you may want to use multiple models in the training loop. You can do this in the ``PyTorchTrainer`` by allowing the ``model_creator`` and the ``optimizer_creator`` to return multiple values. - .. automethod:: __init__ +If multiple models are returned, you will need to provide a custom training function (and custom validation function if you plan to call ``validate``). + +You can see the `DCGAN script `_ for an end-to-end example. + +.. code-block:: python + + def model_creator(config): + netD = Discriminator() + netD.apply(weights_init) + + netG = Generator() + netG.apply(weights_init) + return netD, netG -.. autoclass:: ray.experimental.sgd.pytorch.PyTorchTrainable - :members: + def optimizer_creator(models, config): + net_d, net_g = models + discriminator_opt = optim.Adam( + net_d.parameters(), lr=config.get("lr", 0.01), betas=(0.5, 0.999)) + generator_opt = optim.Adam( + net_g.parameters(), lr=config.get("lr", 0.01), betas=(0.5, 0.999)) + return discriminator_opt, generator_opt + + + def custom_train(models, dataloader, criterion, optimizers, config): + result = {} + for i, (model, optimizer) in enumerate(zip(models, optimizers)): + result["model_{}".format(i)] = train(model, dataloader, criterion, + optimizer, config) + return result + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.BCELoss, + train_function=custom_train) + +.. _raysgd-custom-training: + +Custom Training and Validation Functions +---------------------------------------- + +``PyTorchTrainer`` allows you to run a custom training and validation step in parallel on each worker, providing a flexibility similar to using PyTorch natively. This is done via the ``train_function`` and ``validation_function`` parameters. + +Note that this is needed if the model creator returns multiple models. + +.. code-block:: python + + def train(models, dataloader, criterion, optimizers, config): + """A custom training function. + + Args: + models: Output of the model_creator passed into PyTorchTrainer. + data_loader: A dataloader wrapping the training dataset created by the ``data_creator`` passed into PyTorchTrainer. + criterion: The instantiation of the ``loss_creator``. + optimizers: Output of the optimizer_creator passed into PyTorchTrainer. + config: The configuration dictionary passed into PyTorchTrainer. + + Returns: + A dictionary of values/metrics. + """ + + netD, netG = models + optimD, optimG = optimizers + real_label = 1 + fake_label = 0 + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + for i, data in enumerate(dataloader, 0): + netD.zero_grad() + real_cpu = data[0].to(device) + b_size = real_cpu.size(0) + label = torch.full((b_size, ), real_label, device=device) + output = netD(real_cpu).view(-1) + errD_real = criterion(output, label) + errD_real.backward() + + noise = torch.randn(b_size, latent_vector_size, 1, 1, device=device) + fake = netG(noise) + label.fill_(fake_label) + output = netD(fake.detach()).view(-1) + errD_fake = criterion(output, label) + errD_fake.backward() + errD = errD_real + errD_fake + optimD.step() + + netG.zero_grad() + label.fill_(real_label) + output = netD(fake).view(-1) + errG = criterion(output, label) + errG.backward() + optimG.step() + + is_score, is_std = inception_score(fake) + + return { + "loss_g": errG.item(), + "loss_d": errD.item(), + "inception": is_score + } + + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + nn.BCELoss, + train_function=train, + ... + ) diff --git a/doc/source/raysgd/raysgd_pytorch_examples.rst b/doc/source/raysgd/raysgd_pytorch_examples.rst new file mode 100644 index 000000000..d8546d801 --- /dev/null +++ b/doc/source/raysgd/raysgd_pytorch_examples.rst @@ -0,0 +1,38 @@ +.. _raysgd-pytorch-example: + +RaySGD PyTorch Examples +======================= + +Here are some examples of using RaySGD for training PyTorch models. If you'd like +to contribute an example, feel free to create a `pull request here `_. + + +Toy Example +----------- + +Below is an example of using Ray's PyTorchTrainer. + + +.. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/train_example.py + :language: python + :start-after: __torch_train_example__ + + +CIFAR10 Example +--------------- + +Below is an example of training a ResNet18 model on CIFAR10. It uses a custom training +function, a custom validation function, and custom initialization code for each worker. + +.. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py + :language: python + + +DCGAN Example +------------- + +Below is an example of training a Deep Convolutional GAN on MNIST. It constructs +two models and two optimizers and uses a custom training and validation function. + +.. literalinclude:: ../../../python/ray/experimental/sgd/pytorch/examples/dcgan.py + :language: python diff --git a/doc/source/raysgd/raysgd_ref.rst b/doc/source/raysgd/raysgd_ref.rst new file mode 100644 index 000000000..13c313c1e --- /dev/null +++ b/doc/source/raysgd/raysgd_ref.rst @@ -0,0 +1,27 @@ +Package Reference +================= + +.. _ref-pytorch-trainer: + +PyTorchTrainer +-------------- + +.. autoclass:: ray.experimental.sgd.pytorch.PyTorchTrainer + :members: + + .. automethod:: __init__ + + +PyTorchTrainable +---------------- + +.. autoclass:: ray.experimental.sgd.pytorch.PyTorchTrainable + :members: + +TFTrainer +--------- + +.. autoclass:: ray.experimental.sgd.tf.TFTrainer + :members: + + .. automethod:: __init__ diff --git a/doc/source/raysgd/raysgd_tensorflow.rst b/doc/source/raysgd/raysgd_tensorflow.rst index dde5e92f2..c42277376 100644 --- a/doc/source/raysgd/raysgd_tensorflow.rst +++ b/doc/source/raysgd/raysgd_tensorflow.rst @@ -71,14 +71,6 @@ TFTrainer Example Below is an example of using Ray's TFTrainer. Under the hood, ``TFTrainer`` will create *replicas* of your model (controlled by ``num_replicas``) which are each managed by a worker. -.. literalinclude:: ../../../python/ray/experimental/sgd/examples/tensorflow_train_example.py +.. literalinclude:: ../../../python/ray/experimental/sgd/tf/examples/tensorflow_train_example.py :language: python - -Package Reference ------------------ - -.. autoclass:: ray.experimental.sgd.tf.TFTrainer - :members: - - .. automethod:: __init__ diff --git a/python/ray/experimental/sgd/__init__.py b/python/ray/experimental/sgd/__init__.py index e69de29bb..9223335cc 100644 --- a/python/ray/experimental/sgd/__init__.py +++ b/python/ray/experimental/sgd/__init__.py @@ -0,0 +1,4 @@ +from ray.experimental.sgd.pytorch import PyTorchTrainer +from ray.experimental.sgd.tf import TFTrainer + +__all__ = ["PyTorchTrainer", "TFTrainer"] diff --git a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py index 2c38529a5..4a9a4ff78 100644 --- a/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/distributed_pytorch_runner.py @@ -66,15 +66,27 @@ class DistributedPyTorchRunner(PyTorchRunner): self.config) if not isinstance(self.optimizers, collections.Iterable): self.optimizers = [self.optimizers] - self.criterion = self.loss_creator(self.config) - if torch.cuda.is_available(): - self.criterion = self.criterion.cuda() + + logger.debug("Creating loss.") + self._create_loss() logger.debug("Creating dataset.") with FileLock(os.path.expanduser("~/.ray_data.lock")): - data_loaders = self.data_creator(self.batch_size, self.config) - self.train_loader, self.validation_loader = self._validate_loaders( - data_loaders) + datasets = self.data_creator(self.config) + train_set, val_set = self._validate_datasets(datasets) + + train_loader_config = self.dataloader_config.copy() + train_loader_config.update( + sampler=torch.utils.data.distributed.DistributedSampler(train_set), + shuffle=False) + + self.train_loader = torch.utils.data.DataLoader( + train_set, batch_size=self.batch_size, **train_loader_config) + + self.validation_loader = None + if val_set: + self.validation_loader = torch.utils.data.DataLoader( + val_set, batch_size=self.batch_size, **self.dataloader_config) def step(self): """Runs a training epoch and updates the model parameters. @@ -117,4 +129,9 @@ class DistributedPyTorchRunner(PyTorchRunner): def shutdown(self): """Attempts to shut down the worker.""" super(DistributedPyTorchRunner, self).shutdown() - dist.destroy_process_group() + # TODO: Temporarily removing since it causes hangs on MacOSX. + # However, it seems to be harmless to remove permanently + # since the processes are shutdown anyways. This comment can be + # removed in a future release if it is still not documented + # the stable Pytorch docs. + # dist.destroy_process_group() diff --git a/python/ray/experimental/sgd/examples/cifar_pytorch_example.py b/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py similarity index 86% rename from python/ray/experimental/sgd/examples/cifar_pytorch_example.py rename to python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py index 1e55c9b91..1d454e019 100644 --- a/python/ray/experimental/sgd/examples/cifar_pytorch_example.py +++ b/python/ray/experimental/sgd/pytorch/examples/cifar_pytorch_example.py @@ -4,8 +4,6 @@ import torch.nn as nn import argparse from ray import tune import torch.utils.data -from torch import distributed -from torch.utils.data.distributed import DistributedSampler import torchvision import torchvision.transforms as transforms @@ -71,7 +69,7 @@ def validate(model, val_iterator, criterion, config): return stats -def cifar_creator(batch_size, config): +def cifar_creator(config): transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), @@ -90,29 +88,7 @@ def cifar_creator(batch_size, config): validation_dataset = torchvision.datasets.CIFAR10( root="~/data", train=False, download=False, transform=transform_test) - train_sampler = None - if distributed.is_initialized(): - train_sampler = DistributedSampler(train_dataset) - train_loader = torch.utils.data.DataLoader( - train_dataset, - batch_size=batch_size, - shuffle=(train_sampler is None), - num_workers=2, - pin_memory=False, - sampler=train_sampler) - - validation_sampler = None - if distributed.is_initialized(): - validation_sampler = DistributedSampler(validation_dataset) - validation_loader = torch.utils.data.DataLoader( - validation_dataset, - batch_size=batch_size, - shuffle=(validation_sampler is None), - num_workers=2, - pin_memory=False, - sampler=validation_sampler) - - return train_loader, validation_loader + return train_dataset, validation_dataset def optimizer_creator(model, config): @@ -126,7 +102,7 @@ def train_example(num_replicas=1, use_gpu=False, test_mode=False): ResNet18, cifar_creator, optimizer_creator, - lambda config: nn.CrossEntropyLoss(), + nn.CrossEntropyLoss, initialization_hook=initialization_hook, train_function=train, validation_function=validate, diff --git a/python/ray/experimental/sgd/pytorch/examples/dcgan.py b/python/ray/experimental/sgd/pytorch/examples/dcgan.py index 7afc83f61..031e89e61 100644 --- a/python/ray/experimental/sgd/pytorch/examples/dcgan.py +++ b/python/ray/experimental/sgd/pytorch/examples/dcgan.py @@ -4,7 +4,6 @@ import argparse import os import torch import torch.nn as nn -from torch import distributed import torch.optim as optim import torch.utils.data import torchvision.datasets as dset @@ -13,11 +12,10 @@ import numpy as np from torch.autograd import Variable from torch.nn import functional as F -from torch.utils.data.distributed import DistributedSampler from scipy.stats import entropy import ray -from ray.experimental.sgd.pytorch import PyTorchTrainer +from ray.experimental.sgd import PyTorchTrainer # Training parameters TRAIN_BATCHES = 5 @@ -34,8 +32,8 @@ features_g = 32 features_d = 32 -def data_creator(batch_size, config): - dataset = dset.MNIST( +def data_creator(config): + return dset.MNIST( root="~/mnist/", download=True, transform=transforms.Compose([ @@ -44,19 +42,6 @@ def data_creator(batch_size, config): transforms.Normalize((0.5, ), (0.5, )), ])) - # Create the dataloader - train_sampler = None - if distributed.is_initialized(): - train_sampler = DistributedSampler(dataset) - dataloader = torch.utils.data.DataLoader( - dataset, - batch_size=batch_size, - num_workers=3, - shuffle=(train_sampler is None), - sampler=train_sampler) - - return dataloader, None - def weights_init(m): classname = m.__class__.__name__ @@ -231,7 +216,7 @@ def train_example(num_replicas=1, use_gpu=False, test_mode=False): model_creator, data_creator, optimizer_creator, - lambda config: nn.BCELoss(), + nn.BCELoss, train_function=train, validation_function=False, num_replicas=num_replicas, diff --git a/python/ray/experimental/sgd/examples/example-sgd.yaml b/python/ray/experimental/sgd/pytorch/examples/example-sgd.yaml similarity index 100% rename from python/ray/experimental/sgd/examples/example-sgd.yaml rename to python/ray/experimental/sgd/pytorch/examples/example-sgd.yaml diff --git a/python/ray/experimental/sgd/examples/train_example.py b/python/ray/experimental/sgd/pytorch/examples/train_example.py similarity index 69% rename from python/ray/experimental/sgd/examples/train_example.py rename to python/ray/experimental/sgd/pytorch/examples/train_example.py index a527e8318..5c9e68f7e 100644 --- a/python/ray/experimental/sgd/examples/train_example.py +++ b/python/ray/experimental/sgd/pytorch/examples/train_example.py @@ -12,10 +12,8 @@ import argparse import numpy as np import torch import torch.nn as nn -from torch import distributed -from torch.utils.data.distributed import DistributedSampler -from ray.experimental.sgd.pytorch.pytorch_trainer import PyTorchTrainer +from ray.experimental.sgd import PyTorchTrainer class LinearDataset(torch.utils.data.Dataset): @@ -42,30 +40,9 @@ def optimizer_creator(model, config): return torch.optim.SGD(model.parameters(), lr=1e-2) -def data_creator(batch_size, config): +def data_creator(config): """Returns training dataloader, validation dataloader.""" - train_dataset = LinearDataset(2, 5) - validation_dataset = LinearDataset(2, 5, size=400) - - train_sampler = None - if distributed.is_initialized(): - train_sampler = DistributedSampler(train_dataset) - train_loader = torch.utils.data.DataLoader( - train_dataset, - batch_size=batch_size, - shuffle=(train_sampler is None), - sampler=train_sampler) - - validation_sampler = None - if distributed.is_initialized(): - validation_sampler = DistributedSampler(validation_dataset) - validation_loader = torch.utils.data.DataLoader( - validation_dataset, - batch_size=batch_size, - shuffle=(validation_sampler is None), - sampler=validation_sampler) - - return train_loader, validation_loader + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) def train_example(num_replicas=1, use_gpu=False): @@ -73,7 +50,7 @@ def train_example(num_replicas=1, use_gpu=False): model_creator, data_creator, optimizer_creator, - loss_creator=lambda config: nn.MSELoss(), + loss_creator=nn.MSELoss, num_replicas=num_replicas, use_gpu=use_gpu, batch_size=num_replicas * 4, diff --git a/python/ray/experimental/sgd/examples/tune_example.py b/python/ray/experimental/sgd/pytorch/examples/tune_example.py similarity index 72% rename from python/ray/experimental/sgd/examples/tune_example.py rename to python/ray/experimental/sgd/pytorch/examples/tune_example.py index 1fd8264e4..bacfa8eac 100644 --- a/python/ray/experimental/sgd/examples/tune_example.py +++ b/python/ray/experimental/sgd/pytorch/examples/tune_example.py @@ -11,8 +11,6 @@ in the documentation. import numpy as np import torch import torch.nn as nn -from torch import distributed -from torch.utils.data.distributed import DistributedSampler import ray from ray import tune @@ -44,29 +42,9 @@ def optimizer_creator(model, config): return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-4)) -def data_creator(batch_size, config): +def data_creator(config): """Returns training dataloader, validation dataloader.""" - train_dataset = LinearDataset(2, 5) - validation_dataset = LinearDataset(2, 5, size=400) - - train_sampler = None - if distributed.is_initialized(): - train_sampler = DistributedSampler(train_dataset) - train_loader = torch.utils.data.DataLoader( - train_dataset, - batch_size=batch_size, - shuffle=(train_sampler is None), - sampler=train_sampler) - - validation_sampler = None - if distributed.is_initialized(): - validation_sampler = DistributedSampler(validation_dataset) - validation_loader = torch.utils.data.DataLoader( - validation_dataset, - batch_size=batch_size, - shuffle=(validation_sampler is None), - sampler=validation_sampler) - return train_loader, validation_loader + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) def tune_example(num_replicas=1, use_gpu=False): @@ -74,7 +52,7 @@ def tune_example(num_replicas=1, use_gpu=False): "model_creator": tune.function(model_creator), "data_creator": tune.function(data_creator), "optimizer_creator": tune.function(optimizer_creator), - "loss_creator": tune.function(lambda config: nn.MSELoss()), + "loss_creator": tune.function(nn.MSELoss), "num_replicas": num_replicas, "use_gpu": use_gpu, "batch_size": 512, diff --git a/python/ray/experimental/sgd/pytorch/pytorch_runner.py b/python/ray/experimental/sgd/pytorch/pytorch_runner.py index a0c861d86..e5319a8a5 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_runner.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_runner.py @@ -1,10 +1,11 @@ import collections from filelock import FileLock import logging +import inspect import os import torch import torch.utils.data -from torch.utils.data import DataLoader +from torch.utils.data import Dataset import ray from ray.experimental.sgd.pytorch import utils as pytorch_utils @@ -24,19 +25,21 @@ class PyTorchRunner: train_function=None, validation_function=None, config=None, + dataloader_config=None, batch_size=16): """Initializes the runner. Args: model_creator (dict -> torch.nn.Module): see pytorch_trainer.py - data_creator (int, dict -> DataLoader, DataLoader): see + data_creator (int, dict -> Dataset, Dataset): see pytorch_trainer.py. optimizer_creator (torch.nn.Module, dict -> loss, optimizer): see pytorch_trainer.py. - loss_creator (dict -> loss): see pytorch_trainer.py. + loss_creator (dict -> loss | Loss class): see pytorch_trainer.py. train_function: see pytorch_trainer.py validation_function: see pytorch_trainer.py config (dict): see pytorch_trainer.py. + dataloader_config (dict): See pytorch_trainer.py. batch_size (int): see pytorch_trainer.py. """ self.model_creator = model_creator @@ -44,6 +47,10 @@ class PyTorchRunner: self.optimizer_creator = optimizer_creator self.loss_creator = loss_creator self.config = {} if config is None else config + self.dataloader_config = { + "num_workers": 2, + "pin_memory": True + } if dataloader_config is None else dataloader_config self.train_function = train_function or pytorch_utils.train self.validation_function = (validation_function or pytorch_utils.validate) @@ -65,16 +72,24 @@ class PyTorchRunner: self.train_loader = None self.validation_loader = None - def _validate_loaders(self, data_loaders): - assert data_loaders, "Dataloaders need to be returned in data_creator." - if isinstance(data_loaders, DataLoader): - return data_loaders, None - elif len(data_loaders) == 2 and isinstance(data_loaders[0], - DataLoader): - return data_loaders + def _validate_datasets(self, dataset): + assert dataset, "Datasets need to be returned in data_creator." + if issubclass(type(dataset), Dataset): + return dataset, None + elif len(dataset) == 2 and issubclass(type(dataset[0]), Dataset): + return dataset else: - raise ValueError( - "Dataloaders must be <= 2. Got {}".format(data_loaders)) + raise ValueError("Datasets must be <= 2. Got {}".format(dataset)) + + def _create_loss(self): + if inspect.isclass(self.loss_creator) and issubclass( + self.loss_creator, torch.nn.modules.loss._Loss): + self.criterion = self.loss_creator() + else: + self.criterion = self.loss_creator(self.config) + + if torch.cuda.is_available(): + self.criterion = self.criterion.cuda() def setup(self): """Initializes the model.""" @@ -90,15 +105,23 @@ class PyTorchRunner: self.config) if not isinstance(self.optimizers, collections.Iterable): self.optimizers = [self.optimizers] - self.criterion = self.loss_creator(self.config) - if torch.cuda.is_available(): - self.criterion = self.criterion.cuda() + + self._create_loss() logger.debug("Creating dataset") + # When creating datasets, a filelock will be used to ensure no + # race conditions in data downloading among different workers. with FileLock(os.path.expanduser("~/.ray_data.lock")): - dataloaders = self.data_creator(self.batch_size, self.config) - self.train_loader, self.validation_loader = self._validate_loaders( - dataloaders) + datasets = self.data_creator(self.config) + train_set, val_set = self._validate_datasets(datasets) + + self.train_loader = torch.utils.data.DataLoader( + train_set, batch_size=self.batch_size, **self.dataloader_config) + + self.validation_loader = None + if val_set: + self.validation_loader = torch.utils.data.DataLoader( + val_set, batch_size=self.batch_size, **self.dataloader_config) def get_node_ip(self): """Returns the IP address of the current node.""" @@ -145,9 +168,18 @@ class PyTorchRunner: def get_state(self): """Returns the state of the runner.""" + # This is so that we create a duplicate of weights into CPU rather than + # move the model weights entirely out of the GPU, so that we can + # resume training while saving intermediate checkpoints. + cpu_state_dicts = [] + for model in self.models: + state_dict = model.state_dict() + for k, v in state_dict.items(): + state_dict[k] = v.cpu() + cpu_state_dicts += [state_dict] return { "epoch": self.epoch, - "models": [model.cpu().state_dict() for model in self.models], + "models": cpu_state_dicts, "optimizers": [opt.state_dict() for opt in self.optimizers], "stats": self.stats() } diff --git a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py index 02a25193a..5a03175b3 100644 --- a/python/ray/experimental/sgd/pytorch/pytorch_trainer.py +++ b/python/ray/experimental/sgd/pytorch/pytorch_trainer.py @@ -25,6 +25,85 @@ class PyTorchTrainer: Launches a set of actors which connect via distributed PyTorch and coordinate gradient updates to train the provided model. + + .. code-block:: python + + def model_creator(config): + return nn.Linear(1, 1) + + + def optimizer_creator(model, config): + return torch.optim.SGD( + model.parameters(), lr=config.get("lr", 1e-4)) + + + def data_creator(config): + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) + + trainer = PyTorchTrainer( + model_creator, + data_creator, + optimizer_creator, + loss_creator=nn.MSELoss, + use_gpu=True + ) + trainer.train() + + Args: + model_creator (dict -> *): Constructor function that takes in + config and returns the model(s) to be optimized. These must be + ``torch.nn.Module`` objects. Note that if multiple models + are returned, the same number of optimizers must be returned + by the optimizer_creator. If multiple models are returned, + a ``train_function`` must be specified. You do not need to + handle GPU/devices in this function; + RaySGD will do that under the hood. + data_creator (dict -> Dataset, Dataset): Constructor function + that takes in the passed config and returns one or + two ``torch.utils.data.Dataset`` objects. + Note that even though two Dataset objects can be returned, + only one dataset will be used for training. RaySGD + will automatically wrap the objects with a ``DataLoader``. + optimizer_creator (models, dict -> optimizers): Constructor + function that takes in the return values from + ``model_creator`` and the passed config and returns One or + more Torch optimizer objects. You must return as many + optimizers as you have models. You do not need to handle + GPU/devices in this function; ``RaySGD`` will do that for you. + loss_creator (dict -> loss or torch.nn.*Loss): A constructor function + for the training loss. This can be either a function that + takes in the provided config for customization or a subclass + of ``torch.nn.modules.loss._Loss``, which is most Pytorch + loss classes. For example, ``loss_creator=torch.nn.BCELoss``. + train_function: Custom function for training. This function + will be executed in parallel across all workers at once. The + function needs to take in (models, train_dataloader, criterion, + optimizers, config), and return a dict of training stats. + validation_function: Custom function for validation. This function + will be executed in parallel across all workers at once. + This takes in (model, val_dataloader, criterion, config) + and returns a dict of validation stats. + config (dict): Custom configuration value to be passed to + "model_creator", "data_creator", "optimizer_creator", and + "loss_creator". + dataloader_config (dict): Configuration values to be passed into + the ``torch.utils.data.DataLoader`` object that wraps + the dataset on each parallel worker for both training + and validation. Note that if ``num_replicas`` + is greater than 1, ``shuffle`` and ``sampler`` will be + automatically set. See the available arguments + here https://pytorch.org/docs/stable/data.html. + num_replicas (int): the number of workers used in distributed + training. + use_gpu (bool): Sets resource allocation for workers to 1 GPU + if true, and automatically moves both the model and optimizer + to the available CUDA device. + batch_size (int): Total batch size for each minibatch. This + value is divided among all workers and rounded. + backend (string): backend used by distributed PyTorch. Currently + support "nccl", "gloo", and "auto". If "auto", RaySGD will + automatically use "nccl" if `use_gpu` is True, and "gloo" + otherwise. """ def __init__(self, @@ -36,39 +115,12 @@ class PyTorchTrainer: validation_function=None, initialization_hook=None, config=None, + dataloader_config=None, num_replicas=1, use_gpu=False, batch_size=16, backend="auto"): - """Sets up the PyTorch trainer. - - Args: - model_creator (dict -> torch.nn.Module): creates the model - using the config. - data_creator (int, dict -> DataLoader, DataLoader): Function that - takes in (batch_size, config) and returns two Torch DataLoader - objects. - optimizer_creator (torch.nn.Module, dict -> optimizer): - creates the loss and optimizer using the model and the config. - loss_creator (dict -> loss): Creates the loss function/criterion - using the config. - train_function: Trains a model for a epoch. This takes in ( - model, train_dataloader, criterion, optimizer, config), and - returns a dict of training stats. - validation_function: Runs validation. This takes in ( - model, val_dataloader, criterion, config) and returns a dict of - validation stats. - config (dict): configuration passed to "model_creator", - "data_creator", "optimizer_creator", and "loss_creator". - num_replicas (int): the number of workers used in distributed - training. - use_gpu (bool): Sets resource allocation for workers to 1 GPU - if true. - batch_size (int): batch size for an update. - backend (string): backend used by distributed PyTorch. - """ # TODO: add support for mixed precision - # TODO: add support for callbacks if num_replicas > 1 and not dist.is_available(): raise ValueError( ("Distributed PyTorch is not supported on macOS. " @@ -84,6 +136,7 @@ class PyTorchTrainer: self.validation_function = validation_function self.initialization_hook = initialization_hook self.config = {} if config is None else config + self.dataloader_config = dataloader_config self.optimizer_timer = utils.TimerStat(window_size=1) if backend == "auto": @@ -115,6 +168,7 @@ class PyTorchTrainer: train_function=self.train_function, validation_function=self.validation_function, config=self.config, + dataloader_config=self.dataloader_config, batch_size=self.batch_size) ] if self.initialization_hook: @@ -148,6 +202,7 @@ class PyTorchTrainer: train_function=self.train_function, validation_function=self.validation_function, config=self.config, + dataloader_config=self.dataloader_config, batch_size=batch_size_per_replica) for i in range(num_replicas) ] @@ -274,11 +329,12 @@ class PyTorchTrainer: def shutdown(self, force=False): """Shuts down workers and releases resources.""" - for worker in self.workers: - if not force: - worker.shutdown.remote() - worker.__ray_terminate__.remote() - else: + if not force: + cleanup = [worker.shutdown.remote() for worker in self.workers] + ray.get(cleanup) + [worker.__ray_terminate__.remote() for worker in self.workers] + else: + for worker in self.workers: logger.warning("Killing worker {}.".format(worker)) worker.__ray_kill__() diff --git a/python/ray/experimental/sgd/tests/test_pytorch.py b/python/ray/experimental/sgd/tests/test_pytorch.py index 3fc24db46..f87ab37cc 100644 --- a/python/ray/experimental/sgd/tests/test_pytorch.py +++ b/python/ray/experimental/sgd/tests/test_pytorch.py @@ -15,7 +15,7 @@ from ray.experimental.sgd.pytorch import PyTorchTrainer, PyTorchTrainable from ray.experimental.sgd.pytorch.utils import train from ray.experimental.sgd.utils import check_for_failure -from ray.experimental.sgd.examples.train_example import ( +from ray.experimental.sgd.pytorch.examples.train_example import ( model_creator, optimizer_creator, data_creator, LinearDataset) @@ -98,6 +98,8 @@ def test_multi_model(ray_start_2_cpus, num_replicas): # noqa: F811 for k in model1_state_dict: assert torch.equal(model1_state_dict[k], model2_state_dict[k]) + trainer2.shutdown() + @pytest.mark.parametrize("num_replicas", [1, 2] if dist.is_available() else [1]) @@ -175,11 +177,8 @@ def test_fail_with_recover(ray_start_2_cpus): # noqa: F811 if not dist.is_available(): return - def single_loader(batch_size, config): - train_dataset = LinearDataset(2, 5, size=1000000) - train_loader = torch.utils.data.DataLoader( - train_dataset, batch_size=batch_size) - return train_loader + def single_loader(config): + return LinearDataset(2, 5, size=1000000) def step_with_fail(self): worker_stats = [w.step.remote() for w in self.workers] @@ -206,11 +205,8 @@ def test_resize(ray_start_2_cpus): # noqa: F811 if not dist.is_available(): return - def single_loader(batch_size, config): - train_dataset = LinearDataset(2, 5, size=1000000) - train_loader = torch.utils.data.DataLoader( - train_dataset, batch_size=batch_size) - return train_loader + def single_loader(config): + return LinearDataset(2, 5, size=1000000) def step_with_fail(self): worker_stats = [w.step.remote() for w in self.workers] @@ -243,11 +239,8 @@ def test_fail_twice(ray_start_2_cpus): # noqa: F811 if not dist.is_available(): return - def single_loader(batch_size, config): - train_dataset = LinearDataset(2, 5, size=1000000) - train_loader = torch.utils.data.DataLoader( - train_dataset, batch_size=batch_size) - return train_loader + def single_loader(config): + return LinearDataset(2, 5, size=1000000) def step_with_fail(self): worker_stats = [w.step.remote() for w in self.workers] diff --git a/python/ray/experimental/sgd/tests/test_pytorch_runner.py b/python/ray/experimental/sgd/tests/test_pytorch_runner.py index 926101ad1..f3036fe5e 100644 --- a/python/ray/experimental/sgd/tests/test_pytorch_runner.py +++ b/python/ray/experimental/sgd/tests/test_pytorch_runner.py @@ -36,18 +36,12 @@ def loss_creator(config): return nn.MSELoss() -def single_loader(batch_size, config): - train_dataset = LinearDataset(2, 5) - train_loader = torch.utils.data.DataLoader(train_dataset) - return train_loader +def single_loader(config): + return LinearDataset(2, 5) -def create_dataloaders(batch_size, config): - train_dataset = LinearDataset(2, 5) - validation_dataset = LinearDataset(2, 5, size=400) - train_loader = torch.utils.data.DataLoader(train_dataset) - validation_loader = torch.utils.data.DataLoader(validation_dataset) - return train_loader, validation_loader +def create_dataloaders(config): + return LinearDataset(2, 5), LinearDataset(2, 5, size=400) class TestPyTorchRunner(unittest.TestCase): @@ -109,12 +103,9 @@ class TestPyTorchRunner(unittest.TestCase): self.assertNotEqual(runner2.given_optimizers, runner2.optimizers) def testMultiLoaders(self): - def three_data_loader(batch_size, config): - train_dataset = LinearDataset(2, 5) - validation_dataset = LinearDataset(2, 5, size=400) - train_loader = torch.utils.data.DataLoader(train_dataset) - validation_loader = torch.utils.data.DataLoader(validation_dataset) - return train_loader, validation_loader, validation_loader + def three_data_loader(config): + return (LinearDataset(2, 5), LinearDataset(2, 5, size=400), + LinearDataset(2, 5, size=400)) runner = PyTorchRunner(model_creator, three_data_loader, optimizer_creator, loss_creator) @@ -134,6 +125,15 @@ class TestPyTorchRunner(unittest.TestCase): with self.assertRaises(ValueError): runner.validate() + def testNativeLoss(self): + runner = PyTorchRunner( + model_creator, + single_loader, + optimizer_creator, + loss_creator=nn.MSELoss) + runner.setup() + runner.step() + def testMultiModel(self): def multi_model_creator(config): return nn.Linear(1, 1), nn.Linear(1, 1), nn.Linear(1, 1) diff --git a/python/ray/experimental/sgd/tests/test_tensorflow.py b/python/ray/experimental/sgd/tests/test_tensorflow.py index ce5f921fd..7e2014772 100644 --- a/python/ray/experimental/sgd/tests/test_tensorflow.py +++ b/python/ray/experimental/sgd/tests/test_tensorflow.py @@ -8,7 +8,7 @@ from ray import tune from ray.tests.conftest import ray_start_2_cpus # noqa: F401 from ray.experimental.sgd.tf import TFTrainer, TFTrainable -from ray.experimental.sgd.examples.tensorflow_train_example import ( +from ray.experimental.sgd.tf.examples.tensorflow_train_example import ( simple_model, simple_dataset) SIMPLE_CONFIG = { diff --git a/python/ray/experimental/sgd/examples/cifar_tf_example.py b/python/ray/experimental/sgd/tf/examples/cifar_tf_example.py similarity index 100% rename from python/ray/experimental/sgd/examples/cifar_tf_example.py rename to python/ray/experimental/sgd/tf/examples/cifar_tf_example.py diff --git a/python/ray/experimental/sgd/examples/tensorflow_train_example.py b/python/ray/experimental/sgd/tf/examples/tensorflow_train_example.py similarity index 100% rename from python/ray/experimental/sgd/examples/tensorflow_train_example.py rename to python/ray/experimental/sgd/tf/examples/tensorflow_train_example.py diff --git a/python/ray/experimental/sgd/examples/tf-example-sgd.yaml b/python/ray/experimental/sgd/tf/examples/tf-example-sgd.yaml similarity index 100% rename from python/ray/experimental/sgd/examples/tf-example-sgd.yaml rename to python/ray/experimental/sgd/tf/examples/tf-example-sgd.yaml