From c66178bcd7a27dd9f5cf726cfe8293a49eb12587 Mon Sep 17 00:00:00 2001 From: Wapaul1 Date: Tue, 7 Mar 2017 01:07:32 -0800 Subject: [PATCH] Resnet Adapted to Ray (#229) * Initial conversion * Further changes * fixes * some changes * Fixes * Added data pipeline * Added updates to cifar * Currently borken need sep pr * Added test for retriving variables from an optimizer * Removed FlAG ref in environment variables * Added comments to test * Addressed comments * Added updates * Made further changes for tfutils * Fixed finalized bug * Removed ipython * Added accuracy printing * Temp commit * added fixes * changes * Added writing to file * Fixes for gpus * Cleaned up code * Temp commit * Gpu support fully implemented * Updated to use num_gpus for actors * Finished testing gpus implementation * Changed to be more in line with origin implementation * Updated test to use actors * Added support for cpu only systems * Now works with no cpus * Minor changes and some documentation. --- doc/source/example-resnet.rst | 71 ++++++++ doc/source/index.rst | 1 + examples/resnet/cifar_input.py | 106 +++++++++++ examples/resnet/resnet_main.py | 169 +++++++++++++++++ examples/resnet/resnet_model.py | 282 +++++++++++++++++++++++++++++ python/ray/experimental/tfutils.py | 2 +- test/tensorflow_test.py | 163 ++++++++--------- 7 files changed, 706 insertions(+), 88 deletions(-) create mode 100644 doc/source/example-resnet.rst create mode 100644 examples/resnet/cifar_input.py create mode 100644 examples/resnet/resnet_main.py create mode 100644 examples/resnet/resnet_model.py diff --git a/doc/source/example-resnet.rst b/doc/source/example-resnet.rst new file mode 100644 index 000000000..426effccd --- /dev/null +++ b/doc/source/example-resnet.rst @@ -0,0 +1,71 @@ +ResNet +====== + +This code adapts the `TensorFlow ResNet example`_ to do data parallel training +across multiple GPUs using Ray. + +To run the example, you will need to install `TensorFlow with GPU support`_ (at +least version ``1.0.0``). Then you can run the example as follows. + +First download the CIFAR-10 dataset. + +.. code-block:: bash + + curl -o cifar-10-binary.tar.gz https://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz + + tar -xvf cifar-10-binary.tar.gz + + +Then run the training script. + +.. code-block:: bash + + python ray/examples/resnet/resnet_main.py \ + --train_data_path=cifar-10-batches-bin/data_batch* \ + --eval_data_path=cifar-10-batches-bin/test_batch.bin \ + --num_gpus=1 + +The core of the script is the actor definition. + +.. code-block:: python + + @ray.actor(num_gpus=1) + class ResNetTrainActor(object): + def __init__(self, path, num_gpus): + # Set the CUDA_VISIBLE_DEVICES environment variable in order to restrict + # which GPUs TensorFlow uses. Note that this only works if it is done before + # the call to tf.Session. + os.environ['CUDA_VISIBLE_DEVICES'] = ','.join([str(i) for i in ray.get_gpu_ids()]) + with tf.Graph().as_default(): + with tf.device('/gpu:0'): + # We omit the code here that actually constructs the residual network + # and initializes it. + + def compute_steps(self, weights): + # This method sets the weights in the network, runs some training steps, + # and returns the new weights. + steps = 10 + self.model.variables.set_weights(weights) + for i in range(steps): + self.model.variables.sess.run(self.model.train_op) + return self.model.variables.get_weights() + +The main script first creates one actor for each GPU. + +.. code-block:: python + + train_actors = [ResNetTrainActor(train_data, num_gpus) for _ in range(num_gpus)] + +Then after initializing the actors with the same weights, the main loop performs +updates on each model, averages the updates, and puts the new weights in the +object store. + +.. code-block:: python + + while True: + all_weights = ray.get([actor.compute_steps(weight_id) for actor in train_actors]) + mean_weights = {k: sum([weights[k] for weights in all_weights]) / num_gpus for k in all_weights[0]} + weight_id = ray.put(mean_weights) + +.. _`TensorFlow ResNet example`: https://github.com/tensorflow/models/tree/master/resnet +.. _`TensorFlow with GPU support`: https://www.tensorflow.org/install/ diff --git a/doc/source/index.rst b/doc/source/index.rst index b085d7698..08fe213c5 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -27,6 +27,7 @@ learning and reinforcement learning applications.* :caption: Examples example-hyperopt.rst + example-resnet.rst example-lbfgs.md example-rl-pong.md using-ray-with-tensorflow.md diff --git a/examples/resnet/cifar_input.py b/examples/resnet/cifar_input.py new file mode 100644 index 000000000..7d0e04c91 --- /dev/null +++ b/examples/resnet/cifar_input.py @@ -0,0 +1,106 @@ +"""CIFAR dataset input module, with the majority taken from +https://github.com/tensorflow/models/tree/master/resnet. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import tensorflow as tf + +def build_data(data_path, size): + image_size = 32 + label_bytes = 1 + label_offset = 0 + num_classes = 10 + + depth = 3 + image_bytes = image_size * image_size * depth + record_bytes = label_bytes + label_offset + image_bytes + + data_files = tf.gfile.Glob(data_path) + file_queue = tf.train.string_input_producer(data_files, shuffle=True) + # Read examples from files in the filename queue. + reader = tf.FixedLengthRecordReader(record_bytes=record_bytes) + _, value = reader.read(file_queue) + + # Convert these examples to dense labels and processed images. + record = tf.reshape(tf.decode_raw(value, tf.uint8), [record_bytes]) + label = tf.cast(tf.slice(record, [label_offset], [label_bytes]), tf.int32) + # Convert from string to [depth * height * width] to [depth, height, width]. + depth_major = tf.reshape(tf.slice(record, [label_bytes], [image_bytes]), + [depth, image_size, image_size]) + # Convert from [depth, height, width] to [height, width, depth]. + image = tf.cast(tf.transpose(depth_major, [1, 2, 0]), tf.float32) + queue = tf.train.shuffle_batch([image, label], size, size, 0, num_threads=16) + return queue + +def build_input(data, batch_size, train): + """Build CIFAR image and labels. + + Args: + data_path: Filename for cifar10 data. + batch_size: Input batch size. + train: True if we are training and false if we are testing. + + Returns: + images: Batches of images of size [batch_size, image_size, image_size, 3]. + labels: Batches of labels of size [batch_size, num_classes]. + + Raises: + ValueError: When the specified dataset is not supported. + """ + images_constant = tf.constant(data[0]) + labels_constant = tf.constant(data[1]) + image_size = 32 + depth = 3 + num_classes = 10 + image, label = tf.train.slice_input_producer([images_constant, labels_constant]) + if train: + image = tf.image.resize_image_with_crop_or_pad( + image, image_size+4, image_size+4) + image = tf.random_crop(image, [image_size, image_size, 3]) + image = tf.image.random_flip_left_right(image) + # Brightness/saturation/constrast provides small gains .2%~.5% on cifar. + # image = tf.image.random_brightness(image, max_delta=63. / 255.) + # image = tf.image.random_saturation(image, lower=0.5, upper=1.5) + # image = tf.image.random_contrast(image, lower=0.2, upper=1.8) + image = tf.image.per_image_standardization(image) + example_queue = tf.RandomShuffleQueue( + capacity=16 * batch_size, + min_after_dequeue=8 * batch_size, + dtypes=[tf.float32, tf.int32], + shapes=[[image_size, image_size, depth], [1]]) + num_threads = 16 + else: + image = tf.image.resize_image_with_crop_or_pad( + image, image_size, image_size) + image = tf.image.per_image_standardization(image) + example_queue = tf.FIFOQueue( + 3 * batch_size, + dtypes=[tf.float32, tf.int32], + shapes=[[image_size, image_size, depth], [1]]) + num_threads = 1 + + + example_enqueue_op = example_queue.enqueue([image, label]) + tf.train.add_queue_runner(tf.train.queue_runner.QueueRunner( + example_queue, [example_enqueue_op] * num_threads)) + + # Read 'batch' labels + images from the example queue. + images, labels = example_queue.dequeue_many(batch_size) + labels = tf.reshape(labels, [batch_size, 1]) + indices = tf.reshape(tf.range(0, batch_size, 1), [batch_size, 1]) + labels = tf.sparse_to_dense( + tf.concat([indices, labels], 1), + [batch_size, num_classes], 1.0, 0.0) + + assert len(images.get_shape()) == 4 + assert images.get_shape()[0] == batch_size + assert images.get_shape()[-1] == 3 + assert len(labels.get_shape()) == 2 + assert labels.get_shape()[0] == batch_size + assert labels.get_shape()[1] == num_classes + + return images, labels diff --git a/examples/resnet/resnet_main.py b/examples/resnet/resnet_main.py new file mode 100644 index 000000000..2ffe46793 --- /dev/null +++ b/examples/resnet/resnet_main.py @@ -0,0 +1,169 @@ +"""ResNet training script, with some code from +https://github.com/tensorflow/models/tree/master/resnet. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import ray +import tensorflow as tf + +import cifar_input +import resnet_model + +FLAGS = tf.app.flags.FLAGS +tf.app.flags.DEFINE_string('train_data_path', '', + 'Filepattern for training data.') +tf.app.flags.DEFINE_string('eval_data_path', '', + 'Filepattern for eval data') +tf.app.flags.DEFINE_string('num_gpus', 0, 'Number of gpus to run with') +use_gpu = 1 if int(FLAGS.num_gpus) > 0 else 0 +@ray.remote(num_return_vals=4) +def get_data(path, size): + os.environ['CUDA_VISIBLE_DEVICES'] = '' + with tf.device('/cpu:0'): + queue = cifar_input.build_data(path, size) + sess = tf.Session() + coord = tf.train.Coordinator() + tf.train.start_queue_runners(sess, coord=coord) + images, labels = sess.run(queue) + coord.request_stop() + sess.close() + return (images[:int(size / 3), :], + images[int(size / 3):int(2 * size / 3), :], + images[int(2 * size / 3):, :], + labels) + +@ray.actor(num_gpus=use_gpu) +class ResNetTrainActor(object): + def __init__(self, data, num_gpus): + if num_gpus > 0: + os.environ['CUDA_VISIBLE_DEVICES'] = ','.join([str(i) for i in ray.get_gpu_ids()]) + hps = resnet_model.HParams(batch_size=128, + num_classes=10, + min_lrn_rate=0.0001, + lrn_rate=0.1, + num_residual_units=5, + use_bottleneck=False, + weight_decay_rate=0.0002, + relu_leakiness=0.1, + optimizer='mom', + num_gpus=num_gpus) + data = ray.get(data) + total_images = np.concatenate([data[0], data[1], data[2]]) + with tf.Graph().as_default(): + if num_gpus > 0: + tf.set_random_seed(ray.get_gpu_ids()[0] + 1) + else: + tf.set_random_seed(1) + + with tf.device('/gpu:0' if num_gpus > 0 else '/cpu:0'): + images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, True) + self.model = resnet_model.ResNet(hps, images, labels, 'train') + self.model.build_graph() + config = tf.ConfigProto(allow_soft_placement=True) + sess = tf.Session(config=config) + self.model.variables.set_session(sess) + self.coord = tf.train.Coordinator() + tf.train.start_queue_runners(sess, coord=self.coord) + init = tf.global_variables_initializer() + sess.run(init) + + def compute_steps(self, weights): + # This method sets the weights in the network, runs some training steps, + # and returns the new weights. + steps = 10 + self.model.variables.set_weights(weights) + for i in range(steps): + self.model.variables.sess.run(self.model.train_op) + return self.model.variables.get_weights() + + def get_weights(self): + return self.model.variables.get_weights() + +@ray.actor +class ResNetTestActor(object): + def __init__(self, data, eval_batch_count): + hps = resnet_model.HParams(batch_size=100, + num_classes=10, + min_lrn_rate=0.0001, + lrn_rate=0.1, + num_residual_units=5, + use_bottleneck=False, + weight_decay_rate=0.0002, + relu_leakiness=0.1, + optimizer='mom', + num_gpus=0) + data = ray.get(data) + total_images = np.concatenate([data[0], data[1], data[2]]) + with tf.Graph().as_default(): + with tf.device('/cpu:0'): + images, labels = cifar_input.build_input([total_images, data[3]], hps.batch_size, False) + self.model = resnet_model.ResNet(hps, images, labels, 'eval') + self.model.build_graph() + config = tf.ConfigProto(allow_soft_placement=True) + sess = tf.Session(config=config) + self.model.variables.set_session(sess) + self.coord = tf.train.Coordinator() + tf.train.start_queue_runners(sess, coord=self.coord) + init = tf.global_variables_initializer() + sess.run(init) + self.best_precision = 0.0 + self.eval_batch_count = eval_batch_count + + def accuracy(self, weights): + self.model.variables.set_weights(weights) + total_prediction, correct_prediction = 0, 0 + model = self.model + sess = self.model.variables.sess + for _ in range(self.eval_batch_count): + loss, predictions, truth, train_step = sess.run( + [model.cost, model.predictions, + model.labels, model.global_step]) + + truth = np.argmax(truth, axis=1) + predictions = np.argmax(predictions, axis=1) + correct_prediction += np.sum(truth == predictions) + total_prediction += predictions.shape[0] + + precision = 1.0 * correct_prediction / total_prediction + self.best_precision = max(precision, self.best_precision) + return precision + +def train(): + """Training loop.""" + num_gpus = int(FLAGS.num_gpus) + ray.init(num_workers=2, num_gpus=num_gpus) + train_data = get_data.remote(FLAGS.train_data_path, 50000) + test_data = get_data.remote(FLAGS.eval_data_path, 10000) + if num_gpus > 0: + train_actors = [ResNetTrainActor(train_data, num_gpus) for _ in range(num_gpus)] + else: + train_actors = [ResNetTrainActor(train_data, num_gpus)] + test_actor = ResNetTestActor(test_data, 50) + step = 0 + weight_id = train_actors[0].get_weights() + acc_id = test_actor.accuracy(weight_id) + if num_gpus == 0: + num_gpus = 1 + while True: + with open('results.txt', 'a') as results: + print('Computing steps') + all_weights = ray.get([actor.compute_steps(weight_id) for actor in train_actors]) + mean_weights = {k: sum([weights[k] for weights in all_weights]) / num_gpus for k in all_weights[0]} + weight_id = ray.put(mean_weights) + step += 10 + if step % 200 == 0: + acc = ray.get(acc_id) + acc_id = test_actor.accuracy(weight_id) + print('Step {0}: {1:.6f}'.format(step - 200, acc)) + results.write(str(step - 200) + ' ' + str(acc) + '\n') + +def main(_): + train() + +if __name__ == '__main__': + tf.app.run() diff --git a/examples/resnet/resnet_model.py b/examples/resnet/resnet_model.py new file mode 100644 index 000000000..c4c23f527 --- /dev/null +++ b/examples/resnet/resnet_model.py @@ -0,0 +1,282 @@ +"""ResNet model with most of the code taken from +https://github.com/tensorflow/models/tree/master/resnet. + +Related papers: +https://arxiv.org/pdf/1603.05027v2.pdf +https://arxiv.org/pdf/1512.03385v1.pdf +https://arxiv.org/pdf/1605.07146v1.pdf +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import namedtuple +import numpy as np +import ray +import tensorflow as tf +from tensorflow.python.training import moving_averages + +HParams = namedtuple('HParams', + 'batch_size, num_classes, min_lrn_rate, lrn_rate, ' + 'num_residual_units, use_bottleneck, weight_decay_rate, ' + 'relu_leakiness, optimizer, num_gpus') + + +class ResNet(object): + """ResNet model.""" + + def __init__(self, hps, images, labels, mode): + """ResNet constructor. + + Args: + hps: Hyperparameters. + images: Batches of images of size [batch_size, image_size, image_size, 3]. + labels: Batches of labels of size [batch_size, num_classes]. + mode: One of 'train' and 'eval'. + """ + self.hps = hps + self._images = images + self.labels = labels + self.mode = mode + + self._extra_train_ops = [] + + def build_graph(self): + """Build a whole graph for the model.""" + self.global_step = tf.Variable(0, trainable=False) + self._build_model() + if self.mode == 'train': + self._build_train_op() + else: + self.variables = ray.experimental.TensorFlowVariables(self.cost) + + def _stride_arr(self, stride): + """Map a stride scalar to the stride array for tf.nn.conv2d.""" + return [1, stride, stride, 1] + + def _build_model(self): + """Build the core model within the graph.""" + + with tf.variable_scope('init'): + x = self._images + x = self._conv('init_conv', x, 3, 3, 16, self._stride_arr(1)) + + strides = [1, 2, 2] + activate_before_residual = [True, False, False] + if self.hps.use_bottleneck: + res_func = self._bottleneck_residual + filters = [16, 64, 128, 256] + else: + res_func = self._residual + filters = [16, 16, 32, 64] + # Uncomment the following codes to use w28-10 wide residual network. + # It is more memory efficient than very deep residual network and has + # comparably good performance. + # https://arxiv.org/pdf/1605.07146v1.pdf + # filters = [16, 160, 320, 640] + # Update hps.num_residual_units to 9 + + with tf.variable_scope('unit_1_0'): + x = res_func(x, filters[0], filters[1], self._stride_arr(strides[0]), + activate_before_residual[0]) + for i in range(1, self.hps.num_residual_units): + with tf.variable_scope('unit_1_%d' % i): + x = res_func(x, filters[1], filters[1], self._stride_arr(1), False) + + with tf.variable_scope('unit_2_0'): + x = res_func(x, filters[1], filters[2], self._stride_arr(strides[1]), + activate_before_residual[1]) + for i in range(1, self.hps.num_residual_units): + with tf.variable_scope('unit_2_%d' % i): + x = res_func(x, filters[2], filters[2], self._stride_arr(1), False) + + with tf.variable_scope('unit_3_0'): + x = res_func(x, filters[2], filters[3], self._stride_arr(strides[2]), + activate_before_residual[2]) + for i in range(1, self.hps.num_residual_units): + with tf.variable_scope('unit_3_%d' % i): + x = res_func(x, filters[3], filters[3], self._stride_arr(1), False) + with tf.variable_scope('unit_last'): + x = self._batch_norm('final_bn', x) + x = self._relu(x, self.hps.relu_leakiness) + x = self._global_avg_pool(x) + + with tf.variable_scope('logit'): + logits = self._fully_connected(x, self.hps.num_classes) + self.predictions = tf.nn.softmax(logits) + + with tf.variable_scope('costs'): + xent = tf.nn.softmax_cross_entropy_with_logits( + logits=logits, labels=self.labels) + self.cost = tf.reduce_mean(xent, name='xent') + self.cost += self._decay() + + truth = tf.argmax(self.labels, axis=1) + predictions = tf.argmax(self.predictions, axis=1) + self.precision = tf.reduce_mean(tf.to_float(tf.equal(predictions, truth))) + + def _build_train_op(self): + """Build training specific ops for the graph.""" + rate = self.hps.lrn_rate + num_gpus = self.hps.num_gpus if self.hps.num_gpus != 0 else 1 + # The learning rate schedule is dependent on the number of gpus. + boundaries = [int(20000 * i / np.sqrt(num_gpus)) for i in range(2, 5)] + values = [0.1, 0.01, 0.001, 0.0001] + self.lrn_rate = tf.train.piecewise_constant(self.global_step, boundaries, values) + + if self.hps.optimizer == 'sgd': + optimizer = tf.train.GradientDescentOptimizer(self.lrn_rate) + elif self.hps.optimizer == 'mom': + optimizer = tf.train.MomentumOptimizer(self.lrn_rate, 0.9) + + apply_op = optimizer.minimize(self.cost, global_step=self.global_step) + train_ops = [apply_op] + self._extra_train_ops + self.train_op = tf.group(*train_ops) + self.variables = ray.experimental.TensorFlowVariables(self.train_op) + + def _batch_norm(self, name, x): + """Batch normalization.""" + with tf.variable_scope(name): + params_shape = [x.get_shape()[-1]] + + beta = tf.get_variable( + 'beta', params_shape, tf.float32, + initializer=tf.constant_initializer(0.0, tf.float32)) + gamma = tf.get_variable( + 'gamma', params_shape, tf.float32, + initializer=tf.constant_initializer(1.0, tf.float32)) + + if self.mode == 'train': + mean, variance = tf.nn.moments(x, [0, 1, 2], name='moments') + + moving_mean = tf.get_variable( + 'moving_mean', params_shape, tf.float32, + initializer=tf.constant_initializer(0.0, tf.float32), + trainable=False) + moving_variance = tf.get_variable( + 'moving_variance', params_shape, tf.float32, + initializer=tf.constant_initializer(1.0, tf.float32), + trainable=False) + + self._extra_train_ops.append(moving_averages.assign_moving_average( + moving_mean, mean, 0.9)) + self._extra_train_ops.append(moving_averages.assign_moving_average( + moving_variance, variance, 0.9)) + else: + mean = tf.get_variable( + 'moving_mean', params_shape, tf.float32, + initializer=tf.constant_initializer(0.0, tf.float32), + trainable=False) + variance = tf.get_variable( + 'moving_variance', params_shape, tf.float32, + initializer=tf.constant_initializer(1.0, tf.float32), + trainable=False) + # elipson used to be 1e-5. Maybe 0.001 solves NaN problem in deeper net. + y = tf.nn.batch_normalization( + x, mean, variance, beta, gamma, 0.001) + y.set_shape(x.get_shape()) + return y + + def _residual(self, x, in_filter, out_filter, stride, + activate_before_residual=False): + """Residual unit with 2 sub layers.""" + if activate_before_residual: + with tf.variable_scope('shared_activation'): + x = self._batch_norm('init_bn', x) + x = self._relu(x, self.hps.relu_leakiness) + orig_x = x + else: + with tf.variable_scope('residual_only_activation'): + orig_x = x + x = self._batch_norm('init_bn', x) + x = self._relu(x, self.hps.relu_leakiness) + + with tf.variable_scope('sub1'): + x = self._conv('conv1', x, 3, in_filter, out_filter, stride) + + with tf.variable_scope('sub2'): + x = self._batch_norm('bn2', x) + x = self._relu(x, self.hps.relu_leakiness) + x = self._conv('conv2', x, 3, out_filter, out_filter, [1, 1, 1, 1]) + + with tf.variable_scope('sub_add'): + if in_filter != out_filter: + orig_x = tf.nn.avg_pool(orig_x, stride, stride, 'VALID') + orig_x = tf.pad( + orig_x, [[0, 0], [0, 0], [0, 0], + [(out_filter-in_filter) // 2, (out_filter-in_filter) // 2]]) + x += orig_x + + return x + + def _bottleneck_residual(self, x, in_filter, out_filter, stride, + activate_before_residual=False): + """Bottleneck residual unit with 3 sub layers.""" + if activate_before_residual: + with tf.variable_scope('common_bn_relu'): + x = self._batch_norm('init_bn', x) + x = self._relu(x, self.hps.relu_leakiness) + orig_x = x + else: + with tf.variable_scope('residual_bn_relu'): + orig_x = x + x = self._batch_norm('init_bn', x) + x = self._relu(x, self.hps.relu_leakiness) + + with tf.variable_scope('sub1'): + x = self._conv('conv1', x, 1, in_filter, out_filter / 4, stride) + + with tf.variable_scope('sub2'): + x = self._batch_norm('bn2', x) + x = self._relu(x, self.hps.relu_leakiness) + x = self._conv('conv2', x, 3, out_filter / 4, out_filter / 4, [1, 1, 1, 1]) + + with tf.variable_scope('sub3'): + x = self._batch_norm('bn3', x) + x = self._relu(x, self.hps.relu_leakiness) + x = self._conv('conv3', x, 1, out_filter / 4, out_filter, [1, 1, 1, 1]) + + with tf.variable_scope('sub_add'): + if in_filter != out_filter: + orig_x = self._conv('project', orig_x, 1, in_filter, out_filter, stride) + x += orig_x + + return x + + def _decay(self): + """L2 weight decay loss.""" + costs = [] + for var in tf.trainable_variables(): + if var.op.name.find(r'DW') > 0: + costs.append(tf.nn.l2_loss(var)) + + return tf.multiply(self.hps.weight_decay_rate, tf.add_n(costs)) + + def _conv(self, name, x, filter_size, in_filters, out_filters, strides): + """Convolution.""" + with tf.variable_scope(name): + n = filter_size * filter_size * out_filters + kernel = tf.get_variable( + 'DW', [filter_size, filter_size, in_filters, out_filters], + tf.float32, initializer=tf.random_normal_initializer( + stddev=np.sqrt(2.0 / n))) + return tf.nn.conv2d(x, kernel, strides, padding='SAME') + + def _relu(self, x, leakiness=0.0): + """Relu, with optional leaky support.""" + return tf.where(tf.less(x, 0.0), leakiness * x, x, name='leaky_relu') + + def _fully_connected(self, x, out_dim): + """FullyConnected layer for final output.""" + x = tf.reshape(x, [self.hps.batch_size, -1]) + w = tf.get_variable( + 'DW', [x.get_shape()[1], out_dim], + initializer=tf.uniform_unit_scaling_initializer(factor=1.0)) + b = tf.get_variable('biases', [out_dim], + initializer=tf.constant_initializer()) + return tf.nn.xw_plus_b(x, w, b) + + def _global_avg_pool(self, x): + assert x.get_shape().ndims == 4 + return tf.reduce_mean(x, [1, 2]) diff --git a/python/ray/experimental/tfutils.py b/python/ray/experimental/tfutils.py index a8299b922..7ce146c7c 100644 --- a/python/ray/experimental/tfutils.py +++ b/python/ray/experimental/tfutils.py @@ -103,4 +103,4 @@ class TensorFlowVariables(object): def set_weights(self, new_weights): """Sets the weights to new_weights.""" self._check_sess() - self.sess.run(self.assignment_nodes, feed_dict={self.placeholders[name]: value for (name, value) in new_weights.items()}) + self.sess.run(self.assignment_nodes, feed_dict={self.placeholders[name]: value for (name, value) in new_weights.items() if name in self.placeholders}) diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index 0d9b9fd27..f5c0b05e4 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -19,31 +19,48 @@ def make_linear_network(w_name=None, b_name=None): # Return the loss and weight initializer. return tf.reduce_mean(tf.square(y - y_data)), tf.global_variables_initializer(), x_data, y_data -def net_vars_initializer(): - # Uses a separate graph for each network. - with tf.Graph().as_default(): - # Create the network. - loss, init, _, _ = make_linear_network() - sess = tf.Session() - # Additional code for setting and getting the weights. - variables = ray.experimental.TensorFlowVariables(loss, sess) - # Return all of the data needed to use the network. - return variables, init, sess +class NetActor(object): -def net_vars_reinitializer(net_vars): - return net_vars + def __init__(self): + # Uses a separate graph for each network. + with tf.Graph().as_default(): + # Create the network. + loss, init, _, _ = make_linear_network() + sess = tf.Session() + # Additional code for setting and getting the weights. + variables = ray.experimental.TensorFlowVariables(loss, sess) + # Return all of the data needed to use the network. + self.values = [variables, init, sess] + sess.run(init) -def train_vars_initializer(): - # Almost the same as above, but now returns the placeholders and gradient. - with tf.Graph().as_default(): - loss, init, x_data, y_data = make_linear_network() - sess = tf.Session() - variables = ray.experimental.TensorFlowVariables(loss, sess) - optimizer = tf.train.GradientDescentOptimizer(0.9) - grads = optimizer.compute_gradients(loss) - train = optimizer.apply_gradients(grads) - return loss, variables, init, sess, grads, train, [x_data, y_data] + def set_and_get_weights(self, weights): + self.values[0].set_weights(weights) + return self.values[0].get_weights() + def get_weights(self): + return self.values[0].get_weights() + +class TrainActor(object): + + def __init__(self): + # Almost the same as above, but now returns the placeholders and gradient. + with tf.Graph().as_default(): + loss, init, x_data, y_data = make_linear_network() + sess = tf.Session() + variables = ray.experimental.TensorFlowVariables(loss, sess) + optimizer = tf.train.GradientDescentOptimizer(0.9) + grads = optimizer.compute_gradients(loss) + train = optimizer.apply_gradients(grads) + self.values = [loss, variables, init, sess, grads, train, [x_data, y_data]] + sess.run(init) + + def training_step(self, weights): + _, variables, _, sess, grads, _, placeholders = self.values + variables.set_weights(weights) + return sess.run([grad[0] for grad in grads], feed_dict=dict(zip(placeholders, [[1]*100, [2]*100]))) + + def get_weights(self): + return self.values[1].get_weights() class TensorFlowTest(unittest.TestCase): @@ -93,19 +110,15 @@ class TensorFlowTest(unittest.TestCase): def testVariableNameCollision(self): ray.init(num_workers=2) - ray.env.net1 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) - ray.env.net2 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) + net1 = NetActor() + net2 = NetActor() - net_vars1, init1, sess1 = ray.env.net1 - net_vars2, init2, sess2 = ray.env.net2 - - # Initialize the networks - sess1.run(init1) - sess2.run(init2) + net_vars1, init1, sess1 = net1.values + net_vars2, init2, sess2 = net2.values # This is checking that the variable names of the two nets are the same, # i.e. that the names in the weight dictionaries are the same - ray.env.net1[0].set_weights(ray.env.net2[0].get_weights()) + net1.values[0].set_weights(net2.values[0].get_weights()) ray.worker.cleanup() @@ -114,37 +127,25 @@ class TensorFlowTest(unittest.TestCase): def testNetworksIndependent(self): # Note we use only one worker to ensure that all of the remote functions run on the same worker. ray.init(num_workers=1) - - ray.env.net1 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) - ray.env.net2 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) - - net_vars1, init1, sess1 = ray.env.net1 - net_vars2, init2, sess2 = ray.env.net2 - - # Initialize the networks - sess1.run(init1) - sess2.run(init2) - - @ray.remote - def set_and_get_weights(weights1, weights2): - ray.env.net1[0].set_weights(weights1) - ray.env.net2[0].set_weights(weights2) - return ray.env.net1[0].get_weights(), ray.env.net2[0].get_weights() + net1 = NetActor() + net2 = NetActor() # Make sure the two networks have different weights. TODO(rkn): Note that # equality comparisons of numpy arrays normally does not work. This only # works because at the moment they have size 1. - weights1 = net_vars1.get_weights() - weights2 = net_vars2.get_weights() + weights1 = net1.get_weights() + weights2 = net2.get_weights() self.assertNotEqual(weights1, weights2) # Set the weights and get the weights, and make sure they are unchanged. - new_weights1, new_weights2 = ray.get(set_and_get_weights.remote(weights1, weights2)) + new_weights1 = net1.set_and_get_weights(weights1) + new_weights2 = net2.set_and_get_weights(weights2) self.assertEqual(weights1, new_weights1) self.assertEqual(weights2, new_weights2) # Swap the weights. - new_weights2, new_weights1 = ray.get(set_and_get_weights.remote(weights2, weights1)) + new_weights1 = net2.set_and_get_weights(weights1) + new_weights2 = net1.set_and_get_weights(weights2) self.assertEqual(weights1, new_weights1) self.assertEqual(weights2, new_weights2) @@ -161,20 +162,10 @@ class TensorFlowTest(unittest.TestCase): net_vars1 = ray.experimental.TensorFlowVariables(loss1, sess1) sess1.run(init1) - # Create a network on the driver via an environment variable. - ray.env.net = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) + net2 = ray.actor(NetActor)() + weights2 = ray.get(net2.get_weights()) - net_vars2, init2, sess2 = ray.env.net - sess2.run(init2) - - weights2 = net_vars2.get_weights() - - @ray.remote - def set_and_get_weights(weights): - ray.env.net[0].set_weights(weights) - return ray.env.net[0].get_weights() - - new_weights2 = ray.get(set_and_get_weights.remote(net_vars2.get_weights())) + new_weights2 = ray.get(net2.set_and_get_weights(net2.get_weights())) self.assertEqual(weights2, new_weights2) ray.worker.cleanup() @@ -198,18 +189,8 @@ class TensorFlowTest(unittest.TestCase): def testRemoteTrainingStep(self): ray.init(num_workers=1) - ray.env.net = ray.EnvironmentVariable(train_vars_initializer, net_vars_reinitializer) - - @ray.remote - def training_step(weights): - _, variables, _, sess, grads, _, placeholders = ray.env.net - variables.set_weights(weights) - return sess.run([grad[0] for grad in grads], feed_dict=dict(zip(placeholders, [[1]*100]*2))) - - _, variables, init, sess, _, _, _ = ray.env.net - - sess.run(init) - ray.get(training_step.remote(variables.get_weights())) + net = ray.actor(TrainActor)() + ray.get(net.training_step(net.get_weights())) ray.worker.cleanup() @@ -217,21 +198,13 @@ class TensorFlowTest(unittest.TestCase): def testRemoteTrainingLoss(self): ray.init(num_workers=2) - ray.env.net = ray.EnvironmentVariable(train_vars_initializer, net_vars_reinitializer) + net = ray.actor(TrainActor)() + loss, variables, _, sess, grads, train, placeholders = TrainActor().values - @ray.remote - def training_step(weights): - _, variables, _, sess, grads, _, placeholders = ray.env.net - variables.set_weights(weights) - return sess.run([grad[0] for grad in grads], feed_dict=dict(zip(placeholders, [[1]*100, [2]*100]))) - - loss, variables, init, sess, grads, train, placeholders = ray.env.net - - sess.run(init) before_acc = sess.run(loss, feed_dict=dict(zip(placeholders, [[2]*100, [4]*100]))) for _ in range(3): - gradients_list = ray.get([training_step.remote(variables.get_weights()) for _ in range(2)]) + gradients_list = ray.get([net.training_step(variables.get_weights()) for _ in range(2)]) mean_grads = [sum([gradients[i] for gradients in gradients_list]) / len(gradients_list) for i in range(len(gradients_list[0]))] feed_dict = {grad[0]: mean_grad for (grad, mean_grad) in zip(grads, mean_grads)} sess.run(train, feed_dict=feed_dict) @@ -239,5 +212,21 @@ class TensorFlowTest(unittest.TestCase): self.assertTrue(before_acc < after_acc) ray.worker.cleanup() + def testVariablesControlDependencies(self): + ray.init(num_workers=1) + + # Creates a network and appends a momentum optimizer. + sess = tf.Session() + loss, init, _, _ = make_linear_network() + minimizer = tf.train.MomentumOptimizer(0.9, 0.9).minimize(loss) + net_vars = ray.experimental.TensorFlowVariables(minimizer, sess) + sess.run(init) + + # Tests if all variables are properly retrieved, 2 variables and 2 momentum + # variables. + self.assertEqual(len(net_vars.variables.items()), 4) + + ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2)