[sgd] Deprecate old distributed SGD implementation (#5160)

* Deprecate old distributed SGD implementation

* Update README
This commit is contained in:
Peter Schafhalter
2019-07-22 15:47:10 -07:00
committed by Richard Liaw
parent 80b976efcb
commit fc589050c9
20 changed files with 1 additions and 3016 deletions
-11
View File
@@ -1,11 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.experimental.sgd.sgd import DistributedSGD
from ray.experimental.sgd.model import Model
__all__ = [
"DistributedSGD",
"Model",
]
@@ -1,152 +0,0 @@
#!/usr/bin/env python
"""Example of how to train a model with Ray SGD.
We use a small model here, so no speedup for distributing the computation is
expected. This example shows:
- How to set up a simple input pipeline
- How to evaluate model accuracy during training
- How to get and set model weights
- How to train with ray.experimental.sgd.DistributedSGD
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import time
from tensorflow.examples.tutorials.mnist import input_data
import tensorflow as tf
import ray
from ray.tune import run_experiments
from ray.tune.examples.tune_mnist_ray import deepnn
from ray.experimental.sgd.model import Model
from ray.experimental.sgd.sgd import DistributedSGD
import ray.experimental.tf_utils as ray_tf_utils
parser = argparse.ArgumentParser()
parser.add_argument("--redis-address", default=None, type=str)
parser.add_argument("--num-iters", default=10000, type=int)
parser.add_argument("--batch-size", default=50, type=int)
parser.add_argument("--num-workers", default=1, type=int)
parser.add_argument("--devices-per-worker", default=1, type=int)
parser.add_argument("--tune", action="store_true", help="Run in Ray Tune")
parser.add_argument(
"--strategy", default="ps", type=str, help="One of 'simple' or 'ps'")
parser.add_argument(
"--gpu", action="store_true", help="Use GPUs for optimization")
class MNISTModel(Model):
def __init__(self):
# Import data
error = None
for _ in range(10):
try:
self.mnist = input_data.read_data_sets(
"/tmp/tensorflow/mnist/input_data", one_hot=True)
error = None
break
except Exception as e:
error = e
time.sleep(5)
if error:
raise ValueError("Failed to import data", error)
# Set seed and build layers
tf.set_random_seed(0)
self.x = tf.placeholder(tf.float32, [None, 784], name="x")
self.y_ = tf.placeholder(tf.float32, [None, 10], name="y_")
y_conv, self.keep_prob = deepnn(self.x)
# Need to define loss and optimizer attributes
self.loss = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(
labels=self.y_, logits=y_conv))
self.optimizer = tf.train.AdamOptimizer(1e-4)
self.variables = ray_tf_utils.TensorFlowVariables(
self.loss, tf.get_default_session())
# For evaluating test accuracy
correct_prediction = tf.equal(
tf.argmax(y_conv, 1), tf.argmax(self.y_, 1))
self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
def get_loss(self):
return self.loss
def get_optimizer(self):
return self.optimizer
def get_variables(self):
return self.variables
def get_feed_dict(self):
batch = self.mnist.train.next_batch(50)
return {
self.x: batch[0],
self.y_: batch[1],
self.keep_prob: 0.5,
}
def get_metrics(self):
accuracy = self.accuracy.eval(
feed_dict={
self.x: self.mnist.test.images,
self.y_: self.mnist.test.labels,
self.keep_prob: 1.0,
})
return {"accuracy": accuracy}
def get_weights(self):
return self.variables.get_flat()
def set_weights(self, weights):
self.variables.set_flat(weights)
def train_mnist(config, reporter):
args = config["args"]
sgd = DistributedSGD(
lambda w_i, d_i: MNISTModel(),
num_workers=args.num_workers,
devices_per_worker=args.devices_per_worker,
gpu=args.gpu,
strategy=args.strategy)
# Important: synchronize the initial weights of all model replicas
w0 = sgd.for_model(lambda m: m.get_variables().get_flat())
sgd.foreach_model(lambda m: m.get_variables().set_flat(w0))
for i in range(args.num_iters):
if i % 10 == 0:
start = time.time()
loss = sgd.step(fetch_stats=True)["loss"]
metrics = sgd.foreach_model(lambda model: model.get_metrics())
acc = [m["accuracy"] for m in metrics]
print("Iter", i, "loss", loss, "accuracy", acc)
print("Time per iteration", time.time() - start)
assert len(set(acc)) == 1, ("Models out of sync", acc)
reporter(timesteps_total=i, mean_loss=loss, mean_accuracy=acc[0])
else:
sgd.step()
if __name__ == "__main__":
args = parser.parse_args()
ray.init(redis_address=args.redis_address)
if args.tune:
run_experiments({
"mnist_sgd": {
"run": train_mnist,
"config": {
"args": args,
},
},
})
else:
train_mnist({"args": args}, lambda **kw: None)
-72
View File
@@ -1,72 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
class Model(object):
"""Your class must implement this interface to be used with Ray SGD.
This supports any form of input pipeline: it is up to you to define it
using TensorFlow.
For an example implementation, see tfbench/test_model.py
"""
def get_loss(self):
"""Return loss of the model
Returns:
loss
"""
raise NotImplementedError(
"get_loss of %s is not implemented" % self.__class__.__name__)
# TODO support complex way of updating gradient,
# e.g. using different optimizers
def get_optimizer(self):
"""Return optimizer for the model
Returns:
optimizer
"""
raise NotImplementedError(
"get_optimizer of %s is not implemented" % self.__class__.__name__)
def get_metrics(self):
"""Return metrics of the model
Returns:
metrics(dict): e.g. {"accuracy": accuracy(numpy data)}
"""
return {}
def get_feed_dict(self):
"""Extra values to pass in when computing gradients for the loss.
Returns:
TensorFlow feed_dict to add to the gradient operation.
"""
return {}
def get_weights(self):
"""Return weights from the model.
Implementing `get_weights` is required for checkpointing and fault
tolerance.
Returns:
Numpy array of weights from the model.
"""
raise NotImplementedError(
"get_weights of %s is not implemented" % self.__class__.__name__)
def set_weights(self, weights):
"""Sets the model weights.
Implementing `set_weights` is required for checkpointing and fault
tolerance.
Args:
weights: numpy array of weights for the model.
"""
raise NotImplementedError(
"set_weights of %s is not implemented" % self.__class__.__name__)
@@ -1,637 +0,0 @@
# This file is adapted from https://github.com/tensorflow/benchmarks
# /blob/master/scripts/tf_cnn_benchmarks/allreduce.py
#
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utilities for allreduce."""
from __future__ import print_function
import collections as pycoll
import logging
import numpy as np
import re
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from tensorflow.contrib.all_reduce.python import all_reduce
logger = logging.getLogger(__name__)
AllReduceSpecTuple = pycoll.namedtuple("AllReduceSpecTuple",
"alg shards limit")
def parse_general_int(s):
"""Parse integer with power-of-2 suffix eg. 32k."""
mo = re.match(r"(\d+)([KkMGT]?)$", s)
if mo:
i, suffix = mo.group(1, 2)
v = int(i)
if suffix:
if suffix == "K" or suffix == "k":
v *= 1024
elif suffix == "M":
v *= (1024 * 1024)
elif suffix == "G":
v *= (1024 * 1024 * 1024)
elif suffix == "T":
v *= (1024 * 1024 * 1024 * 1024)
else:
raise ValueError("invalid integer string %s" % s)
return v
else:
v = int(s)
return v
def parse_all_reduce_spec(all_reduce_spec):
"""Parse all_reduce_spec.
Args:
all_reduce_spec: a string specifying a combination of all-reduce
algorithms to apply for gradient reduction.
Returns:
a list of AllReduceSpecTuple.
Raises:
ValueError: all_reduce_spec is not well-formed.
An all_reduce_spec has BNF form:
int ::= positive whole number
g_int ::= int[KkMGT]?
alg_spec ::= alg | alg#int
range_spec ::= alg_spec | alg_spec/alg_spec
spec ::= range_spec | range_spec:g_int:range_spec
Not all syntactically correct specifications are supported.
Examples of supported all_reduce_spec strings, with semantics explained:
"xring" == apply ring all-reduce to all tensors
"xring#2" == apply ring all-reduce to all tensors, using two simultaneous
transfer rings, each operating on 1/2 of each tensor.
"nccl" == apply NCCL all-reduce to all tensors (only works within
a single worker process where all devices are GPUs)
"nccl/xring" == apply NCCL all-reduce to all tensors within each worker
to produce at least one full-reduced (locally) value,
then apply ring all-reduce to one such value from each
worker, then apply NCCL broadcast to propagate those globally
reduced values back to every device within each worker.
"pscpu" == Shuffle reduce using worker CPUs as the gather devices: each
distributed tensor is reduced by copying all instances to
one of the worker CPUs, computing the reduction there, then
copying back to each participating device. Tensor reductions
are assigned to specific CPUs round-robin.
"psgpu#4" == Arrange all GPUs across all workers into groups of 4.
Each distributed tensor is shuffle reduced against one
such group of 4 GPUs, selected round-robin. That is, each
tensor is split across 4 shards for the reduction.
"pscpu:2k:pscpu#2:64k:xring" == Apply single-shard pscpu to
tensors of size <= 2048 elements, apply 2-shard pscpu to
tensors up to size 64k elements, apply xring to larger tensors.
"pscpu/pscpu#2" == Use shuffle gather to locally reduce each tensor on
the worker's CPU, then use 2-shard shuffle to reduce those
locally reduced tensors across workers (on the worker CPUs), then
scatter the globally reduced values locally from each worker CPU.
"""
range_parts = all_reduce_spec.split(":") + ["-1"]
if len(range_parts) % 2:
raise ValueError(
"all_reduce_spec not well formed: %s" % all_reduce_spec)
limit = 0
spec = []
alg = None
shards = 1
for i, range_part in enumerate(range_parts):
if i % 2 == 1:
try:
limit = parse_general_int(range_part)
spec.append(
AllReduceSpecTuple(alg=alg, shards=shards, limit=limit))
except ValueError:
raise ValueError(
"all_reduce_spec (%s) contains non-integer range %s" %
(all_reduce_spec, range_part))
else:
alg = range_part
alg_parts = range_part.split("#")
alg = alg_parts[0]
if len(alg_parts) > 1:
try:
shards = int(alg_parts[1])
except ValueError:
raise ValueError(
"all_reduce_spec (%s) contains non-integer "
"shards %s" % all_reduce_spec, alg_parts[1])
else:
shards = 1
if alg not in [
"nccl", "nccl/xring", "nccl/rechd", "nccl/pscpu", "xring",
"pscpu", "psgpu", "pscpu/pscpu"
]:
raise ValueError("all_reduce_spec (%s) contains invalid alg %s"
% (all_reduce_spec, alg))
return spec
def build_all_reduce_device_prefixes(job_name, num_tasks):
"""Build list of device prefix names for all_reduce.
Args:
job_name: "worker", "ps" or "localhost".
num_tasks: number of jobs across which device names should be generated.
Returns:
A list of device name prefix strings. Each element spells out the full
host name without adding the device.
e.g. "/job:worker/task:0"
"""
if job_name != "localhost":
return ["/job:%s/task:%d" % (job_name, d) for d in range(0, num_tasks)]
else:
assert num_tasks == 1
return ["/job:%s" % job_name]
def group_device_names(devices, group_size):
"""Group device names into groups of group_size.
Args:
devices: list of strings naming devices.
group_size: int >= 1
Returns:
list of lists of devices, where each inner list is group_size long,
and each device appears at least once in an inner list. If
len(devices) % group_size = 0 then each device will appear
exactly once.
Raises:
ValueError: group_size > len(devices)
"""
num_devices = len(devices)
if group_size > num_devices:
raise ValueError(
"only %d devices, but group_size=%d" % (num_devices, group_size))
num_groups = (
num_devices // group_size + (1 if
(num_devices % group_size != 0) else 0))
groups = [[] for i in range(num_groups)]
for i in range(0, num_groups * group_size):
groups[i % num_groups].append(devices[i % num_devices])
return groups
def split_grads_by_size(threshold_size, device_grads):
"""Break gradients into two sets according to tensor size.
Args:
threshold_size: int size cutoff for small vs large tensor.
device_grads: List of lists of (gradient, variable) tuples. The outer
list is over devices. The inner list is over individual gradients.
Returns:
small_grads: Subset of device_grads where shape is <= theshold_size
elements.
large_grads: Subset of device_grads where shape is > threshold_size
elements.
"""
small_grads = []
large_grads = []
for dl in device_grads:
small_dl = []
large_dl = []
for (g, v) in dl:
tensor_size = g.get_shape().num_elements()
if tensor_size <= threshold_size:
small_dl.append([g, v])
else:
large_dl.append([g, v])
if small_dl:
small_grads.append(small_dl)
if large_dl:
large_grads.append(large_dl)
return small_grads, large_grads
def build_reduce_sum(scaled_grads):
stacked = tf.parallel_stack(values=scaled_grads)
reduced = tf.reduce_sum(stacked, 0)
return [reduced] * len(scaled_grads)
def build_trivial_sum(scaled_grads):
return scaled_grads
def aggregate_single_gradient(grad_and_vars, use_mean, check_inf_nan):
"""Calculate the average gradient for a shared variable across all towers.
Note that this function provides a synchronization point across all towers.
Args:
grad_and_vars: A list or tuple of (gradient, variable) tuples. Each
(gradient, variable) pair within the outer list represents the gradient
of the variable calculated for a single tower, and the number of pairs
equals the number of towers.
use_mean: if True, mean is taken, else sum of gradients is taken.
check_inf_nan: check grads for nans and infs.
Returns:
The tuple ([(average_gradient, variable),], has_nan_or_inf) where the
gradient has been averaged across all towers. The variable is chosen from
the first tower. The has_nan_or_inf indicates the grads has nan or inf.
"""
grads = [g for g, _ in grad_and_vars]
grad = tf.add_n(grads)
if use_mean and len(grads) > 1:
grad = tf.multiply(grad, 1.0 / len(grads))
v = grad_and_vars[0][1]
if check_inf_nan:
has_nan_or_inf = tf.logical_not(tf.reduce_all(tf.is_finite(grads)))
return (grad, v), has_nan_or_inf
else:
return (grad, v), None
def aggregate_gradients_using_copy_with_device_selection(
tower_grads, avail_devices, use_mean=True, check_inf_nan=False):
"""Aggregate gradients, controlling device for the aggregation.
Args:
tower_grads: List of lists of (gradient, variable) tuples. The outer list
is over towers. The inner list is over individual gradients.
use_mean: if True, mean is taken, else sum of gradients is taken.
check_inf_nan: If true, check grads for nans and infs.
Returns:
The tuple ([(average_gradient, variable),], has_nan_or_inf) where the
gradient has been averaged across all towers. The variable is chosen from
the first tower. The has_nan_or_inf indicates the grads has nan or inf.
"""
agg_grads = []
has_nan_or_inf_list = []
for i, single_grads in enumerate(zip(*tower_grads)):
with tf.device(avail_devices[i % len(avail_devices)]):
grad_and_var, has_nan_or_inf = aggregate_single_gradient(
single_grads, use_mean, check_inf_nan)
agg_grads.append(grad_and_var)
has_nan_or_inf_list.append(has_nan_or_inf)
return agg_grads
def sum_grad_and_var_all_reduce(grad_and_vars,
num_workers,
alg,
gpu_indices,
aux_devices=None,
num_shards=1):
"""Apply all-reduce algorithm over specified gradient tensors."""
with tf.name_scope("allreduce"):
# Note that each grad_and_vars looks like the following:
# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
scaled_grads = [g for g, _ in grad_and_vars]
if alg == "nccl":
from tensorflow.python.ops import nccl_ops
summed_grads = nccl_ops.all_sum(scaled_grads)
elif alg == "simple":
summed_grads = build_reduce_sum(scaled_grads)
elif alg == "trivial":
summed_grads = build_trivial_sum(scaled_grads)
elif alg == "xring":
summed_grads = all_reduce.build_ring_all_reduce(
scaled_grads, num_workers, num_shards, gpu_indices, tf.add)
elif alg == "nccl/xring":
summed_grads = all_reduce.build_nccl_then_ring(
scaled_grads, num_shards, tf.add)
elif alg == "nccl/rechd":
summed_grads = all_reduce.build_nccl_then_recursive_hd(
scaled_grads, tf.add)
elif alg == "nccl/pscpu":
summed_grads = all_reduce.build_nccl_then_shuffle(
scaled_grads, aux_devices, tf.add, tf.add_n)
elif alg == "pscpu/pscpu":
summed_grads = all_reduce.build_shuffle_then_shuffle(
scaled_grads,
aux_devices,
# TODO(tucker): devise a way of better specifying the device
# for the second level.
[aux_devices[0]],
tf.add_n)
elif alg in ["pscpu", "psgpu"]:
summed_grads = all_reduce.build_shuffle_all_reduce(
scaled_grads, aux_devices, tf.add_n)
else:
raise ValueError("unsupported all_reduce alg: ", alg)
result = []
for (_, v), g in zip(grad_and_vars, summed_grads):
result.append([g, v])
return result
def contains_any(haystack, needles):
"""Tests if any needle is a substring of haystack.
Args:
haystack: a string
needles: list of strings
Returns:
True if any element of needles is a substring of haystack,
False otherwise.
"""
for n in needles:
if n in haystack:
return True
return False
def sum_gradients_all_reduce(dev_prefixes,
tower_grads,
num_workers,
alg,
num_shards,
gpu_indices,
agg_small_grads_max_bytes=0):
"""Apply all-reduce algorithm over specified gradient tensors.
Args:
dev_prefixes: list of prefix strings to use to generate PS device names.
tower_grads: the gradients to reduce.
num_workers: number of worker processes across entire job.
alg: the all-reduce algorithm to apply.
num_shards: alg-specific sharding factor.
gpu_indices: indices of local GPUs in order usable for ring-reduce.
agg_small_grads_max_bytes: largest tensor eligible for aggregation,
in number of bytes.
Returns:
list of reduced tensors, packing values
"""
alg_contains_shuffle = contains_any(alg, ["pscpu", "psgpu"])
is_hierarchical = "/" in alg
if "pscpu" in alg:
aux_devices = [prefix + "/cpu:0" for prefix in dev_prefixes]
elif "psgpu" in alg:
aux_devices = [
prefix + "/gpu:%d" % i for i in range(len(gpu_indices))
for prefix in dev_prefixes
]
else:
aux_devices = ["/job:localhost/cpu:0"]
aux_device_groups = group_device_names(
aux_devices, num_shards if alg_contains_shuffle else 1)
group_index = 0
if agg_small_grads_max_bytes > 0:
tower_grads, packing = pack_small_tensors(
tower_grads, max_bytes=agg_small_grads_max_bytes)
else:
packing = None
new_tower_grads = []
if alg == "better":
raw_devices = ["/gpu:%i" % (i) for i in gpu_indices]
agg_grads = aggregate_gradients_using_copy_with_device_selection(
tower_grads, raw_devices)
for arr in tower_grads:
new_tower_grads.append(
[(g, v) for (_, v), (g, _) in zip(arr, agg_grads)])
else:
reduced_gv_list = []
for grad_and_vars in zip(*tower_grads):
reduced_gv_list.append(
sum_grad_and_var_all_reduce(
grad_and_vars, num_workers, alg, gpu_indices, aux_devices
if is_hierarchical else aux_device_groups[group_index],
num_shards))
group_index = (group_index + 1) % len(aux_device_groups)
new_tower_grads = [list(x) for x in zip(*reduced_gv_list)]
return new_tower_grads, packing
def print_stats(sizes):
def sizeof_fmt(num, suffix="B"):
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, "Yi", suffix)
stats = {
"avg": np.mean(sizes),
"median": np.median(sizes),
"total size": np.sum(sizes)
}
logger.info("Stats " + ", ".join(
["%s: %s" % (k, sizeof_fmt(v)) for k, v in stats.items()]))
other_stats = {"len": len(sizes)}
logger.info(", ".join(["%s: %f" % (k, v) for k, v in other_stats.items()]))
def extract_ranges(index_list, range_size_limit=32):
"""Extract consecutive ranges and singles from index_list.
Args:
index_list: List of monotone increasing non-negative integers.
range_size_limit: Largest size range to return. If a larger
consecutive range exists it will be returned as multiple
ranges.
Returns:
ranges, singles where ranges is a list of [first, last] pairs of
consecutive elements in index_list, and singles is all of the
other elements, in original order.
"""
if not index_list:
return [], []
first = index_list[0]
last = first
ranges = []
singles = []
for i in index_list[1:]:
if i == last + 1 and (last - first) <= range_size_limit:
last = i
else:
if last > first:
ranges.append([first, last])
else:
singles.append(first)
first = i
last = i
if last > first:
ranges.append([first, last])
else:
singles.append(first)
return ranges, singles
GradPackTuple = pycoll.namedtuple("GradPackTuple", "indices vars shapes")
def pack_range(key, packing, grad_vars, rng):
"""Form the concatenation of a specified range of gradient tensors.
Args:
key: Value under which to store meta-data in packing that will be used
later to restore the grad_var list structure.
packing: Dict holding data describing packed ranges of small tensors.
grad_vars: List of (grad, var) pairs for one tower.
rng: A pair of integers giving the first, last indices of a consecutive
range of tensors to be packed.
Returns:
A tensor that is the concatenation of all the specified small tensors.
"""
to_pack = grad_vars[rng[0]:rng[1] + 1]
members = []
variables = []
restore_shapes = []
with tf.name_scope("pack"):
for g, v in to_pack:
variables.append(v)
restore_shapes.append(g.shape)
with tf.device(g.device):
members.append(tf.reshape(g, [-1]))
packing[key] = GradPackTuple(
indices=range(rng[0], rng[1] + 1),
vars=variables,
shapes=restore_shapes)
with tf.device(members[0].device):
return tf.concat(members, 0)
def unpack_grad_tuple(gv, gpt):
"""Unpack a previously packed collection of gradient tensors.
Args:
gv: A (grad, var) pair to be unpacked.
gpt: A GradPackTuple describing the packing operation that produced gv.
Returns:
A list of (grad, var) pairs corresponding to the values that were
originally packed into gv, maybe following subsequent operations like
reduction.
"""
elt_widths = [x.num_elements() for x in gpt.shapes]
with tf.device(gv[0][0].device):
with tf.name_scope("unpack"):
splits = tf.split(gv[0], elt_widths)
unpacked_gv = []
for idx, s in enumerate(splits):
unpacked_gv.append((tf.reshape(s, gpt.shapes[idx]),
gpt.vars[idx]))
return unpacked_gv
def pack_small_tensors(tower_grads, max_bytes=0):
"""Concatenate gradients together more intelligently.
Does binpacking
Args:
tower_grads: List of lists of (gradient, variable) tuples.
max_bytes: Int giving max number of bytes in a tensor that
may be considered small.
"""
assert max_bytes >= 0
orig_grads = [g for g, _ in tower_grads[0]]
# Check to make sure sizes are accurate; not entirely important
assert all(g.dtype == tf.float32 for g in orig_grads)
sizes = [4 * g.shape.num_elements() for g in orig_grads]
print_stats(sizes)
small_ranges = []
large_indices = []
new_sizes = []
def end_interval(indices, small_ranges, large_indices):
if len(indices) > 1:
small_ranges.insert(0, [indices[0], indices[-1]])
else:
large_indices.insert(0, indices[0])
cur_range = []
cur_size = 0
for i, s in reversed(list(enumerate(sizes))):
if cur_size > max_bytes:
end_interval(cur_range, small_ranges, large_indices)
new_sizes.insert(0, cur_size)
cur_range = []
cur_size = 0
cur_range.insert(0, i)
cur_size += s
end_interval(cur_range, small_ranges, large_indices)
new_sizes.insert(0, cur_size)
print_stats(new_sizes)
num_gv = len(orig_grads)
packing = {}
if len(small_ranges):
new_tower_grads = []
for dev_idx, gv_list in enumerate(tower_grads):
assert len(gv_list) == num_gv, (
"Possible cause: "
"Networks constructed on different workers "
"don't have the same number of variables. "
"If you use tf.GraphKeys or tf.global_variables() "
"with multiple graphs per worker during network "
"construction, you need to use "
"appropriate scopes, see "
"https://github.com/ray-project/ray/issues/3136")
new_gv_list = []
for r in small_ranges:
key = "%d:%d" % (dev_idx, len(new_gv_list))
new_gv_list.append((pack_range(key, packing, gv_list, r),
"packing_var_placeholder"))
for i in large_indices:
new_gv_list.append(gv_list[i])
new_tower_grads.append(new_gv_list)
return new_tower_grads, packing
else:
return tower_grads, None
def unpack_small_tensors(tower_grads, packing):
"""Undo the structure alterations to tower_grads done by pack_small_tensors.
Args:
tower_grads: List of List of (grad, var) tuples.
packing: A dict generated by pack_small_tensors describing the changes
it made to tower_grads.
Returns:
new_tower_grads: identical to tower_grads except that concatentations
of small tensors have been split apart and returned to their original
positions, paired with their original variables.
"""
if not packing:
return tower_grads
new_tower_grads = []
num_devices = len(tower_grads)
num_packed = len(packing.keys()) // num_devices
for dev_idx, gv_list in enumerate(tower_grads):
new_gv_list = gv_list[num_packed:]
for i in xrange(0, num_packed):
k = "%d:%d" % (dev_idx, i)
gpt = packing[k]
gv = unpack_grad_tuple(gv_list[i], gpt)
for gi, idx in enumerate(gpt.indices):
assert idx == gpt.indices[gi]
new_gv_list.insert(idx, gv[gi])
new_tower_grads.append(new_gv_list)
return new_tower_grads
@@ -1,83 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import numpy as np
import ray
from ray.experimental.sgd.util import Timeline, fetch, warmup
logger = logging.getLogger(__name__)
@ray.remote(num_cpus=0)
class ParameterServer(object):
"""Helper class for ray.experimental.sgd.DistributedSGD."""
def __init__(self, num_workers, tid):
self.num_sgd_workers = num_workers
self.acc_counter = 0
self.timeline = Timeline(tid)
# TODO(ekl) get this to work again so we get ray events
# self.timeline.patch_ray()
def initialize(self, shard_shape):
"""Resets the gradient buffer to zeros."""
self.accumulated = np.zeros(shard_shape, dtype=np.float32)
def prefetch(self, oids):
"""Tell plasma to prefetch the given object ids over the network."""
self.timeline.reset()
self.timeline.start("prefetch")
fetch(oids)
self.timeline.end("prefetch")
def add_spinwait(self, grad_shard_ids):
"""Optimized version of add() that operates on multiple grads."""
self.timeline.start("add_spinwait")
plasma_ids = [ray.pyarrow.plasma.ObjectID(x) for x in grad_shard_ids]
while plasma_ids:
for p in plasma_ids:
if ray.worker.global_worker.plasma_client.contains(p):
self.timeline.start("get_buffers")
grads = ray.worker.global_worker.plasma_client.get(p)
self.accumulated += grads
self.acc_counter += 1
self.timeline.end("get_buffers")
plasma_ids.remove(p)
break
self.timeline.end("add_spinwait")
def add(self, grad_shard_id):
"""Add the given gradient value to the accumulated gradients."""
self.timeline.start("add")
self.timeline.start("get_buffers")
oid = ray.pyarrow.plasma.ObjectID(grad_shard_id)
grads = ray.worker.global_worker.plasma_client.get(oid)
self.timeline.end("get_buffers")
self.accumulated += grads
self.acc_counter += 1
self.timeline.end("add")
def get(self, object_id):
"""Put the accumulated gradients to the given object id."""
self.timeline.start("get")
client = ray.worker.global_worker.plasma_client
assert self.acc_counter == self.num_sgd_workers, self.acc_counter
oid = ray.pyarrow.plasma.ObjectID(object_id)
self.accumulated /= self.acc_counter
client.put(self.accumulated.flatten(), object_id=oid)
self.accumulated = np.zeros_like(self.accumulated)
self.acc_counter = 0
self.timeline.end("get")
def get_timeline(self):
return self.timeline
def ip(self):
return ray.services.get_node_ip_address()
def warmup(self):
warmup()
-274
View File
@@ -1,274 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import os
import random
import time
import numpy as np
import ray
from ray.experimental.sgd.sgd_worker import SGDWorker
from ray.experimental.sgd.param_server import ParameterServer
logger = logging.getLogger(__name__)
class DistributedSGD(object):
"""Experimental distributed SGD implementation in Ray.
This supports two modes:
'simple': centralized gradient aggregation
'ps': sharded parameter-server implementation
To use this class, you'll have to implement model.py:Model.
Arguments:
model_creator (func): Function that returns a model given worker and
device indexes as arguments. Each model replica will be created
within its own variable scope.
num_workers (int): Number of Ray actors to use for SGD.
devices_per_worker (int): Number of GPU or CPU devices to use per
worker. One model replica will be created per device.
gpu (bool): Whether to use GPU devices.
strategy (str): Strategy to use for distributed gradient aggregation.
This only applies if num_workers > 1.
grad_shard_bytes (int): Fuse gradient tensors into chunks of at most
this size (if applicable).
all_reduce_alg (str): TensorFlow strategy to use for gradient
synchronization within the same worker (if applicable).
See modified_allreduce.py for options.
Examples:
>>> # Setup distributed SGD
>>> model_creator = (
... lambda worker_idx, device_idx: YourModelClass(...))
>>> sgd = DistributedSGD(
... model_creator, num_workers=2,
... devices_per_worker=4, gpu=True, strategy="ps")
>>> # To train
>>> for i in range(100):
... stats = sgd.step(fetch_stats=i % 10 == 0)
>>> # To access or update model state
>>> sgd.foreach_model(lambda model: ...)
>>> # To access or update worker state
>>> sgd.foreach_worker(lambda worker: ...)
"""
def __init__(self,
model_creator,
num_workers,
devices_per_worker,
gpu=True,
strategy="ps",
grad_shard_bytes=10000000,
all_reduce_alg="simple"):
if num_workers == 1 and strategy == "ps":
logger.warning(
"The parameter server strategy does not make sense for single "
"worker operation, falling back to simple mode.")
strategy = "simple"
if strategy == "ps":
use_plasma_op = True
elif strategy == "simple":
use_plasma_op = False
grad_shard_bytes = 0 # tensor fusion doesn't make sense
else:
raise ValueError("strategy must be one of 'ps', 'simple'")
self.strategy = strategy
self.model_creator = model_creator
if gpu:
requests = {"num_gpus": devices_per_worker}
else:
requests = {"num_cpus": devices_per_worker}
RemoteSGDWorker = ray.remote(**requests)(SGDWorker)
self.workers = []
logger.info(
"Creating SGD workers ({} total, {} devices per worker)".format(
num_workers, devices_per_worker))
for worker_index in range(num_workers):
self.workers.append(
RemoteSGDWorker.remote(
worker_index,
model_creator,
num_devices=devices_per_worker,
plasma_op=use_plasma_op,
gpu=gpu,
max_bytes=grad_shard_bytes,
all_reduce_alg=all_reduce_alg))
logger.info("Waiting for gradient configuration")
shard_shapes = ray.get(self.workers[0].shard_shapes.remote())
logger.info("Waiting for actors to start")
ray.get([w.shard_shapes.remote() for w in self.workers])
if strategy == "ps":
logger.info("Starting parameter servers ({} shards)".format(
len(shard_shapes)))
self.ps_list = [
ParameterServer.remote(len(self.workers), i)
for i, s in enumerate(shard_shapes)
]
ray.get([
ps.initialize.remote(s)
for ps, s in zip(self.ps_list, shard_shapes)
])
logger.info("Parameter servers started")
else:
self.ps_list = []
def foreach_worker(self, fn):
"""Apply the given function to each remote worker.
Returns:
List of results from applying the function.
"""
results = ray.get([w.foreach_worker.remote(fn) for w in self.workers])
return results
def foreach_model(self, fn):
"""Apply the given function to each model replica in each worker.
Returns:
List of results from applying the function.
"""
results = ray.get([w.foreach_model.remote(fn) for w in self.workers])
out = []
for r in results:
out.extend(r)
return out
def for_model(self, fn):
"""Apply the given function to a single model replica.
Returns:
Result from applying the function.
"""
return ray.get(self.workers[0].for_model.remote(fn))
def step(self, fetch_stats=False):
"""Run a single SGD step.
Arguments:
fetch_stats (bool): Whether to return stats from the step. This can
slow down the computation by acting as a global barrier.
"""
if self.strategy == "ps":
return _distributed_sgd_step(
self.workers,
self.ps_list,
write_timeline=False,
fetch_stats=fetch_stats)
else:
return _simple_sgd_step(self.workers)
def warmup(self):
logger.info("Warming up object store of worker actors")
ray.get([w.warmup.remote() for w in self.workers])
logger.info("Warmup complete")
def save_checkpoint(self, path):
w0 = self.for_model(lambda m: m.get_weights())
filename = os.path.join(path, "model.npy")
np.save(filename, w0)
def restore_checkpoint(self, path):
filename = os.path.join(path, "model.npy")
w0 = np.load(filename)
self.foreach_model(lambda m: m.set_weights(w0))
def _average_gradients(grads):
out = []
for grad_list in zip(*grads):
out.append(np.mean(grad_list, axis=0))
return out
def _simple_sgd_step(actors):
if len(actors) == 1:
return {"loss": ray.get(actors[0].compute_apply.remote())}
start = time.time()
fetches = ray.get([a.compute_gradients.remote() for a in actors])
losses = [f[0] for f in fetches]
grads = [f[1] for f in fetches]
logger.debug("compute all grads time {}".format(time.time() - start))
start = time.time()
if len(actors) == 1:
assert len(grads) == 1
avg_grad = grads[0]
else:
avg_grad = _average_gradients(grads)
logger.debug("grad reduce time {}".format(time.time() - start))
start = time.time()
ray.get([a.apply_gradients.remote(avg_grad) for a in actors])
logger.debug("apply all grads time {}".format(time.time() - start))
return {"loss": np.mean(losses)}
def _distributed_sgd_step(actors, ps_list, fetch_stats, write_timeline):
# Preallocate object ids that actors will write gradient shards to
grad_shard_oids_list = [[np.random.bytes(20) for _ in ps_list]
for _ in actors]
logger.debug("Generated grad oids")
# Preallocate object ids that param servers will write new weights to
accum_shard_ids = [np.random.bytes(20) for _ in ps_list]
logger.debug("Generated accum oids")
# Kick off the fused compute grad / update weights tf run for each actor
losses = []
for actor, grad_shard_oids in zip(actors, grad_shard_oids_list):
losses.append(
actor.ps_compute_apply.remote(
grad_shard_oids,
accum_shard_ids,
write_timeline=write_timeline))
logger.debug("Launched all ps_compute_applys on all actors")
# Issue prefetch ops
for j, (ps, weight_shard_oid) in list(
enumerate(zip(ps_list, accum_shard_ids)))[::-1]:
to_fetch = []
for grad_shard_oids in grad_shard_oids_list:
to_fetch.append(grad_shard_oids[j])
random.shuffle(to_fetch)
ps.prefetch.remote(to_fetch)
logger.debug("Launched all prefetch ops")
# Aggregate the gradients produced by the actors. These operations
# run concurrently with the actor methods above.
ps_gets = []
for j, (ps, weight_shard_oid) in list(
enumerate(zip(ps_list, accum_shard_ids)))[::-1]:
ps.add_spinwait.remote([gs[j] for gs in grad_shard_oids_list])
ps_gets.append(ps.get.remote(weight_shard_oid))
logger.debug("Launched all aggregate ops")
if write_timeline:
timelines = [ps.get_timeline.remote() for ps in ps_list]
logger.debug("Launched timeline gets")
timelines = ray.get(timelines)
t0 = timelines[0]
for t in timelines[1:]:
t0.merge(t)
t0.chrome_trace_format("ps_timeline.json")
else:
# Wait for at least the ps gets to finish
ray.get(ps_gets)
if fetch_stats:
return {"loss": np.mean(ray.get(losses))}
else:
return None
-265
View File
@@ -1,265 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import time
import pyarrow.plasma as plasma
import tensorflow as tf
import ray
from ray.experimental.sgd.util import (ensure_plasma_tensorflow_op, fetch,
run_timeline, warmup)
from ray.experimental.sgd.modified_allreduce import (sum_gradients_all_reduce,
unpack_small_tensors)
logger = logging.getLogger(__name__)
class SGDWorker(object):
"""Helper class for ray.experimental.sgd.DistributedSGD."""
def __init__(self,
worker_index,
model_creator,
all_reduce_alg="simple",
num_devices=1,
gpu=False,
max_bytes=10000000,
plasma_op=False):
self.worker_index = worker_index
assert num_devices > 0
# TODO(ekl) support custom session
tf_session_args = {
"device_count": {
"CPU": num_devices
},
"log_device_placement": False,
"gpu_options": tf.GPUOptions(force_gpu_compatible=True),
"inter_op_parallelism_threads": 128,
}
config_proto = tf.ConfigProto(**tf_session_args)
self.sess = tf.Session(config=config_proto)
self.models = []
grad_ops = []
if gpu:
device_tmpl = "/gpu:%d"
else:
device_tmpl = "/cpu:%d"
with self.sess.as_default():
for device_idx in range(num_devices):
device = device_tmpl % device_idx
with tf.device(device):
with tf.variable_scope("device_%d" % device_idx):
model = model_creator(worker_index, device_idx)
self.models.append(model)
optimizer = model.get_optimizer()
loss = model.get_loss()
grads = [
t for t in optimizer.compute_gradients(loss)
if t[0] is not None
]
grad_ops.append(grads)
if num_devices == 1:
if max_bytes:
raise ValueError(
"Implementation limitation: grad_shard_bytes > 0 "
"({}) currently requires > 1 device".format(max_bytes))
self.packed_grads_and_vars = grad_ops
else:
if max_bytes:
self.packed_grads_and_vars, packing_vals = (
sum_gradients_all_reduce(
"",
grad_ops,
1,
all_reduce_alg,
1,
list(range(num_devices)),
agg_small_grads_max_bytes=max_bytes))
else:
self.packed_grads_and_vars, _ = (sum_gradients_all_reduce(
"",
grad_ops,
1,
all_reduce_alg,
1,
list(range(num_devices)),
agg_small_grads_max_bytes=0))
self.per_device_grads = [
list(zip(*dev_gv))[0] for dev_gv in self.packed_grads_and_vars
]
assert (len(self.per_device_grads) == num_devices)
self.num_grads = num_grads = len(self.packed_grads_and_vars[0])
if max_bytes:
logger.info("Packed grads => {} tensors".format(num_grads))
# Ops for reading grads with the right control deps
nccl_noops = []
for j in range(num_grads)[::-1]:
deps = nccl_noops + [
dev_grad[j] for dev_grad in self.per_device_grads
]
with tf.control_dependencies(deps):
nccl_noops = [tf.no_op()]
# You must fetch this otherwise the NCCL allreduce will hang
self.nccl_control_out = tf.group(*nccl_noops)
if plasma_op:
store_socket = (
ray.worker.global_worker.plasma_client.store_socket_name)
ensure_plasma_tensorflow_op()
# For fetching grads -> plasma
self.plasma_in_grads = []
self.plasma_in_grads_oids = [
tf.placeholder(shape=[], dtype=tf.string, name="in_grad_oids")
for _ in range(num_grads)
]
for j in range(num_grads):
grad = self.per_device_grads[0][j]
with tf.device(self.models[0].get_loss().device):
plasma_grad = plasma.tf_plasma_op.tensor_to_plasma(
[grad],
self.plasma_in_grads_oids[j],
plasma_store_socket_name=store_socket)
self.plasma_in_grads.append(plasma_grad)
# For applying grads <- plasma
unpacked_gv = []
self.plasma_out_grads_oids = [
tf.placeholder(
shape=[], dtype=tf.string, name="grad_out_oids")
for _ in range(num_grads)
]
packed_plasma_grads = []
for j in range(num_grads):
with tf.device(self.plasma_in_grads[j].device):
with tf.control_dependencies([self.plasma_in_grads[j]]):
grad_ph = plasma.tf_plasma_op.plasma_to_tensor(
self.plasma_out_grads_oids[j],
dtype=tf.float32,
plasma_store_socket_name=store_socket)
grad_ph = tf.reshape(grad_ph,
self.packed_grads_and_vars[0][j][0].shape)
logger.debug("Packed tensor {}".format(grad_ph))
packed_plasma_grads.append(grad_ph)
for i in range(num_devices):
per_device = []
for j, (g, v) in enumerate(self.packed_grads_and_vars[i]):
grad_ph = packed_plasma_grads[j]
per_device.append((grad_ph, v))
unpacked_gv.append(per_device)
if max_bytes:
unpacked_gv = unpack_small_tensors(unpacked_gv, packing_vals)
elif max_bytes:
unpacked_gv = unpack_small_tensors(self.packed_grads_and_vars,
packing_vals)
else:
unpacked_gv = self.packed_grads_and_vars
# Same shape as packed_grads_and_vars
assert len(unpacked_gv) == num_devices
assert len(unpacked_gv[0][0]) == 2
apply_ops = []
to_apply = unpacked_gv[0]
for ix, m in enumerate(self.models):
apply_ops.append(m.get_optimizer().apply_gradients([
(g, v) for ((g, _), (_, v)) in zip(to_apply, unpacked_gv[ix])
]))
self.apply_op = tf.group(*apply_ops)
init_op = tf.group(tf.global_variables_initializer(),
tf.local_variables_initializer())
self.sess.run(init_op)
def _grad_feed_dict(self):
# Aggregate feed dicts for each model on this worker.
feed_dict = {}
for model in self.models:
feed_dict.update(model.get_feed_dict())
return feed_dict
def foreach_model(self, fn):
with self.sess.as_default():
return [fn(m) for m in self.models]
def foreach_worker(self, fn):
with self.sess.as_default():
return fn(self)
def for_model(self, fn):
with self.sess.as_default():
return fn(self.models[0])
def compute_gradients(self):
start = time.time()
feed_dict = self._grad_feed_dict()
# We only need to fetch the first per_device_grad, since they are
# averaged across all devices by allreduce.
fetches = self.sess.run(
[
self.models[0].get_loss(), self.per_device_grads[0],
self.nccl_control_out
],
feed_dict=feed_dict)
logger.debug(
"Compute grad interior time {}".format(time.time() - start))
return fetches
def apply_gradients(self, avg_grads):
start = time.time()
result = {
g: avg_grads[i]
for (i, g) in enumerate(self.per_device_grads[0])
}
self.sess.run(self.apply_op, feed_dict=result)
logger.debug("Apply grad interior time {}".format(time.time() - start))
def compute_apply(self):
fetches = run_timeline(
self.sess,
[self.models[0].get_loss(), self.apply_op, self.nccl_control_out],
feed_dict=self._grad_feed_dict(),
name="compute_apply")
return fetches[0]
def ps_compute_apply(self,
out_grad_shard_oids,
agg_grad_shard_oids,
tl_name="ps_compute_apply",
write_timeline=False):
feed_dict = self._grad_feed_dict()
feed_dict.update(
dict(zip(self.plasma_in_grads_oids, out_grad_shard_oids)))
feed_dict.update(
dict(zip(self.plasma_out_grads_oids, agg_grad_shard_oids)))
fetch(agg_grad_shard_oids)
fetches = run_timeline(
self.sess, [
self.models[0].get_loss(), self.plasma_in_grads, self.apply_op,
self.nccl_control_out
],
feed_dict=feed_dict,
write_timeline=write_timeline)
return fetches[0]
def num_grad_shards(self):
return self.num_grads
def shard_shapes(self):
main_gv = self.packed_grads_and_vars[0]
return [g.shape for g, _ in main_gv]
def ip(self):
return ray.services.get_node_ip_address()
def warmup(self):
warmup()
@@ -1,77 +0,0 @@
#!/usr/bin/env python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
import time
import ray
from ray.experimental.sgd.tfbench.test_model import TFBenchModel
from ray.experimental.sgd.sgd import DistributedSGD
parser = argparse.ArgumentParser()
parser.add_argument("--redis-address", default=None, type=str)
parser.add_argument("--num-iters", default=10, type=int)
parser.add_argument("--batch-size", default=1, type=int)
parser.add_argument("--num-workers", default=2, type=int)
parser.add_argument("--grad-shard-bytes", default=10000000, type=int)
parser.add_argument("--devices-per-worker", default=2, type=int)
parser.add_argument("--all-reduce-alg", default="simple", type=str)
parser.add_argument("--object-store-memory", default=None, type=int)
parser.add_argument("--checkpoint-dir", default="/tmp", type=str)
parser.add_argument(
"--strategy", default="simple", type=str, help="One of 'simple' or 'ps'")
parser.add_argument(
"--gpu", action="store_true", help="Use GPUs for optimization")
if __name__ == "__main__":
args, _ = parser.parse_known_args()
ray.init(
redis_address=args.redis_address,
object_store_memory=args.object_store_memory)
model_creator = (
lambda worker_idx, device_idx: TFBenchModel(
batch=args.batch_size, use_cpus=not args.gpu))
sgd = DistributedSGD(
model_creator,
num_workers=args.num_workers,
devices_per_worker=args.devices_per_worker,
gpu=args.gpu,
strategy=args.strategy,
grad_shard_bytes=args.grad_shard_bytes,
all_reduce_alg=args.all_reduce_alg)
if not os.path.exists(args.checkpoint_dir):
raise ValueError(
"Checkpoint directory does not exist: %s" % args.checkpoint_dir)
def step(i):
start = time.time()
print("== Step {} ==".format(i))
stats = sgd.step(fetch_stats=True)
ips = ((args.batch_size * args.num_workers * args.devices_per_worker) /
(time.time() - start))
print("Iteration time", time.time() - start, "Images per second", ips)
print("Current loss", stats)
i = 0
while i < args.num_iters:
step(i)
i += 1
print("Saving checkpoint...")
sgd.save_checkpoint(args.checkpoint_dir)
print("Done saving checkpoint")
step(i)
print("Restoring checkpoint")
sgd.restore_checkpoint(args.checkpoint_dir)
print("Done restoring checkpoint")
step(i)
-67
View File
@@ -1,67 +0,0 @@
#!/usr/bin/env python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import time
import ray
from ray.experimental.sgd.tfbench.test_model import TFBenchModel
from ray.experimental.sgd.sgd import DistributedSGD
parser = argparse.ArgumentParser()
parser.add_argument("--redis-address", default=None, type=str)
parser.add_argument("--num-iters", default=10, type=int)
parser.add_argument("--batch-size", default=1, type=int)
parser.add_argument("--num-workers", default=2, type=int)
parser.add_argument("--grad-shard-bytes", default=10000000, type=int)
parser.add_argument("--devices-per-worker", default=2, type=int)
parser.add_argument("--stats-interval", default=10, type=int)
parser.add_argument("--all-reduce-alg", default="simple", type=str)
parser.add_argument("--object-store-memory", default=None, type=int)
parser.add_argument(
"--warmup", action="store_true", help="Warm up object store before start.")
parser.add_argument(
"--strategy", default="ps", type=str, help="One of 'simple' or 'ps'")
parser.add_argument(
"--gpu", action="store_true", help="Use GPUs for optimization")
if __name__ == "__main__":
args, _ = parser.parse_known_args()
ray.init(
redis_address=args.redis_address,
object_store_memory=args.object_store_memory)
model_creator = (
lambda worker_idx, device_idx: TFBenchModel(
batch=args.batch_size, use_cpus=not args.gpu))
sgd = DistributedSGD(
model_creator,
num_workers=args.num_workers,
devices_per_worker=args.devices_per_worker,
gpu=args.gpu,
strategy=args.strategy,
grad_shard_bytes=args.grad_shard_bytes,
all_reduce_alg=args.all_reduce_alg)
if args.warmup:
sgd.warmup()
t = []
for i in range(args.num_iters):
start = time.time()
fetch_stats = i % args.stats_interval == 0
print("== Step {} ==".format(i))
stats = sgd.step(fetch_stats=fetch_stats)
ips = ((args.batch_size * args.num_workers * args.devices_per_worker) /
(time.time() - start))
print("Iteration time", time.time() - start, "Images per second", ips)
t.append(ips)
if fetch_stats:
print("Current loss", stats)
print("Peak throughput", max(sum(t[i:i + 5]) / 5 for i in range(len(t))))
@@ -1 +0,0 @@
Files in this directory are adapted from https://github.com/tensorflow/benchmarks/tree/master/scripts/tf_cnn_benchmarks.
@@ -1,507 +0,0 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""CNN builder."""
from __future__ import print_function
from collections import defaultdict
import contextlib
import numpy as np
import tensorflow as tf
from tensorflow.python.layers import convolutional as conv_layers
from tensorflow.python.layers import core as core_layers
from tensorflow.python.layers import pooling as pooling_layers
from tensorflow.python.training import moving_averages
class ConvNetBuilder(object):
"""Builder of cnn net."""
def __init__(self,
input_op,
input_nchan,
phase_train,
use_tf_layers,
data_format="NCHW",
dtype=tf.float32,
variable_dtype=tf.float32):
self.top_layer = input_op
self.top_size = input_nchan
self.phase_train = phase_train
self.use_tf_layers = use_tf_layers
self.data_format = data_format
self.dtype = dtype
self.variable_dtype = variable_dtype
self.counts = defaultdict(lambda: 0)
self.use_batch_norm = False
self.batch_norm_config = {} # "decay": 0.997, "scale": True}
self.channel_pos = ("channels_last"
if data_format == "NHWC" else "channels_first")
self.aux_top_layer = None
self.aux_top_size = 0
def get_custom_getter(self):
"""Returns a custom getter that this class's methods must be called
All methods of this class must be called under a variable scope that was
passed this custom getter. Example:
```python
network = ConvNetBuilder(...)
with tf.variable_scope("cg", custom_getter=network.get_custom_getter()):
network.conv(...)
# Call more methods of network here
```
Currently, this custom getter only does anything if self.use_tf_layers is
True. In that case, it causes variables to be stored as dtype
self.variable_type, then casted to the requested dtype, instead of directly
storing the variable as the requested dtype.
"""
def inner_custom_getter(getter, *args, **kwargs):
if not self.use_tf_layers:
return getter(*args, **kwargs)
requested_dtype = kwargs["dtype"]
if not (requested_dtype == tf.float32
and self.variable_dtype == tf.float16):
kwargs["dtype"] = self.variable_dtype
var = getter(*args, **kwargs)
if var.dtype.base_dtype != requested_dtype:
var = tf.cast(var, requested_dtype)
return var
return inner_custom_getter
@contextlib.contextmanager
def switch_to_aux_top_layer(self):
"""Context that construct cnn in the auxiliary arm."""
if self.aux_top_layer is None:
raise RuntimeError("Empty auxiliary top layer in the network.")
saved_top_layer = self.top_layer
saved_top_size = self.top_size
self.top_layer = self.aux_top_layer
self.top_size = self.aux_top_size
yield
self.aux_top_layer = self.top_layer
self.aux_top_size = self.top_size
self.top_layer = saved_top_layer
self.top_size = saved_top_size
def get_variable(self, name, shape, dtype, cast_dtype, *args, **kwargs):
var = tf.get_variable(name, shape, dtype, *args, **kwargs)
return tf.cast(var, cast_dtype)
def _conv2d_impl(self, input_layer, num_channels_in, filters, kernel_size,
strides, padding, kernel_initializer):
if self.use_tf_layers:
return conv_layers.conv2d(
input_layer,
filters,
kernel_size,
strides,
padding,
self.channel_pos,
kernel_initializer=kernel_initializer,
use_bias=False)
else:
weights_shape = [
kernel_size[0], kernel_size[1], num_channels_in, filters
]
weights = self.get_variable(
"conv2d/kernel",
weights_shape,
self.variable_dtype,
self.dtype,
initializer=kernel_initializer)
if self.data_format == "NHWC":
strides = [1] + strides + [1]
else:
strides = [1, 1] + strides
return tf.nn.conv2d(
input_layer,
weights,
strides,
padding,
data_format=self.data_format)
def conv(self,
num_out_channels,
k_height,
k_width,
d_height=1,
d_width=1,
mode="SAME",
input_layer=None,
num_channels_in=None,
use_batch_norm=None,
stddev=None,
activation="relu",
bias=0.0):
"""Construct a conv2d layer on top of cnn."""
if input_layer is None:
input_layer = self.top_layer
if num_channels_in is None:
num_channels_in = self.top_size
kernel_initializer = None
if stddev is not None:
kernel_initializer = tf.truncated_normal_initializer(stddev=stddev)
name = "conv" + str(self.counts["conv"])
self.counts["conv"] += 1
with tf.variable_scope(name):
strides = [1, d_height, d_width, 1]
if self.data_format == "NCHW":
strides = [strides[0], strides[3], strides[1], strides[2]]
if mode != "SAME_RESNET":
conv = self._conv2d_impl(
input_layer,
num_channels_in,
num_out_channels,
kernel_size=[k_height, k_width],
strides=[d_height, d_width],
padding=mode,
kernel_initializer=kernel_initializer)
else: # Special padding mode for ResNet models
if d_height == 1 and d_width == 1:
conv = self._conv2d_impl(
input_layer,
num_channels_in,
num_out_channels,
kernel_size=[k_height, k_width],
strides=[d_height, d_width],
padding="SAME",
kernel_initializer=kernel_initializer)
else:
rate = 1 # Unused (for 'a trous' convolutions)
kernel_height_effective = k_height + (k_height - 1) * (
rate - 1)
pad_h_beg = (kernel_height_effective - 1) // 2
pad_h_end = kernel_height_effective - 1 - pad_h_beg
kernel_width_effective = k_width + (k_width - 1) * (
rate - 1)
pad_w_beg = (kernel_width_effective - 1) // 2
pad_w_end = kernel_width_effective - 1 - pad_w_beg
padding = [[0, 0], [pad_h_beg, pad_h_end],
[pad_w_beg, pad_w_end], [0, 0]]
if self.data_format == "NCHW":
padding = [
padding[0], padding[3], padding[1], padding[2]
]
input_layer = tf.pad(input_layer, padding)
conv = self._conv2d_impl(
input_layer,
num_channels_in,
num_out_channels,
kernel_size=[k_height, k_width],
strides=[d_height, d_width],
padding="VALID",
kernel_initializer=kernel_initializer)
if use_batch_norm is None:
use_batch_norm = self.use_batch_norm
if not use_batch_norm:
if bias is not None:
biases = self.get_variable(
"biases", [num_out_channels],
self.variable_dtype,
self.dtype,
initializer=tf.constant_initializer(bias))
biased = tf.reshape(
tf.nn.bias_add(
conv, biases, data_format=self.data_format),
conv.get_shape())
else:
biased = conv
else:
self.top_layer = conv
self.top_size = num_out_channels
biased = self.batch_norm(**self.batch_norm_config)
if activation == "relu":
conv1 = tf.nn.relu(biased)
elif activation == "linear" or activation is None:
conv1 = biased
elif activation == "tanh":
conv1 = tf.nn.tanh(biased)
else:
raise KeyError("Invalid activation type \"%s\"" % activation)
self.top_layer = conv1
self.top_size = num_out_channels
return conv1
def _pool(self, pool_name, pool_function, k_height, k_width, d_height,
d_width, mode, input_layer, num_channels_in):
"""Construct a pooling layer."""
if input_layer is None:
input_layer = self.top_layer
else:
self.top_size = num_channels_in
name = pool_name + str(self.counts[pool_name])
self.counts[pool_name] += 1
if self.use_tf_layers:
pool = pool_function(
input_layer, [k_height, k_width], [d_height, d_width],
padding=mode,
data_format=self.channel_pos,
name=name)
else:
if self.data_format == "NHWC":
ksize = [1, k_height, k_width, 1]
strides = [1, d_height, d_width, 1]
else:
ksize = [1, 1, k_height, k_width]
strides = [1, 1, d_height, d_width]
pool = tf.nn.max_pool(
input_layer,
ksize,
strides,
padding=mode,
data_format=self.data_format,
name=name)
self.top_layer = pool
return pool
def mpool(self,
k_height,
k_width,
d_height=2,
d_width=2,
mode="VALID",
input_layer=None,
num_channels_in=None):
"""Construct a max pooling layer."""
return self._pool("mpool", pooling_layers.max_pooling2d, k_height,
k_width, d_height, d_width, mode, input_layer,
num_channels_in)
def apool(self,
k_height,
k_width,
d_height=2,
d_width=2,
mode="VALID",
input_layer=None,
num_channels_in=None):
"""Construct an average pooling layer."""
return self._pool("apool", pooling_layers.average_pooling2d, k_height,
k_width, d_height, d_width, mode, input_layer,
num_channels_in)
def reshape(self, shape, input_layer=None):
if input_layer is None:
input_layer = self.top_layer
self.top_layer = tf.reshape(input_layer, shape)
self.top_size = shape[-1] # HACK This may not always work
return self.top_layer
def affine(self,
num_out_channels,
input_layer=None,
num_channels_in=None,
bias=0.0,
stddev=None,
activation="relu"):
if input_layer is None:
input_layer = self.top_layer
if num_channels_in is None:
num_channels_in = self.top_size
name = "affine" + str(self.counts["affine"])
self.counts["affine"] += 1
with tf.variable_scope(name):
init_factor = 2. if activation == "relu" else 1.
stddev = stddev or np.sqrt(init_factor / num_channels_in)
kernel = self.get_variable(
"weights", [num_channels_in, num_out_channels],
self.variable_dtype,
self.dtype,
initializer=tf.truncated_normal_initializer(stddev=stddev))
biases = self.get_variable(
"biases", [num_out_channels],
self.variable_dtype,
self.dtype,
initializer=tf.constant_initializer(bias))
logits = tf.nn.xw_plus_b(input_layer, kernel, biases)
if activation == "relu":
affine1 = tf.nn.relu(logits, name=name)
elif activation == "linear" or activation is None:
affine1 = logits
else:
raise KeyError("Invalid activation type \"%s\"" % activation)
self.top_layer = affine1
self.top_size = num_out_channels
return affine1
def inception_module(self, name, cols, input_layer=None, in_size=None):
if input_layer is None:
input_layer = self.top_layer
if in_size is None:
in_size = self.top_size
name += str(self.counts[name])
self.counts[name] += 1
with tf.variable_scope(name):
col_layers = []
col_layer_sizes = []
for c, col in enumerate(cols):
col_layers.append([])
col_layer_sizes.append([])
for lx, layer in enumerate(col):
ltype, args = layer[0], layer[1:]
kwargs = {
"input_layer": input_layer,
"num_channels_in": in_size
} if lx == 0 else {}
if ltype == "conv":
self.conv(*args, **kwargs)
elif ltype == "mpool":
self.mpool(*args, **kwargs)
elif ltype == "apool":
self.apool(*args, **kwargs)
elif ltype == "share":
self.top_layer = col_layers[c - 1][lx]
self.top_size = col_layer_sizes[c - 1][lx]
else:
raise KeyError(
"Invalid layer type for inception module: \"%s\"" %
ltype)
col_layers[c].append(self.top_layer)
col_layer_sizes[c].append(self.top_size)
catdim = 3 if self.data_format == "NHWC" else 1
self.top_layer = tf.concat([layers[-1] for layers in col_layers],
catdim)
self.top_size = sum(sizes[-1] for sizes in col_layer_sizes)
return self.top_layer
def spatial_mean(self, keep_dims=False):
name = "spatial_mean" + str(self.counts["spatial_mean"])
self.counts["spatial_mean"] += 1
axes = [1, 2] if self.data_format == "NHWC" else [2, 3]
self.top_layer = tf.reduce_mean(
self.top_layer, axes, keep_dims=keep_dims, name=name)
return self.top_layer
def dropout(self, keep_prob=0.5, input_layer=None):
if input_layer is None:
input_layer = self.top_layer
else:
self.top_size = None
name = "dropout" + str(self.counts["dropout"])
with tf.variable_scope(name):
if not self.phase_train:
keep_prob = 1.0
if self.use_tf_layers:
dropout = core_layers.dropout(input_layer, 1. - keep_prob)
else:
dropout = tf.nn.dropout(input_layer, keep_prob)
self.top_layer = dropout
return dropout
def _batch_norm_without_layers(self, input_layer, decay, use_scale,
epsilon):
"""Batch normalization on `input_layer` without tf.layers."""
shape = input_layer.shape
num_channels = shape[3] if self.data_format == "NHWC" else shape[1]
beta = self.get_variable(
"beta", [num_channels],
tf.float32,
tf.float32,
initializer=tf.zeros_initializer())
if use_scale:
gamma = self.get_variable(
"gamma", [num_channels],
tf.float32,
tf.float32,
initializer=tf.ones_initializer())
else:
gamma = tf.constant(1.0, tf.float32, [num_channels])
moving_mean = tf.get_variable(
"moving_mean", [num_channels],
tf.float32,
initializer=tf.zeros_initializer(),
trainable=False)
moving_variance = tf.get_variable(
"moving_variance", [num_channels],
tf.float32,
initializer=tf.ones_initializer(),
trainable=False)
if self.phase_train:
bn, batch_mean, batch_variance = tf.nn.fused_batch_norm(
input_layer,
gamma,
beta,
epsilon=epsilon,
data_format=self.data_format,
is_training=True)
mean_update = moving_averages.assign_moving_average(
moving_mean, batch_mean, decay=decay, zero_debias=False)
variance_update = moving_averages.assign_moving_average(
moving_variance,
batch_variance,
decay=decay,
zero_debias=False)
tf.add_to_collection(tf.GraphKeys.UPDATE_OPS, mean_update)
tf.add_to_collection(tf.GraphKeys.UPDATE_OPS, variance_update)
else:
bn, _, _ = tf.nn.fused_batch_norm(
input_layer,
gamma,
beta,
mean=moving_mean,
variance=moving_variance,
epsilon=epsilon,
data_format=self.data_format,
is_training=False)
return bn
def batch_norm(self,
input_layer=None,
decay=0.999,
scale=False,
epsilon=0.001):
"""Adds a Batch Normalization layer."""
if input_layer is None:
input_layer = self.top_layer
else:
self.top_size = None
name = "batchnorm" + str(self.counts["batchnorm"])
self.counts["batchnorm"] += 1
with tf.variable_scope(name) as scope:
if self.use_tf_layers:
bn = tf.contrib.layers.batch_norm(
input_layer,
decay=decay,
scale=scale,
epsilon=epsilon,
is_training=self.phase_train,
fused=True,
data_format=self.data_format,
scope=scope)
else:
bn = self._batch_norm_without_layers(input_layer, decay, scale,
epsilon)
self.top_layer = bn
self.top_size = bn.shape[
3] if self.data_format == "NHWC" else bn.shape[1]
self.top_size = int(self.top_size)
return bn
def lrn(self, depth_radius, bias, alpha, beta):
"""Adds a local response normalization layer."""
name = "lrn" + str(self.counts["lrn"])
self.counts["lrn"] += 1
self.top_layer = tf.nn.lrn(
self.top_layer, depth_radius, bias, alpha, beta, name=name)
return self.top_layer
@@ -1,116 +0,0 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
"""Base model configuration for CNN benchmarks."""
import tensorflow as tf
from . import convnet_builder
class Model(object):
"""Base model configuration for CNN benchmarks."""
def __init__(self,
model,
image_size,
batch_size,
learning_rate,
layer_counts=None,
fp16_loss_scale=128):
self.model = model
self.image_size = image_size
self.batch_size = batch_size
self.default_batch_size = batch_size
self.learning_rate = learning_rate
self.layer_counts = layer_counts
self.fp16_loss_scale = fp16_loss_scale
def get_model(self):
return self.model
def get_image_size(self):
return self.image_size
def get_batch_size(self):
return self.batch_size
def set_batch_size(self, batch_size):
self.batch_size = batch_size
def get_default_batch_size(self):
return self.default_batch_size
def get_layer_counts(self):
return self.layer_counts
def get_fp16_loss_scale(self):
return self.fp16_loss_scale
def get_learning_rate(self, global_step, batch_size):
del global_step
del batch_size
return self.learning_rate
def add_inference(self, unused_cnn):
raise ValueError("Must be implemented in derived classes")
def skip_final_affine_layer(self):
"""Returns if the caller of this class should skip the final affine
Normally, this class adds a final affine layer to the model after calling
self.add_inference(), to generate the logits. If a subclass override this
method to return True, the caller should not add the final affine layer.
This is useful for tests.
"""
return False
def build_network(self,
images,
phase_train=True,
nclass=1001,
image_depth=3,
data_type=tf.float32,
data_format="NCHW",
use_tf_layers=True,
fp16_vars=False):
"""Returns logits and aux_logits from images."""
if data_format == "NCHW":
images = tf.transpose(images, [0, 3, 1, 2])
var_type = tf.float32
if data_type == tf.float16 and fp16_vars:
var_type = tf.float16
network = convnet_builder.ConvNetBuilder(
images, image_depth, phase_train, use_tf_layers, data_format,
data_type, var_type)
with tf.variable_scope(
"cg", custom_getter=network.get_custom_getter()):
self.add_inference(network)
# Add the final fully-connected class layer
logits = (network.affine(nclass, activation="linear")
if not self.skip_final_affine_layer() else
network.top_layer)
aux_logits = None
if network.aux_top_layer is not None:
with network.switch_to_aux_top_layer():
aux_logits = network.affine(
nclass, activation="linear", stddev=0.001)
if data_type == tf.float16:
# TODO(reedwm): Determine if we should do this cast here.
logits = tf.cast(logits, tf.float32)
if aux_logits is not None:
aux_logits = tf.cast(aux_logits, tf.float32)
return logits, aux_logits
loss_function = None
@@ -1,57 +0,0 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Model configurations for CNN benchmarks.
"""
from . import resnet_model
_model_name_to_imagenet_model = {
"resnet50": resnet_model.create_resnet50_model,
"resnet50_v2": resnet_model.create_resnet50_v2_model,
"resnet101": resnet_model.create_resnet101_model,
"resnet101_v2": resnet_model.create_resnet101_v2_model,
"resnet152": resnet_model.create_resnet152_model,
"resnet152_v2": resnet_model.create_resnet152_v2_model,
}
_model_name_to_cifar_model = {}
def _get_model_map(dataset_name):
if "cifar10" == dataset_name:
return _model_name_to_cifar_model
elif dataset_name in ("imagenet", "synthetic"):
return _model_name_to_imagenet_model
else:
raise ValueError("Invalid dataset name: %s" % dataset_name)
def get_model_config(model_name, dataset):
"""Map model name to model network configuration."""
model_map = _get_model_map(dataset.name)
if model_name not in model_map:
raise ValueError("Invalid model name \"%s\" for dataset \"%s\"" %
(model_name, dataset.name))
else:
return model_map[model_name]()
def register_model(model_name, dataset_name, model_func):
"""Register a new model that can be obtained with `get_model_config`."""
model_map = _get_model_map(dataset_name)
if model_name in model_map:
raise ValueError("Model \"%s\" is already registered for dataset"
"\"%s\"" % (model_name, dataset_name))
model_map[model_name] = model_func
@@ -1,422 +0,0 @@
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Resnet model configuration.
References:
Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
Deep Residual Learning for Image Recognition
arXiv:1512.03385 (2015)
Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
Identity Mappings in Deep Residual Networks
arXiv:1603.05027 (2016)
Liang-Chieh Chen, George Papandreou, Iasonas Kokkinos, Kevin Murphy,
Alan L. Yuille
DeepLab: Semantic Image Segmentation with Deep Convolutional Nets,
Atrous Convolution, and Fully Connected CRFs
arXiv:1606.00915 (2016)
"""
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from . import model as model_lib
def bottleneck_block_v1(cnn, depth, depth_bottleneck, stride):
"""Bottleneck block with identity short-cut for ResNet v1.
Args:
cnn: the network to append bottleneck blocks.
depth: the number of output filters for this bottleneck block.
depth_bottleneck: the number of bottleneck filters for this block.
stride: Stride used in the first layer of the bottleneck block.
"""
input_layer = cnn.top_layer
in_size = cnn.top_size
name_key = "resnet_v1"
name = name_key + str(cnn.counts[name_key])
cnn.counts[name_key] += 1
with tf.variable_scope(name):
if depth == in_size:
if stride == 1:
shortcut = input_layer
else:
shortcut = cnn.apool(
1,
1,
stride,
stride,
input_layer=input_layer,
num_channels_in=in_size)
else:
shortcut = cnn.conv(
depth,
1,
1,
stride,
stride,
activation=None,
use_batch_norm=True,
input_layer=input_layer,
num_channels_in=in_size,
bias=None)
cnn.conv(
depth_bottleneck,
1,
1,
stride,
stride,
input_layer=input_layer,
num_channels_in=in_size,
use_batch_norm=True,
bias=None)
cnn.conv(
depth_bottleneck,
3,
3,
1,
1,
mode="SAME_RESNET",
use_batch_norm=True,
bias=None)
res = cnn.conv(
depth, 1, 1, 1, 1, activation=None, use_batch_norm=True, bias=None)
output = tf.nn.relu(shortcut + res)
cnn.top_layer = output
cnn.top_size = depth
def bottleneck_block_v2(cnn, depth, depth_bottleneck, stride):
"""Bottleneck block with identity short-cut for ResNet v2.
The main difference from v1 is that a batch norm and relu are done at the
start of the block, instead of the end. This initial batch norm and relu is
collectively called a pre-activation.
Args:
cnn: the network to append bottleneck blocks.
depth: the number of output filters for this bottleneck block.
depth_bottleneck: the number of bottleneck filters for this block.
stride: Stride used in the first layer of the bottleneck block.
"""
input_layer = cnn.top_layer
in_size = cnn.top_size
name_key = "resnet_v2"
name = name_key + str(cnn.counts[name_key])
cnn.counts[name_key] += 1
preact = cnn.batch_norm()
preact = tf.nn.relu(preact)
with tf.variable_scope(name):
if depth == in_size:
if stride == 1:
shortcut = input_layer
else:
shortcut = cnn.apool(
1,
1,
stride,
stride,
input_layer=input_layer,
num_channels_in=in_size)
else:
shortcut = cnn.conv(
depth,
1,
1,
stride,
stride,
activation=None,
use_batch_norm=False,
input_layer=preact,
num_channels_in=in_size,
bias=None)
cnn.conv(
depth_bottleneck,
1,
1,
stride,
stride,
input_layer=preact,
num_channels_in=in_size,
use_batch_norm=True,
bias=None)
cnn.conv(
depth_bottleneck,
3,
3,
1,
1,
mode="SAME_RESNET",
use_batch_norm=True,
bias=None)
res = cnn.conv(
depth,
1,
1,
1,
1,
activation=None,
use_batch_norm=False,
bias=None)
output = shortcut + res
cnn.top_layer = output
cnn.top_size = depth
def bottleneck_block(cnn, depth, depth_bottleneck, stride, pre_activation):
"""Bottleneck block with identity short-cut.
Args:
cnn: the network to append bottleneck blocks.
depth: the number of output filters for this bottleneck block.
depth_bottleneck: the number of bottleneck filters for this block.
stride: Stride used in the first layer of the bottleneck block.
pre_activation: use pre_activation structure used in v2 or not.
"""
if pre_activation:
bottleneck_block_v2(cnn, depth, depth_bottleneck, stride)
else:
bottleneck_block_v1(cnn, depth, depth_bottleneck, stride)
def residual_block(cnn, depth, stride, pre_activation):
"""Residual block with identity short-cut.
Args:
cnn: the network to append residual blocks.
depth: the number of output filters for this residual block.
stride: Stride used in the first layer of the residual block.
pre_activation: use pre_activation structure or not.
"""
input_layer = cnn.top_layer
in_size = cnn.top_size
if in_size != depth:
# Plan A of shortcut.
shortcut = cnn.apool(
1,
1,
stride,
stride,
input_layer=input_layer,
num_channels_in=in_size)
padding = (depth - in_size) // 2
if cnn.channel_pos == "channels_last":
shortcut = tf.pad(shortcut,
[[0, 0], [0, 0], [0, 0], [padding, padding]])
else:
shortcut = tf.pad(shortcut,
[[0, 0], [padding, padding], [0, 0], [0, 0]])
else:
shortcut = input_layer
if pre_activation:
res = cnn.batch_norm(input_layer)
res = tf.nn.relu(res)
else:
res = input_layer
cnn.conv(
depth,
3,
3,
stride,
stride,
input_layer=res,
num_channels_in=in_size,
use_batch_norm=True,
bias=None)
if pre_activation:
res = cnn.conv(
depth,
3,
3,
1,
1,
activation=None,
use_batch_norm=False,
bias=None)
output = shortcut + res
else:
res = cnn.conv(
depth, 3, 3, 1, 1, activation=None, use_batch_norm=True, bias=None)
output = tf.nn.relu(shortcut + res)
cnn.top_layer = output
cnn.top_size = depth
class ResnetModel(model_lib.Model):
"""Resnet cnn network configuration."""
def __init__(self, model, layer_counts):
default_batch_sizes = {
"resnet50": 64,
"resnet101": 32,
"resnet152": 32,
"resnet50_v2": 64,
"resnet101_v2": 32,
"resnet152_v2": 32,
}
batch_size = default_batch_sizes.get(model, 32)
super(ResnetModel, self).__init__(model, 224, batch_size, 0.005,
layer_counts)
self.pre_activation = "v2" in model
def add_inference(self, cnn):
if self.layer_counts is None:
raise ValueError(
"Layer counts not specified for %s" % self.get_model())
cnn.use_batch_norm = True
cnn.batch_norm_config = {
"decay": 0.997,
"epsilon": 1e-5,
"scale": True
}
cnn.conv(64, 7, 7, 2, 2, mode="SAME_RESNET", use_batch_norm=True)
cnn.mpool(3, 3, 2, 2, mode="SAME")
for _ in xrange(self.layer_counts[0]):
bottleneck_block(cnn, 256, 64, 1, self.pre_activation)
for i in xrange(self.layer_counts[1]):
stride = 2 if i == 0 else 1
bottleneck_block(cnn, 512, 128, stride, self.pre_activation)
for i in xrange(self.layer_counts[2]):
stride = 2 if i == 0 else 1
bottleneck_block(cnn, 1024, 256, stride, self.pre_activation)
for i in xrange(self.layer_counts[3]):
stride = 2 if i == 0 else 1
bottleneck_block(cnn, 2048, 512, stride, self.pre_activation)
if self.pre_activation:
cnn.batch_norm()
cnn.top_layer = tf.nn.relu(cnn.top_layer)
cnn.spatial_mean()
def get_learning_rate(self, global_step, batch_size):
raise NotImplementedError
def create_resnet50_model():
return ResnetModel("resnet50", (3, 4, 6, 3))
def create_resnet50_v2_model():
return ResnetModel("resnet50_v2", (3, 4, 6, 3))
def create_resnet101_model():
return ResnetModel("resnet101", (3, 4, 23, 3))
def create_resnet101_v2_model():
return ResnetModel("resnet101_v2", (3, 4, 23, 3))
def create_resnet152_model():
return ResnetModel("resnet152", (3, 8, 36, 3))
def create_resnet152_v2_model():
return ResnetModel("resnet152_v2", (3, 8, 36, 3))
class ResnetCifar10Model(model_lib.Model):
"""Resnet cnn network configuration for Cifar 10 dataset.
V1 model architecture follows the one defined in the paper:
https://arxiv.org/pdf/1512.03385.pdf.
V2 model architecture follows the one defined in the paper:
https://arxiv.org/pdf/1603.05027.pdf.
"""
def __init__(self, model, layer_counts):
self.pre_activation = "v2" in model
super(ResnetCifar10Model, self).__init__(model, 32, 128, 0.1,
layer_counts)
def add_inference(self, cnn):
if self.layer_counts is None:
raise ValueError(
"Layer counts not specified for %s" % self.get_model())
cnn.use_batch_norm = True
cnn.batch_norm_config = {"decay": 0.9, "epsilon": 1e-5, "scale": True}
if self.pre_activation:
cnn.conv(16, 3, 3, 1, 1, use_batch_norm=True)
else:
cnn.conv(16, 3, 3, 1, 1, activation=None, use_batch_norm=True)
for i in xrange(self.layer_counts[0]):
# reshape to batch_size x 16 x 32 x 32
residual_block(cnn, 16, 1, self.pre_activation)
for i in xrange(self.layer_counts[1]):
stride = 2 if i == 0 else 1
# reshape to batch_size x 32 x 16 x 16
residual_block(cnn, 32, stride, self.pre_activation)
for i in xrange(self.layer_counts[2]):
stride = 2 if i == 0 else 1
# reshape to batch_size x 64 x 8 x 8
residual_block(cnn, 64, stride, self.pre_activation)
if self.pre_activation:
cnn.batch_norm()
cnn.top_layer = tf.nn.relu(cnn.top_layer)
cnn.spatial_mean()
def get_learning_rate(self, global_step, batch_size):
num_batches_per_epoch = int(50000 / batch_size)
boundaries = num_batches_per_epoch * np.array(
[82, 123, 300], dtype=np.int64)
boundaries = [x for x in boundaries]
values = [0.1, 0.01, 0.001, 0.0002]
return tf.train.piecewise_constant(global_step, boundaries, values)
def create_resnet20_cifar_model():
return ResnetCifar10Model("resnet20", (3, 3, 3))
def create_resnet20_v2_cifar_model():
return ResnetCifar10Model("resnet20_v2", (3, 3, 3))
def create_resnet32_cifar_model():
return ResnetCifar10Model("resnet32_v2", (5, 5, 5))
def create_resnet32_v2_cifar_model():
return ResnetCifar10Model("resnet32_v2", (5, 5, 5))
def create_resnet44_cifar_model():
return ResnetCifar10Model("resnet44", (7, 7, 7))
def create_resnet44_v2_cifar_model():
return ResnetCifar10Model("resnet44_v2", (7, 7, 7))
def create_resnet56_cifar_model():
return ResnetCifar10Model("resnet56", (9, 9, 9))
def create_resnet56_v2_cifar_model():
return ResnetCifar10Model("resnet56_v2", (9, 9, 9))
def create_resnet110_cifar_model():
return ResnetCifar10Model("resnet110", (18, 18, 18))
def create_resnet110_v2_cifar_model():
return ResnetCifar10Model("resnet110_v2", (18, 18, 18))
@@ -1,66 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from tfbench import model_config
from ray.experimental.sgd.model import Model
import ray.experimental.tf_utils as ray_tf_utils
class MockDataset():
name = "synthetic"
class TFBenchModel(Model):
def __init__(self, batch=64, use_cpus=False):
image_shape = [batch, 224, 224, 3]
labels_shape = [batch]
# Synthetic image should be within [0, 255].
images = tf.truncated_normal(
image_shape,
dtype=tf.float32,
mean=127,
stddev=60,
name="synthetic_images")
# Minor hack to avoid H2D copy when using synthetic data
inputs = tf.contrib.framework.local_variable(
images, name="gpu_cached_images")
labels = tf.random_uniform(
labels_shape,
minval=0,
maxval=999,
dtype=tf.int32,
name="synthetic_labels")
model = model_config.get_model_config("resnet101", MockDataset())
logits, aux = model.build_network(
inputs, data_format=use_cpus and "NHWC" or "NCHW")
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
logits=logits, labels=labels)
# Implement model interface
self.loss = tf.reduce_mean(loss, name="xentropy-loss")
self.optimizer = tf.train.GradientDescentOptimizer(1e-6)
self.variables = ray_tf_utils.TensorFlowVariables(
self.loss, tf.get_default_session())
def get_loss(self):
return self.loss
def get_optimizer(self):
return self.optimizer
def get_feed_dict(self):
return {}
def get_weights(self):
return self.variables.get_flat()
def set_weights(self, weights):
self.variables.set_flat(weights)
-151
View File
@@ -1,151 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import filelock
import json
import logging
import numpy as np
import os
import pyarrow
import pyarrow.plasma as plasma
import time
import tensorflow as tf
import ray
logger = logging.getLogger(__name__)
def warmup():
logger.info("Warming up object store")
zeros = np.zeros(int(100e6 / 8), dtype=np.float64)
start = time.time()
for _ in range(10):
ray.put(zeros)
logger.info("Initial latency for 100MB put {}".format(
(time.time() - start) / 10))
for _ in range(5):
for _ in range(100):
ray.put(zeros)
start = time.time()
for _ in range(10):
ray.put(zeros)
logger.info("Warmed up latency for 100MB put {}".format(
(time.time() - start) / 10))
def fetch(oids):
raylet_client = ray.worker.global_worker.raylet_client
for o in oids:
ray_obj_id = ray.ObjectID(o)
raylet_client.fetch_or_reconstruct([ray_obj_id], True)
def run_timeline(sess, ops, feed_dict=None, write_timeline=False, name=""):
feed_dict = feed_dict or {}
if write_timeline:
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
fetches = sess.run(
ops,
options=run_options,
run_metadata=run_metadata,
feed_dict=feed_dict)
trace = Timeline(step_stats=run_metadata.step_stats)
outf = "timeline-{}-{}.json".format(name, os.getpid())
trace_file = open(outf, "w")
logger.info("wrote tf timeline to", os.path.abspath(outf))
trace_file.write(trace.generate_chrome_trace_format())
else:
fetches = sess.run(ops, feed_dict=feed_dict)
return fetches
class Timeline(object):
def __init__(self, tid):
self.events = []
self.offset = 0
self.start_time = self.time()
self.tid = tid
def patch_ray(self):
orig_log = ray.worker.log
def custom_log(event_type, kind, *args, **kwargs):
orig_log(event_type, kind, *args, **kwargs)
if kind == ray.worker.LOG_SPAN_START:
self.start(event_type)
elif kind == ray.worker.LOG_SPAN_END:
self.end(event_type)
elif kind == ray.worker.LOG_SPAN_POINT:
self.event(event_type)
ray.worker.log = custom_log
def time(self):
return time.time() + self.offset
def reset(self):
self.events = []
self.start_time = self.time()
def start(self, name):
self.events.append((self.tid, "B", name, self.time()))
def end(self, name):
self.events.append((self.tid, "E", name, self.time()))
def event(self, name):
now = self.time()
self.events.append((self.tid, "B", name, now))
self.events.append((self.tid, "E", name, now + .0001))
def merge(self, other):
if other.start_time < self.start_time:
self.start_time = other.start_time
self.events.extend(other.events)
self.events.sort(key=lambda e: e[3])
def chrome_trace_format(self, filename):
out = []
for tid, ph, name, t in self.events:
ts = int((t - self.start_time) * 1000000)
out.append({
"name": name,
"tid": tid,
"pid": tid,
"ph": ph,
"ts": ts,
})
with open(filename, "w") as f:
f.write(json.dumps(out))
logger.info("Wrote chrome timeline to", filename)
def ensure_plasma_tensorflow_op():
base_path = os.path.join(pyarrow.__path__[0], "tensorflow")
lock_path = os.path.join(base_path, "compile_op.lock")
with filelock.FileLock(lock_path):
if not os.path.exists(os.path.join(base_path, "plasma_op.so")):
plasma.build_plasma_tensorflow_op()
else:
plasma.load_plasma_tensorflow_op()
if __name__ == "__main__":
a = Timeline(1)
b = Timeline(2)
a.start("hi")
time.sleep(.1)
b.start("bye")
a.start("hi3")
time.sleep(.1)
a.end("hi3")
b.end("bye")
time.sleep(.1)
a.end("hi")
b.start("b1")
b.end("b1")
a.merge(b)
a.chrome_trace_format("test.json")