diff --git a/README.rst b/README.rst index bf210f4cb..f93dbf04c 100644 --- a/README.rst +++ b/README.rst @@ -41,7 +41,7 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin - `Tune`_: Hyperparameter Optimization Framework - `RLlib`_: Scalable Reinforcement Learning -- `Distributed Training `__ +- `Distributed Training `__ .. _`Tune`: http://ray.readthedocs.io/en/latest/tune.html .. _`RLlib`: http://ray.readthedocs.io/en/latest/rllib.html diff --git a/doc/source/distributed_sgd.rst b/doc/source/distributed_sgd.rst deleted file mode 100644 index e50847624..000000000 --- a/doc/source/distributed_sgd.rst +++ /dev/null @@ -1,56 +0,0 @@ -Distributed SGD (Experimental) -============================== - -Ray includes an implementation of synchronous distributed stochastic gradient descent (SGD), which is competitive in performance with implementations in Horovod and Distributed TensorFlow. - -Ray SGD is built on top of the Ray task and actor abstractions to provide seamless integration into existing Ray applications. - -Interface ---------- - -To use Ray SGD, define a `model class `__: - -.. autoclass:: ray.experimental.sgd.Model - -Then, pass a model creator function to the ``ray.experimental.sgd.DistributedSGD`` class. To drive the distributed training, ``sgd.step()`` can be called repeatedly: - -.. code-block:: python - - model_creator = lambda worker_idx, device_idx: YourModelClass() - - sgd = DistributedSGD( - model_creator, - num_workers=2, - devices_per_worker=4, - gpu=True, - strategy="ps") - - for i in range(NUM_ITERS): - sgd.step() - -Under the hood, Ray SGD will create *replicas* of your model onto each hardware device (GPU) allocated to workers (controlled by ``num_workers``). Multiple devices can be managed by each worker process (controlled by ``devices_per_worker``). Each model instance will be in a separate TF variable scope. The ``DistributedSGD`` class coordinates the distributed computation and application of gradients to improve the model. - -There are two distributed SGD strategies available for use: - - ``strategy="simple"``: Gradients are averaged centrally on the driver before being applied to each model replica. This is a reference implementation for debugging purposes. - - ``strategy="ps"``: Gradients are computed and averaged within each node. Gradients are then averaged across nodes through a number of parameter server actors. To pipeline the computation of gradients and transmission across the network, we use a custom TensorFlow op that can read and write to the Ray object store directly. - -Note that when ``num_workers=1``, only local allreduce will be used and the choice of distributed strategy is irrelevant. - -The full documentation for ``DistributedSGD`` is as follows: - -.. autoclass:: ray.experimental.sgd.DistributedSGD - -Examples --------- - -For examples of end-to-end usage, check out the `ImageNet synthetic data test `__ and also the simple `MNIST training example `__, which includes examples of how access the model weights and monitor accuracy as training progresses. - -Performance ------------ - -When using the new Ray backend (which will be enabled by default in Ray 0.6+), we `expect `__ performance competitive with other synchronous SGD implementations on 25Gbps Ethernet. - -.. figure:: sgd.png - :width: 756px - - Images per second reached when distributing the training of a ResNet-101 TensorFlow model (from the official TF benchmark). All experiments were run on p3.16xl instances connected by 25Gbps Ethernet, and workers allocated 4 GPUs per node as done in the Horovod benchmark. diff --git a/doc/source/index.rst b/doc/source/index.rst index a8efb7a53..574220ccb 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -108,7 +108,6 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin :caption: Other Libraries distributed_training.rst - distributed_sgd.rst pandas_on_ray.rst .. toctree:: diff --git a/python/ray/experimental/sgd/__init__.py b/python/ray/experimental/sgd/__init__.py deleted file mode 100644 index 005b3fff0..000000000 --- a/python/ray/experimental/sgd/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from ray.experimental.sgd.sgd import DistributedSGD -from ray.experimental.sgd.model import Model - -__all__ = [ - "DistributedSGD", - "Model", -] diff --git a/python/ray/experimental/sgd/mnist_example.py b/python/ray/experimental/sgd/mnist_example.py deleted file mode 100755 index 69abffa3c..000000000 --- a/python/ray/experimental/sgd/mnist_example.py +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/env python -"""Example of how to train a model with Ray SGD. - -We use a small model here, so no speedup for distributing the computation is -expected. This example shows: - - How to set up a simple input pipeline - - How to evaluate model accuracy during training - - How to get and set model weights - - How to train with ray.experimental.sgd.DistributedSGD -""" - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse -import time - -from tensorflow.examples.tutorials.mnist import input_data -import tensorflow as tf - -import ray -from ray.tune import run_experiments -from ray.tune.examples.tune_mnist_ray import deepnn -from ray.experimental.sgd.model import Model -from ray.experimental.sgd.sgd import DistributedSGD -import ray.experimental.tf_utils as ray_tf_utils - -parser = argparse.ArgumentParser() -parser.add_argument("--redis-address", default=None, type=str) -parser.add_argument("--num-iters", default=10000, type=int) -parser.add_argument("--batch-size", default=50, type=int) -parser.add_argument("--num-workers", default=1, type=int) -parser.add_argument("--devices-per-worker", default=1, type=int) -parser.add_argument("--tune", action="store_true", help="Run in Ray Tune") -parser.add_argument( - "--strategy", default="ps", type=str, help="One of 'simple' or 'ps'") -parser.add_argument( - "--gpu", action="store_true", help="Use GPUs for optimization") - - -class MNISTModel(Model): - def __init__(self): - # Import data - error = None - for _ in range(10): - try: - self.mnist = input_data.read_data_sets( - "/tmp/tensorflow/mnist/input_data", one_hot=True) - error = None - break - except Exception as e: - error = e - time.sleep(5) - if error: - raise ValueError("Failed to import data", error) - - # Set seed and build layers - tf.set_random_seed(0) - - self.x = tf.placeholder(tf.float32, [None, 784], name="x") - self.y_ = tf.placeholder(tf.float32, [None, 10], name="y_") - y_conv, self.keep_prob = deepnn(self.x) - - # Need to define loss and optimizer attributes - self.loss = tf.reduce_mean( - tf.nn.softmax_cross_entropy_with_logits( - labels=self.y_, logits=y_conv)) - self.optimizer = tf.train.AdamOptimizer(1e-4) - self.variables = ray_tf_utils.TensorFlowVariables( - self.loss, tf.get_default_session()) - - # For evaluating test accuracy - correct_prediction = tf.equal( - tf.argmax(y_conv, 1), tf.argmax(self.y_, 1)) - self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) - - def get_loss(self): - return self.loss - - def get_optimizer(self): - return self.optimizer - - def get_variables(self): - return self.variables - - def get_feed_dict(self): - batch = self.mnist.train.next_batch(50) - return { - self.x: batch[0], - self.y_: batch[1], - self.keep_prob: 0.5, - } - - def get_metrics(self): - accuracy = self.accuracy.eval( - feed_dict={ - self.x: self.mnist.test.images, - self.y_: self.mnist.test.labels, - self.keep_prob: 1.0, - }) - return {"accuracy": accuracy} - - def get_weights(self): - return self.variables.get_flat() - - def set_weights(self, weights): - self.variables.set_flat(weights) - - -def train_mnist(config, reporter): - args = config["args"] - sgd = DistributedSGD( - lambda w_i, d_i: MNISTModel(), - num_workers=args.num_workers, - devices_per_worker=args.devices_per_worker, - gpu=args.gpu, - strategy=args.strategy) - - # Important: synchronize the initial weights of all model replicas - w0 = sgd.for_model(lambda m: m.get_variables().get_flat()) - sgd.foreach_model(lambda m: m.get_variables().set_flat(w0)) - - for i in range(args.num_iters): - if i % 10 == 0: - start = time.time() - loss = sgd.step(fetch_stats=True)["loss"] - metrics = sgd.foreach_model(lambda model: model.get_metrics()) - acc = [m["accuracy"] for m in metrics] - print("Iter", i, "loss", loss, "accuracy", acc) - print("Time per iteration", time.time() - start) - assert len(set(acc)) == 1, ("Models out of sync", acc) - reporter(timesteps_total=i, mean_loss=loss, mean_accuracy=acc[0]) - else: - sgd.step() - - -if __name__ == "__main__": - args = parser.parse_args() - ray.init(redis_address=args.redis_address) - - if args.tune: - run_experiments({ - "mnist_sgd": { - "run": train_mnist, - "config": { - "args": args, - }, - }, - }) - else: - train_mnist({"args": args}, lambda **kw: None) diff --git a/python/ray/experimental/sgd/model.py b/python/ray/experimental/sgd/model.py deleted file mode 100644 index 2777d1cfe..000000000 --- a/python/ray/experimental/sgd/model.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - - -class Model(object): - """Your class must implement this interface to be used with Ray SGD. - - This supports any form of input pipeline: it is up to you to define it - using TensorFlow. - For an example implementation, see tfbench/test_model.py - """ - - def get_loss(self): - """Return loss of the model - - Returns: - loss - """ - raise NotImplementedError( - "get_loss of %s is not implemented" % self.__class__.__name__) - - # TODO support complex way of updating gradient, - # e.g. using different optimizers - def get_optimizer(self): - """Return optimizer for the model - - Returns: - optimizer - """ - raise NotImplementedError( - "get_optimizer of %s is not implemented" % self.__class__.__name__) - - def get_metrics(self): - """Return metrics of the model - - Returns: - metrics(dict): e.g. {"accuracy": accuracy(numpy data)} - """ - return {} - - def get_feed_dict(self): - """Extra values to pass in when computing gradients for the loss. - - Returns: - TensorFlow feed_dict to add to the gradient operation. - """ - return {} - - def get_weights(self): - """Return weights from the model. - - Implementing `get_weights` is required for checkpointing and fault - tolerance. - - Returns: - Numpy array of weights from the model. - """ - raise NotImplementedError( - "get_weights of %s is not implemented" % self.__class__.__name__) - - def set_weights(self, weights): - """Sets the model weights. - - Implementing `set_weights` is required for checkpointing and fault - tolerance. - - Args: - weights: numpy array of weights for the model. - """ - raise NotImplementedError( - "set_weights of %s is not implemented" % self.__class__.__name__) diff --git a/python/ray/experimental/sgd/modified_allreduce.py b/python/ray/experimental/sgd/modified_allreduce.py deleted file mode 100644 index 7e224a769..000000000 --- a/python/ray/experimental/sgd/modified_allreduce.py +++ /dev/null @@ -1,637 +0,0 @@ -# This file is adapted from https://github.com/tensorflow/benchmarks -# /blob/master/scripts/tf_cnn_benchmarks/allreduce.py -# -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Utilities for allreduce.""" - -from __future__ import print_function - -import collections as pycoll -import logging -import numpy as np -import re - -from six.moves import xrange # pylint: disable=redefined-builtin -import tensorflow as tf - -from tensorflow.contrib.all_reduce.python import all_reduce - -logger = logging.getLogger(__name__) - -AllReduceSpecTuple = pycoll.namedtuple("AllReduceSpecTuple", - "alg shards limit") - - -def parse_general_int(s): - """Parse integer with power-of-2 suffix eg. 32k.""" - mo = re.match(r"(\d+)([KkMGT]?)$", s) - if mo: - i, suffix = mo.group(1, 2) - v = int(i) - if suffix: - if suffix == "K" or suffix == "k": - v *= 1024 - elif suffix == "M": - v *= (1024 * 1024) - elif suffix == "G": - v *= (1024 * 1024 * 1024) - elif suffix == "T": - v *= (1024 * 1024 * 1024 * 1024) - else: - raise ValueError("invalid integer string %s" % s) - return v - else: - v = int(s) - return v - - -def parse_all_reduce_spec(all_reduce_spec): - """Parse all_reduce_spec. - - Args: - all_reduce_spec: a string specifying a combination of all-reduce - algorithms to apply for gradient reduction. - - Returns: - a list of AllReduceSpecTuple. - - Raises: - ValueError: all_reduce_spec is not well-formed. - - An all_reduce_spec has BNF form: - int ::= positive whole number - g_int ::= int[KkMGT]? - alg_spec ::= alg | alg#int - range_spec ::= alg_spec | alg_spec/alg_spec - spec ::= range_spec | range_spec:g_int:range_spec - - Not all syntactically correct specifications are supported. - Examples of supported all_reduce_spec strings, with semantics explained: - - "xring" == apply ring all-reduce to all tensors - "xring#2" == apply ring all-reduce to all tensors, using two simultaneous - transfer rings, each operating on 1/2 of each tensor. - "nccl" == apply NCCL all-reduce to all tensors (only works within - a single worker process where all devices are GPUs) - "nccl/xring" == apply NCCL all-reduce to all tensors within each worker - to produce at least one full-reduced (locally) value, - then apply ring all-reduce to one such value from each - worker, then apply NCCL broadcast to propagate those globally - reduced values back to every device within each worker. - "pscpu" == Shuffle reduce using worker CPUs as the gather devices: each - distributed tensor is reduced by copying all instances to - one of the worker CPUs, computing the reduction there, then - copying back to each participating device. Tensor reductions - are assigned to specific CPUs round-robin. - "psgpu#4" == Arrange all GPUs across all workers into groups of 4. - Each distributed tensor is shuffle reduced against one - such group of 4 GPUs, selected round-robin. That is, each - tensor is split across 4 shards for the reduction. - "pscpu:2k:pscpu#2:64k:xring" == Apply single-shard pscpu to - tensors of size <= 2048 elements, apply 2-shard pscpu to - tensors up to size 64k elements, apply xring to larger tensors. - "pscpu/pscpu#2" == Use shuffle gather to locally reduce each tensor on - the worker's CPU, then use 2-shard shuffle to reduce those - locally reduced tensors across workers (on the worker CPUs), then - scatter the globally reduced values locally from each worker CPU. - """ - range_parts = all_reduce_spec.split(":") + ["-1"] - if len(range_parts) % 2: - raise ValueError( - "all_reduce_spec not well formed: %s" % all_reduce_spec) - limit = 0 - spec = [] - alg = None - shards = 1 - for i, range_part in enumerate(range_parts): - if i % 2 == 1: - try: - limit = parse_general_int(range_part) - spec.append( - AllReduceSpecTuple(alg=alg, shards=shards, limit=limit)) - except ValueError: - raise ValueError( - "all_reduce_spec (%s) contains non-integer range %s" % - (all_reduce_spec, range_part)) - else: - alg = range_part - alg_parts = range_part.split("#") - alg = alg_parts[0] - if len(alg_parts) > 1: - try: - shards = int(alg_parts[1]) - except ValueError: - raise ValueError( - "all_reduce_spec (%s) contains non-integer " - "shards %s" % all_reduce_spec, alg_parts[1]) - else: - shards = 1 - if alg not in [ - "nccl", "nccl/xring", "nccl/rechd", "nccl/pscpu", "xring", - "pscpu", "psgpu", "pscpu/pscpu" - ]: - raise ValueError("all_reduce_spec (%s) contains invalid alg %s" - % (all_reduce_spec, alg)) - return spec - - -def build_all_reduce_device_prefixes(job_name, num_tasks): - """Build list of device prefix names for all_reduce. - - Args: - job_name: "worker", "ps" or "localhost". - num_tasks: number of jobs across which device names should be generated. - - Returns: - A list of device name prefix strings. Each element spells out the full - host name without adding the device. - e.g. "/job:worker/task:0" - """ - if job_name != "localhost": - return ["/job:%s/task:%d" % (job_name, d) for d in range(0, num_tasks)] - else: - assert num_tasks == 1 - return ["/job:%s" % job_name] - - -def group_device_names(devices, group_size): - """Group device names into groups of group_size. - - Args: - devices: list of strings naming devices. - group_size: int >= 1 - - Returns: - list of lists of devices, where each inner list is group_size long, - and each device appears at least once in an inner list. If - len(devices) % group_size = 0 then each device will appear - exactly once. - - Raises: - ValueError: group_size > len(devices) - """ - num_devices = len(devices) - if group_size > num_devices: - raise ValueError( - "only %d devices, but group_size=%d" % (num_devices, group_size)) - num_groups = ( - num_devices // group_size + (1 if - (num_devices % group_size != 0) else 0)) - groups = [[] for i in range(num_groups)] - for i in range(0, num_groups * group_size): - groups[i % num_groups].append(devices[i % num_devices]) - return groups - - -def split_grads_by_size(threshold_size, device_grads): - """Break gradients into two sets according to tensor size. - - Args: - threshold_size: int size cutoff for small vs large tensor. - device_grads: List of lists of (gradient, variable) tuples. The outer - list is over devices. The inner list is over individual gradients. - - Returns: - small_grads: Subset of device_grads where shape is <= theshold_size - elements. - large_grads: Subset of device_grads where shape is > threshold_size - elements. - """ - small_grads = [] - large_grads = [] - for dl in device_grads: - small_dl = [] - large_dl = [] - for (g, v) in dl: - tensor_size = g.get_shape().num_elements() - if tensor_size <= threshold_size: - small_dl.append([g, v]) - else: - large_dl.append([g, v]) - if small_dl: - small_grads.append(small_dl) - if large_dl: - large_grads.append(large_dl) - return small_grads, large_grads - - -def build_reduce_sum(scaled_grads): - stacked = tf.parallel_stack(values=scaled_grads) - reduced = tf.reduce_sum(stacked, 0) - return [reduced] * len(scaled_grads) - - -def build_trivial_sum(scaled_grads): - return scaled_grads - - -def aggregate_single_gradient(grad_and_vars, use_mean, check_inf_nan): - """Calculate the average gradient for a shared variable across all towers. - - Note that this function provides a synchronization point across all towers. - - Args: - grad_and_vars: A list or tuple of (gradient, variable) tuples. Each - (gradient, variable) pair within the outer list represents the gradient - of the variable calculated for a single tower, and the number of pairs - equals the number of towers. - use_mean: if True, mean is taken, else sum of gradients is taken. - check_inf_nan: check grads for nans and infs. - - Returns: - The tuple ([(average_gradient, variable),], has_nan_or_inf) where the - gradient has been averaged across all towers. The variable is chosen from - the first tower. The has_nan_or_inf indicates the grads has nan or inf. - """ - grads = [g for g, _ in grad_and_vars] - grad = tf.add_n(grads) - - if use_mean and len(grads) > 1: - grad = tf.multiply(grad, 1.0 / len(grads)) - - v = grad_and_vars[0][1] - if check_inf_nan: - has_nan_or_inf = tf.logical_not(tf.reduce_all(tf.is_finite(grads))) - return (grad, v), has_nan_or_inf - else: - return (grad, v), None - - -def aggregate_gradients_using_copy_with_device_selection( - tower_grads, avail_devices, use_mean=True, check_inf_nan=False): - """Aggregate gradients, controlling device for the aggregation. - - Args: - tower_grads: List of lists of (gradient, variable) tuples. The outer list - is over towers. The inner list is over individual gradients. - use_mean: if True, mean is taken, else sum of gradients is taken. - check_inf_nan: If true, check grads for nans and infs. - - Returns: - The tuple ([(average_gradient, variable),], has_nan_or_inf) where the - gradient has been averaged across all towers. The variable is chosen from - the first tower. The has_nan_or_inf indicates the grads has nan or inf. - """ - agg_grads = [] - has_nan_or_inf_list = [] - for i, single_grads in enumerate(zip(*tower_grads)): - with tf.device(avail_devices[i % len(avail_devices)]): - grad_and_var, has_nan_or_inf = aggregate_single_gradient( - single_grads, use_mean, check_inf_nan) - agg_grads.append(grad_and_var) - has_nan_or_inf_list.append(has_nan_or_inf) - return agg_grads - - -def sum_grad_and_var_all_reduce(grad_and_vars, - num_workers, - alg, - gpu_indices, - aux_devices=None, - num_shards=1): - """Apply all-reduce algorithm over specified gradient tensors.""" - with tf.name_scope("allreduce"): - # Note that each grad_and_vars looks like the following: - # ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN)) - scaled_grads = [g for g, _ in grad_and_vars] - if alg == "nccl": - from tensorflow.python.ops import nccl_ops - summed_grads = nccl_ops.all_sum(scaled_grads) - elif alg == "simple": - summed_grads = build_reduce_sum(scaled_grads) - elif alg == "trivial": - summed_grads = build_trivial_sum(scaled_grads) - elif alg == "xring": - summed_grads = all_reduce.build_ring_all_reduce( - scaled_grads, num_workers, num_shards, gpu_indices, tf.add) - elif alg == "nccl/xring": - summed_grads = all_reduce.build_nccl_then_ring( - scaled_grads, num_shards, tf.add) - elif alg == "nccl/rechd": - summed_grads = all_reduce.build_nccl_then_recursive_hd( - scaled_grads, tf.add) - elif alg == "nccl/pscpu": - summed_grads = all_reduce.build_nccl_then_shuffle( - scaled_grads, aux_devices, tf.add, tf.add_n) - elif alg == "pscpu/pscpu": - summed_grads = all_reduce.build_shuffle_then_shuffle( - scaled_grads, - aux_devices, - # TODO(tucker): devise a way of better specifying the device - # for the second level. - [aux_devices[0]], - tf.add_n) - elif alg in ["pscpu", "psgpu"]: - summed_grads = all_reduce.build_shuffle_all_reduce( - scaled_grads, aux_devices, tf.add_n) - else: - raise ValueError("unsupported all_reduce alg: ", alg) - - result = [] - for (_, v), g in zip(grad_and_vars, summed_grads): - result.append([g, v]) - return result - - -def contains_any(haystack, needles): - """Tests if any needle is a substring of haystack. - - Args: - haystack: a string - needles: list of strings - - Returns: - True if any element of needles is a substring of haystack, - False otherwise. - """ - for n in needles: - if n in haystack: - return True - return False - - -def sum_gradients_all_reduce(dev_prefixes, - tower_grads, - num_workers, - alg, - num_shards, - gpu_indices, - agg_small_grads_max_bytes=0): - """Apply all-reduce algorithm over specified gradient tensors. - - Args: - dev_prefixes: list of prefix strings to use to generate PS device names. - tower_grads: the gradients to reduce. - num_workers: number of worker processes across entire job. - alg: the all-reduce algorithm to apply. - num_shards: alg-specific sharding factor. - gpu_indices: indices of local GPUs in order usable for ring-reduce. - agg_small_grads_max_bytes: largest tensor eligible for aggregation, - in number of bytes. - - Returns: - list of reduced tensors, packing values - """ - alg_contains_shuffle = contains_any(alg, ["pscpu", "psgpu"]) - is_hierarchical = "/" in alg - if "pscpu" in alg: - aux_devices = [prefix + "/cpu:0" for prefix in dev_prefixes] - elif "psgpu" in alg: - aux_devices = [ - prefix + "/gpu:%d" % i for i in range(len(gpu_indices)) - for prefix in dev_prefixes - ] - else: - aux_devices = ["/job:localhost/cpu:0"] - aux_device_groups = group_device_names( - aux_devices, num_shards if alg_contains_shuffle else 1) - group_index = 0 - if agg_small_grads_max_bytes > 0: - tower_grads, packing = pack_small_tensors( - tower_grads, max_bytes=agg_small_grads_max_bytes) - - else: - packing = None - new_tower_grads = [] - if alg == "better": - raw_devices = ["/gpu:%i" % (i) for i in gpu_indices] - agg_grads = aggregate_gradients_using_copy_with_device_selection( - tower_grads, raw_devices) - for arr in tower_grads: - new_tower_grads.append( - [(g, v) for (_, v), (g, _) in zip(arr, agg_grads)]) - else: - reduced_gv_list = [] - for grad_and_vars in zip(*tower_grads): - reduced_gv_list.append( - sum_grad_and_var_all_reduce( - grad_and_vars, num_workers, alg, gpu_indices, aux_devices - if is_hierarchical else aux_device_groups[group_index], - num_shards)) - group_index = (group_index + 1) % len(aux_device_groups) - new_tower_grads = [list(x) for x in zip(*reduced_gv_list)] - return new_tower_grads, packing - - -def print_stats(sizes): - def sizeof_fmt(num, suffix="B"): - for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: - if abs(num) < 1024.0: - return "%3.1f%s%s" % (num, unit, suffix) - num /= 1024.0 - return "%.1f%s%s" % (num, "Yi", suffix) - - stats = { - "avg": np.mean(sizes), - "median": np.median(sizes), - "total size": np.sum(sizes) - } - logger.info("Stats " + ", ".join( - ["%s: %s" % (k, sizeof_fmt(v)) for k, v in stats.items()])) - other_stats = {"len": len(sizes)} - logger.info(", ".join(["%s: %f" % (k, v) for k, v in other_stats.items()])) - - -def extract_ranges(index_list, range_size_limit=32): - """Extract consecutive ranges and singles from index_list. - - Args: - index_list: List of monotone increasing non-negative integers. - range_size_limit: Largest size range to return. If a larger - consecutive range exists it will be returned as multiple - ranges. - - Returns: - ranges, singles where ranges is a list of [first, last] pairs of - consecutive elements in index_list, and singles is all of the - other elements, in original order. - """ - if not index_list: - return [], [] - first = index_list[0] - last = first - ranges = [] - singles = [] - for i in index_list[1:]: - if i == last + 1 and (last - first) <= range_size_limit: - last = i - else: - if last > first: - ranges.append([first, last]) - else: - singles.append(first) - first = i - last = i - if last > first: - ranges.append([first, last]) - else: - singles.append(first) - return ranges, singles - - -GradPackTuple = pycoll.namedtuple("GradPackTuple", "indices vars shapes") - - -def pack_range(key, packing, grad_vars, rng): - """Form the concatenation of a specified range of gradient tensors. - - Args: - key: Value under which to store meta-data in packing that will be used - later to restore the grad_var list structure. - packing: Dict holding data describing packed ranges of small tensors. - grad_vars: List of (grad, var) pairs for one tower. - rng: A pair of integers giving the first, last indices of a consecutive - range of tensors to be packed. - - Returns: - A tensor that is the concatenation of all the specified small tensors. - """ - to_pack = grad_vars[rng[0]:rng[1] + 1] - members = [] - variables = [] - restore_shapes = [] - with tf.name_scope("pack"): - for g, v in to_pack: - variables.append(v) - restore_shapes.append(g.shape) - with tf.device(g.device): - members.append(tf.reshape(g, [-1])) - packing[key] = GradPackTuple( - indices=range(rng[0], rng[1] + 1), - vars=variables, - shapes=restore_shapes) - with tf.device(members[0].device): - return tf.concat(members, 0) - - -def unpack_grad_tuple(gv, gpt): - """Unpack a previously packed collection of gradient tensors. - - Args: - gv: A (grad, var) pair to be unpacked. - gpt: A GradPackTuple describing the packing operation that produced gv. - - Returns: - A list of (grad, var) pairs corresponding to the values that were - originally packed into gv, maybe following subsequent operations like - reduction. - """ - elt_widths = [x.num_elements() for x in gpt.shapes] - with tf.device(gv[0][0].device): - with tf.name_scope("unpack"): - splits = tf.split(gv[0], elt_widths) - unpacked_gv = [] - for idx, s in enumerate(splits): - unpacked_gv.append((tf.reshape(s, gpt.shapes[idx]), - gpt.vars[idx])) - return unpacked_gv - - -def pack_small_tensors(tower_grads, max_bytes=0): - """Concatenate gradients together more intelligently. - - Does binpacking - Args: - tower_grads: List of lists of (gradient, variable) tuples. - max_bytes: Int giving max number of bytes in a tensor that - may be considered small. - """ - assert max_bytes >= 0 - orig_grads = [g for g, _ in tower_grads[0]] - # Check to make sure sizes are accurate; not entirely important - assert all(g.dtype == tf.float32 for g in orig_grads) - sizes = [4 * g.shape.num_elements() for g in orig_grads] - print_stats(sizes) - small_ranges = [] - large_indices = [] - new_sizes = [] - - def end_interval(indices, small_ranges, large_indices): - if len(indices) > 1: - small_ranges.insert(0, [indices[0], indices[-1]]) - else: - large_indices.insert(0, indices[0]) - - cur_range = [] - cur_size = 0 - for i, s in reversed(list(enumerate(sizes))): - if cur_size > max_bytes: - end_interval(cur_range, small_ranges, large_indices) - new_sizes.insert(0, cur_size) - cur_range = [] - cur_size = 0 - cur_range.insert(0, i) - cur_size += s - end_interval(cur_range, small_ranges, large_indices) - new_sizes.insert(0, cur_size) - - print_stats(new_sizes) - num_gv = len(orig_grads) - packing = {} - if len(small_ranges): - new_tower_grads = [] - for dev_idx, gv_list in enumerate(tower_grads): - assert len(gv_list) == num_gv, ( - "Possible cause: " - "Networks constructed on different workers " - "don't have the same number of variables. " - "If you use tf.GraphKeys or tf.global_variables() " - "with multiple graphs per worker during network " - "construction, you need to use " - "appropriate scopes, see " - "https://github.com/ray-project/ray/issues/3136") - new_gv_list = [] - for r in small_ranges: - key = "%d:%d" % (dev_idx, len(new_gv_list)) - new_gv_list.append((pack_range(key, packing, gv_list, r), - "packing_var_placeholder")) - for i in large_indices: - new_gv_list.append(gv_list[i]) - new_tower_grads.append(new_gv_list) - return new_tower_grads, packing - else: - return tower_grads, None - - -def unpack_small_tensors(tower_grads, packing): - """Undo the structure alterations to tower_grads done by pack_small_tensors. - - Args: - tower_grads: List of List of (grad, var) tuples. - packing: A dict generated by pack_small_tensors describing the changes - it made to tower_grads. - - Returns: - new_tower_grads: identical to tower_grads except that concatentations - of small tensors have been split apart and returned to their original - positions, paired with their original variables. - """ - if not packing: - return tower_grads - new_tower_grads = [] - num_devices = len(tower_grads) - num_packed = len(packing.keys()) // num_devices - for dev_idx, gv_list in enumerate(tower_grads): - new_gv_list = gv_list[num_packed:] - for i in xrange(0, num_packed): - k = "%d:%d" % (dev_idx, i) - gpt = packing[k] - gv = unpack_grad_tuple(gv_list[i], gpt) - for gi, idx in enumerate(gpt.indices): - assert idx == gpt.indices[gi] - new_gv_list.insert(idx, gv[gi]) - new_tower_grads.append(new_gv_list) - return new_tower_grads diff --git a/python/ray/experimental/sgd/param_server.py b/python/ray/experimental/sgd/param_server.py deleted file mode 100644 index a69727722..000000000 --- a/python/ray/experimental/sgd/param_server.py +++ /dev/null @@ -1,83 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import logging - -import numpy as np - -import ray -from ray.experimental.sgd.util import Timeline, fetch, warmup - -logger = logging.getLogger(__name__) - - -@ray.remote(num_cpus=0) -class ParameterServer(object): - """Helper class for ray.experimental.sgd.DistributedSGD.""" - - def __init__(self, num_workers, tid): - self.num_sgd_workers = num_workers - self.acc_counter = 0 - self.timeline = Timeline(tid) - # TODO(ekl) get this to work again so we get ray events - # self.timeline.patch_ray() - - def initialize(self, shard_shape): - """Resets the gradient buffer to zeros.""" - self.accumulated = np.zeros(shard_shape, dtype=np.float32) - - def prefetch(self, oids): - """Tell plasma to prefetch the given object ids over the network.""" - self.timeline.reset() - self.timeline.start("prefetch") - fetch(oids) - self.timeline.end("prefetch") - - def add_spinwait(self, grad_shard_ids): - """Optimized version of add() that operates on multiple grads.""" - self.timeline.start("add_spinwait") - plasma_ids = [ray.pyarrow.plasma.ObjectID(x) for x in grad_shard_ids] - while plasma_ids: - for p in plasma_ids: - if ray.worker.global_worker.plasma_client.contains(p): - self.timeline.start("get_buffers") - grads = ray.worker.global_worker.plasma_client.get(p) - self.accumulated += grads - self.acc_counter += 1 - self.timeline.end("get_buffers") - plasma_ids.remove(p) - break - self.timeline.end("add_spinwait") - - def add(self, grad_shard_id): - """Add the given gradient value to the accumulated gradients.""" - self.timeline.start("add") - self.timeline.start("get_buffers") - oid = ray.pyarrow.plasma.ObjectID(grad_shard_id) - grads = ray.worker.global_worker.plasma_client.get(oid) - self.timeline.end("get_buffers") - self.accumulated += grads - self.acc_counter += 1 - self.timeline.end("add") - - def get(self, object_id): - """Put the accumulated gradients to the given object id.""" - self.timeline.start("get") - client = ray.worker.global_worker.plasma_client - assert self.acc_counter == self.num_sgd_workers, self.acc_counter - oid = ray.pyarrow.plasma.ObjectID(object_id) - self.accumulated /= self.acc_counter - client.put(self.accumulated.flatten(), object_id=oid) - self.accumulated = np.zeros_like(self.accumulated) - self.acc_counter = 0 - self.timeline.end("get") - - def get_timeline(self): - return self.timeline - - def ip(self): - return ray.services.get_node_ip_address() - - def warmup(self): - warmup() diff --git a/python/ray/experimental/sgd/sgd.py b/python/ray/experimental/sgd/sgd.py deleted file mode 100644 index 15cce7152..000000000 --- a/python/ray/experimental/sgd/sgd.py +++ /dev/null @@ -1,274 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import logging -import os -import random -import time - -import numpy as np - -import ray -from ray.experimental.sgd.sgd_worker import SGDWorker -from ray.experimental.sgd.param_server import ParameterServer - -logger = logging.getLogger(__name__) - - -class DistributedSGD(object): - """Experimental distributed SGD implementation in Ray. - - This supports two modes: - 'simple': centralized gradient aggregation - 'ps': sharded parameter-server implementation - - To use this class, you'll have to implement model.py:Model. - - Arguments: - model_creator (func): Function that returns a model given worker and - device indexes as arguments. Each model replica will be created - within its own variable scope. - num_workers (int): Number of Ray actors to use for SGD. - devices_per_worker (int): Number of GPU or CPU devices to use per - worker. One model replica will be created per device. - gpu (bool): Whether to use GPU devices. - strategy (str): Strategy to use for distributed gradient aggregation. - This only applies if num_workers > 1. - grad_shard_bytes (int): Fuse gradient tensors into chunks of at most - this size (if applicable). - all_reduce_alg (str): TensorFlow strategy to use for gradient - synchronization within the same worker (if applicable). - See modified_allreduce.py for options. - - Examples: - >>> # Setup distributed SGD - >>> model_creator = ( - ... lambda worker_idx, device_idx: YourModelClass(...)) - >>> sgd = DistributedSGD( - ... model_creator, num_workers=2, - ... devices_per_worker=4, gpu=True, strategy="ps") - - >>> # To train - >>> for i in range(100): - ... stats = sgd.step(fetch_stats=i % 10 == 0) - - >>> # To access or update model state - >>> sgd.foreach_model(lambda model: ...) - - >>> # To access or update worker state - >>> sgd.foreach_worker(lambda worker: ...) - """ - - def __init__(self, - model_creator, - num_workers, - devices_per_worker, - gpu=True, - strategy="ps", - grad_shard_bytes=10000000, - all_reduce_alg="simple"): - - if num_workers == 1 and strategy == "ps": - logger.warning( - "The parameter server strategy does not make sense for single " - "worker operation, falling back to simple mode.") - strategy = "simple" - - if strategy == "ps": - use_plasma_op = True - elif strategy == "simple": - use_plasma_op = False - grad_shard_bytes = 0 # tensor fusion doesn't make sense - else: - raise ValueError("strategy must be one of 'ps', 'simple'") - self.strategy = strategy - - self.model_creator = model_creator - if gpu: - requests = {"num_gpus": devices_per_worker} - else: - requests = {"num_cpus": devices_per_worker} - - RemoteSGDWorker = ray.remote(**requests)(SGDWorker) - self.workers = [] - logger.info( - "Creating SGD workers ({} total, {} devices per worker)".format( - num_workers, devices_per_worker)) - for worker_index in range(num_workers): - self.workers.append( - RemoteSGDWorker.remote( - worker_index, - model_creator, - num_devices=devices_per_worker, - plasma_op=use_plasma_op, - gpu=gpu, - max_bytes=grad_shard_bytes, - all_reduce_alg=all_reduce_alg)) - - logger.info("Waiting for gradient configuration") - shard_shapes = ray.get(self.workers[0].shard_shapes.remote()) - - logger.info("Waiting for actors to start") - ray.get([w.shard_shapes.remote() for w in self.workers]) - - if strategy == "ps": - logger.info("Starting parameter servers ({} shards)".format( - len(shard_shapes))) - self.ps_list = [ - ParameterServer.remote(len(self.workers), i) - for i, s in enumerate(shard_shapes) - ] - ray.get([ - ps.initialize.remote(s) - for ps, s in zip(self.ps_list, shard_shapes) - ]) - logger.info("Parameter servers started") - else: - self.ps_list = [] - - def foreach_worker(self, fn): - """Apply the given function to each remote worker. - - Returns: - List of results from applying the function. - """ - results = ray.get([w.foreach_worker.remote(fn) for w in self.workers]) - return results - - def foreach_model(self, fn): - """Apply the given function to each model replica in each worker. - - Returns: - List of results from applying the function. - """ - - results = ray.get([w.foreach_model.remote(fn) for w in self.workers]) - out = [] - for r in results: - out.extend(r) - return out - - def for_model(self, fn): - """Apply the given function to a single model replica. - - Returns: - Result from applying the function. - """ - return ray.get(self.workers[0].for_model.remote(fn)) - - def step(self, fetch_stats=False): - """Run a single SGD step. - - Arguments: - fetch_stats (bool): Whether to return stats from the step. This can - slow down the computation by acting as a global barrier. - """ - if self.strategy == "ps": - return _distributed_sgd_step( - self.workers, - self.ps_list, - write_timeline=False, - fetch_stats=fetch_stats) - else: - return _simple_sgd_step(self.workers) - - def warmup(self): - logger.info("Warming up object store of worker actors") - ray.get([w.warmup.remote() for w in self.workers]) - logger.info("Warmup complete") - - def save_checkpoint(self, path): - w0 = self.for_model(lambda m: m.get_weights()) - filename = os.path.join(path, "model.npy") - np.save(filename, w0) - - def restore_checkpoint(self, path): - filename = os.path.join(path, "model.npy") - w0 = np.load(filename) - self.foreach_model(lambda m: m.set_weights(w0)) - - -def _average_gradients(grads): - out = [] - for grad_list in zip(*grads): - out.append(np.mean(grad_list, axis=0)) - return out - - -def _simple_sgd_step(actors): - if len(actors) == 1: - return {"loss": ray.get(actors[0].compute_apply.remote())} - - start = time.time() - fetches = ray.get([a.compute_gradients.remote() for a in actors]) - losses = [f[0] for f in fetches] - grads = [f[1] for f in fetches] - logger.debug("compute all grads time {}".format(time.time() - start)) - start = time.time() - if len(actors) == 1: - assert len(grads) == 1 - avg_grad = grads[0] - else: - avg_grad = _average_gradients(grads) - logger.debug("grad reduce time {}".format(time.time() - start)) - start = time.time() - ray.get([a.apply_gradients.remote(avg_grad) for a in actors]) - logger.debug("apply all grads time {}".format(time.time() - start)) - return {"loss": np.mean(losses)} - - -def _distributed_sgd_step(actors, ps_list, fetch_stats, write_timeline): - # Preallocate object ids that actors will write gradient shards to - grad_shard_oids_list = [[np.random.bytes(20) for _ in ps_list] - for _ in actors] - logger.debug("Generated grad oids") - - # Preallocate object ids that param servers will write new weights to - accum_shard_ids = [np.random.bytes(20) for _ in ps_list] - logger.debug("Generated accum oids") - - # Kick off the fused compute grad / update weights tf run for each actor - losses = [] - for actor, grad_shard_oids in zip(actors, grad_shard_oids_list): - losses.append( - actor.ps_compute_apply.remote( - grad_shard_oids, - accum_shard_ids, - write_timeline=write_timeline)) - logger.debug("Launched all ps_compute_applys on all actors") - - # Issue prefetch ops - for j, (ps, weight_shard_oid) in list( - enumerate(zip(ps_list, accum_shard_ids)))[::-1]: - to_fetch = [] - for grad_shard_oids in grad_shard_oids_list: - to_fetch.append(grad_shard_oids[j]) - random.shuffle(to_fetch) - ps.prefetch.remote(to_fetch) - logger.debug("Launched all prefetch ops") - - # Aggregate the gradients produced by the actors. These operations - # run concurrently with the actor methods above. - ps_gets = [] - for j, (ps, weight_shard_oid) in list( - enumerate(zip(ps_list, accum_shard_ids)))[::-1]: - ps.add_spinwait.remote([gs[j] for gs in grad_shard_oids_list]) - ps_gets.append(ps.get.remote(weight_shard_oid)) - logger.debug("Launched all aggregate ops") - - if write_timeline: - timelines = [ps.get_timeline.remote() for ps in ps_list] - logger.debug("Launched timeline gets") - timelines = ray.get(timelines) - t0 = timelines[0] - for t in timelines[1:]: - t0.merge(t) - t0.chrome_trace_format("ps_timeline.json") - else: - # Wait for at least the ps gets to finish - ray.get(ps_gets) - if fetch_stats: - return {"loss": np.mean(ray.get(losses))} - else: - return None diff --git a/python/ray/experimental/sgd/sgd_worker.py b/python/ray/experimental/sgd/sgd_worker.py deleted file mode 100644 index 27c14f9dd..000000000 --- a/python/ray/experimental/sgd/sgd_worker.py +++ /dev/null @@ -1,265 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import logging -import time - -import pyarrow.plasma as plasma -import tensorflow as tf - -import ray -from ray.experimental.sgd.util import (ensure_plasma_tensorflow_op, fetch, - run_timeline, warmup) -from ray.experimental.sgd.modified_allreduce import (sum_gradients_all_reduce, - unpack_small_tensors) - -logger = logging.getLogger(__name__) - - -class SGDWorker(object): - """Helper class for ray.experimental.sgd.DistributedSGD.""" - - def __init__(self, - worker_index, - model_creator, - all_reduce_alg="simple", - num_devices=1, - gpu=False, - max_bytes=10000000, - plasma_op=False): - self.worker_index = worker_index - assert num_devices > 0 - - # TODO(ekl) support custom session - tf_session_args = { - "device_count": { - "CPU": num_devices - }, - "log_device_placement": False, - "gpu_options": tf.GPUOptions(force_gpu_compatible=True), - "inter_op_parallelism_threads": 128, - } - config_proto = tf.ConfigProto(**tf_session_args) - self.sess = tf.Session(config=config_proto) - self.models = [] - grad_ops = [] - - if gpu: - device_tmpl = "/gpu:%d" - else: - device_tmpl = "/cpu:%d" - with self.sess.as_default(): - for device_idx in range(num_devices): - device = device_tmpl % device_idx - with tf.device(device): - with tf.variable_scope("device_%d" % device_idx): - model = model_creator(worker_index, device_idx) - self.models.append(model) - optimizer = model.get_optimizer() - loss = model.get_loss() - grads = [ - t for t in optimizer.compute_gradients(loss) - if t[0] is not None - ] - grad_ops.append(grads) - - if num_devices == 1: - if max_bytes: - raise ValueError( - "Implementation limitation: grad_shard_bytes > 0 " - "({}) currently requires > 1 device".format(max_bytes)) - self.packed_grads_and_vars = grad_ops - else: - if max_bytes: - self.packed_grads_and_vars, packing_vals = ( - sum_gradients_all_reduce( - "", - grad_ops, - 1, - all_reduce_alg, - 1, - list(range(num_devices)), - agg_small_grads_max_bytes=max_bytes)) - else: - self.packed_grads_and_vars, _ = (sum_gradients_all_reduce( - "", - grad_ops, - 1, - all_reduce_alg, - 1, - list(range(num_devices)), - agg_small_grads_max_bytes=0)) - self.per_device_grads = [ - list(zip(*dev_gv))[0] for dev_gv in self.packed_grads_and_vars - ] - assert (len(self.per_device_grads) == num_devices) - self.num_grads = num_grads = len(self.packed_grads_and_vars[0]) - if max_bytes: - logger.info("Packed grads => {} tensors".format(num_grads)) - - # Ops for reading grads with the right control deps - nccl_noops = [] - for j in range(num_grads)[::-1]: - deps = nccl_noops + [ - dev_grad[j] for dev_grad in self.per_device_grads - ] - with tf.control_dependencies(deps): - nccl_noops = [tf.no_op()] - - # You must fetch this otherwise the NCCL allreduce will hang - self.nccl_control_out = tf.group(*nccl_noops) - - if plasma_op: - store_socket = ( - ray.worker.global_worker.plasma_client.store_socket_name) - ensure_plasma_tensorflow_op() - - # For fetching grads -> plasma - self.plasma_in_grads = [] - self.plasma_in_grads_oids = [ - tf.placeholder(shape=[], dtype=tf.string, name="in_grad_oids") - for _ in range(num_grads) - ] - for j in range(num_grads): - grad = self.per_device_grads[0][j] - with tf.device(self.models[0].get_loss().device): - plasma_grad = plasma.tf_plasma_op.tensor_to_plasma( - [grad], - self.plasma_in_grads_oids[j], - plasma_store_socket_name=store_socket) - self.plasma_in_grads.append(plasma_grad) - - # For applying grads <- plasma - unpacked_gv = [] - self.plasma_out_grads_oids = [ - tf.placeholder( - shape=[], dtype=tf.string, name="grad_out_oids") - for _ in range(num_grads) - ] - packed_plasma_grads = [] - for j in range(num_grads): - with tf.device(self.plasma_in_grads[j].device): - with tf.control_dependencies([self.plasma_in_grads[j]]): - grad_ph = plasma.tf_plasma_op.plasma_to_tensor( - self.plasma_out_grads_oids[j], - dtype=tf.float32, - plasma_store_socket_name=store_socket) - grad_ph = tf.reshape(grad_ph, - self.packed_grads_and_vars[0][j][0].shape) - logger.debug("Packed tensor {}".format(grad_ph)) - packed_plasma_grads.append(grad_ph) - for i in range(num_devices): - per_device = [] - for j, (g, v) in enumerate(self.packed_grads_and_vars[i]): - grad_ph = packed_plasma_grads[j] - per_device.append((grad_ph, v)) - unpacked_gv.append(per_device) - - if max_bytes: - unpacked_gv = unpack_small_tensors(unpacked_gv, packing_vals) - - elif max_bytes: - unpacked_gv = unpack_small_tensors(self.packed_grads_and_vars, - packing_vals) - else: - unpacked_gv = self.packed_grads_and_vars - - # Same shape as packed_grads_and_vars - assert len(unpacked_gv) == num_devices - assert len(unpacked_gv[0][0]) == 2 - - apply_ops = [] - to_apply = unpacked_gv[0] - for ix, m in enumerate(self.models): - apply_ops.append(m.get_optimizer().apply_gradients([ - (g, v) for ((g, _), (_, v)) in zip(to_apply, unpacked_gv[ix]) - ])) - self.apply_op = tf.group(*apply_ops) - init_op = tf.group(tf.global_variables_initializer(), - tf.local_variables_initializer()) - self.sess.run(init_op) - - def _grad_feed_dict(self): - # Aggregate feed dicts for each model on this worker. - feed_dict = {} - for model in self.models: - feed_dict.update(model.get_feed_dict()) - return feed_dict - - def foreach_model(self, fn): - with self.sess.as_default(): - return [fn(m) for m in self.models] - - def foreach_worker(self, fn): - with self.sess.as_default(): - return fn(self) - - def for_model(self, fn): - with self.sess.as_default(): - return fn(self.models[0]) - - def compute_gradients(self): - start = time.time() - feed_dict = self._grad_feed_dict() - # We only need to fetch the first per_device_grad, since they are - # averaged across all devices by allreduce. - fetches = self.sess.run( - [ - self.models[0].get_loss(), self.per_device_grads[0], - self.nccl_control_out - ], - feed_dict=feed_dict) - logger.debug( - "Compute grad interior time {}".format(time.time() - start)) - return fetches - - def apply_gradients(self, avg_grads): - start = time.time() - result = { - g: avg_grads[i] - for (i, g) in enumerate(self.per_device_grads[0]) - } - self.sess.run(self.apply_op, feed_dict=result) - logger.debug("Apply grad interior time {}".format(time.time() - start)) - - def compute_apply(self): - fetches = run_timeline( - self.sess, - [self.models[0].get_loss(), self.apply_op, self.nccl_control_out], - feed_dict=self._grad_feed_dict(), - name="compute_apply") - return fetches[0] - - def ps_compute_apply(self, - out_grad_shard_oids, - agg_grad_shard_oids, - tl_name="ps_compute_apply", - write_timeline=False): - feed_dict = self._grad_feed_dict() - feed_dict.update( - dict(zip(self.plasma_in_grads_oids, out_grad_shard_oids))) - feed_dict.update( - dict(zip(self.plasma_out_grads_oids, agg_grad_shard_oids))) - fetch(agg_grad_shard_oids) - fetches = run_timeline( - self.sess, [ - self.models[0].get_loss(), self.plasma_in_grads, self.apply_op, - self.nccl_control_out - ], - feed_dict=feed_dict, - write_timeline=write_timeline) - return fetches[0] - - def num_grad_shards(self): - return self.num_grads - - def shard_shapes(self): - main_gv = self.packed_grads_and_vars[0] - return [g.shape for g, _ in main_gv] - - def ip(self): - return ray.services.get_node_ip_address() - - def warmup(self): - warmup() diff --git a/python/ray/experimental/sgd/test_save_and_restore.py b/python/ray/experimental/sgd/test_save_and_restore.py deleted file mode 100755 index 47647842f..000000000 --- a/python/ray/experimental/sgd/test_save_and_restore.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse -import os -import time - -import ray -from ray.experimental.sgd.tfbench.test_model import TFBenchModel -from ray.experimental.sgd.sgd import DistributedSGD - -parser = argparse.ArgumentParser() -parser.add_argument("--redis-address", default=None, type=str) -parser.add_argument("--num-iters", default=10, type=int) -parser.add_argument("--batch-size", default=1, type=int) -parser.add_argument("--num-workers", default=2, type=int) -parser.add_argument("--grad-shard-bytes", default=10000000, type=int) -parser.add_argument("--devices-per-worker", default=2, type=int) -parser.add_argument("--all-reduce-alg", default="simple", type=str) -parser.add_argument("--object-store-memory", default=None, type=int) -parser.add_argument("--checkpoint-dir", default="/tmp", type=str) -parser.add_argument( - "--strategy", default="simple", type=str, help="One of 'simple' or 'ps'") -parser.add_argument( - "--gpu", action="store_true", help="Use GPUs for optimization") - -if __name__ == "__main__": - args, _ = parser.parse_known_args() - ray.init( - redis_address=args.redis_address, - object_store_memory=args.object_store_memory) - - model_creator = ( - lambda worker_idx, device_idx: TFBenchModel( - batch=args.batch_size, use_cpus=not args.gpu)) - - sgd = DistributedSGD( - model_creator, - num_workers=args.num_workers, - devices_per_worker=args.devices_per_worker, - gpu=args.gpu, - strategy=args.strategy, - grad_shard_bytes=args.grad_shard_bytes, - all_reduce_alg=args.all_reduce_alg) - - if not os.path.exists(args.checkpoint_dir): - raise ValueError( - "Checkpoint directory does not exist: %s" % args.checkpoint_dir) - - def step(i): - start = time.time() - print("== Step {} ==".format(i)) - stats = sgd.step(fetch_stats=True) - ips = ((args.batch_size * args.num_workers * args.devices_per_worker) / - (time.time() - start)) - print("Iteration time", time.time() - start, "Images per second", ips) - print("Current loss", stats) - - i = 0 - while i < args.num_iters: - step(i) - i += 1 - - print("Saving checkpoint...") - sgd.save_checkpoint(args.checkpoint_dir) - print("Done saving checkpoint") - - step(i) - - print("Restoring checkpoint") - sgd.restore_checkpoint(args.checkpoint_dir) - print("Done restoring checkpoint") - - step(i) diff --git a/python/ray/experimental/sgd/test_sgd.py b/python/ray/experimental/sgd/test_sgd.py deleted file mode 100755 index 79e00b265..000000000 --- a/python/ray/experimental/sgd/test_sgd.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse -import time - -import ray -from ray.experimental.sgd.tfbench.test_model import TFBenchModel -from ray.experimental.sgd.sgd import DistributedSGD - -parser = argparse.ArgumentParser() -parser.add_argument("--redis-address", default=None, type=str) -parser.add_argument("--num-iters", default=10, type=int) -parser.add_argument("--batch-size", default=1, type=int) -parser.add_argument("--num-workers", default=2, type=int) -parser.add_argument("--grad-shard-bytes", default=10000000, type=int) -parser.add_argument("--devices-per-worker", default=2, type=int) -parser.add_argument("--stats-interval", default=10, type=int) -parser.add_argument("--all-reduce-alg", default="simple", type=str) -parser.add_argument("--object-store-memory", default=None, type=int) -parser.add_argument( - "--warmup", action="store_true", help="Warm up object store before start.") -parser.add_argument( - "--strategy", default="ps", type=str, help="One of 'simple' or 'ps'") -parser.add_argument( - "--gpu", action="store_true", help="Use GPUs for optimization") - -if __name__ == "__main__": - args, _ = parser.parse_known_args() - ray.init( - redis_address=args.redis_address, - object_store_memory=args.object_store_memory) - - model_creator = ( - lambda worker_idx, device_idx: TFBenchModel( - batch=args.batch_size, use_cpus=not args.gpu)) - - sgd = DistributedSGD( - model_creator, - num_workers=args.num_workers, - devices_per_worker=args.devices_per_worker, - gpu=args.gpu, - strategy=args.strategy, - grad_shard_bytes=args.grad_shard_bytes, - all_reduce_alg=args.all_reduce_alg) - - if args.warmup: - sgd.warmup() - - t = [] - - for i in range(args.num_iters): - start = time.time() - fetch_stats = i % args.stats_interval == 0 - print("== Step {} ==".format(i)) - stats = sgd.step(fetch_stats=fetch_stats) - ips = ((args.batch_size * args.num_workers * args.devices_per_worker) / - (time.time() - start)) - print("Iteration time", time.time() - start, "Images per second", ips) - t.append(ips) - if fetch_stats: - print("Current loss", stats) - - print("Peak throughput", max(sum(t[i:i + 5]) / 5 for i in range(len(t)))) diff --git a/python/ray/experimental/sgd/tfbench/README.txt b/python/ray/experimental/sgd/tfbench/README.txt deleted file mode 100644 index 7ef1a87f7..000000000 --- a/python/ray/experimental/sgd/tfbench/README.txt +++ /dev/null @@ -1 +0,0 @@ -Files in this directory are adapted from https://github.com/tensorflow/benchmarks/tree/master/scripts/tf_cnn_benchmarks. diff --git a/python/ray/experimental/sgd/tfbench/__init__.py b/python/ray/experimental/sgd/tfbench/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/python/ray/experimental/sgd/tfbench/convnet_builder.py b/python/ray/experimental/sgd/tfbench/convnet_builder.py deleted file mode 100644 index 4936fb10d..000000000 --- a/python/ray/experimental/sgd/tfbench/convnet_builder.py +++ /dev/null @@ -1,507 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================= -"""CNN builder.""" - -from __future__ import print_function - -from collections import defaultdict -import contextlib - -import numpy as np - -import tensorflow as tf - -from tensorflow.python.layers import convolutional as conv_layers -from tensorflow.python.layers import core as core_layers -from tensorflow.python.layers import pooling as pooling_layers -from tensorflow.python.training import moving_averages - - -class ConvNetBuilder(object): - """Builder of cnn net.""" - - def __init__(self, - input_op, - input_nchan, - phase_train, - use_tf_layers, - data_format="NCHW", - dtype=tf.float32, - variable_dtype=tf.float32): - self.top_layer = input_op - self.top_size = input_nchan - self.phase_train = phase_train - self.use_tf_layers = use_tf_layers - self.data_format = data_format - self.dtype = dtype - self.variable_dtype = variable_dtype - self.counts = defaultdict(lambda: 0) - self.use_batch_norm = False - self.batch_norm_config = {} # "decay": 0.997, "scale": True} - self.channel_pos = ("channels_last" - if data_format == "NHWC" else "channels_first") - self.aux_top_layer = None - self.aux_top_size = 0 - - def get_custom_getter(self): - """Returns a custom getter that this class's methods must be called - - All methods of this class must be called under a variable scope that was - passed this custom getter. Example: - - ```python - network = ConvNetBuilder(...) - with tf.variable_scope("cg", custom_getter=network.get_custom_getter()): - network.conv(...) - # Call more methods of network here - ``` - - Currently, this custom getter only does anything if self.use_tf_layers is - True. In that case, it causes variables to be stored as dtype - self.variable_type, then casted to the requested dtype, instead of directly - storing the variable as the requested dtype. - """ - - def inner_custom_getter(getter, *args, **kwargs): - if not self.use_tf_layers: - return getter(*args, **kwargs) - requested_dtype = kwargs["dtype"] - if not (requested_dtype == tf.float32 - and self.variable_dtype == tf.float16): - kwargs["dtype"] = self.variable_dtype - var = getter(*args, **kwargs) - if var.dtype.base_dtype != requested_dtype: - var = tf.cast(var, requested_dtype) - return var - - return inner_custom_getter - - @contextlib.contextmanager - def switch_to_aux_top_layer(self): - """Context that construct cnn in the auxiliary arm.""" - if self.aux_top_layer is None: - raise RuntimeError("Empty auxiliary top layer in the network.") - saved_top_layer = self.top_layer - saved_top_size = self.top_size - self.top_layer = self.aux_top_layer - self.top_size = self.aux_top_size - yield - self.aux_top_layer = self.top_layer - self.aux_top_size = self.top_size - self.top_layer = saved_top_layer - self.top_size = saved_top_size - - def get_variable(self, name, shape, dtype, cast_dtype, *args, **kwargs): - var = tf.get_variable(name, shape, dtype, *args, **kwargs) - return tf.cast(var, cast_dtype) - - def _conv2d_impl(self, input_layer, num_channels_in, filters, kernel_size, - strides, padding, kernel_initializer): - if self.use_tf_layers: - return conv_layers.conv2d( - input_layer, - filters, - kernel_size, - strides, - padding, - self.channel_pos, - kernel_initializer=kernel_initializer, - use_bias=False) - else: - weights_shape = [ - kernel_size[0], kernel_size[1], num_channels_in, filters - ] - weights = self.get_variable( - "conv2d/kernel", - weights_shape, - self.variable_dtype, - self.dtype, - initializer=kernel_initializer) - if self.data_format == "NHWC": - strides = [1] + strides + [1] - else: - strides = [1, 1] + strides - return tf.nn.conv2d( - input_layer, - weights, - strides, - padding, - data_format=self.data_format) - - def conv(self, - num_out_channels, - k_height, - k_width, - d_height=1, - d_width=1, - mode="SAME", - input_layer=None, - num_channels_in=None, - use_batch_norm=None, - stddev=None, - activation="relu", - bias=0.0): - """Construct a conv2d layer on top of cnn.""" - if input_layer is None: - input_layer = self.top_layer - if num_channels_in is None: - num_channels_in = self.top_size - kernel_initializer = None - if stddev is not None: - kernel_initializer = tf.truncated_normal_initializer(stddev=stddev) - name = "conv" + str(self.counts["conv"]) - self.counts["conv"] += 1 - with tf.variable_scope(name): - strides = [1, d_height, d_width, 1] - if self.data_format == "NCHW": - strides = [strides[0], strides[3], strides[1], strides[2]] - if mode != "SAME_RESNET": - conv = self._conv2d_impl( - input_layer, - num_channels_in, - num_out_channels, - kernel_size=[k_height, k_width], - strides=[d_height, d_width], - padding=mode, - kernel_initializer=kernel_initializer) - else: # Special padding mode for ResNet models - if d_height == 1 and d_width == 1: - conv = self._conv2d_impl( - input_layer, - num_channels_in, - num_out_channels, - kernel_size=[k_height, k_width], - strides=[d_height, d_width], - padding="SAME", - kernel_initializer=kernel_initializer) - else: - rate = 1 # Unused (for 'a trous' convolutions) - kernel_height_effective = k_height + (k_height - 1) * ( - rate - 1) - pad_h_beg = (kernel_height_effective - 1) // 2 - pad_h_end = kernel_height_effective - 1 - pad_h_beg - kernel_width_effective = k_width + (k_width - 1) * ( - rate - 1) - pad_w_beg = (kernel_width_effective - 1) // 2 - pad_w_end = kernel_width_effective - 1 - pad_w_beg - padding = [[0, 0], [pad_h_beg, pad_h_end], - [pad_w_beg, pad_w_end], [0, 0]] - if self.data_format == "NCHW": - padding = [ - padding[0], padding[3], padding[1], padding[2] - ] - input_layer = tf.pad(input_layer, padding) - conv = self._conv2d_impl( - input_layer, - num_channels_in, - num_out_channels, - kernel_size=[k_height, k_width], - strides=[d_height, d_width], - padding="VALID", - kernel_initializer=kernel_initializer) - if use_batch_norm is None: - use_batch_norm = self.use_batch_norm - if not use_batch_norm: - if bias is not None: - biases = self.get_variable( - "biases", [num_out_channels], - self.variable_dtype, - self.dtype, - initializer=tf.constant_initializer(bias)) - biased = tf.reshape( - tf.nn.bias_add( - conv, biases, data_format=self.data_format), - conv.get_shape()) - else: - biased = conv - else: - self.top_layer = conv - self.top_size = num_out_channels - biased = self.batch_norm(**self.batch_norm_config) - if activation == "relu": - conv1 = tf.nn.relu(biased) - elif activation == "linear" or activation is None: - conv1 = biased - elif activation == "tanh": - conv1 = tf.nn.tanh(biased) - else: - raise KeyError("Invalid activation type \"%s\"" % activation) - self.top_layer = conv1 - self.top_size = num_out_channels - return conv1 - - def _pool(self, pool_name, pool_function, k_height, k_width, d_height, - d_width, mode, input_layer, num_channels_in): - """Construct a pooling layer.""" - if input_layer is None: - input_layer = self.top_layer - else: - self.top_size = num_channels_in - name = pool_name + str(self.counts[pool_name]) - self.counts[pool_name] += 1 - if self.use_tf_layers: - pool = pool_function( - input_layer, [k_height, k_width], [d_height, d_width], - padding=mode, - data_format=self.channel_pos, - name=name) - else: - if self.data_format == "NHWC": - ksize = [1, k_height, k_width, 1] - strides = [1, d_height, d_width, 1] - else: - ksize = [1, 1, k_height, k_width] - strides = [1, 1, d_height, d_width] - pool = tf.nn.max_pool( - input_layer, - ksize, - strides, - padding=mode, - data_format=self.data_format, - name=name) - self.top_layer = pool - return pool - - def mpool(self, - k_height, - k_width, - d_height=2, - d_width=2, - mode="VALID", - input_layer=None, - num_channels_in=None): - """Construct a max pooling layer.""" - return self._pool("mpool", pooling_layers.max_pooling2d, k_height, - k_width, d_height, d_width, mode, input_layer, - num_channels_in) - - def apool(self, - k_height, - k_width, - d_height=2, - d_width=2, - mode="VALID", - input_layer=None, - num_channels_in=None): - """Construct an average pooling layer.""" - return self._pool("apool", pooling_layers.average_pooling2d, k_height, - k_width, d_height, d_width, mode, input_layer, - num_channels_in) - - def reshape(self, shape, input_layer=None): - if input_layer is None: - input_layer = self.top_layer - self.top_layer = tf.reshape(input_layer, shape) - self.top_size = shape[-1] # HACK This may not always work - return self.top_layer - - def affine(self, - num_out_channels, - input_layer=None, - num_channels_in=None, - bias=0.0, - stddev=None, - activation="relu"): - if input_layer is None: - input_layer = self.top_layer - if num_channels_in is None: - num_channels_in = self.top_size - name = "affine" + str(self.counts["affine"]) - self.counts["affine"] += 1 - with tf.variable_scope(name): - init_factor = 2. if activation == "relu" else 1. - stddev = stddev or np.sqrt(init_factor / num_channels_in) - kernel = self.get_variable( - "weights", [num_channels_in, num_out_channels], - self.variable_dtype, - self.dtype, - initializer=tf.truncated_normal_initializer(stddev=stddev)) - biases = self.get_variable( - "biases", [num_out_channels], - self.variable_dtype, - self.dtype, - initializer=tf.constant_initializer(bias)) - logits = tf.nn.xw_plus_b(input_layer, kernel, biases) - if activation == "relu": - affine1 = tf.nn.relu(logits, name=name) - elif activation == "linear" or activation is None: - affine1 = logits - else: - raise KeyError("Invalid activation type \"%s\"" % activation) - self.top_layer = affine1 - self.top_size = num_out_channels - return affine1 - - def inception_module(self, name, cols, input_layer=None, in_size=None): - if input_layer is None: - input_layer = self.top_layer - if in_size is None: - in_size = self.top_size - name += str(self.counts[name]) - self.counts[name] += 1 - with tf.variable_scope(name): - col_layers = [] - col_layer_sizes = [] - for c, col in enumerate(cols): - col_layers.append([]) - col_layer_sizes.append([]) - for lx, layer in enumerate(col): - ltype, args = layer[0], layer[1:] - kwargs = { - "input_layer": input_layer, - "num_channels_in": in_size - } if lx == 0 else {} - if ltype == "conv": - self.conv(*args, **kwargs) - elif ltype == "mpool": - self.mpool(*args, **kwargs) - elif ltype == "apool": - self.apool(*args, **kwargs) - elif ltype == "share": - self.top_layer = col_layers[c - 1][lx] - self.top_size = col_layer_sizes[c - 1][lx] - else: - raise KeyError( - "Invalid layer type for inception module: \"%s\"" % - ltype) - col_layers[c].append(self.top_layer) - col_layer_sizes[c].append(self.top_size) - catdim = 3 if self.data_format == "NHWC" else 1 - self.top_layer = tf.concat([layers[-1] for layers in col_layers], - catdim) - self.top_size = sum(sizes[-1] for sizes in col_layer_sizes) - return self.top_layer - - def spatial_mean(self, keep_dims=False): - name = "spatial_mean" + str(self.counts["spatial_mean"]) - self.counts["spatial_mean"] += 1 - axes = [1, 2] if self.data_format == "NHWC" else [2, 3] - self.top_layer = tf.reduce_mean( - self.top_layer, axes, keep_dims=keep_dims, name=name) - return self.top_layer - - def dropout(self, keep_prob=0.5, input_layer=None): - if input_layer is None: - input_layer = self.top_layer - else: - self.top_size = None - name = "dropout" + str(self.counts["dropout"]) - with tf.variable_scope(name): - if not self.phase_train: - keep_prob = 1.0 - if self.use_tf_layers: - dropout = core_layers.dropout(input_layer, 1. - keep_prob) - else: - dropout = tf.nn.dropout(input_layer, keep_prob) - self.top_layer = dropout - return dropout - - def _batch_norm_without_layers(self, input_layer, decay, use_scale, - epsilon): - """Batch normalization on `input_layer` without tf.layers.""" - shape = input_layer.shape - num_channels = shape[3] if self.data_format == "NHWC" else shape[1] - beta = self.get_variable( - "beta", [num_channels], - tf.float32, - tf.float32, - initializer=tf.zeros_initializer()) - if use_scale: - gamma = self.get_variable( - "gamma", [num_channels], - tf.float32, - tf.float32, - initializer=tf.ones_initializer()) - else: - gamma = tf.constant(1.0, tf.float32, [num_channels]) - moving_mean = tf.get_variable( - "moving_mean", [num_channels], - tf.float32, - initializer=tf.zeros_initializer(), - trainable=False) - moving_variance = tf.get_variable( - "moving_variance", [num_channels], - tf.float32, - initializer=tf.ones_initializer(), - trainable=False) - if self.phase_train: - bn, batch_mean, batch_variance = tf.nn.fused_batch_norm( - input_layer, - gamma, - beta, - epsilon=epsilon, - data_format=self.data_format, - is_training=True) - mean_update = moving_averages.assign_moving_average( - moving_mean, batch_mean, decay=decay, zero_debias=False) - variance_update = moving_averages.assign_moving_average( - moving_variance, - batch_variance, - decay=decay, - zero_debias=False) - tf.add_to_collection(tf.GraphKeys.UPDATE_OPS, mean_update) - tf.add_to_collection(tf.GraphKeys.UPDATE_OPS, variance_update) - else: - bn, _, _ = tf.nn.fused_batch_norm( - input_layer, - gamma, - beta, - mean=moving_mean, - variance=moving_variance, - epsilon=epsilon, - data_format=self.data_format, - is_training=False) - return bn - - def batch_norm(self, - input_layer=None, - decay=0.999, - scale=False, - epsilon=0.001): - """Adds a Batch Normalization layer.""" - if input_layer is None: - input_layer = self.top_layer - else: - self.top_size = None - name = "batchnorm" + str(self.counts["batchnorm"]) - self.counts["batchnorm"] += 1 - - with tf.variable_scope(name) as scope: - if self.use_tf_layers: - bn = tf.contrib.layers.batch_norm( - input_layer, - decay=decay, - scale=scale, - epsilon=epsilon, - is_training=self.phase_train, - fused=True, - data_format=self.data_format, - scope=scope) - else: - bn = self._batch_norm_without_layers(input_layer, decay, scale, - epsilon) - self.top_layer = bn - self.top_size = bn.shape[ - 3] if self.data_format == "NHWC" else bn.shape[1] - self.top_size = int(self.top_size) - return bn - - def lrn(self, depth_radius, bias, alpha, beta): - """Adds a local response normalization layer.""" - name = "lrn" + str(self.counts["lrn"]) - self.counts["lrn"] += 1 - self.top_layer = tf.nn.lrn( - self.top_layer, depth_radius, bias, alpha, beta, name=name) - return self.top_layer diff --git a/python/ray/experimental/sgd/tfbench/model.py b/python/ray/experimental/sgd/tfbench/model.py deleted file mode 100644 index c5ebb30d8..000000000 --- a/python/ray/experimental/sgd/tfbench/model.py +++ /dev/null @@ -1,116 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================= -"""Base model configuration for CNN benchmarks.""" -import tensorflow as tf - -from . import convnet_builder - - -class Model(object): - """Base model configuration for CNN benchmarks.""" - - def __init__(self, - model, - image_size, - batch_size, - learning_rate, - layer_counts=None, - fp16_loss_scale=128): - self.model = model - self.image_size = image_size - self.batch_size = batch_size - self.default_batch_size = batch_size - self.learning_rate = learning_rate - self.layer_counts = layer_counts - self.fp16_loss_scale = fp16_loss_scale - - def get_model(self): - return self.model - - def get_image_size(self): - return self.image_size - - def get_batch_size(self): - return self.batch_size - - def set_batch_size(self, batch_size): - self.batch_size = batch_size - - def get_default_batch_size(self): - return self.default_batch_size - - def get_layer_counts(self): - return self.layer_counts - - def get_fp16_loss_scale(self): - return self.fp16_loss_scale - - def get_learning_rate(self, global_step, batch_size): - del global_step - del batch_size - return self.learning_rate - - def add_inference(self, unused_cnn): - raise ValueError("Must be implemented in derived classes") - - def skip_final_affine_layer(self): - """Returns if the caller of this class should skip the final affine - - Normally, this class adds a final affine layer to the model after calling - self.add_inference(), to generate the logits. If a subclass override this - method to return True, the caller should not add the final affine layer. - - This is useful for tests. - """ - return False - - def build_network(self, - images, - phase_train=True, - nclass=1001, - image_depth=3, - data_type=tf.float32, - data_format="NCHW", - use_tf_layers=True, - fp16_vars=False): - """Returns logits and aux_logits from images.""" - if data_format == "NCHW": - images = tf.transpose(images, [0, 3, 1, 2]) - var_type = tf.float32 - if data_type == tf.float16 and fp16_vars: - var_type = tf.float16 - network = convnet_builder.ConvNetBuilder( - images, image_depth, phase_train, use_tf_layers, data_format, - data_type, var_type) - with tf.variable_scope( - "cg", custom_getter=network.get_custom_getter()): - self.add_inference(network) - # Add the final fully-connected class layer - logits = (network.affine(nclass, activation="linear") - if not self.skip_final_affine_layer() else - network.top_layer) - aux_logits = None - if network.aux_top_layer is not None: - with network.switch_to_aux_top_layer(): - aux_logits = network.affine( - nclass, activation="linear", stddev=0.001) - if data_type == tf.float16: - # TODO(reedwm): Determine if we should do this cast here. - logits = tf.cast(logits, tf.float32) - if aux_logits is not None: - aux_logits = tf.cast(aux_logits, tf.float32) - return logits, aux_logits - - loss_function = None diff --git a/python/ray/experimental/sgd/tfbench/model_config.py b/python/ray/experimental/sgd/tfbench/model_config.py deleted file mode 100644 index ffa2f9064..000000000 --- a/python/ray/experimental/sgd/tfbench/model_config.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Model configurations for CNN benchmarks. -""" - -from . import resnet_model - -_model_name_to_imagenet_model = { - "resnet50": resnet_model.create_resnet50_model, - "resnet50_v2": resnet_model.create_resnet50_v2_model, - "resnet101": resnet_model.create_resnet101_model, - "resnet101_v2": resnet_model.create_resnet101_v2_model, - "resnet152": resnet_model.create_resnet152_model, - "resnet152_v2": resnet_model.create_resnet152_v2_model, -} - -_model_name_to_cifar_model = {} - - -def _get_model_map(dataset_name): - if "cifar10" == dataset_name: - return _model_name_to_cifar_model - elif dataset_name in ("imagenet", "synthetic"): - return _model_name_to_imagenet_model - else: - raise ValueError("Invalid dataset name: %s" % dataset_name) - - -def get_model_config(model_name, dataset): - """Map model name to model network configuration.""" - model_map = _get_model_map(dataset.name) - if model_name not in model_map: - raise ValueError("Invalid model name \"%s\" for dataset \"%s\"" % - (model_name, dataset.name)) - else: - return model_map[model_name]() - - -def register_model(model_name, dataset_name, model_func): - """Register a new model that can be obtained with `get_model_config`.""" - model_map = _get_model_map(dataset_name) - if model_name in model_map: - raise ValueError("Model \"%s\" is already registered for dataset" - "\"%s\"" % (model_name, dataset_name)) - model_map[model_name] = model_func diff --git a/python/ray/experimental/sgd/tfbench/resnet_model.py b/python/ray/experimental/sgd/tfbench/resnet_model.py deleted file mode 100644 index d89f7ee7f..000000000 --- a/python/ray/experimental/sgd/tfbench/resnet_model.py +++ /dev/null @@ -1,422 +0,0 @@ -# Copyright 2017 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -"""Resnet model configuration. - -References: - Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun - Deep Residual Learning for Image Recognition - arXiv:1512.03385 (2015) - - Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun - Identity Mappings in Deep Residual Networks - arXiv:1603.05027 (2016) - - Liang-Chieh Chen, George Papandreou, Iasonas Kokkinos, Kevin Murphy, - Alan L. Yuille - DeepLab: Semantic Image Segmentation with Deep Convolutional Nets, - Atrous Convolution, and Fully Connected CRFs - arXiv:1606.00915 (2016) -""" - -import numpy as np -from six.moves import xrange # pylint: disable=redefined-builtin -import tensorflow as tf -from . import model as model_lib - - -def bottleneck_block_v1(cnn, depth, depth_bottleneck, stride): - """Bottleneck block with identity short-cut for ResNet v1. - - Args: - cnn: the network to append bottleneck blocks. - depth: the number of output filters for this bottleneck block. - depth_bottleneck: the number of bottleneck filters for this block. - stride: Stride used in the first layer of the bottleneck block. - """ - input_layer = cnn.top_layer - in_size = cnn.top_size - name_key = "resnet_v1" - name = name_key + str(cnn.counts[name_key]) - cnn.counts[name_key] += 1 - - with tf.variable_scope(name): - if depth == in_size: - if stride == 1: - shortcut = input_layer - else: - shortcut = cnn.apool( - 1, - 1, - stride, - stride, - input_layer=input_layer, - num_channels_in=in_size) - else: - shortcut = cnn.conv( - depth, - 1, - 1, - stride, - stride, - activation=None, - use_batch_norm=True, - input_layer=input_layer, - num_channels_in=in_size, - bias=None) - cnn.conv( - depth_bottleneck, - 1, - 1, - stride, - stride, - input_layer=input_layer, - num_channels_in=in_size, - use_batch_norm=True, - bias=None) - cnn.conv( - depth_bottleneck, - 3, - 3, - 1, - 1, - mode="SAME_RESNET", - use_batch_norm=True, - bias=None) - res = cnn.conv( - depth, 1, 1, 1, 1, activation=None, use_batch_norm=True, bias=None) - output = tf.nn.relu(shortcut + res) - cnn.top_layer = output - cnn.top_size = depth - - -def bottleneck_block_v2(cnn, depth, depth_bottleneck, stride): - """Bottleneck block with identity short-cut for ResNet v2. - - The main difference from v1 is that a batch norm and relu are done at the - start of the block, instead of the end. This initial batch norm and relu is - collectively called a pre-activation. - - Args: - cnn: the network to append bottleneck blocks. - depth: the number of output filters for this bottleneck block. - depth_bottleneck: the number of bottleneck filters for this block. - stride: Stride used in the first layer of the bottleneck block. - """ - input_layer = cnn.top_layer - in_size = cnn.top_size - name_key = "resnet_v2" - name = name_key + str(cnn.counts[name_key]) - cnn.counts[name_key] += 1 - - preact = cnn.batch_norm() - preact = tf.nn.relu(preact) - with tf.variable_scope(name): - if depth == in_size: - if stride == 1: - shortcut = input_layer - else: - shortcut = cnn.apool( - 1, - 1, - stride, - stride, - input_layer=input_layer, - num_channels_in=in_size) - else: - shortcut = cnn.conv( - depth, - 1, - 1, - stride, - stride, - activation=None, - use_batch_norm=False, - input_layer=preact, - num_channels_in=in_size, - bias=None) - cnn.conv( - depth_bottleneck, - 1, - 1, - stride, - stride, - input_layer=preact, - num_channels_in=in_size, - use_batch_norm=True, - bias=None) - cnn.conv( - depth_bottleneck, - 3, - 3, - 1, - 1, - mode="SAME_RESNET", - use_batch_norm=True, - bias=None) - res = cnn.conv( - depth, - 1, - 1, - 1, - 1, - activation=None, - use_batch_norm=False, - bias=None) - output = shortcut + res - cnn.top_layer = output - cnn.top_size = depth - - -def bottleneck_block(cnn, depth, depth_bottleneck, stride, pre_activation): - """Bottleneck block with identity short-cut. - - Args: - cnn: the network to append bottleneck blocks. - depth: the number of output filters for this bottleneck block. - depth_bottleneck: the number of bottleneck filters for this block. - stride: Stride used in the first layer of the bottleneck block. - pre_activation: use pre_activation structure used in v2 or not. - """ - if pre_activation: - bottleneck_block_v2(cnn, depth, depth_bottleneck, stride) - else: - bottleneck_block_v1(cnn, depth, depth_bottleneck, stride) - - -def residual_block(cnn, depth, stride, pre_activation): - """Residual block with identity short-cut. - - Args: - cnn: the network to append residual blocks. - depth: the number of output filters for this residual block. - stride: Stride used in the first layer of the residual block. - pre_activation: use pre_activation structure or not. - """ - input_layer = cnn.top_layer - in_size = cnn.top_size - if in_size != depth: - # Plan A of shortcut. - shortcut = cnn.apool( - 1, - 1, - stride, - stride, - input_layer=input_layer, - num_channels_in=in_size) - padding = (depth - in_size) // 2 - if cnn.channel_pos == "channels_last": - shortcut = tf.pad(shortcut, - [[0, 0], [0, 0], [0, 0], [padding, padding]]) - else: - shortcut = tf.pad(shortcut, - [[0, 0], [padding, padding], [0, 0], [0, 0]]) - else: - shortcut = input_layer - if pre_activation: - res = cnn.batch_norm(input_layer) - res = tf.nn.relu(res) - else: - res = input_layer - cnn.conv( - depth, - 3, - 3, - stride, - stride, - input_layer=res, - num_channels_in=in_size, - use_batch_norm=True, - bias=None) - if pre_activation: - res = cnn.conv( - depth, - 3, - 3, - 1, - 1, - activation=None, - use_batch_norm=False, - bias=None) - output = shortcut + res - else: - res = cnn.conv( - depth, 3, 3, 1, 1, activation=None, use_batch_norm=True, bias=None) - output = tf.nn.relu(shortcut + res) - cnn.top_layer = output - cnn.top_size = depth - - -class ResnetModel(model_lib.Model): - """Resnet cnn network configuration.""" - - def __init__(self, model, layer_counts): - default_batch_sizes = { - "resnet50": 64, - "resnet101": 32, - "resnet152": 32, - "resnet50_v2": 64, - "resnet101_v2": 32, - "resnet152_v2": 32, - } - batch_size = default_batch_sizes.get(model, 32) - super(ResnetModel, self).__init__(model, 224, batch_size, 0.005, - layer_counts) - self.pre_activation = "v2" in model - - def add_inference(self, cnn): - if self.layer_counts is None: - raise ValueError( - "Layer counts not specified for %s" % self.get_model()) - cnn.use_batch_norm = True - cnn.batch_norm_config = { - "decay": 0.997, - "epsilon": 1e-5, - "scale": True - } - cnn.conv(64, 7, 7, 2, 2, mode="SAME_RESNET", use_batch_norm=True) - cnn.mpool(3, 3, 2, 2, mode="SAME") - for _ in xrange(self.layer_counts[0]): - bottleneck_block(cnn, 256, 64, 1, self.pre_activation) - for i in xrange(self.layer_counts[1]): - stride = 2 if i == 0 else 1 - bottleneck_block(cnn, 512, 128, stride, self.pre_activation) - for i in xrange(self.layer_counts[2]): - stride = 2 if i == 0 else 1 - bottleneck_block(cnn, 1024, 256, stride, self.pre_activation) - for i in xrange(self.layer_counts[3]): - stride = 2 if i == 0 else 1 - bottleneck_block(cnn, 2048, 512, stride, self.pre_activation) - if self.pre_activation: - cnn.batch_norm() - cnn.top_layer = tf.nn.relu(cnn.top_layer) - cnn.spatial_mean() - - def get_learning_rate(self, global_step, batch_size): - raise NotImplementedError - - -def create_resnet50_model(): - return ResnetModel("resnet50", (3, 4, 6, 3)) - - -def create_resnet50_v2_model(): - return ResnetModel("resnet50_v2", (3, 4, 6, 3)) - - -def create_resnet101_model(): - return ResnetModel("resnet101", (3, 4, 23, 3)) - - -def create_resnet101_v2_model(): - return ResnetModel("resnet101_v2", (3, 4, 23, 3)) - - -def create_resnet152_model(): - return ResnetModel("resnet152", (3, 8, 36, 3)) - - -def create_resnet152_v2_model(): - return ResnetModel("resnet152_v2", (3, 8, 36, 3)) - - -class ResnetCifar10Model(model_lib.Model): - """Resnet cnn network configuration for Cifar 10 dataset. - - V1 model architecture follows the one defined in the paper: - https://arxiv.org/pdf/1512.03385.pdf. - - V2 model architecture follows the one defined in the paper: - https://arxiv.org/pdf/1603.05027.pdf. - """ - - def __init__(self, model, layer_counts): - self.pre_activation = "v2" in model - super(ResnetCifar10Model, self).__init__(model, 32, 128, 0.1, - layer_counts) - - def add_inference(self, cnn): - if self.layer_counts is None: - raise ValueError( - "Layer counts not specified for %s" % self.get_model()) - - cnn.use_batch_norm = True - cnn.batch_norm_config = {"decay": 0.9, "epsilon": 1e-5, "scale": True} - if self.pre_activation: - cnn.conv(16, 3, 3, 1, 1, use_batch_norm=True) - else: - cnn.conv(16, 3, 3, 1, 1, activation=None, use_batch_norm=True) - for i in xrange(self.layer_counts[0]): - # reshape to batch_size x 16 x 32 x 32 - residual_block(cnn, 16, 1, self.pre_activation) - for i in xrange(self.layer_counts[1]): - stride = 2 if i == 0 else 1 - # reshape to batch_size x 32 x 16 x 16 - residual_block(cnn, 32, stride, self.pre_activation) - for i in xrange(self.layer_counts[2]): - stride = 2 if i == 0 else 1 - # reshape to batch_size x 64 x 8 x 8 - residual_block(cnn, 64, stride, self.pre_activation) - if self.pre_activation: - cnn.batch_norm() - cnn.top_layer = tf.nn.relu(cnn.top_layer) - cnn.spatial_mean() - - def get_learning_rate(self, global_step, batch_size): - num_batches_per_epoch = int(50000 / batch_size) - boundaries = num_batches_per_epoch * np.array( - [82, 123, 300], dtype=np.int64) - boundaries = [x for x in boundaries] - values = [0.1, 0.01, 0.001, 0.0002] - return tf.train.piecewise_constant(global_step, boundaries, values) - - -def create_resnet20_cifar_model(): - return ResnetCifar10Model("resnet20", (3, 3, 3)) - - -def create_resnet20_v2_cifar_model(): - return ResnetCifar10Model("resnet20_v2", (3, 3, 3)) - - -def create_resnet32_cifar_model(): - return ResnetCifar10Model("resnet32_v2", (5, 5, 5)) - - -def create_resnet32_v2_cifar_model(): - return ResnetCifar10Model("resnet32_v2", (5, 5, 5)) - - -def create_resnet44_cifar_model(): - return ResnetCifar10Model("resnet44", (7, 7, 7)) - - -def create_resnet44_v2_cifar_model(): - return ResnetCifar10Model("resnet44_v2", (7, 7, 7)) - - -def create_resnet56_cifar_model(): - return ResnetCifar10Model("resnet56", (9, 9, 9)) - - -def create_resnet56_v2_cifar_model(): - return ResnetCifar10Model("resnet56_v2", (9, 9, 9)) - - -def create_resnet110_cifar_model(): - return ResnetCifar10Model("resnet110", (18, 18, 18)) - - -def create_resnet110_v2_cifar_model(): - return ResnetCifar10Model("resnet110_v2", (18, 18, 18)) diff --git a/python/ray/experimental/sgd/tfbench/test_model.py b/python/ray/experimental/sgd/tfbench/test_model.py deleted file mode 100644 index 99900c43b..000000000 --- a/python/ray/experimental/sgd/tfbench/test_model.py +++ /dev/null @@ -1,66 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import tensorflow as tf - -from tfbench import model_config -from ray.experimental.sgd.model import Model -import ray.experimental.tf_utils as ray_tf_utils - - -class MockDataset(): - name = "synthetic" - - -class TFBenchModel(Model): - def __init__(self, batch=64, use_cpus=False): - - image_shape = [batch, 224, 224, 3] - labels_shape = [batch] - - # Synthetic image should be within [0, 255]. - images = tf.truncated_normal( - image_shape, - dtype=tf.float32, - mean=127, - stddev=60, - name="synthetic_images") - - # Minor hack to avoid H2D copy when using synthetic data - inputs = tf.contrib.framework.local_variable( - images, name="gpu_cached_images") - labels = tf.random_uniform( - labels_shape, - minval=0, - maxval=999, - dtype=tf.int32, - name="synthetic_labels") - - model = model_config.get_model_config("resnet101", MockDataset()) - logits, aux = model.build_network( - inputs, data_format=use_cpus and "NHWC" or "NCHW") - loss = tf.nn.sparse_softmax_cross_entropy_with_logits( - logits=logits, labels=labels) - - # Implement model interface - self.loss = tf.reduce_mean(loss, name="xentropy-loss") - self.optimizer = tf.train.GradientDescentOptimizer(1e-6) - - self.variables = ray_tf_utils.TensorFlowVariables( - self.loss, tf.get_default_session()) - - def get_loss(self): - return self.loss - - def get_optimizer(self): - return self.optimizer - - def get_feed_dict(self): - return {} - - def get_weights(self): - return self.variables.get_flat() - - def set_weights(self, weights): - self.variables.set_flat(weights) diff --git a/python/ray/experimental/sgd/util.py b/python/ray/experimental/sgd/util.py deleted file mode 100644 index 57549d2df..000000000 --- a/python/ray/experimental/sgd/util.py +++ /dev/null @@ -1,151 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import filelock -import json -import logging -import numpy as np -import os -import pyarrow -import pyarrow.plasma as plasma -import time -import tensorflow as tf - -import ray - -logger = logging.getLogger(__name__) - - -def warmup(): - logger.info("Warming up object store") - zeros = np.zeros(int(100e6 / 8), dtype=np.float64) - start = time.time() - for _ in range(10): - ray.put(zeros) - logger.info("Initial latency for 100MB put {}".format( - (time.time() - start) / 10)) - for _ in range(5): - for _ in range(100): - ray.put(zeros) - start = time.time() - for _ in range(10): - ray.put(zeros) - logger.info("Warmed up latency for 100MB put {}".format( - (time.time() - start) / 10)) - - -def fetch(oids): - raylet_client = ray.worker.global_worker.raylet_client - for o in oids: - ray_obj_id = ray.ObjectID(o) - raylet_client.fetch_or_reconstruct([ray_obj_id], True) - - -def run_timeline(sess, ops, feed_dict=None, write_timeline=False, name=""): - feed_dict = feed_dict or {} - if write_timeline: - run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE) - run_metadata = tf.RunMetadata() - fetches = sess.run( - ops, - options=run_options, - run_metadata=run_metadata, - feed_dict=feed_dict) - trace = Timeline(step_stats=run_metadata.step_stats) - outf = "timeline-{}-{}.json".format(name, os.getpid()) - trace_file = open(outf, "w") - logger.info("wrote tf timeline to", os.path.abspath(outf)) - trace_file.write(trace.generate_chrome_trace_format()) - else: - fetches = sess.run(ops, feed_dict=feed_dict) - return fetches - - -class Timeline(object): - def __init__(self, tid): - self.events = [] - self.offset = 0 - self.start_time = self.time() - self.tid = tid - - def patch_ray(self): - orig_log = ray.worker.log - - def custom_log(event_type, kind, *args, **kwargs): - orig_log(event_type, kind, *args, **kwargs) - if kind == ray.worker.LOG_SPAN_START: - self.start(event_type) - elif kind == ray.worker.LOG_SPAN_END: - self.end(event_type) - elif kind == ray.worker.LOG_SPAN_POINT: - self.event(event_type) - - ray.worker.log = custom_log - - def time(self): - return time.time() + self.offset - - def reset(self): - self.events = [] - self.start_time = self.time() - - def start(self, name): - self.events.append((self.tid, "B", name, self.time())) - - def end(self, name): - self.events.append((self.tid, "E", name, self.time())) - - def event(self, name): - now = self.time() - self.events.append((self.tid, "B", name, now)) - self.events.append((self.tid, "E", name, now + .0001)) - - def merge(self, other): - if other.start_time < self.start_time: - self.start_time = other.start_time - self.events.extend(other.events) - self.events.sort(key=lambda e: e[3]) - - def chrome_trace_format(self, filename): - out = [] - for tid, ph, name, t in self.events: - ts = int((t - self.start_time) * 1000000) - out.append({ - "name": name, - "tid": tid, - "pid": tid, - "ph": ph, - "ts": ts, - }) - with open(filename, "w") as f: - f.write(json.dumps(out)) - logger.info("Wrote chrome timeline to", filename) - - -def ensure_plasma_tensorflow_op(): - base_path = os.path.join(pyarrow.__path__[0], "tensorflow") - lock_path = os.path.join(base_path, "compile_op.lock") - with filelock.FileLock(lock_path): - if not os.path.exists(os.path.join(base_path, "plasma_op.so")): - plasma.build_plasma_tensorflow_op() - else: - plasma.load_plasma_tensorflow_op() - - -if __name__ == "__main__": - a = Timeline(1) - b = Timeline(2) - a.start("hi") - time.sleep(.1) - b.start("bye") - a.start("hi3") - time.sleep(.1) - a.end("hi3") - b.end("bye") - time.sleep(.1) - a.end("hi") - b.start("b1") - b.end("b1") - a.merge(b) - a.chrome_trace_format("test.json")