diff --git a/doc/using-ray-with-tensorflow.md b/doc/using-ray-with-tensorflow.md index d56ea4916..4ee376919 100644 --- a/doc/using-ray-with-tensorflow.md +++ b/doc/using-ray-with-tensorflow.md @@ -72,15 +72,21 @@ b.assign(np.zeros(1)) # This adds a node to the graph every time you call it. ## Complete Example Putting this all together, we would first create the graph on each worker using -environment variables. Within the environment variables, we would define -`get_weights` and `set_weights` methods. We would then use those methods to ship -the weights (as lists of numpy arrays) between the processes without shipping -the actual TensorFlow graphs, which are much more complex Python objects. +environment variables. Within the environment variables, we would use the +`get_weights` and `set_weights` methods of the `TensorFlowVariables` class. We +would then use those methods to ship the weights (as a dictionary of variable +names mapping to tensorflow tensors) between the processes without shipping the +actual TensorFlow graphs, which are much more complex Python objects. Note that +to avoid namespace collision with already created variables on the workers, we +use a variable_scope and a prefix in the environment variables and then pass +true to the prefix in `TensorFlowVariables` so it can properly decode the variable +names. ```python import tensorflow as tf import numpy as np import ray +import uuid ray.init(num_workers=5) @@ -89,25 +95,31 @@ NUM_BATCHES = 1 NUM_ITERS = 201 def net_vars_initializer(): - # Seed TensorFlow to make the script deterministic. - tf.set_random_seed(0) - # Define the inputs. - x_data = tf.placeholder(tf.float32, shape=[BATCH_SIZE]) - y_data = tf.placeholder(tf.float32, shape=[BATCH_SIZE]) - # Define the weights and computation. - w = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) - b = tf.Variable(tf.zeros([1])) - y = w * x_data + b - # Define the loss. - loss = tf.reduce_mean(tf.square(y - y_data)) - optimizer = tf.train.GradientDescentOptimizer(0.5) - train = optimizer.minimize(loss) - # Define the weight initializer and session. - init = tf.global_variables_initializer() - sess = tf.Session() - # Additional code for setting and getting the weights. - variables = ray.experimental.TensorFlowVariables(loss, sess) - # Return all of the data needed to use the network. + # Prefix should be random so that there is no conflict with variable names in + # the cluster setting. + prefix = str(uuid.uuid1().hex) + # Use the tensorflow variable_scope to prefix all of the variables + with tf.variable_scope(prefix): + # Seed TensorFlow to make the script deterministic. + tf.set_random_seed(0) + # Define the inputs. + x_data = tf.placeholder(tf.float32, shape=[BATCH_SIZE]) + y_data = tf.placeholder(tf.float32, shape=[BATCH_SIZE]) + # Define the weights and computation. + w = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) + b = tf.Variable(tf.zeros([1])) + y = w * x_data + b + # Define the loss. + loss = tf.reduce_mean(tf.square(y - y_data)) + optimizer = tf.train.GradientDescentOptimizer(0.5) + train = optimizer.minimize(loss) + # Define the weight initializer and session. + init = tf.global_variables_initializer() + sess = tf.Session() + # Additional code for setting and getting the weights, and use a prefix + # so that the variable names can be converted between workers. + variables = ray.experimental.TensorFlowVariables(loss, sess, prefix=True) + # Return all of the data needed to use the network. return variables, sess, train, loss, x_data, y_data, init def net_vars_reinitializer(net_vars): @@ -131,7 +143,7 @@ def step(weights, x, y): variables, sess, _, loss, x_data, y_data, init = ray.env.net_vars # Initialize the network weights. sess.run(init) -# Get the weights as a list of numpy arrays. +# Get the weights as a dictionary of numpy arrays. weights = variables.get_weights() # Define a remote function for generating fake data. diff --git a/python/ray/experimental/tfutils.py b/python/ray/experimental/tfutils.py index 6f51e3a0d..eb7da888a 100644 --- a/python/ray/experimental/tfutils.py +++ b/python/ray/experimental/tfutils.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function import numpy as np +from collections import deque, OrderedDict def unflatten(vector, shapes): i = 0 @@ -27,28 +28,42 @@ class TensorFlowVariables(object): assignment_placeholders (List[tf.placeholders]): The nodes that weights get passed to. assignment_nodes (List[tf.Tensor]): The nodes that assign the weights. + prefix (Bool): Boolean for if there is a prefix on the variable names. """ - def __init__(self, loss, sess=None): + def __init__(self, loss, sess=None, prefix=False): """Creates a TensorFlowVariables instance.""" import tensorflow as tf self.sess = sess self.loss = loss - variable_names = [op.node_def.name for op in loss.graph.get_operations() if op.node_def.op == "Variable"] - self.variables = [v for v in tf.trainable_variables() if v.op.node_def.name in variable_names] + self.prefix = prefix + queue = deque([loss]) + variable_names = [] + + # We do a BFS on the dependency graph of the input function to find + # the variables. + while len(queue) != 0: + op = queue.popleft().op + queue.extend(op.inputs) + if op.node_def.op == "Variable": + variable_names.append(op.node_def.name) + self.variables = OrderedDict() + for v in [v for v in tf.global_variables() if v.op.node_def.name in variable_names]: + name = v.op.node_def.name.split("/", 1 if prefix else 0)[-1] + self.variables[name] = v self.assignment_placeholders = dict() self.assignment_nodes = [] # Create new placeholders to put in custom weights. - for var in self.variables: - self.assignment_placeholders[var.op.node_def.name] = tf.placeholder(var.value().dtype, var.get_shape().as_list()) - self.assignment_nodes.append(var.assign(self.assignment_placeholders[var.op.node_def.name])) + for k, var in self.variables.items(): + self.assignment_placeholders[k] = tf.placeholder(var.value().dtype, var.get_shape().as_list()) + self.assignment_nodes.append(var.assign(self.assignment_placeholders[k])) def set_session(self, sess): """Modifies the current session used by the class.""" self.sess = sess def get_flat_size(self): - return sum([np.prod(v.get_shape().as_list()) for v in self.variables]) + return sum([np.prod(v.get_shape().as_list()) for v in self.variables.values()]) def _check_sess(self): """Checks if the session is set, and if not throw an error message.""" @@ -57,20 +72,20 @@ class TensorFlowVariables(object): def get_flat(self): """Gets the weights and returns them as a flat array.""" self._check_sess() - return np.concatenate([v.eval(session=self.sess).flatten() for v in self.variables]) + return np.concatenate([v.eval(session=self.sess).flatten() for v in self.variables.values()]) def set_flat(self, new_weights): """Sets the weights to new_weights, converting from a flat array.""" self._check_sess() - shapes = [v.get_shape().as_list() for v in self.variables] + shapes = [v.get_shape().as_list() for v in self.variables.values()] arrays = unflatten(new_weights, shapes) - placeholders = [self.assignment_placeholders[v.op.node_def.name] for v in self.variables] + placeholders = [self.assignment_placeholders[k] for k, v in self.variables.items()] self.sess.run(self.assignment_nodes, feed_dict=dict(zip(placeholders,arrays))) def get_weights(self): """Returns the weights of the variables of the loss function in a list.""" self._check_sess() - return {v.op.node_def.name: v.eval(session=self.sess) for v in self.variables} + return {k: v.eval(session=self.sess) for k, v in self.variables.items()} def set_weights(self, new_weights): """Sets the weights to new_weights.""" diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index 7ec5404d0..1c0a43caa 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -3,25 +3,47 @@ from __future__ import division from __future__ import print_function import unittest +import uuid import tensorflow as tf import ray from numpy.testing import assert_almost_equal +def make_linear_network(w_name=None, b_name=None): + # Define the inputs. + x_data = tf.placeholder(tf.float32, shape=[100]) + y_data = tf.placeholder(tf.float32, shape=[100]) + # Define the weights and computation. + w = tf.Variable(tf.random_uniform([1], -1.0, 1.0), name=w_name) + b = tf.Variable(tf.zeros([1]), name=b_name) + y = w * x_data + b + # Return the loss and weight initializer. + return tf.reduce_mean(tf.square(y - y_data)), tf.global_variables_initializer() + +def net_vars_initializer(): + # Random prefix so variable names do not clash if we use nets with + # the same name. + prefix = str(uuid.uuid1().hex) + # Use the tensorflow variable_scope to prefix all of the variables + with tf.variable_scope(prefix): + # Create the network. + loss, init = make_linear_network() + sess = tf.Session() + # Additional code for setting and getting the weights. + variables = ray.experimental.TensorFlowVariables(loss, sess, prefix=True) + # Return all of the data needed to use the network. + return variables, init, sess + +def net_vars_reinitializer(net_vars): + return net_vars + class TensorFlowTest(unittest.TestCase): def testTensorFlowVariables(self): ray.init(num_workers=2) - x_data = tf.placeholder(tf.float32, shape=[100]) - y_data = tf.placeholder(tf.float32, shape=[100]) - - w = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) - b = tf.Variable(tf.zeros([1])) - y = w * x_data + b - loss = tf.reduce_mean(tf.square(y - y_data)) - sess = tf.Session() - sess.run(tf.global_variables_initializer()) + loss, init = make_linear_network() + sess.run(init) variables = ray.experimental.TensorFlowVariables(loss, sess) weights = variables.get_weights() @@ -32,12 +54,8 @@ class TensorFlowTest(unittest.TestCase): variables.set_weights(weights) self.assertEqual(weights, variables.get_weights()) - w2 = tf.Variable(tf.random_uniform([1], -1.0, 1.0), name="w") - b2 = tf.Variable(tf.zeros([1]), name="b") - y2 = w2 * x_data + b2 - loss2 = tf.reduce_mean(tf.square(y2 - y_data)) - - sess.run(tf.global_variables_initializer()) + loss2, init2 = make_linear_network("w", "b") + sess.run(init2) variables2 = ray.experimental.TensorFlowVariables(loss2, sess) weights2 = variables2.get_weights() @@ -60,5 +78,114 @@ class TensorFlowTest(unittest.TestCase): ray.worker.cleanup() + # Test that the variable names for the two different nets are not + # modified by TensorFlow to be unique (i.e. they should already + # be unique because of the variable prefix). + def testVariableNameCollision(self): + ray.init(num_workers=2) + + ray.env.net1 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) + ray.env.net2 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) + + net_vars1, init1, sess1 = ray.env.net1 + net_vars2, init2, sess2 = ray.env.net2 + + # Initialize the networks + sess1.run(init1) + sess2.run(init2) + + # This is checking that the variable names of the two nets are the same, + # i.e. that the names in the weight dictionaries are the same + ray.env.net1[0].set_weights(ray.env.net2[0].get_weights()) + + ray.worker.cleanup() + + # Test that different networks on the same worker are independent and + # we can get/set their weights without any interaction. + def testNetworksIndependent(self): + # Note we use only one worker to ensure that all of the remote functions run on the same worker. + ray.init(num_workers=1) + + ray.env.net1 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) + ray.env.net2 = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) + + net_vars1, init1, sess1 = ray.env.net1 + net_vars2, init2, sess2 = ray.env.net2 + + # Initialize the networks + sess1.run(init1) + sess2.run(init2) + + @ray.remote + def get_vars1(): + return ray.env.net1[0].get_weights() + + @ray.remote + def get_vars2(): + return ray.env.net2[0].get_weights() + + @ray.remote + def set_vars1(weights): + ray.env.net1[0].set_weights(weights) + + @ray.remote + def set_vars2(weights): + ray.env.net2[0].set_weights(weights) + + # Get the weights. + weights1 = net_vars1.get_weights() + weights2 = net_vars2.get_weights() + self.assertNotEqual(weights1, weights2) + + # Swap the weights. + set_vars2.remote(weights1) + set_vars1.remote(weights2) + + # Get the new weights. + new_weights1 = ray.get(get_vars1.remote()) + new_weights2 = ray.get(get_vars2.remote()) + self.assertNotEqual(new_weights1, new_weights2) + + # Check that the weights were swapped. + self.assertEqual(weights1, new_weights2) + self.assertEqual(weights2, new_weights1) + + ray.worker.cleanup() + + def testNetworkDriverWorkerIndependent(self): + ray.init(num_workers=1) + + # Create a network on the driver locally. + sess1 = tf.Session() + loss1, init1 = make_linear_network() + net_vars1 = ray.experimental.TensorFlowVariables(loss1, sess1) + sess1.run(init1) + + # Create a network on the driver via an environment variable. + ray.env.net = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) + + net_vars2, init2, sess2 = ray.env.net + sess2.run(init2) + + # Get the weights. + weights1 = net_vars1.get_weights() + weights2 = net_vars2.get_weights() + self.assertNotEqual(weights1, weights2) + + # Swap the weights. + net_vars1.set_weights(weights2) + net_vars2.set_weights(weights1) + + # Get the new weights. + new_weights1 = net_vars1.get_weights() + new_weights2 = net_vars2.get_weights() + self.assertNotEqual(new_weights1, new_weights2) + + # Check that the weights were swapped. + self.assertEqual(weights1, new_weights2) + self.assertEqual(weights2, new_weights1) + + ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2)