From ba8933e10feae43aeda7cbf4bf0bfa27591905b3 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 10 Jan 2017 23:52:38 -0800 Subject: [PATCH] Update tutorial. (#196) * Update tutorial. * Small updates to documentation and code. --- README.md | 4 +- doc/serialization.md | 15 +- doc/tutorial.md | 267 +++++++++++++------------------ doc/using-ray-with-tensorflow.md | 9 +- examples/hyperopt/README.md | 1 - examples/lbfgs/README.md | 1 - examples/rl_pong/README.md | 1 - lib/python/ray/worker.py | 2 +- 8 files changed, 128 insertions(+), 172 deletions(-) diff --git a/README.md b/README.md index def9113e2..2289f0438 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,8 @@ [![Build Status](https://travis-ci.org/ray-project/ray.svg?branch=master)](https://travis-ci.org/ray-project/ray) -Ray is an experimental distributed extension of Python. It is under development -and not ready to be used. +Ray is an experimental distributed execution engine. It is under development and +not ready to be used. The goal of Ray is to make it easy to write machine learning applications that run on a cluster while providing the development and debugging experience of diff --git a/doc/serialization.md b/doc/serialization.md index a00b4b38b..bd471763f 100644 --- a/doc/serialization.md +++ b/doc/serialization.md @@ -16,8 +16,8 @@ A normal Python object may have pointers all over the place, so to place an object in the object store or send it between processes, it must first be converted to a contiguous string of bytes. This process is known as serialization. The process of turning the string of bytes back into a Python -object is known as deserialization. The processes of serialization and -deserialization are often bottlenecks in distributed computing. +object is known as deserialization. Serialization and deserialization are often +bottlenecks in distributed computing. Pickle is one example of a library for serialization and deserialization in Python. @@ -25,8 +25,8 @@ Python. ```python import pickle -pickle.dumps([1, 2, 3]) # prints '(lp0\nI1\naI2\naI3\na.' -pickle.loads("(lp0\nI1\naI2\naI3\na.") # prints [1, 2, 3] +pickle.dumps([1, 2, 3]) # prints b'\x80\x03]q\x00(K\x01K\x02K\x03e.' +pickle.loads(b'\x80\x03]q\x00(K\x01K\x02K\x03e.') # prints [1, 2, 3] ``` Pickle (and its variants) are pretty general. They can successfully serialize a @@ -35,8 +35,8 @@ unpickling can be inefficient. For example, when unpickling a list of numpy arrays, pickle will create completely new arrays in memory. In Ray, when we deserialize a list of numpy arrays from the object store, we will create a list of numpy array objects in Python, but each numpy array object is essentially -just a pointer to the relevant location in the object store's memory. There are -some advantages to this form of serialization. +just a pointer to the relevant location in shared memory. There are some +advantages to this form of serialization. - Deserialization can be very fast. - Memory is shared between processes so worker processes can all read the same @@ -126,6 +126,9 @@ below. ```python l = [] l.append(l) + +# Try to put this list that recursively contains itself in the object store. +ray.put(l) ``` It will throw an exception with a message like the following. diff --git a/doc/tutorial.md b/doc/tutorial.md index 22a91cab3..85c2a97b1 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -3,18 +3,18 @@ To use Ray, you need to understand the following: - How Ray uses object IDs to represent immutable remote objects. -- How Ray constructs a computation graph using remote functions. +- How Ray executes tasks asynchronously to achieve parallelism. ## Overview Ray is a Python-based distributed execution engine. It can be used on a single -machine to achieve effective multiprocessing, and it can be used on a cluster +machine to achieve efficient multiprocessing, and it can be used on a cluster for large computations. When using Ray, several processes are involved. -- Multiple **worker** processes execute tasks and store results in object stores. -Each worker is a separate process. +- Multiple **worker** processes execute tasks and store results in object +stores. Each worker is a separate process. - One **object store** per node stores immutable objects in shared memory and allows workers to efficiently share objects on the same node with minimal copying and deserialization. @@ -28,37 +28,32 @@ that it can submit tasks to its local scheduler and get objects from the object store, but it is different in that the local scheduler will not assign tasks to the driver to be executed. - A **Redis server** maintains much of the system's state. For example, it keeps -track of which objects live on which machines and of the task specifications. It -can also be queried directly for debugging purposes. +track of which objects live on which machines and of the task specifications +(but not data). It can also be queried directly for debugging purposes. ## Starting Ray -To start Ray, start Python, and run the following commands. +To start Ray, start Python and run the following commands. ```python import ray ray.init(num_workers=10) ``` -That command starts a scheduler, one object store, and ten workers. Each of -these are distinct processes. They will be killed when you exit the Python -interpreter. They can also be killed manually with the following command. - -``` -killall scheduler objstore python -``` +This starts Ray with ten workers. Each of these are distinct processes. They +will be killed when you exit the Python interpreter. ## Immutable remote objects In Ray, we can create and manipulate objects. We refer to these objects as -**remote objects**, and we use **object IDs** to refer to them. Remote -objects are stored in **object stores**, and there is one object store per node -in the cluster. In the cluster setting, we may not actually know which machine -each object lives on. +**remote objects**, and we use **object IDs** to refer to them. Remote objects +are stored in **object stores**, and there is one object store per node in the +cluster. In the cluster setting, we may not actually know which machine each +object lives on. -An **object ID** is essentially a unique ID that can be used to refer to -a remote object. If you're familiar with Futures, our object IDs are -conceptually similar. +An **object ID** is essentially a unique ID that can be used to refer to a +remote object. If you're familiar with Futures, our object IDs are conceptually +similar. We assume that remote objects are immutable. That is, their values cannot be changed after creation. This allows remote objects to be replicated in multiple @@ -71,7 +66,7 @@ objects and object IDs, as shown in the example below. ```python x = [1, 2, 3] -ray.put(x) # prints +ray.put(x) # prints ObjectID(b49a32d72057bdcfc4dda35584b3d838aad89f5d) ``` The command `ray.put(x)` would be run by a worker process or by the driver @@ -80,29 +75,28 @@ object and copies it to the local object store (here *local* means *on the same node*). Once the object has been stored in the object store, its value cannot be changed. -In addition, `ray.put(x)` returns an object ID, which is essentially an -ID that can be used to refer to the newly created remote object. If we save the -object ID in a variable with `x_id = ray.put(x)`, then we can pass `x_id` -into remote functions, and those remote functions will operate on the -corresponding remote object. +In addition, `ray.put(x)` returns an object ID, which is essentially an ID that +can be used to refer to the newly created remote object. If we save the object +ID in a variable with `x_id = ray.put(x)`, then we can pass `x_id` into remote +functions, and those remote functions will operate on the corresponding remote +object. -The command `ray.get(x_id)` takes an object ID and creates a Python object -from the corresponding remote object. For some objects like arrays, we can use -shared memory and avoid copying the object. For other objects, this currently -copies the object from the object store into the memory of the worker process. -If the remote object corresponding to the object ID `x_id` does not live -on the same node as the worker that calls `ray.get(x_id)`, then the remote object -will first be copied from an object store that has it to the object store that -needs it. +The command `ray.get(x_id)` takes an object ID and creates a Python object from +the corresponding remote object. For some objects like arrays, we can use shared +memory and avoid copying the object. For other objects, this copies the object +from the object store to the worker process's heap. If the remote object +corresponding to the object ID `x_id` does not live on the same node as the +worker that calls `ray.get(x_id)`, then the remote object will first be +transferred from an object store that has it to the object store that needs it. ```python x_id = ray.put([1, 2, 3]) ray.get(x_id) # prints [1, 2, 3] ``` -If the remote object corresponding to the object ID `x_id` has not been -created yet, *the command `ray.get(x_id)` will wait until the remote object has -been created.* +If the remote object corresponding to the object ID `x_id` has not been created +yet, *the command `ray.get(x_id)` will wait until the remote object has been +created.* A very common use case of `ray.get` is to get a list of object IDs. In this case, you can call `ray.get(object_ids)` where `object_ids` is a list of object @@ -113,179 +107,138 @@ result_ids = [ray.put(i) for i in range(10)] ray.get(result_ids) # prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ``` -## Computation graphs in Ray +## Asynchronous Computation in Ray -Ray represents computation with a directed acyclic graph of tasks. Tasks are -added to this graph by calling **remote functions**. +Ray enables arbitrary Python functions to be executed asynchronously. This is +done by designating a Python function as a **remote function**. For example, a normal Python function looks like this. ```python -def add(a, b): +def add1(a, b): return a + b ``` -A remote function in Ray looks like this. +A remote function looks like this. ```python @ray.remote -def add(a, b): +def add2(a, b): return a + b ``` ### Remote functions -Whereas in regular Python, calling `add(1, 2)` would return `3`, in Ray, calling -`add.remote(1, 2)` does not actually execute the task. Instead, it adds a task to the -computation graph and immediately returns the object ID for the output of -the computation. +Whereas calling `add1(1, 2)` returns `3` and causes the Python interpreter to +block until the computation has finished, calling `add2.remote(1, 2)` +immediately returns an object ID and creates a **task**. The task will be +scheduled by the system and executed asynchronously (potentially on a different +machine). When the task finishes executing, its return value will be stored in +the object store. ```python -x_id = add.remote(1, 2) +x_id = add2.remote(1, 2) ray.get(x_id) # prints 3 ``` +The following simple example demonstrates how asynchronous tasks can be used +to parallelize computation. + +```python +import time + +def f1(): + time.sleep(1) + +@ray.remote +def f2(): + time.sleep(1) + +# The following takes ten seconds. +[f1() for _ in range(10)] +``` + +```python +# The following takes one second (assuming the system has at least ten workers). +ray.get([f2.remote() for _ in range(10)]) +``` + There is a sharp distinction between *submitting a task* and *executing the task*. When a remote function is called, the task of executing that function is -submitted to the scheduler, and the scheduler immediately returns object -IDs for the outputs of the task. However, the task will not be executed -until the scheduler actually schedules the task on a worker. +submitted to a local scheduler, and object IDs for the outputs of the task are +immediately returned. However, the task will not be executed until the system +actually schedules the task on a worker. Task execution is **not** done lazily. -When a task is submitted, each argument may be passed in by value or by object -ID. For example, these lines have the same behavior. +**When a task is submitted, each argument may be passed in by value or by object +ID.** For example, these lines have the same behavior. ```python -add.remote(1, 2) -add.remote(1, ray.put(2)) -add.remote(ray.put(1), ray.put(2)) +add2.remote(1, 2) +add2.remote(1, ray.put(2)) +add2.remote(ray.put(1), ray.put(2)) ``` Remote functions never return actual values, they always return object IDs. When the remote function is actually executed, it operates on Python objects. -That is, if the remote function was called with any object IDs, the -Python objects corresponding to those object IDs will be retrieved and -passed into the actual execution of the remote function. +That is, if the remote function was called with any object IDs, the Python +objects corresponding to those object IDs will be retrieved and passed into the +actual execution of the remote function. Note that a remote function can return multiple object IDs. ```python @ray.remote(num_return_vals=3) def return_multiple(): - return 0, 0.0, "zero" + return 1, 2, 3 a_id, b_id, c_id = return_multiple.remote() ``` -### Blocking computation +### Expressing dependencies between tasks -In a regular Python script, the specification of a computation is intimately -linked to the actual execution of the code. For example, consider the following -code. +Programmers can express dependencies between tasks by passing the object ID +output of one task as an argument to another task. For example, we can launch +three tasks as follows, each of which depends on the previous task. ```python -import time - -# This takes 20 seconds. -for i in range(10): - time.sleep(2) -``` - -At the core of the above script, there are ten separate tasks, each of which -sleeps for two seconds (this is a toy example, but you could imagine replacing -the call to `sleep` with some computationally intensive task). These tasks do -not depend on each other, so in principle, they could be executed in parallel. -However, in the above implementation, they will be executed serially, which will -take twenty seconds. - -Ray gets around this by representing computation as a graph of tasks, where some -tasks depend on the outputs of other tasks and where tasks can be executed once -their dependencies are done. - -For example, suppose we define the remote function `sleep` to be a wrapper -around `time.sleep`. - -```python -import time - @ray.remote -def sleep(n): - time.sleep(n) - return 0 +def f(x): + return x + 1 + +x = f.remote(0) +y = f.remote(x) +z = f.remote(y) +ray.get(z) # prints 3 ``` -Then we can write +The second task above will not execute until the first has finished, and the +third will not execute until the second has finished. In this example, there are +no opportunities for parallelism. -```python -# Submit ten tasks to the scheduler. This finishes almost immediately. -result_ids = [] -for i in range(10): - result_ids.append(sleep.remote(2)) - -# Wait for the results. If we have at least ten workers, this takes 2 seconds. -ray.get(result_ids) # prints [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] -``` - -The for loop simply adds ten tasks to the computation graph, with no -dependencies between the tasks. It executes almost instantaneously. Afterwards, -we use `ray.get` to wait for the tasks to finish. If we have at least ten -workers, then all ten tasks can be executed in parallel, and so the overall -script should take two seconds. - -### Visualizing the Computation Graph - -The computation graph can be viewed as follows. - -```python -ray.visualize_computation_graph(view=True) -``` - -

- -

- -In this figure, boxes are tasks and ovals are objects. - -The box that says `op-root` in it just refers to the overall script itself. The -dotted lines indicate that the script launched 10 tasks (tasks are denoted by -rectangular boxes). The solid lines indicate that each task produces one output -(represented by an oval). - -It is clear from the computation graph that these ten tasks can be executed in -parallel. - -Computation graphs encode dependencies. For example, suppose we define +The ability to compose tasks makes it easy to express interesting dependencies. +Consider the following implementation of a tree reduce. ```python import numpy as np @ray.remote -def zeros(shape): - return np.zeros(shape) +def generate_data(): + return np.random.normal(size=1000) @ray.remote -def dot(a, b): - return np.dot(a, b) +def aggregate_data(x, y): + return x + y + +# Generate some random data. This launches 100 tasks that will be scheduled on +# various nodes. The resulting data will be distributed around the cluster. +data = [generate_data.remote() for _ in range(100)] + +# Perform a tree reduce. +while len(data) > 1: + data.append(aggregate_data.remote(data.pop(0), data.pop(0))) + +# Fetch the result. +ray.get(data) ``` -Then we run - -```python -a_id = zeros.remote([10, 10]) -b_id = zeros.remote([10, 10]) -c_id = dot.remote(a_id, b_id) -``` - -The corresponding computation graph looks like this. - -

- -

- -The three dashed lines indicate that the script launched three tasks (the two -`zeros` tasks and the one `dot` task). Each task produces a single output, and -the `dot` task depends on the outputs of the two `zeros` tasks. - -This makes it clear that the two `zeros` tasks can execute in parallel but that -the `dot` task must wait until the two `zeros` tasks have finished. - ### Remote Functions Within Remote Functions So far, we have been calling remote functions only from the driver. But worker diff --git a/doc/using-ray-with-tensorflow.md b/doc/using-ray-with-tensorflow.md index 94f52e2ee..d56ea4916 100644 --- a/doc/using-ray-with-tensorflow.md +++ b/doc/using-ray-with-tensorflow.md @@ -33,17 +33,20 @@ loss = tf.reduce_mean(tf.square(y - y_data)) optimizer = tf.train.GradientDescentOptimizer(0.5) train = optimizer.minimize(loss) -init = tf.initialize_all_variables() +init = tf.global_variables_initializer() sess = tf.Session() ``` -To extract the weights and set the weights, you can call +To extract the weights and set the weights, you can use the following helper +method. ```python +import ray variables = ray.experimental.TensorFlowVariables(loss, sess) ``` -which gives you methods to set and get the weights as well as collecting all of the variables in the model. +The `TensorFlowVariables` object provides methods for getting and setting the +weights as well as collecting all of the variables in the model. Now we can use these methods to extract the weights, and place them back in the network as follows. diff --git a/examples/hyperopt/README.md b/examples/hyperopt/README.md index 91972fad3..de376aade 100644 --- a/examples/hyperopt/README.md +++ b/examples/hyperopt/README.md @@ -8,7 +8,6 @@ To run the application, first install this dependency. Then from the directory `ray/examples/hyperopt/` run the following. ``` -source ../../setup-env.sh python driver.py ``` diff --git a/examples/lbfgs/README.md b/examples/lbfgs/README.md index 5c6d9791b..b9659d148 100644 --- a/examples/lbfgs/README.md +++ b/examples/lbfgs/README.md @@ -9,7 +9,6 @@ application, first install these dependencies. Then from the directory `ray/examples/lbfgs/` run the following. ``` -source ../../setup-env.sh python driver.py ``` diff --git a/examples/rl_pong/README.md b/examples/rl_pong/README.md index f6019c7bc..6bf3de5a8 100644 --- a/examples/rl_pong/README.md +++ b/examples/rl_pong/README.md @@ -12,7 +12,6 @@ the application, first install this dependency. Then from the directory `ray/examples/rl_pong/` run the following. ``` -source ../../setup-env.sh python driver.py ``` diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index fb7869831..2f5d80b70 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -553,7 +553,7 @@ def check_connected(worker=global_worker): Exception: An exception is raised if the worker is not connected. """ if not worker.connected: - raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init()'.") + raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(num_workers=10)'.") def print_failed_task(task_status): """Print information about failed tasks.