From aaf3be3c53bf6b762947cf45bc04b4ac2cc25923 Mon Sep 17 00:00:00 2001 From: Wapaul1 Date: Tue, 10 Jan 2017 18:40:06 -0800 Subject: [PATCH] Fixed lbfgs for ray-cluster (#180) * Updated lbfgs example to include TensorflowVariables * Whitespace. --- examples/lbfgs/driver.py | 191 +++++++++++++------------ lib/python/ray/experimental/tfutils.py | 36 ++++- test/tensorflow_test.py | 11 ++ 3 files changed, 141 insertions(+), 97 deletions(-) diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index f74af6ddd..f81f19233 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -10,104 +10,103 @@ import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data +class LinearModel(object): + """Simple class for a one layer neural network. + + Note that this code does not initialize the network weights. Instead weights + are set via self.variables.set_weights. + + Example: + net = LinearModel([10,10]) + weights = [np.random.normal(size=[10,10]), np.random.normal(size=[10])] + variable_names = [v.name for v in net.variables] + net.variables.set_weights(dict(zip(variable_names, weights))) + + Attributes: + x (tf.placeholder): Input vector. + w (tf.Variable): Weight matrix. + b (tf.Variable): Bias vector. + y_ (tf.placeholder): Input result vector. + cross_entropy (tf.Operation): Final layer of network. + cross_entropy_grads (tf.Operation): Gradient computation. + sess (tf.Session): Session used for training. + variables (TensorFlowVariables): Extracted variables and methods to + manipulate them. + """ + def __init__(self, shape): + """Creates a LinearModel object.""" + x = tf.placeholder(tf.float32, [None, shape[0]]) + w = tf.Variable(tf.zeros(shape)) + b = tf.Variable(tf.zeros(shape[1])) + self.x = x + self.w = w + self.b = b + y = tf.nn.softmax(tf.matmul(x, w) + b) + y_ = tf.placeholder(tf.float32, [None, shape[1]]) + self.y_ = y_ + cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) + self.cross_entropy = cross_entropy + self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b]) + self.sess = tf.Session() + # In order to get and set the weights, we pass in the loss function to Ray's + # TensorFlowVariables to automatically create methods to modify the weights. + self.variables = ray.experimental.TensorFlowVariables(cross_entropy, self.sess) + + def loss(self, xs, ys): + """Computes the loss of the network.""" + return float(self.sess.run(self.cross_entropy, feed_dict={self.x: xs, self.y_: ys})) + + def grad(self, xs, ys): + """Computes the gradients of the network.""" + return self.sess.run(self.cross_entropy_grads, feed_dict={self.x: xs, self.y_: ys}) + +def net_initialization(): + return LinearModel([784,10]) + +# By default, when a reusable variable is used by a remote function, the +# initialization code will be rerun at the end of the remote task to ensure +# that the state of the variable is not changed by the remote task. However, +# the initialization code may be expensive. This case is one example, because +# a TensorFlow network is constructed. In this case, we pass in a special +# reinitialization function which gets run instead of the original +# initialization code. As users, if we pass in custom reinitialization code, +# we must ensure that no state is leaked between tasks. +def net_reinitialization(net): + return net + +# Register the network with Ray and create a reusable variable for it. +ray.reusables.net = ray.Reusable(net_initialization, net_reinitialization) + +# Compute the loss on a batch of data. +@ray.remote +def loss(theta, xs, ys): + net = ray.reusables.net + net.variables.set_flat(theta) + return net.loss(xs,ys) + +# Compute the gradient of the loss on a batch of data. +@ray.remote +def grad(theta, xs, ys): + net = ray.reusables.net + net.variables.set_flat(theta) + gradients = net.grad(xs, ys) + return np.concatenate([g.flatten() for g in gradients]) + +# Compute the loss on the entire dataset. +def full_loss(theta): + theta_id = ray.put(theta) + loss_ids = [loss.remote(theta_id, xs_id, ys_id) for (xs_id, ys_id) in batch_ids] + return sum(ray.get(loss_ids)) + +# Compute the gradient of the loss on the entire dataset. +def full_grad(theta): + theta_id = ray.put(theta) + grad_ids = [grad.remote(theta_id, xs_id, ys_id) for (xs_id, ys_id) in batch_ids] + return sum(ray.get(grad_ids)).astype("float64") # This conversion is necessary for use with fmin_l_bfgs_b. + if __name__ == "__main__": ray.init(num_workers=10) - # Define the dimensions of the data and of the model. - image_dimension = 784 - label_dimension = 10 - w_shape = [image_dimension, label_dimension] - w_size = np.prod(w_shape) - b_shape = [label_dimension] - b_size = np.prod(b_shape) - dim = w_size + b_size - - # Define a function for initializing the network. Note that this code does not - # call initialize the network weights. If it did, the weights would be - # randomly initialized on each worker and would differ from worker to worker. - # We pass the weights into the remote functions loss and grad so that the - # weights are the same on each worker. - def net_initialization(): - x = tf.placeholder(tf.float32, [None, image_dimension]) - w = tf.Variable(tf.zeros(w_shape)) - b = tf.Variable(tf.zeros(b_shape)) - y = tf.nn.softmax(tf.matmul(x, w) + b) - y_ = tf.placeholder(tf.float32, [None, label_dimension]) - cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) - cross_entropy_grads = tf.gradients(cross_entropy, [w, b]) - - sess = tf.Session() - - # In order to set the weights of the TensorFlow graph on a worker, we add - # assignment nodes. To get the network weights (as a list of numpy arrays) - # and to set the network weights (from a list of numpy arrays), use the - # methods get_weights and set_weights. This can be done from within a remote - # function or on the driver. - def get_and_set_weights_methods(): - assignment_placeholders = [] - assignment_nodes = [] - for var in tf.trainable_variables(): - assignment_placeholders.append(tf.placeholder(var.value().dtype, var.get_shape().as_list())) - assignment_nodes.append(var.assign(assignment_placeholders[-1])) - - def get_weights(): - return [v.eval(session=sess) for v in tf.trainable_variables()] - - def set_weights(new_weights): - sess.run(assignment_nodes, feed_dict={p: w for p, w in zip(assignment_placeholders, new_weights)}) - - return get_weights, set_weights - - get_weights, set_weights = get_and_set_weights_methods() - - return sess, cross_entropy, cross_entropy_grads, x, y_, get_weights, set_weights - - # By default, when a reusable variable is used by a remote function, the - # initialization code will be rerun at the end of the remote task to ensure - # that the state of the variable is not changed by the remote task. However, - # the initialization code may be expensive. This case is one example, because - # a TensorFlow network is constructed. In this case, we pass in a special - # reinitialization function which gets run instead of the original - # initialization code. As users, if we pass in custom reinitialization code, - # we must ensure that no state is leaked between tasks. - def net_reinitialization(net_vars): - return net_vars - - # Create a reusable variable for the network. - ray.reusables.net_vars = ray.Reusable(net_initialization, net_reinitialization) - - # Load the weights into the network. - def load_weights(theta): - sess, _, _, _, _, get_weights, set_weights = ray.reusables.net_vars - set_weights([theta[:w_size].reshape(w_shape), theta[w_size:].reshape(b_shape)]) - - # Compute the loss on a batch of data. - @ray.remote - def loss(theta, xs, ys): - sess, cross_entropy, _, x, y_, _, _ = ray.reusables.net_vars - load_weights(theta) - return float(sess.run(cross_entropy, feed_dict={x: xs, y_: ys})) - - # Compute the gradient of the loss on a batch of data. - @ray.remote - def grad(theta, xs, ys): - sess, _, cross_entropy_grads, x, y_, _, _ = ray.reusables.net_vars - load_weights(theta) - gradients = sess.run(cross_entropy_grads, feed_dict={x: xs, y_: ys}) - return np.concatenate([g.flatten() for g in gradients]) - - # Compute the loss on the entire dataset. - def full_loss(theta): - theta_id = ray.put(theta) - loss_ids = [loss.remote(theta_id, xs_id, ys_id) for (xs_id, ys_id) in batch_ids] - return sum(ray.get(loss_ids)) - - # Compute the gradient of the loss on the entire dataset. - def full_grad(theta): - theta_id = ray.put(theta) - grad_ids = [grad.remote(theta_id, xs_id, ys_id) for (xs_id, ys_id) in batch_ids] - return sum(ray.get(grad_ids)).astype("float64") # This conversion is necessary for use with fmin_l_bfgs_b. - # From the perspective of scipy.optimize.fmin_l_bfgs_b, full_loss is simply a # function which takes some parameters theta, and computes a loss. Similarly, # full_grad is a function which takes some parameters theta, and computes the @@ -128,7 +127,9 @@ if __name__ == "__main__": batch_ids = [(ray.put(xs), ray.put(ys)) for (xs, ys) in batches] # Initialize the weights for the network to the vector of all zeros. + dim = ray.reusables.net.variables.get_flat_size() theta_init = 1e-2 * np.random.normal(size=dim) + # Use L-BFGS to minimize the loss function. print("Running L-BFGS.") result = scipy.optimize.fmin_l_bfgs_b(full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True) diff --git a/lib/python/ray/experimental/tfutils.py b/lib/python/ray/experimental/tfutils.py index f70ebd5c8..6f51e3a0d 100644 --- a/lib/python/ray/experimental/tfutils.py +++ b/lib/python/ray/experimental/tfutils.py @@ -1,6 +1,18 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import numpy as np + +def unflatten(vector, shapes): + i = 0 + arrays = [] + for shape in shapes: + size = np.prod(shape) + array = vector[i:(i + size)].reshape(shape) + arrays.append(array) + i += size + assert len(vector) == i, "Passed weight does not have the correct shape." + return arrays class TensorFlowVariables(object): """An object used to extract variables from a loss function. @@ -35,12 +47,32 @@ class TensorFlowVariables(object): """Modifies the current session used by the class.""" self.sess = sess + def get_flat_size(self): + return sum([np.prod(v.get_shape().as_list()) for v in self.variables]) + + def _check_sess(self): + """Checks if the session is set, and if not throw an error message.""" + assert self.sess is not None, "The session is not set. Set the session either by passing it into the TensorFlowVariables constructor or by calling set_session(sess)." + + def get_flat(self): + """Gets the weights and returns them as a flat array.""" + self._check_sess() + return np.concatenate([v.eval(session=self.sess).flatten() for v in self.variables]) + + def set_flat(self, new_weights): + """Sets the weights to new_weights, converting from a flat array.""" + self._check_sess() + shapes = [v.get_shape().as_list() for v in self.variables] + arrays = unflatten(new_weights, shapes) + placeholders = [self.assignment_placeholders[v.op.node_def.name] for v in self.variables] + self.sess.run(self.assignment_nodes, feed_dict=dict(zip(placeholders,arrays))) + def get_weights(self): """Returns the weights of the variables of the loss function in a list.""" - assert self.sess is not None, "The session is not set. Set the session either by passing it into the TensorFlowVariables constructor or by calling set_session(sess)." + self._check_sess() return {v.op.node_def.name: v.eval(session=self.sess) for v in self.variables} def set_weights(self, new_weights): """Sets the weights to new_weights.""" - assert self.sess is not None, "The session is not set. Set the session either by passing it into the TensorFlowVariables constructor or by calling set_session(sess)." + self._check_sess() self.sess.run(self.assignment_nodes, feed_dict={self.assignment_placeholders[name]: value for (name, value) in new_weights.items()}) diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index d9ad347bb..7ec5404d0 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -5,6 +5,7 @@ from __future__ import print_function import unittest import tensorflow as tf import ray +from numpy.testing import assert_almost_equal class TensorFlowTest(unittest.TestCase): @@ -47,6 +48,16 @@ class TensorFlowTest(unittest.TestCase): variables2.set_weights(weights2) self.assertEqual(weights2, variables2.get_weights()) + flat_weights = variables2.get_flat() + 2.0 + variables2.set_flat(flat_weights) + assert_almost_equal(flat_weights, variables2.get_flat()) + + variables3 = ray.experimental.TensorFlowVariables(loss2) + self.assertEqual(variables3.sess, None) + sess = tf.Session() + variables3.set_session(sess) + self.assertEqual(variables3.sess, sess) + ray.worker.cleanup() if __name__ == "__main__":