From 95bf81aeb8712114f30714160c2c4fafaffcc6e5 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 4 Mar 2017 23:06:02 -0800 Subject: [PATCH] Add actor tutorial. (#335) --- doc/source/actors.rst | 231 ++++++++++++++++++++++++++++++++++++++++++ doc/source/index.rst | 15 +-- 2 files changed, 239 insertions(+), 7 deletions(-) create mode 100644 doc/source/actors.rst diff --git a/doc/source/actors.rst b/doc/source/actors.rst new file mode 100644 index 000000000..6961f4d95 --- /dev/null +++ b/doc/source/actors.rst @@ -0,0 +1,231 @@ +Actors +====== + +Remote functions in Ray should be thought of as functional and side-effect free. +Restricting ourselves only to remote functions gives us distributed functional +programming, which is great for many use cases, but in practice is a bit +limited. + +Ray extends the dataflow model with **actors**. An actor is essentially a +stateful worker (or a service). When a new actor is instantiated, a new worker +is created, and methods of the actor are scheduled on that specific worker and +can access and mutate the state of that worker. + +Suppose we've already started Ray. + +.. code-block:: python + + import ray + ray.init() + +Defining and creating an actor +------------------------------ + +An actor can be defined as follows. + +.. code-block:: python + + import gym + + @ray.actor + class GymEnvironment(object): + def __init__(self, name): + self.env = gym.make(name) + def step(self, action): + return self.env.step(action) + def reset(self): + self.env.reset() + +Two copies of the actor can be created as follows. + +.. code-block:: python + + a1 = GymEnvironment("Pong-v0") + a2 = GymEnvironment("Pong-v0") + +When the first line is run, the following happens. + +- Some node in the cluster will be chosen, and a worker will be created on that + node (by the local scheduler on that node) for the purpose of running methods + called on the actor. +- A ``GymEnvironment`` object will be created on that worker and the + ``GymEnvironment`` constructor will run. + +When the second line is run, another node (possibly the same one) is chosen, +another worker is created on that node for the purpose of running methods called +on the second actor, and another ``GymEnvironment`` object is constructed on +the newly-created worker. + +Using an actor +-------------- + +We can use the actor by calling one of its methods. + +.. code-block:: python + + a1.step(0) + a2.step(0) + +When ``a1.step(0)`` is called, a task is created and scheduled on the first +actor. This scheduling procedure bypasses the global scheduler, and is assigned +directly to the local scheduler responsible for the actor by the driver's local +scheduler. Since the method call is a task, ``a1.step(0)`` returns an object ID. +We can call `ray.get` on the object ID to retrieve the actual value. + +The call to ``a2.step(0)`` generates a task which is scheduled on the second +actor. Since these two tasks run on different actors, they can be executed in +parallel (note that only actor methods will be scheduled on actor workers, not +regular remote functions). + +On the other hand, methods called on the same actor are executed serially and +share in the order that they are called and share state with one another. We +illustrate this with a simple example. + +.. code-block:: python + + @ray.actor + class Counter(object): + def __init__(self): + self.value = 0 + def increment(self): + self.value += 1 + return self.value + + # Create ten actors. + counters = [Counter() for _ in range(10)] + + # Increment each counter once and get the results. These tasks all happen in + # parallel. + results = ray.get([c.increment() for c in counters]) + print(results) # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + + # Increment the first counter five times. These tasks are executed serially + # and share state. + results = ray.get([counters[0].increment() for _ in range(5)]) + print(results) # prints [2, 3, 4, 5, 6] + +Using GPUs on actors +-------------------- + +A common use case is for an actor to contain a neural network. For example, +suppose we have a method for constructing a neural net. + +.. code-block:: python + + import tensorflow as tf + + def construct_network(): + x = tf.placeholder(tf.float32, [None, 784]) + y_ = tf.placeholder(tf.float32, [None, 10]) + + W = tf.Variable(tf.zeros([784, 10])) + b = tf.Variable(tf.zeros([10])) + y = tf.nn.softmax(tf.matmul(x, W) + b) + + cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) + train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy) + correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1)) + accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) + + return x, y_, train_step, accuracy + +We can then define an actor for this network as follows. + +.. code-block:: python + + import os + + # Define an actor that runs on GPUs. If there are no GPUs, then simply use + # ray.actor without any arguments and no parentheses. + @ray.actor(num_gpus=1) + class NeuralNetOnGPU(object): + def __init__(self): + # Set an environment variable to tell TensorFlow which GPUs to use. Note + # that this must be done before the call to tf.Session. + os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()]) + with tf.Graph().as_default(): + with tf.device("/gpu:0"): + self.x, self.y_, self.train_step, self.accuracy = construct_network() + # Allow this to run on CPUs if there aren't any GPUs. + config = tf.ConfigProto(allow_soft_placement=True) + self.sess = tf.Session(config=config) + # Initialize the network. + init = tf.global_variables_initializer() + self.sess.run(init) + +To indicate that an actor requires one GPU, we pass in ``num_gpus=1`` to +``ray.actor``. Note that in order for this to work, Ray must have been started +with some GPUs, e.g., via ``ray.init(num_gpus=2)``. Otherwise, when you try to +instantiate the GPU version with ``NeuralNetOnGPU()``, an exception will be +thrown saying that there aren't enough GPUs in the system. + +When the actor is created, it will have access to a list of the IDs of the GPUs +that it is allowed to use via ``ray.get_gpu_ids()``. This is a list of integers, +like ``[]``, or ``[1]``, or ``[2, 5, 6]``. Since we passed in +``ray.actor(num_gpus=1)``, this list will have length one. + +We can put this all together as follows. + +.. code-block:: python + + import os + import ray + import tensorflow as tf + from tensorflow.examples.tutorials.mnist import input_data + + ray.init(num_gpus=8) + + def construct_network(): + x = tf.placeholder(tf.float32, [None, 784]) + y_ = tf.placeholder(tf.float32, [None, 10]) + + W = tf.Variable(tf.zeros([784, 10])) + b = tf.Variable(tf.zeros([10])) + y = tf.nn.softmax(tf.matmul(x, W) + b) + + cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) + train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy) + correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1)) + accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) + + return x, y_, train_step, accuracy + + @ray.actor(num_gpus=1) + class NeuralNetOnGPU(object): + def __init__(self, mnist_data): + self.mnist = mnist_data + # Set an environment variable to tell TensorFlow which GPUs to use. Note + # that this must be done before the call to tf.Session. + os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in ray.get_gpu_ids()]) + with tf.Graph().as_default(): + with tf.device("/gpu:0"): + self.x, self.y_, self.train_step, self.accuracy = construct_network() + # Allow this to run on CPUs if there aren't any GPUs. + config = tf.ConfigProto(allow_soft_placement=True) + self.sess = tf.Session(config=config) + # Initialize the network. + init = tf.global_variables_initializer() + self.sess.run(init) + + def train(self, num_steps): + for _ in range(num_steps): + batch_xs, batch_ys = self.mnist.train.next_batch(100) + self.sess.run(self.train_step, feed_dict={self.x: batch_xs, self.y_: batch_ys}) + + def get_accuracy(self): + return self.sess.run(self.accuracy, feed_dict={self.x: self.mnist.test.images, + self.y_: self.mnist.test.labels}) + + + # Load the MNIST dataset and tell Ray how to serialize the custom classes. + mnist = input_data.read_data_sets("MNIST_data", one_hot=True) + ray.register_class(type(mnist)) + ray.register_class(type(mnist.train)) + + # Create the actor. + nn = NeuralNetOnGPU(mnist) + + # Run a few steps of training and print the accuracy. + nn.train(100) + accuracy = ray.get(nn.get_accuracy()) + print("Accuracy is {}.".format(accuracy)) diff --git a/doc/source/index.rst b/doc/source/index.rst index 4b269fc8f..b085d7698 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -14,6 +14,14 @@ learning and reinforcement learning applications.* install-on-docker.md installation-troubleshooting.md +.. toctree:: + :maxdepth: 1 + :caption: Getting Started + + tutorial.md + actors.rst + api.rst + .. toctree:: :maxdepth: 1 :caption: Examples @@ -23,13 +31,6 @@ learning and reinforcement learning applications.* example-rl-pong.md using-ray-with-tensorflow.md -.. toctree:: - :maxdepth: 1 - :caption: Getting Started - - api.rst - tutorial.md - .. toctree:: :maxdepth: 1 :caption: Design