From f2b6a7b58ddb12d9e1d68affad76ecbf7fd52e81 Mon Sep 17 00:00:00 2001 From: William Paul Date: Mon, 12 Feb 2018 15:38:58 -0800 Subject: [PATCH] Polished TensorFlowVariables code and documentation (#566) --- doc/source/using-ray-with-tensorflow.rst | 70 ++++++++++++- python/ray/experimental/tfutils.py | 120 +++++++++++++++++------ test/tensorflow_test.py | 63 ++++++++++-- 3 files changed, 210 insertions(+), 43 deletions(-) diff --git a/doc/source/using-ray-with-tensorflow.rst b/doc/source/using-ray-with-tensorflow.rst index 429e11461..ae8d51ac0 100644 --- a/doc/source/using-ray-with-tensorflow.rst +++ b/doc/source/using-ray-with-tensorflow.rst @@ -82,8 +82,8 @@ unmanageably large over time. w.assign(np.zeros(1)) # This adds a node to the graph every time you call it. b.assign(np.zeros(1)) # This adds a node to the graph every time you call it. -Complete Example ----------------- +Complete Example for Weight Averaging +------------------------------------- Putting this all together, we would first embed the graph in an actor. Within the actor, we would use the ``get_weights`` and ``set_weights`` methods of the @@ -185,8 +185,8 @@ complex Python objects. if iteration % 20 == 0: print("Iteration {}: weights are {}".format(iteration, weights)) -How to Train in Parallel using Ray ----------------------------------- +How to Train in Parallel using Ray and Gradients +------------------------------------------------ In some cases, you may want to do data-parallel training on your network. We use the network above to illustrate how to do this in Ray. The only differences are in the remote function @@ -320,3 +320,65 @@ For reference, the full code is below: # and 0.3 used in generate_fake_x_y_data. if iteration % 20 == 0: print("Iteration {}: weights are {}".format(iteration, weights)) + +.. autoclass:: ray.experimental.TensorFlowVariables + :members: + +Troubleshooting +--------------- + +Note that ``TensorFlowVariables`` uses variable names to determine what +variables to set when calling ``set_weights``. One common issue arises when two +networks are defined in the same TensorFlow graph. In this case, TensorFlow +appends an underscore and integer to the names of variables to disambiguate +them. This will cause ``TensorFlowVariables`` to fail. For example, if we have a +class definiton ``Network`` with a ``TensorFlowVariables`` instance: + +.. code-block:: python + + import ray + import tensorflow as tf + + class Network(object): + def __init__(self): + a = tf.Variable(1) + b = tf.Variable(1) + c = tf.add(a, b) + sess = tf.Session() + init = tf.global_variables_initializer() + sess.run(init) + self.variables = ray.experimental.TensorFlowVariables(c, sess) + + def set_weights(self, weights): + self.variables.set_weights(weights) + + def get_weights(self): + return self.variables.get_weights() + +and run the following code: + +.. code-block:: python + + a = Network() + b = Network() + b.set_weights(a.get_weights()) + +the code would fail. If we instead defined each network in its own TensorFlow +graph, then it would work: + +.. code-block:: python + + with tf.Graph().as_default(): + a = Network() + with tf.Graph().as_default(): + b = Network() + b.set_weights(a.get_weights()) + +This issue does not occur between actors that contain a network, as each actor +is in its own process, and thus is in its own graph. This also does not occur +when using ``set_flat``. + +Another issue to keep in mind is that ``TensorFlowVariables`` needs to add new +operations to the graph. If you close the graph and make it immutable, e.g. +creating a ``MonitoredTrainingSession`` the initialization will fail. To resolve +this, simply create the instance before you close the graph. diff --git a/python/ray/experimental/tfutils.py b/python/ray/experimental/tfutils.py index c6aa4ba9a..0ad07bf01 100644 --- a/python/ray/experimental/tfutils.py +++ b/python/ray/experimental/tfutils.py @@ -18,24 +18,34 @@ def unflatten(vector, shapes): class TensorFlowVariables(object): - """An object used to extract variables from a loss function. - - This object also provides methods for getting and setting the weights of - the relevant variables. + """A class used to set and get weights for Tensorflow networks. Attributes: sess (tf.Session): The tensorflow session used to run assignment. - loss: The loss function passed in by the user. - variables (List[tf.Variable]): Extracted variables from the loss. - assignment_placeholders (List[tf.placeholders]): The nodes that weights - get passed to. - assignment _nodes (List[tf.Tensor]): The nodes that assign the weights. + variables (Dict[str, tf.Variable]): Extracted variables from the loss + or additional variables that are passed in. + placeholders (Dict[str, tf.placeholders]): Placeholders for weights. + assignment_nodes (Dict[str, tf.Tensor]): Nodes that assign weights. """ - def __init__(self, loss, sess=None): - """Creates a TensorFlowVariables instance.""" + def __init__(self, loss, sess=None, input_variables=None): + """Creates TensorFlowVariables containing extracted variables. + + The variables are extracted by performing a BFS search on the + dependency graph with loss as the root node. After the tree is + traversed and those variables are collected, we append input_variables + to the collected variables. For each variable in the list, the + variable has a placeholder and assignment operation created for it. + + Args: + loss (tf.Operation): The tensorflow operation to extract all + variables from. + sess (tf.Session): Session used for running the get and set + methods. + input_variables (List[tf.Variables]): Variables to include in the + list. + """ import tensorflow as tf self.sess = sess - self.loss = loss queue = deque([loss]) variable_names = [] explored_inputs = set([loss]) @@ -44,9 +54,10 @@ class TensorFlowVariables(object): # the variables. while len(queue) != 0: tf_obj = queue.popleft() - - # The object put into the queue is not necessarily an operation, so - # we want the op attribute to get the operation underlying the + if tf_obj is None: + continue + # The object put into the queue is not necessarily an operation, + # so we want the op attribute to get the operation underlying the # object. Only operations contain the inputs that we can explore. if hasattr(tf_obj, "op"): tf_obj = tf_obj.op @@ -63,23 +74,37 @@ class TensorFlowVariables(object): if "Variable" in tf_obj.node_def.op: variable_names.append(tf_obj.node_def.name) self.variables = OrderedDict() - for v in [v for v in tf.global_variables() - if v.op.node_def.name in variable_names]: + variable_list = [v for v in tf.global_variables() + if v.op.node_def.name in variable_names] + if input_variables is not None: + variable_list += input_variables + for v in variable_list: self.variables[v.op.node_def.name] = v + self.placeholders = dict() - self.assignment_nodes = [] + self.assignment_nodes = dict() # Create new placeholders to put in custom weights. for k, var in self.variables.items(): self.placeholders[k] = tf.placeholder(var.value().dtype, - var.get_shape().as_list()) - self.assignment_nodes.append(var.assign(self.placeholders[k])) + var.get_shape().as_list(), + name="Placeholder_" + k) + self.assignment_nodes[k] = var.assign(self.placeholders[k]) def set_session(self, sess): - """Modifies the current session used by the class.""" + """Sets the current session used by the class. + + Args: + sess (tf.Session): Session to set the attribute with. + """ self.sess = sess def get_flat_size(self): + """Returns the total length of all of the flattened variables. + + Returns: + The length of all flattened variables concatenated. + """ return sum([np.prod(v.get_shape().as_list()) for v in self.variables.values()]) @@ -91,31 +116,64 @@ class TensorFlowVariables(object): "calling set_session(sess).") def get_flat(self): - """Gets the weights and returns them as a flat array.""" + """Gets the weights and returns them as a flat array. + + Returns: + 1D Array containing the flattened weights. + """ self._check_sess() return np.concatenate([v.eval(session=self.sess).flatten() for v in self.variables.values()]) def set_flat(self, new_weights): - """Sets the weights to new_weights, converting from a flat array.""" + """Sets the weights to new_weights, converting from a flat array. + + Note: + You can only set all weights in the network using this function, + i.e., the length of the array must match get_flat_size. + + Args: + new_weights (np.ndarray): Flat array containing weights. + """ self._check_sess() shapes = [v.get_shape().as_list() for v in self.variables.values()] arrays = unflatten(new_weights, shapes) - placeholders = [self.placeholders[k] - for k, v in self.variables.items()] - self.sess.run(self.assignment_nodes, + placeholders = [self.placeholders[k] for k, v + in self.variables.items()] + self.sess.run(list(self.assignment_nodes.values()), feed_dict=dict(zip(placeholders, arrays))) def get_weights(self): - """Returns a list of the weights of the loss function variables.""" + """Returns a dictionary containing the weights of the network. + + Returns: + Dictionary mapping variable names to their weights. + """ self._check_sess() - return {k: v.eval(session=self.sess) - for k, v in self.variables.items()} + return {k: v.eval(session=self.sess) for k, v + in self.variables.items()} def set_weights(self, new_weights): - """Sets the weights to new_weights.""" + """Sets the weights to new_weights. + + Note: + Can set subsets of variables as well, by only passing in the + variables you want to be set. + + Args: + new_weights (Dict): Dictionary mapping variable names to their + weights. + """ self._check_sess() - self.sess.run(self.assignment_nodes, + assign_list = [self.assignment_nodes[name] + for name in new_weights.keys() + if name in self.assignment_nodes] + assert assign_list, ("No variables in the input matched those in the " + "network. Possible cause: Two networks were " + "defined in the same TensorFlow graph. To fix " + "this, place each network definition in its own " + "tf.Graph.") + self.sess.run(assign_list, feed_dict={self.placeholders[name]: value for (name, value) in new_weights.items() if name in self.placeholders}) diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index 4ffaa29ef..70c49f94c 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -22,6 +22,32 @@ def make_linear_network(w_name=None, b_name=None): tf.global_variables_initializer(), x_data, y_data) +class LossActor(object): + + def __init__(self, use_loss=True): + # Uses a separate graph for each network. + with tf.Graph().as_default(): + # Create the network. + var = [tf.Variable(1)] + loss, init, _, _ = make_linear_network() + sess = tf.Session() + # Additional code for setting and getting the weights. + weights = ray.experimental.TensorFlowVariables(loss if use_loss + else None, + sess, + input_variables=var) + # Return all of the data needed to use the network. + self.values = [weights, init, sess] + sess.run(init) + + def set_and_get_weights(self, weights): + self.values[0].set_weights(weights) + return self.values[0].get_weights() + + def get_weights(self): + return self.values[0].get_weights() + + class NetActor(object): def __init__(self): @@ -102,7 +128,6 @@ class TensorFlowTest(unittest.TestCase): variables2.set_weights(weights2) self.assertEqual(weights2, variables2.get_weights()) - flat_weights = variables2.get_flat() + 2.0 variables2.set_flat(flat_weights) assert_almost_equal(flat_weights, variables2.get_flat()) @@ -114,7 +139,7 @@ class TensorFlowTest(unittest.TestCase): self.assertEqual(variables3.sess, sess) # Test that the variable names for the two different nets are not - # modified by TensorFlow to be unique (i.e. they should already + # modified by TensorFlow to be unique (i.e., they should already # be unique because of the variable prefix). def testVariableNameCollision(self): ray.init(num_workers=2) @@ -123,9 +148,31 @@ class TensorFlowTest(unittest.TestCase): net2 = NetActor() # This is checking that the variable names of the two nets are the - # same, i.e. that the names in the weight dictionaries are the same + # same, i.e., that the names in the weight dictionaries are the same. net1.values[0].set_weights(net2.values[0].get_weights()) + # Test that TensorFlowVariables can take in addition variables through + # input_variables arg and with no loss. + def testAdditionalVariablesNoLoss(self): + ray.init(num_workers=1) + + net = LossActor(use_loss=False) + self.assertEqual(len(net.values[0].variables.items()), 1) + self.assertEqual(len(net.values[0].placeholders.items()), 1) + + net.values[0].set_weights(net.values[0].get_weights()) + + # Test that TensorFlowVariables can take in addition variables through + # input_variables arg and with a loss. + def testAdditionalVariablesWithLoss(self): + ray.init(num_workers=1) + + net = LossActor() + self.assertEqual(len(net.values[0].variables.items()), 3) + self.assertEqual(len(net.values[0].placeholders.items()), 3) + + net.values[0].set_weights(net.values[0].get_weights()) + # Test that different networks on the same worker are independent and # we can get/set their weights without any interaction. def testNetworksIndependent(self): @@ -197,12 +244,12 @@ class TensorFlowTest(unittest.TestCase): ray.init(num_workers=2) net = ray.remote(TrainActor).remote() - (loss, variables, _, sess, grads, - train, placeholders) = TrainActor().values + net_values = TrainActor().values + loss, variables, _, sess, grads, train, placeholders = net_values - before_acc = sess.run(loss, - feed_dict=dict(zip(placeholders, - [[2] * 100, [4] * 100]))) + before_acc = sess.run(loss, feed_dict=dict(zip(placeholders, + [[2] * 100, + [4] * 100]))) for _ in range(3): gradients_list = ray.get(