[sgd] fp16 (apex) and scheduler support + move examples page (#7061)

* Init fp16

* fp16 and schedulers

* scheduler linking and fp16

* to fp16

* loss scaling and documentation

* more documentation

* add tests, refactor config

* moredocs

* more docs

* fix logo, add test mode, add fp16 flag

* fix tests

* fix scheduler

* fix apex

* improve safety

* fix tests

* fix tests

* remove pin memory default

* rm

* fix

* Update doc/examples/doc_code/raysgd_torch_signatures.py

* fix

* migrate changes from other PR

* ok thanks

* pass

* signatures

* lint'

* Update python/ray/experimental/sgd/pytorch/utils.py

* Apply suggestions from code review

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* should address most comments

* comments

* fix this ci

* fix tests'

* testmode

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This commit is contained in:
Richard Liaw
2020-02-16 19:04:08 -08:00
committed by GitHub
parent f0e62d733f
commit 94e2fcea2e
17 changed files with 874 additions and 364 deletions
@@ -13,16 +13,17 @@ logger = logging.getLogger(__name__)
class DistributedPyTorchRunner(PyTorchRunner):
"""Manages a distributed PyTorch model replica."""
"""Manages a distributed PyTorch model replica.
Args:
args: Arguments for PyTorchRunner.
backend (string): backend used by distributed PyTorch.
kwargs: Keyword arguments for PyTorchRunner.
"""
def __init__(self, *args, backend="gloo", **kwargs):
"""Initializes the runner.
Args:
args: Arguments for the PyTorchRunner.
kwargs: Keyword arguments for the PyTorchRunner.
backend (string): backend used by distributed PyTorch.
"""
super(DistributedPyTorchRunner, self).__init__(*args, **kwargs)
self.backend = backend
@@ -59,7 +60,6 @@ class DistributedPyTorchRunner(PyTorchRunner):
"All models must be PyTorch models: {}.".format(self.models))
if torch.cuda.is_available():
self.models = [model.cuda() for model in self.models]
self.models = [DistributedDataParallel(model) for model in self.models]
logger.debug("Creating optimizer.")
self.optimizers = self.optimizer_creator(self.given_models,
@@ -67,6 +67,13 @@ class DistributedPyTorchRunner(PyTorchRunner):
if not isinstance(self.optimizers, collections.Iterable):
self.optimizers = [self.optimizers]
self._create_schedulers_if_available()
self._try_setup_apex()
# This needs to happen after apex
self.models = [DistributedDataParallel(model) for model in self.models]
logger.debug("Creating loss.")
self._create_loss()
@@ -98,37 +105,27 @@ class DistributedPyTorchRunner(PyTorchRunner):
self.train_loader.sampler.set_epoch(self.epoch)
return super(DistributedPyTorchRunner, self).step()
def get_state(self):
"""Returns the state of the runner."""
# This is so that we create a duplicate of weights into CPU rather than
# move the model weights entirely out of the GPU, so that we can
# resume training while saving intermediate checkpoints.
def _get_model_state_dicts(self):
"""Fetch state from ``model.module`` instead of ``model``.
This is needed for PyTorch DistributedDataParallel models.
"""
cpu_state_dicts = []
for model in self.models:
state_dict = model.module.state_dict()
for k, v in state_dict.items():
state_dict[k] = v.cpu()
cpu_state_dicts += [state_dict]
return {
"epoch": self.epoch,
"models": cpu_state_dicts,
"optimizers": [opt.state_dict() for opt in self.optimizers],
"stats": self.stats()
}
# This is so that we create a duplicate of weights into CPU rather
# than move the model weights out of the GPU so that we can
# resume training while saving intermediate checkpoints.
cpu_state_dicts += [{k: v.cpu() for k, v in state_dict.items()}]
return cpu_state_dicts
def set_state(self, state):
"""Sets the state of the model."""
# TODO: restore timer stats
for model, model_state_dict in zip(self.models, state["models"]):
def _set_model_state_dicts(self, model_state_dicts):
for model, model_state_dict in zip(self.models, model_state_dicts):
model.module.load_state_dict(model_state_dict)
for optimizer, opt_state_dict in zip(self.optimizers,
state["optimizers"]):
optimizer.load_state_dict(opt_state_dict)
self.epoch = state["stats"]["epoch"]
def shutdown(self):
# def shutdown(self):
"""Attempts to shut down the worker."""
super(DistributedPyTorchRunner, self).shutdown()
# super(DistributedPyTorchRunner, self).shutdown()
# TODO: Temporarily removing since it causes hangs on MacOSX.
# However, it seems to be harmless to remove permanently
# since the processes are shutdown anyways. This comment can be
@@ -10,6 +10,7 @@ import torchvision.transforms as transforms
import ray
from ray.experimental.sgd.pytorch import (PyTorchTrainer, PyTorchTrainable)
from ray.experimental.sgd.pytorch.resnet import ResNet18
from ray.experimental.sgd.pytorch.utils import TEST_MODE
def initialization_hook(runner):
@@ -20,55 +21,6 @@ def initialization_hook(runner):
os.environ["NCCL_DEBUG"] = "INFO"
def train(model, train_iterator, criterion, optimizer, config):
model.train()
train_loss, total_num, correct = 0, 0, 0
for batch_idx, (data, target) in enumerate(train_iterator):
if config.get("test_mode") and batch_idx > 0:
break
# get small model update
if torch.cuda.is_available():
data, target = data.cuda(), target.cuda()
output = model(data)
loss = criterion(output, target)
loss.backward()
train_loss += loss.item() * target.size(0)
total_num += target.size(0)
_, predicted = output.max(1)
correct += predicted.eq(target).sum().item()
optimizer.step()
optimizer.zero_grad()
stats = {
"train_loss": train_loss / total_num,
"train_acc": correct / total_num
}
return stats
def validate(model, val_iterator, criterion, config):
# switch to evaluate mode
model.eval()
correct = 0
total = 0
total_loss = 0
with torch.no_grad():
for batch_idx, (features, target) in enumerate(val_iterator):
if config.get("test_mode") and batch_idx > 10:
break
if torch.cuda.is_available():
features = features.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
# compute output
output = model(features)
loss = criterion(output, target)
total_loss += loss.item() * target.size(0)
_, predicted = torch.max(output.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
stats = {"mean_accuracy": correct / total, "mean_loss": total_loss / total}
return stats
def cifar_creator(config):
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
@@ -96,23 +48,34 @@ def optimizer_creator(model, config):
return torch.optim.SGD(model.parameters(), lr=config.get("lr", 0.1))
def train_example(num_replicas=1, use_gpu=False, test_mode=False):
config = {"test_mode": test_mode}
def scheduler_creator(optimizer, config):
return torch.optim.lr_scheduler.MultiStepLR(
optimizer, milestones=[150, 250, 350], gamma=0.1)
def train_example(num_replicas=1,
num_epochs=5,
use_gpu=False,
use_fp16=False,
test_mode=False):
config = {TEST_MODE: test_mode}
trainer1 = PyTorchTrainer(
ResNet18,
cifar_creator,
optimizer_creator,
nn.CrossEntropyLoss,
scheduler_creator=scheduler_creator,
initialization_hook=initialization_hook,
train_function=train,
validation_function=validate,
num_replicas=num_replicas,
config=config,
use_gpu=use_gpu,
batch_size=16 if test_mode else 512,
backend="nccl" if use_gpu else "gloo")
for i in range(5):
stats = trainer1.train()
backend="nccl" if use_gpu else "gloo",
scheduler_step_freq="epoch",
use_fp16=use_fp16)
for i in range(num_epochs):
# Increase `max_retries` to turn on fault tolerance.
stats = trainer1.train(max_retries=0)
print(stats)
print(trainer1.validate())
@@ -126,15 +89,13 @@ def tune_example(num_replicas=1, use_gpu=False, test_mode=False):
"data_creator": cifar_creator,
"optimizer_creator": optimizer_creator,
"loss_creator": lambda config: nn.CrossEntropyLoss(),
"train_function": train,
"validation_function": validate,
"num_replicas": num_replicas,
"initialization_hook": initialization_hook,
"use_gpu": use_gpu,
"batch_size": 16 if test_mode else 512,
"config": {
"lr": tune.choice([1e-4, 1e-3, 5e-3, 1e-2]),
"test_mode": test_mode
TEST_MODE: test_mode
},
"backend": "nccl" if use_gpu else "gloo"
}
@@ -152,7 +113,7 @@ def tune_example(num_replicas=1, use_gpu=False, test_mode=False):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--ray-redis-address",
"--address",
required=False,
type=str,
help="the address to use for Redis")
@@ -162,11 +123,18 @@ if __name__ == "__main__":
type=int,
default=1,
help="Sets number of replicas for training.")
parser.add_argument(
"--num-epochs", type=int, default=5, help="Number of epochs to train.")
parser.add_argument(
"--use-gpu",
action="store_true",
default=False,
help="Enables GPU training")
parser.add_argument(
"--fp16",
action="store_true",
default=False,
help="Enables FP16 training with apex. Requires `use-gpu`.")
parser.add_argument(
"--smoke-test",
action="store_true",
@@ -177,7 +145,7 @@ if __name__ == "__main__":
args, _ = parser.parse_known_args()
ray.init(address=args.ray_redis_address, log_to_driver=False)
ray.init(address=args.address, log_to_driver=True)
if args.tune:
tune_example(
@@ -187,5 +155,7 @@ if __name__ == "__main__":
else:
train_example(
num_replicas=args.num_replicas,
num_epochs=args.num_epochs,
use_gpu=args.use_gpu,
use_fp16=args.fp16,
test_mode=args.smoke_test)
@@ -16,6 +16,7 @@ from scipy.stats import entropy
import ray
from ray.experimental.sgd import PyTorchTrainer
from ray.experimental.sgd.pytorch.utils import TEST_MODE
# Training parameters
TRAIN_BATCHES = 5
@@ -157,7 +158,7 @@ def model_creator(config):
return netD, netG
def train(models, dataloader, criterion, optimizers, config):
def train(config, models, dataloader, criterion, optimizers, **kwargs):
netD, netG = models
optimD, optimG = optimizers
real_label = 1
@@ -165,7 +166,7 @@ def train(models, dataloader, criterion, optimizers, config):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for i, data in enumerate(dataloader, 0):
if i >= TRAIN_BATCHES and config.get("test_mode"):
if i >= TRAIN_BATCHES and config.get(TEST_MODE):
break
netD.zero_grad()
@@ -211,7 +212,7 @@ def optimizer_creator(models, config):
def train_example(num_replicas=1, use_gpu=False, test_mode=False):
config = {"test_mode": test_mode}
config = {TEST_MODE: test_mode}
trainer = PyTorchTrainer(
model_creator,
data_creator,
@@ -19,7 +19,6 @@ idle_timeout_minutes: 20
provider:
type: aws
region: us-east-1
availability_zone: us-east-1f
# How Ray will authenticate with newly launched nodes.
auth:
@@ -37,18 +36,18 @@ head_node:
worker_nodes:
InstanceType: p3.8xlarge
ImageId: ami-0757fc5a639fe7666
# Run workers on spot by default. Comment this out to use on-demand.
InstanceMarketOptions:
MarketType: spot
# SpotOptions:
# MaxPrice: "9.0"
# # Run workers on spot by default. Comment this out to use on-demand.
# InstanceMarketOptions:
# MarketType: spot
setup_commands:
- ray || pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.9.0.dev0-cp36-cp36m-manylinux1_x86_64.whl
- pip install -U ipdb ray[rllib] torch torchvision
# Install apex.
# - rm -rf apex || true
# - git clone https://github.com/NVIDIA/apex && cd apex && pip install -v --no-cache-dir ./ || true
file_mounts: {
@@ -32,12 +32,25 @@ class LinearDataset(torch.utils.data.Dataset):
def model_creator(config):
return nn.Linear(1, 1)
"""Returns a torch.nn.Module object."""
return nn.Linear(1, config.get("hidden_size", 1))
def optimizer_creator(model, config):
"""Returns optimizer."""
return torch.optim.SGD(model.parameters(), lr=1e-2)
"""Returns optimizer defined upon the model parameters."""
return torch.optim.SGD(model.parameters(), lr=config.get("lr", 1e-2))
def scheduler_creator(optimizer, config):
"""Returns a learning rate scheduler wrapping the optimizer.
You will need to set ``PyTorchTrainer(scheduler_step_freq="epoch")``
for the scheduler to be incremented correctly.
If using a scheduler for validation loss, be sure to call
``trainer.update_scheduler(validation_loss)``.
"""
return torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.9)
def data_creator(config):
@@ -51,10 +64,13 @@ def train_example(num_replicas=1, use_gpu=False):
data_creator,
optimizer_creator,
loss_creator=nn.MSELoss,
scheduler_creator=scheduler_creator,
num_replicas=num_replicas,
use_gpu=use_gpu,
batch_size=num_replicas * 4,
backend="gloo")
config={"lr": 1e-2, "hidden_size": 1},
backend="gloo",
scheduler_step_freq="epoch")
for i in range(5):
stats = trainer1.train()
print(stats)
@@ -12,44 +12,57 @@ from ray.experimental.sgd.pytorch import utils as pytorch_utils
from ray.experimental.sgd import utils
logger = logging.getLogger(__name__)
amp = None
try:
from apex import amp
except ImportError:
logger.debug("apex is not installed.")
pass
class PyTorchRunner:
"""Manages a PyTorch model for training."""
"""Manages a PyTorch model for training.
Args:
model_creator (dict -> *): see pytorch_trainer.py
data_creator (dict -> Dataset, Dataset): see pytorch_trainer.py.
optimizer_creator (models, dict -> optimizers): see pytorch_trainer.py.
loss_creator (dict -> loss | Loss class): see pytorch_trainer.py.
scheduler_creator (optimizers, dict -> schedulers): see
pytorch_trainer.py.
train_function: see pytorch_trainer.py
validation_function: see pytorch_trainer.py
config (dict): see pytorch_trainer.py.
dataloader_config (dict): See pytorch_trainer.py.
batch_size (int): see pytorch_trainer.py.
use_fp16 (bool): see pytorch_trainer.py.
apex_args (dict|None): see pytorch_trainer.py.
scheduler_step_freq (str): see pytorch_trainer.py.
"""
def __init__(self,
model_creator,
data_creator,
optimizer_creator,
loss_creator,
scheduler_creator=None,
train_function=None,
validation_function=None,
config=None,
dataloader_config=None,
batch_size=16):
"""Initializes the runner.
Args:
model_creator (dict -> torch.nn.Module): see pytorch_trainer.py
data_creator (int, dict -> Dataset, Dataset): see
pytorch_trainer.py.
optimizer_creator (torch.nn.Module, dict -> loss, optimizer):
see pytorch_trainer.py.
loss_creator (dict -> loss | Loss class): see pytorch_trainer.py.
train_function: see pytorch_trainer.py
validation_function: see pytorch_trainer.py
config (dict): see pytorch_trainer.py.
dataloader_config (dict): See pytorch_trainer.py.
batch_size (int): see pytorch_trainer.py.
"""
batch_size=16,
use_fp16=False,
apex_args=None,
scheduler_step_freq="batch"):
self.model_creator = model_creator
self.data_creator = data_creator
self.optimizer_creator = optimizer_creator
self.loss_creator = loss_creator
self.scheduler_creator = scheduler_creator
self.config = {} if config is None else config
self.dataloader_config = {
"num_workers": 2,
"pin_memory": True
"num_workers": 2
} if dataloader_config is None else dataloader_config
self.train_function = train_function or pytorch_utils.train
self.validation_function = (validation_function
@@ -65,12 +78,19 @@ class PyTorchRunner:
"validation", "training"
]
}
self.models = None
self.optimizers = None
self.criterion = None
self.schedulers = None
self.train_loader = None
self.validation_loader = None
self.use_fp16 = use_fp16
self.apex_args = apex_args or {}
if use_fp16 and not amp:
raise ImportError(
"Please install apex from "
"https://www.github.com/nvidia/apex to use fp16 training.")
self.scheduler_step_freq = scheduler_step_freq
def _validate_datasets(self, dataset):
assert dataset, "Datasets need to be returned in data_creator."
@@ -91,6 +111,22 @@ class PyTorchRunner:
if torch.cuda.is_available():
self.criterion = self.criterion.cuda()
def _create_schedulers_if_available(self):
# Learning rate schedules are optional.
if not self.scheduler_creator:
return
self.schedulers = self.scheduler_creator(self.given_optimizers,
self.config)
if not isinstance(self.schedulers, collections.Iterable):
self.schedulers = [self.schedulers]
def _try_setup_apex(self):
"""Sets up the model for fp16 training via apex if available."""
if self.use_fp16 and amp:
self.models, self.optimizers = amp.initialize(
self.models, self.optimizers, **self.apex_args)
def setup(self):
"""Initializes the model."""
logger.debug("Creating model")
@@ -105,7 +141,8 @@ class PyTorchRunner:
self.config)
if not isinstance(self.optimizers, collections.Iterable):
self.optimizers = [self.optimizers]
self._create_schedulers_if_available()
self._try_setup_apex()
self._create_loss()
logger.debug("Creating dataset")
@@ -134,10 +171,19 @@ class PyTorchRunner:
def step(self):
"""Runs a training epoch and updates the model parameters."""
logger.debug("Begin Training Epoch {}".format(self.epoch + 1))
training_config = self.config.copy()
training_config.update({
pytorch_utils.USE_FP16: self.use_fp16,
pytorch_utils.SCHEDULER_STEP: self.scheduler_step_freq
})
with self._timers["training"]:
train_stats = self.train_function(
self.given_models, self.train_loader, self.criterion,
self.given_optimizers, self.config)
training_config,
self.given_models,
self.train_loader,
self.criterion,
self.given_optimizers,
scheduler=self.given_schedulers)
train_stats["epoch"] = self.epoch
self.epoch += 1
@@ -151,8 +197,11 @@ class PyTorchRunner:
raise ValueError("No validation dataloader provided.")
with self._timers["validation"]:
validation_stats = self.validation_function(
self.given_models, self.validation_loader, self.criterion,
self.config)
self.config,
self.given_models,
self.validation_loader,
self.criterion,
scheduler=self.given_schedulers)
validation_stats.update(self.stats())
return validation_stats
@@ -166,31 +215,53 @@ class PyTorchRunner:
t.reset()
return stats
def get_state(self):
"""Returns the state of the runner."""
def _get_model_state_dicts(self):
# This is so that we create a duplicate of weights into CPU rather than
# move the model weights entirely out of the GPU, so that we can
# resume training while saving intermediate checkpoints.
cpu_state_dicts = []
for model in self.models:
state_dict = model.state_dict()
for k, v in state_dict.items():
state_dict[k] = v.cpu()
cpu_state_dicts += [state_dict]
return {
cpu_state_dicts += [{k: v.cpu() for k, v in state_dict.items()}]
return cpu_state_dicts
def _set_model_state_dicts(self, models_state_dicts):
for model, state_dict in zip(self.models, models_state_dicts):
model.load_state_dict(state_dict)
def get_state(self):
"""Returns the state of the runner."""
state = {
"epoch": self.epoch,
"models": cpu_state_dicts,
"models": self._get_model_state_dicts(),
"optimizers": [opt.state_dict() for opt in self.optimizers],
"stats": self.stats()
}
if self.schedulers:
state.update({
"schedulers": [
scheduler.state_dict() for scheduler in self.schedulers
]
})
# Check if fp16 is True and if NVIDIA Apex is imported.
if self.use_fp16 and amp:
state.update({"amp": amp.state_dict()})
return state
def set_state(self, state):
"""Sets the state of the model."""
# TODO: restore timer stats
for model, state_dict in zip(self.models, state["models"]):
model.load_state_dict(state_dict)
self._set_model_state_dicts(state["models"])
for optimizer, state_dict in zip(self.optimizers, state["optimizers"]):
optimizer.load_state_dict(state_dict)
if self.schedulers:
for scheduler, state_dict in zip(self.schedulers,
state["schedulers"]):
scheduler.load_state_dict(state_dict)
if self.use_fp16 and "amp" in state and amp:
amp.load_state_dict(state["amp"])
self.epoch = state["stats"]["epoch"]
def apply_fn(self, fn):
@@ -206,6 +277,13 @@ class PyTorchRunner:
if torch.cuda.is_available():
torch.cuda.empty_cache()
@property
def given_models(self):
if len(self.models) > 1:
return self.models
else:
return self.models[0]
@property
def given_optimizers(self):
if len(self.optimizers) > 1:
@@ -214,8 +292,10 @@ class PyTorchRunner:
return self.optimizers[0]
@property
def given_models(self):
if len(self.models) > 1:
return self.models
def given_schedulers(self):
if not self.schedulers:
return self.schedulers
if len(self.schedulers) > 1:
return self.schedulers
else:
return self.models[0]
return self.schedulers[0]
@@ -15,6 +15,7 @@ from ray.experimental.sgd.pytorch.distributed_pytorch_runner import (
DistributedPyTorchRunner)
from ray.experimental.sgd import utils
from ray.experimental.sgd.pytorch.pytorch_runner import PyTorchRunner
from ray.experimental.sgd.pytorch import utils as pytorch_utils
logger = logging.getLogger(__name__)
RESIZE_COOLDOWN_S = 10
@@ -26,55 +27,59 @@ class PyTorchTrainer:
Launches a set of actors which connect via distributed PyTorch and
coordinate gradient updates to train the provided model.
.. code-block:: python
.. code-block:: python
def model_creator(config):
return nn.Linear(1, 1)
def model_creator(config):
return nn.Linear(1, 1)
def optimizer_creator(model, config):
return torch.optim.SGD(
model.parameters(), lr=config.get("lr", 1e-4))
def optimizer_creator(model, config):
return torch.optim.SGD(
model.parameters(), lr=config.get("lr", 1e-4))
def data_creator(config):
return LinearDataset(2, 5), LinearDataset(2, 5, size=400)
def data_creator(config):
return LinearDataset(2, 5), LinearDataset(2, 5, size=400)
trainer = PyTorchTrainer(
model_creator,
data_creator,
optimizer_creator,
loss_creator=nn.MSELoss,
use_gpu=True
)
trainer.train()
trainer = PyTorchTrainer(
model_creator,
data_creator,
optimizer_creator,
loss_creator=nn.MSELoss,
use_gpu=True
)
trainer.train()
Args:
model_creator (dict -> *): Constructor function that takes in
model_creator (dict -> Model(s)): Constructor function that takes in
config and returns the model(s) to be optimized. These must be
``torch.nn.Module`` objects. Note that if multiple models
are returned, the same number of optimizers must be returned
by the optimizer_creator. If multiple models are returned,
``torch.nn.Module`` objects. If multiple models are returned,
a ``train_function`` must be specified. You do not need to
handle GPU/devices in this function;
RaySGD will do that under the hood.
data_creator (dict -> Dataset, Dataset): Constructor function
handle GPU/devices in this function; RaySGD will do that under
the hood.
data_creator (dict -> Dataset(s)): Constructor function
that takes in the passed config and returns one or
two ``torch.utils.data.Dataset`` objects.
Note that even though two Dataset objects can be returned,
only one dataset will be used for training. RaySGD
will automatically wrap the objects with a ``DataLoader``.
optimizer_creator (models, dict -> optimizers): Constructor
optimizer_creator ((models, dict) -> optimizers): Constructor
function that takes in the return values from
``model_creator`` and the passed config and returns One or
more Torch optimizer objects. You must return as many
optimizers as you have models. You do not need to handle
more Torch optimizer objects. You do not need to handle
GPU/devices in this function; ``RaySGD`` will do that for you.
loss_creator (dict -> loss or torch.nn.*Loss): A constructor function
for the training loss. This can be either a function that
loss_creator (torch.nn.*Loss class | dict -> loss): A constructor
function for the training loss. This can be either a function that
takes in the provided config for customization or a subclass
of ``torch.nn.modules.loss._Loss``, which is most Pytorch
loss classes. For example, ``loss_creator=torch.nn.BCELoss``.
scheduler_creator (optimizers, dict -> loss):
A constructor function for the scheduler loss. This is
a function that takes in the generated optimizers (from
``optimizer_creator``) provided config for customization.
Be sure to set ``scheduler_step_freq`` to increment the
scheduler correctly.
train_function: Custom function for training. This function
will be executed in parallel across all workers at once. The
function needs to take in (models, train_dataloader, criterion,
@@ -104,6 +109,19 @@ class PyTorchTrainer:
support "nccl", "gloo", and "auto". If "auto", RaySGD will
automatically use "nccl" if `use_gpu` is True, and "gloo"
otherwise.
use_fp16 (bool): Enables mixed precision training via apex if apex
is installed. This is automatically done after the model and
optimizers are constructed and will work for multi-model training.
Please see https://github.com/NVIDIA/apex for more details.
apex_args (dict|None): Dict containing keyword args for amp.initialize.
See https://nvidia.github.io/apex/amp.html#module-apex.amp. By
default, the models and optimizers are passed in. Consider using
"num_losses" if operating over multiple models and optimizers.
scheduler_step_freq: "batch", "epoch", or None. This will
determine when ``scheduler.step`` is called. If "batch",
``step`` will be called after every optimizer step. If "epoch",
``step`` will be called after one pass of the DataLoader.
"""
def __init__(self,
@@ -111,6 +129,7 @@ class PyTorchTrainer:
data_creator,
optimizer_creator,
loss_creator,
scheduler_creator=None,
train_function=None,
validation_function=None,
initialization_hook=None,
@@ -119,8 +138,10 @@ class PyTorchTrainer:
num_replicas=1,
use_gpu=False,
batch_size=16,
backend="auto"):
# TODO: add support for mixed precision
backend="auto",
use_fp16=False,
apex_args=None,
scheduler_step_freq="batch"):
if num_replicas > 1 and not dist.is_available():
raise ValueError(
("Distributed PyTorch is not supported on macOS. "
@@ -133,6 +154,7 @@ class PyTorchTrainer:
self.train_function = train_function
self.optimizer_creator = optimizer_creator
self.loss_creator = loss_creator
self.scheduler_creator = scheduler_creator
self.validation_function = validation_function
self.initialization_hook = initialization_hook
self.config = {} if config is None else config
@@ -147,9 +169,25 @@ class PyTorchTrainer:
self.use_gpu = use_gpu
self.batch_size = batch_size
self.max_replicas = num_replicas
self.use_fp16 = use_fp16
if apex_args and not isinstance(apex_args, dict):
raise ValueError("apex_args needs to be a dict object.")
self.apex_args = apex_args
self.temp_dir = tempfile.mkdtemp(prefix="raysgd")
self._num_failures = 0
self._last_resize = float("-inf")
if scheduler_step_freq and (
scheduler_step_freq not in pytorch_utils.VALID_SCHEDULER_STEP):
raise ValueError(
"Scheduler step freq must be in {}. Got {}".format(
pytorch_utils.VALID_SCHEDULER_STEP, scheduler_step_freq))
self.scheduler_step_freq = scheduler_step_freq
self._start_workers(self.max_replicas)
def _start_workers(self, num_replicas):
@@ -165,11 +203,16 @@ class PyTorchTrainer:
self.data_creator,
self.optimizer_creator,
self.loss_creator,
self.scheduler_creator,
train_function=self.train_function,
validation_function=self.validation_function,
config=self.config,
dataloader_config=self.dataloader_config,
batch_size=self.batch_size)
batch_size=self.batch_size,
use_fp16=self.use_fp16,
apex_args=self.apex_args,
scheduler_step_freq=self.scheduler_step_freq,
)
]
if self.initialization_hook:
self.apply_all_workers(self.initialization_hook)
@@ -198,12 +241,16 @@ class PyTorchTrainer:
self.data_creator,
self.optimizer_creator,
self.loss_creator,
self.scheduler_creator,
backend=self.backend,
train_function=self.train_function,
validation_function=self.validation_function,
config=self.config,
dataloader_config=self.dataloader_config,
batch_size=batch_size_per_replica)
batch_size=batch_size_per_replica,
use_fp16=self.use_fp16,
apex_args=self.apex_args,
scheduler_step_freq=self.scheduler_step_freq)
for i in range(num_replicas)
]
if self.initialization_hook:
@@ -219,7 +266,7 @@ class PyTorchTrainer:
for i, worker in enumerate(self.workers)
])
def train(self, max_retries=10, checkpoint="auto"):
def train(self, max_retries=0, checkpoint="auto"):
"""Runs a training epoch.
Runs an average over all values returned from workers. Set
@@ -294,6 +341,14 @@ class PyTorchTrainer:
[s.get(stat_key, np.nan) for s in worker_stats])
return validation_stats
def update_scheduler(self, metric):
"""Calls ``scheduler.step(metric)`` on all schedulers.
This is useful for lr_schedulers such as ``ReduceLROnPlateau``.
"""
self.apply_all_workers(
lambda runner: [sched.step(metric) for sched in runner.schedulers])
def get_model(self):
"""Returns the learned model(s)."""
models = self.model_creator(self.config)
+122 -16
View File
@@ -4,32 +4,87 @@ import torch
from ray.experimental.sgd.utils import TimerStat
amp = None
def train(model, train_iterator, criterion, optimizer, config):
"""Runs 1 training epoch"""
try:
from apex import amp
except ImportError:
# Apex library is not installed, so we cannot enable mixed precision.
# We don't log here because logging happens in the pytorch_runner,
# where amp is initialized.
pass
USE_FP16 = "__use_fp16__"
TEST_MODE = "__test_mode__"
BATCH_COUNT = "batch_processed"
SCHEDULER_STEP = "scheduler_step"
SCHEDULER_STEP_BATCH = "batch"
SCHEDULER_STEP_EPOCH = "epoch"
VALID_SCHEDULER_STEP = {SCHEDULER_STEP_BATCH, SCHEDULER_STEP_EPOCH}
def train(config, model, train_iterator, criterion, optimizer, scheduler=None):
"""Runs one standard training pass over the train_iterator.
This function automatically measures timing for various operations such
as host to device transfer, gradient calculation, and gradient application.
It also automatically detects and places the data on the given GPU device
if available.
The scheduler will only be called at a batch or epoch frequency, depending
on the user parameter. Be sure to set ``scheduler_step_freq`` in
``PyTorchTrainer`` to either "batch" or "epoch" to increment the scheduler
correctly during training. If using a learning rate scheduler
that depends on validation loss, you can use ``trainer.update_scheduler``.
Raises:
ValueError if multiple models/optimizers/schedulers are provided. You
are expected to have a custom training function if you wish
to use multiple models/optimizers/schedulers.
Args:
config: (dict): A user configuration provided into the Trainer
constructor.
model: The model as created by the model_creator.
train_iterator: An iterator created from the DataLoader which
wraps the provided Dataset.
criterion: The loss object created by the loss_creator.
optimizer: The torch.optim.Optimizer object as created by the
optimizer_creator.
scheduler (optional): The torch.optim.lr_scheduler object
as created by the scheduler_creator. Be sure to set
``scheduler_step_freq`` in ``PyTorchTrainer``
to increment the scheduler correctly.
Returns:
A dict of metrics from training.
"""
if isinstance(model, collections.Iterable) or isinstance(
optimizer, collections.Iterable):
optimizer, collections.Iterable) or isinstance(
scheduler, collections.Iterable):
raise ValueError(
"Need to provide custom training function if using multi-model "
"or multi-optimizer training.")
"or multi-scheduler or multi-optimizer training.")
batch_time = AverageMeter()
data_time = AverageMeter()
losses = AverageMeter()
timers = {k: TimerStat() for k in ["d2h", "fwd", "grad", "apply"]}
timers = {k: TimerStat() for k in ["h2d", "fwd", "grad", "apply"]}
# switch to train mode
model.train()
end = time.time()
for i, (features, target) in enumerate(train_iterator):
for batch_idx, (features, target) in enumerate(train_iterator):
# measure data loading time
data_time.update(time.time() - end)
# Create non_blocking tensors for distributed training
with timers["d2h"]:
with timers["h2d"]:
if torch.cuda.is_available():
features = features.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
@@ -45,19 +100,33 @@ def train(model, train_iterator, criterion, optimizer, config):
with timers["grad"]:
# compute gradients in a backward pass
optimizer.zero_grad()
loss.backward()
if config.get(USE_FP16):
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
else:
loss.backward()
with timers["apply"]:
# Call step of optimizer to update model params
optimizer.step()
if scheduler and config.get(SCHEDULER_STEP) == SCHEDULER_STEP_BATCH:
scheduler.step()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()
if config.get(TEST_MODE) and batch_idx == 0:
break
if scheduler and config.get(SCHEDULER_STEP) == SCHEDULER_STEP_EPOCH:
scheduler.step()
stats = {
"batch_time": batch_time.avg,
"batch_processed": losses.count,
BATCH_COUNT: batch_idx + 1,
"train_loss": losses.avg,
"data_time": data_time.avg,
}
@@ -65,11 +134,40 @@ def train(model, train_iterator, criterion, optimizer, config):
return stats
def validate(model, val_iterator, criterion, config):
if isinstance(model, collections.Iterable):
def validate(config, model, val_iterator, criterion, scheduler=None):
"""Runs one standard validation pass over the val_iterator.
This function automatically measures timing for various operations such
as host to device transfer and processing time for the batch.
It also automatically detects and places the data on the given GPU device
if available.
Raises:
ValueError if multiple models/schedulers are provided. You
are expected to have a custom validation function if you wish
to use multiple models/schedulers.
Args:
config: (dict): A user configuration provided into the Trainer
constructor.
model: The model as created by the model_creator.
train_iterator: An iterator created from the DataLoader which
wraps the provided Dataset.
criterion: The loss object created by the loss_creator.
scheduler (optional): The torch.optim.lr_scheduler object
as created by the scheduler_creator. By default,
this is not used in this function.
Returns:
A dict of metrics from the evaluation.
"""
if isinstance(model, collections.Iterable) or isinstance(
scheduler, collections.Iterable):
raise ValueError(
"Need to provide custom validation function if using multi-model "
"training.")
"or multi-scheduler training.")
batch_time = AverageMeter()
losses = AverageMeter()
@@ -77,10 +175,10 @@ def validate(model, val_iterator, criterion, config):
model.eval()
correct = 0
total = 0
batch_idx = 0
with torch.no_grad():
end = time.time()
for i, (features, target) in enumerate(val_iterator):
for batch_idx, (features, target) in enumerate(val_iterator):
if torch.cuda.is_available():
features = features.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
@@ -99,8 +197,16 @@ def validate(model, val_iterator, criterion, config):
batch_time.update(time.time() - end)
end = time.time()
stats = {"batch_time": batch_time.avg, "validation_loss": losses.avg}
stats.update(mean_accuracy=correct / total)
if config.get(TEST_MODE) and batch_idx == 0:
break
stats = {
BATCH_COUNT: batch_idx + 1,
"batch_time": batch_time.avg,
"validation_loss": losses.avg,
"mean_accuracy": correct / total,
"mean_loss": losses.sum / total,
}
return stats
@@ -12,13 +12,29 @@ import ray
from ray import tune
from ray.tests.conftest import ray_start_2_cpus # noqa: F401
from ray.experimental.sgd.pytorch import PyTorchTrainer, PyTorchTrainable
from ray.experimental.sgd.pytorch.utils import train
from ray.experimental.sgd.pytorch.utils import (train, BATCH_COUNT, TEST_MODE,
SCHEDULER_STEP)
from ray.experimental.sgd.utils import check_for_failure
from ray.experimental.sgd.pytorch.examples.train_example import (
model_creator, optimizer_creator, data_creator, LinearDataset)
def test_test_mode(ray_start_2_cpus): # noqa: F811
trainer = PyTorchTrainer(
model_creator,
data_creator,
optimizer_creator,
loss_creator=lambda config: nn.MSELoss(),
config={TEST_MODE: True},
num_replicas=1)
metrics = trainer.train()
assert metrics[BATCH_COUNT] == 1
val_metrics = trainer.validate()
assert val_metrics[BATCH_COUNT] == 1
@pytest.mark.parametrize("num_replicas", [1, 2]
if dist.is_available() else [1])
def test_train(ray_start_2_cpus, num_replicas): # noqa: F811
@@ -28,10 +44,12 @@ def test_train(ray_start_2_cpus, num_replicas): # noqa: F811
optimizer_creator,
loss_creator=lambda config: nn.MSELoss(),
num_replicas=num_replicas)
train_loss1 = trainer.train()["train_loss"]
for i in range(3):
train_loss1 = trainer.train()["train_loss"]
validation_loss1 = trainer.validate()["validation_loss"]
train_loss2 = trainer.train()["train_loss"]
for i in range(3):
train_loss2 = trainer.train()["train_loss"]
validation_loss2 = trainer.validate()["validation_loss"]
print(train_loss1, train_loss2)
@@ -44,11 +62,12 @@ def test_train(ray_start_2_cpus, num_replicas): # noqa: F811
@pytest.mark.parametrize("num_replicas", [1, 2]
if dist.is_available() else [1])
def test_multi_model(ray_start_2_cpus, num_replicas): # noqa: F811
def custom_train(models, dataloader, criterion, optimizers, config):
def custom_train(config, models, dataloader, criterion, optimizers,
**kwargs):
result = {}
for i, (model, optimizer) in enumerate(zip(models, optimizers)):
result["model_{}".format(i)] = train(model, dataloader, criterion,
optimizer, config)
result["model_{}".format(i)] = train(config, model, dataloader,
criterion, optimizer)
return result
def multi_model_creator(config):
@@ -103,7 +122,107 @@ def test_multi_model(ray_start_2_cpus, num_replicas): # noqa: F811
@pytest.mark.parametrize("num_replicas", [1, 2]
if dist.is_available() else [1])
@pytest.mark.xfail
def test_multi_model_matrix(ray_start_2_cpus, num_replicas): # noqa: F811
def custom_train(config, model, dataloader, criterion, optimizer,
scheduler):
if config.get("models", 1) > 1:
assert len(model) == config["models"], config
if config.get("optimizers", 1) > 1:
assert len(optimizer) == config["optimizers"], config
if config.get("schedulers", 1) > 1:
assert len(scheduler) == config["schedulers"], config
return {"done": 1}
def multi_model_creator(config):
models = []
for i in range(config.get("models", 1)):
models += [nn.Linear(1, 1)]
return models[0] if len(models) == 1 else models
def multi_optimizer_creator(models, config):
optimizers = []
main_model = models[0] if type(models) is list else models
for i in range(config.get("optimizers", 1)):
optimizers += [torch.optim.SGD(main_model.parameters(), lr=0.0001)]
return optimizers[0] if len(optimizers) == 1 else optimizers
def multi_scheduler_creator(optimizer, config):
schedulers = []
main_opt = optimizer[0] if type(optimizer) is list else optimizer
for i in range(config.get("schedulers", 1)):
schedulers += [
torch.optim.lr_scheduler.StepLR(
main_opt, step_size=30, gamma=0.1)
]
return schedulers[0] if len(schedulers) == 1 else schedulers
for model_count in range(1, 3):
for optimizer_count in range(1, 3):
for scheduler_count in range(1, 3):
trainer = PyTorchTrainer(
multi_model_creator,
data_creator,
multi_optimizer_creator,
loss_creator=nn.MSELoss,
scheduler_creator=multi_scheduler_creator,
train_function=custom_train,
num_replicas=num_replicas,
config={
"models": model_count,
"optimizers": optimizer_count,
"schedulers": scheduler_count
})
trainer.train()
trainer.shutdown()
@pytest.mark.parametrize("scheduler_freq", ["epoch", "batch"])
def test_scheduler_freq(ray_start_2_cpus, scheduler_freq): # noqa: F811
def custom_train(config, model, dataloader, criterion, optimizer,
scheduler):
assert config[SCHEDULER_STEP] == scheduler_freq
return {"done": 1}
def scheduler_creator(optimizer, config):
return torch.optim.lr_scheduler.StepLR(
optimizer, step_size=30, gamma=0.1)
trainer = PyTorchTrainer(
model_creator,
data_creator,
optimizer_creator,
loss_creator=lambda config: nn.MSELoss(),
scheduler_creator=scheduler_creator)
for i in range(3):
trainer.train()["train_loss"]
trainer.shutdown()
def test_scheduler_validate(ray_start_2_cpus): # noqa: F811
def custom_train(config, model, dataloader, criterion, optimizer,
scheduler):
return {"done": 1}
from torch.optim.lr_scheduler import ReduceLROnPlateau
trainer = PyTorchTrainer(
model_creator,
data_creator,
optimizer_creator,
loss_creator=lambda config: nn.MSELoss(),
scheduler_creator=lambda optimizer, cfg: ReduceLROnPlateau(optimizer))
trainer.update_scheduler(0.5)
trainer.update_scheduler(0.5)
assert all(
trainer.apply_all_workers(lambda r: r.schedulers[0].last_epoch == 2))
trainer.shutdown()
@pytest.mark.parametrize("num_replicas", [1, 2]
if dist.is_available() else [1])
def test_tune_train(ray_start_2_cpus, num_replicas): # noqa: F811
config = {
@@ -114,7 +233,10 @@ def test_tune_train(ray_start_2_cpus, num_replicas): # noqa: F811
"num_replicas": num_replicas,
"use_gpu": False,
"batch_size": 512,
"backend": "gloo"
"backend": "gloo",
"config": {
"lr": 0.001
}
}
analysis = tune.run(