Various cleanups: remove start_ray_local from ray.init, remove unused code, fix "pip install numbuf". (#193)

* Remove start_ray_local from ray.init and change default number of workers to 10.

* Remove alexnet example.

* Move array methods to experimental.

* Remove TRPO example.

* Remove old files.

* Compile plasma when we build numbuf.

* Address comments.
This commit is contained in:
Robert Nishihara
2017-01-10 17:35:27 -08:00
committed by Philipp Moritz
parent b9d6135aa1
commit be4a37bf37
32 changed files with 89 additions and 891 deletions
+2 -3
View File
@@ -17,8 +17,8 @@ machines).
import ray
import numpy as np
# Start a scheduler, an object store, and some workers.
ray.init(start_ray_local=True, num_workers=10)
# Start Ray with some workers.
ray.init(num_workers=10)
# Define a remote function for estimating pi.
@ray.remote
@@ -63,4 +63,3 @@ estimate of pi (waiting until the computation has finished if necessary).
- [Hyperparameter Optimization](examples/hyperopt/README.md)
- [Batch L-BFGS](examples/lbfgs/README.md)
- [Learning to Play Pong](examples/rl_pong/README.md)
- [Training AlexNet](examples/alexnet/README.md)
-35
View File
@@ -1,35 +0,0 @@
#!/usr/bin/env bash
# Cause the script to exit if a single command fails.
set -e
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
unamestr="$(uname)"
if [[ "$unamestr" == "Linux" ]]; then
platform="linux"
elif [[ "$unamestr" == "Darwin" ]]; then
platform="macosx"
else
echo "Unrecognized platform."
exit 1
fi
WEBUI_DIR="$ROOT_DIR/webui"
PYTHON_DIR="$ROOT_DIR/lib/python"
PYTHON_WEBUI_DIR="$PYTHON_DIR/webui"
pushd "$WEBUI_DIR"
npm install
if [[ $platform == "linux" ]]; then
nodejs ./node_modules/.bin/webpack -g
elif [[ $platform == "macosx" ]]; then
node ./node_modules/.bin/webpack -g
fi
pushd node_modules
rm -rf react* babel* classnames dom-helpers jsesc webpack .bin
popd
popd
cp -r $WEBUI_DIR/* $PYTHON_WEBUI_DIR/
+1 -1
View File
@@ -31,7 +31,7 @@ Python wrapper for an Atari simulator.
import gym
import ray
ray.init(start_ray_local=True, num_workers=5)
ray.init(num_workers=10)
# Define a function to create the gym environment.
def env_initializer():
+1 -1
View File
@@ -76,7 +76,7 @@ This can be addressed by calling `ray.register_class(Foo)`.
```python
import ray
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=10)
# Define a custom class.
class Foo(object):
-5
View File
@@ -1,5 +0,0 @@
================
The Services API
================
.. autofunction:: ray.services.start_ray_local
+23 -14
View File
@@ -7,20 +7,29 @@ To use Ray, you need to understand the following:
## Overview
Ray is a distributed extension of Python. When using Ray, several processes are
involved.
Ray is a Python-based distributed execution engine. It can be used on a single
machine to achieve effective multiprocessing, and it can be used on a cluster
for large computations.
- A **scheduler**: The scheduler assigns tasks to workers. It is its own
process.
- Multiple **workers**: Workers execute tasks and store the results in object
stores. Each worker is a separate process.
- One **object store** per node: The object store enables the sharing of Python
objects between worker processes so each worker does not have to have a separate
copy.
- A **driver**: The driver is the Python process that the user controls and
which submits tasks to the scheduler. For example, if the user is running a
script or using a Python shell, then the driver is the process that runs the
script or the shell.
When using Ray, several processes are involved.
- Multiple **worker** processes execute tasks and store results in object stores.
Each worker is a separate process.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **local scheduler** per node assigns tasks to workers on the same node.
- A **global scheduler** receives tasks from local schedulers and assigns them
to other local schedulers.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its local scheduler and get objects from the object
store, but it is different in that the local scheduler will not assign tasks to
the driver to be executed.
- A **Redis server** maintains much of the system's state. For example, it keeps
track of which objects live on which machines and of the task specifications. It
can also be queried directly for debugging purposes.
## Starting Ray
@@ -28,7 +37,7 @@ To start Ray, start Python, and run the following commands.
```python
import ray
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)
```
That command starts a scheduler, one object store, and ten workers. Each of
+1 -1
View File
@@ -79,7 +79,7 @@ import tensorflow as tf
import numpy as np
import ray
ray.init(start_ray_local=True, num_workers=5)
ray.init(num_workers=5)
BATCH_SIZE = 100
NUM_BATCHES = 1
-108
View File
@@ -1,108 +0,0 @@
# Training AlexNet
WARNING: Running this application is fairly involved. In particular, it requires
you to download the ImageNet dataset and put it on S3.
This document walks through how to load the ImageNet dataset from S3 and train
AlexNet using data parallel stochastic gradient descent.
## Running the Application
The instructions in this section must be done before you can run the
application.
### Install the Dependencies
Install the following dependencies.
- [TensorFlow](https://www.tensorflow.org/)
In addition, install the following dependencies.
**On Ubuntu**
```
sudo apt-get install libjpeg8-dev awscli
sudo pip install boto3 botocore pillow
```
**On Mac OSX**
```
brew install libjpeg awscli
sudo pip install boto3 botocore pillow
```
### Put ImageNet on S3
1. To run this application, first put the ImageNet tar files on S3 (e.g., in the
directory `ILSVRC2012_img_train`). Also, put the file `train.txt` on S3. We will
use `$S3_BUCKET` to refer to the name of your S3 bucket.
2. Use `aws configure` to enable Boto to connect to S3. If you are using
multiple machines, this must be done on all machines.
### Run the Application
From the directory `ray/examples/alexnet/` run the following
```
source ../../setup-env.sh
python driver.py --s3-bucket $S3_BUCKET
```
## Parallel Data Loading
To speed up data loading, we will pull data from S3 in parallel with a number of
workers. At the core of our loading code is the remote function
`load_tarfile_from_s3`. When executed, this function connects to S3 and
retrieves the appropriate object.
```python
@ray.remote(num_return_vals=2)
def load_tarfile_from_s3(bucket, s3_key, size=[]):
# Pull the object with the given key and bucket from S3, untar the contents,
# and return it.
return images, labels
```
To load data in parallel, we simply call this function multiple times with the
keys of all the objects that we want to retrieve. This returns a list of pairs
of object IDs, where the first object ID in each pair refers to a
batch of images and the second refers to the corresponding batch of labels.
```python
batches = [load_tarfile_from_s3.remote(bucket, s3_key, size) for s3_key in s3_keys]
```
By default, this will only fetch objects whose keys have prefix
`ILSVRC2012_img_train/n015` (this is 13 tar files). To fetch all of the data,
pass in `--key-prefix ILSVRC2012_img_train/n`.
## Data Parallel Training
The other parallel component of this application is the training procedure. This
is built on top of the remote function `compute_grad`.
```python
@ray.remote
def compute_grad(X, Y, mean, weights):
# Load the weights into the network.
# Subtract the mean and crop the images.
# Compute the gradients.
return gradients
```
This function takes training inputs and outputs, the mean image (to subtract off
of the input images), and the current network weights.
We can parallelize the computation of the gradient over multiple batches by
calling `compute_grad` multiple times in parallel.
```python
gradient_ids = []
for i in range(num_workers):
# Choose a random batch and use it to compute the gradient of the loss.
x_id, y_id = batches[np.random.randint(len(batches))]
gradient_ids.append(compute_grad.remote(x_id, y_id, mean_id, weights_id))
```
-448
View File
@@ -1,448 +0,0 @@
# The code for AlexNet is copied and adapted from the TensorFlow repository
# https://github.com/tensorflow/tensorflow/blob/master/tensorflow/models/image/alexnet/alexnet_benchmark.py.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
import numpy as np
import tarfile, io
import boto3
import PIL.Image as Image
import tensorflow as tf
import ray.array.remote as ra
STDDEV = 0.001 # The standard deviation of the network weight initialization.
def load_chunk(tarfile, size=None):
"""Load a number of images from a single imagenet .tar file.
This function also converts the image from grayscale to RGB if necessary.
Args:
tarfile (tarfile.TarFile): The archive from which the files get loaded.
size (Optional[Tuple[int, int]]): Resize the image to this size if provided.
Returns:
numpy.ndarray: Contains the image data in format [batch, w, h, c]
"""
result = []
filenames = []
for member in tarfile.getmembers():
filename = member.path
content = tarfile.extractfile(member)
img = Image.open(content)
rgbimg = Image.new("RGB", img.size)
rgbimg.paste(img)
if size != None:
rgbimg = rgbimg.resize(size, Image.ANTIALIAS)
result.append(np.array(rgbimg).reshape(1, rgbimg.size[0], rgbimg.size[1], 3))
filenames.append(filename)
return np.concatenate(result), filenames
@ray.remote(num_return_vals=2)
def load_tarfile_from_s3(bucket, s3_key, size=[]):
"""Load an imagenet .tar file.
Args:
bucket (str): Bucket holding the imagenet .tar.
s3_key (str): s3 key from which the .tar file is loaded.
size (List[int]): Resize the image to this size if size != []; len(size) == 2 required.
Returns:
np.ndarray: The image data (see load_chunk).
"""
s3 = boto3.client("s3")
response = s3.get_object(Bucket=bucket, Key=s3_key)
output = io.BytesIO()
chunk = response["Body"].read(1024 * 8)
while chunk:
output.write(chunk)
chunk = response["Body"].read(1024 * 8)
output.seek(0) # go to the beginning of the .tar file
tar = tarfile.open(mode="r", fileobj=output)
return load_chunk(tar, size=size if size != [] else None)
def load_tarfiles_from_s3(bucket, s3_keys, size=[]):
"""Load a number of imagenet .tar files.
Args:
bucket (str): Bucket holding the imagenet .tars.
s3_keys (List[str]): List of s3 keys from which the .tar files are being
loaded.
size (List[int]): Resize the image to this size if size does not equal [].
The length of size must be 2.
Returns:
np.ndarray: Contains object IDs to the chunks of the images (see load_chunk).
"""
return [load_tarfile_from_s3.remote(bucket, s3_key, size) for s3_key in s3_keys]
def setup_variables(params, placeholders, kernelshape, biasshape):
"""Create the variables for each layer.
Args:
params (List): Network parameters used for creating feed_dicts
placeholders (List): Placeholders used for feeding weights into
kernelshape (List): Shape of the kernel used for the conv layer
biasshape (List): Shape of the bias used
Returns:
None
"""
kernel = tf.Variable(tf.truncated_normal(kernelshape, stddev=STDDEV))
biases = tf.Variable(tf.constant(0.0, shape=biasshape, dtype=tf.float32),
trainable=True, name='biases')
kernel_new = tf.placeholder(tf.float32, shape=kernel.get_shape())
biases_new = tf.placeholder(tf.float32, shape=biases.get_shape())
update_kernel = kernel.assign(kernel_new)
update_biases = biases.assign(biases_new)
params += [kernel, biases]
placeholders += [kernel_new, biases_new]
def conv_layer(parameters, prev_layer, shape, scope):
"""Constructs a convolutional layer for the network.
Args:
parameters (List): Parameters used in constructing layer.
prevlayer (Tensor): The previous layer to connect the network together.
shape (List): The strides used for convolution
scope (Scope): Current scope of tensorflow
Returns:
Tensor: Activation of layer
"""
kernel = parameters[-2]
bias = parameters[-1]
conv = tf.nn.conv2d(prev_layer, kernel, shape, padding='SAME')
add_bias = tf.nn.bias_add(conv, bias)
return tf.nn.relu(add_bias, name=scope)
def net_initialization():
images = tf.placeholder(tf.float32, shape=[None, 224, 224, 3])
y_true = tf.placeholder(tf.float32, shape=[None, 1000])
parameters = []
placeholders = []
# conv1
with tf.name_scope('conv1') as scope:
setup_variables(parameters, placeholders, [11, 11, 3, 96], [96])
conv1 = conv_layer(parameters, images, [1, 4, 4, 1], scope)
# pool1
pool1 = tf.nn.max_pool(conv1,
ksize=[1, 3, 3, 1],
strides=[1, 2, 2, 1],
padding='VALID',
name='pool1')
# lrn1
pool1_lrn = tf.nn.lrn(pool1, depth_radius=5, bias=1.0,
alpha=0.0001, beta=0.75,
name="LocalResponseNormalization")
# conv2
with tf.name_scope('conv2') as scope:
setup_variables(parameters, placeholders, [5, 5, 96, 256], [256])
conv2 = conv_layer(parameters, pool1_lrn, [1, 1, 1, 1], scope)
pool2 = tf.nn.max_pool(conv2,
ksize=[1, 3, 3, 1],
strides=[1, 2, 2, 1],
padding='VALID',
name='pool2')
# lrn2
pool2_lrn = tf.nn.lrn(pool2, depth_radius=5, bias=1.0,
alpha=0.0001, beta=0.75,
name="LocalResponseNormalization")
# conv3
with tf.name_scope('conv3') as scope:
setup_variables(parameters, placeholders, [3, 3, 256, 384], [384])
conv3 = conv_layer(parameters, pool2_lrn, [1, 1, 1, 1], scope)
# conv4
with tf.name_scope('conv4') as scope:
setup_variables(parameters, placeholders, [3, 3, 384, 384], [384])
conv4 = conv_layer(parameters, conv3, [1, 1, 1, 1], scope)
# conv5
with tf.name_scope('conv5') as scope:
setup_variables(parameters, placeholders, [3, 3, 384, 256], [256])
conv5 = conv_layer(parameters, conv4, [1, 1, 1, 1], scope)
# pool5
pool5 = tf.nn.max_pool(conv5,
ksize=[1, 3, 3, 1],
strides=[1, 2, 2, 1],
padding='VALID',
name='pool5')
# lrn5
pool5_lrn = tf.nn.lrn(pool5, depth_radius=5, bias=1.0,
alpha=0.0001, beta=0.75,
name="LocalResponseNormalization")
dropout = tf.placeholder(tf.float32)
with tf.name_scope('fc1') as scope:
n_input = int(np.prod(pool5_lrn.get_shape().as_list()[1:]))
setup_variables(parameters, placeholders, [n_input, 4096], [4096])
fc_in = tf.reshape(pool5_lrn, [-1, n_input])
fc_layer1 = tf.nn.tanh(tf.nn.bias_add(tf.matmul(fc_in, parameters[-2]), parameters[-1]))
fc_out1 = tf.nn.dropout(fc_layer1, dropout)
with tf.name_scope('fc2') as scope:
n_input = int(np.prod(fc_out1.get_shape().as_list()[1:]))
setup_variables(parameters, placeholders, [n_input, 4096], [4096])
fc_in = tf.reshape(fc_out1, [-1, n_input])
fc_layer2 = tf.nn.tanh(tf.nn.bias_add(tf.matmul(fc_in, parameters[-2]), parameters[-1]))
fc_out2 = tf.nn.dropout(fc_layer2, dropout)
with tf.name_scope('fc3') as scope:
n_input = int(np.prod(fc_out2.get_shape().as_list()[1:]))
setup_variables(parameters, placeholders, [n_input, 1000], [1000])
fc_in = tf.reshape(fc_out2, [-1, n_input])
fc_layer3 = tf.nn.softmax(tf.nn.bias_add(tf.matmul(fc_in, parameters[-2]), parameters[-1]))
y_pred = fc_layer3 / tf.reduce_sum(fc_layer3,
reduction_indices=len(fc_layer3.get_shape()) - 1,
keep_dims=True)
# manual computation of crossentropy
y_pred = tf.clip_by_value(y_pred, tf.cast(1e-10, dtype=tf.float32),
tf.cast(1. - 1e-10, dtype=tf.float32))
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_true * tf.log(y_pred),
reduction_indices=len(y_pred.get_shape()) - 1))
opt = tf.train.MomentumOptimizer(learning_rate=0.01, momentum=0.9) # Any other optimizier can be placed here
correct_pred = tf.equal(tf.argmax(y_pred, 1), tf.argmax(y_true, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
comp_grads = opt.compute_gradients(cross_entropy, parameters)
application = opt.apply_gradients(zip(placeholders, parameters))
sess = tf.Session()
init_all_variables = tf.initialize_all_variables()
# In order to set the weights of the TensorFlow graph on a worker, we add
# assignment nodes. To get the network weights (as a list of numpy arrays)
# and to set the network weights (from a list of numpy arrays), use the
# methods get_weights and set_weights. This can be done from within a remote
# function or on the driver.
def get_and_set_weights_methods():
assignment_placeholders = []
assignment_nodes = []
for var in tf.trainable_variables():
assignment_placeholders.append(tf.placeholder(var.value().dtype, var.get_shape().as_list()))
assignment_nodes.append(var.assign(assignment_placeholders[-1]))
def get_weights():
return [v.eval(session=sess) for v in tf.trainable_variables()]
def set_weights(new_weights):
sess.run(assignment_nodes, feed_dict={p: w for p, w in zip(assignment_placeholders, new_weights)})
return get_weights, set_weights
get_weights, set_weights = get_and_set_weights_methods()
return comp_grads, sess, application, accuracy, images, y_true, dropout, placeholders, init_all_variables, get_weights, set_weights
def net_reinitialization(net_vars):
return net_vars
@ray.remote
def num_images(batches):
"""Counts number of images in batches.
Args:
batches (List): Collection of batches of images and labels.
Returns:
int: The number of images
"""
shape_ids = [ra.shape.remote(batch) for batch in batches]
return sum([shape[0] for shape in ray.get(shape_ids)])
@ray.remote
def compute_mean_image(batches):
"""Computes the mean image given a list of batches of images.
Args:
batches (List[ObjectID]): A list of batches of images.
Returns:
ndarray: The mean image
"""
if len(batches) == 0:
raise Exception("No images were passed into `compute_mean_image`.")
sum_image_ids = [ra.sum.remote(batch, axis=0) for batch in batches]
n_images = num_images.remote(batches)
return np.sum(ray.get(sum_image_ids), axis=0).astype("float64") / ray.get(n_images)
@ray.remote(num_return_vals=4)
def shuffle_arrays(first_images, first_labels, second_images, second_labels):
"""Shuffles the images and labels from two batches.
Args:
first_images (ndarray): First batch of images.
first_labels (ndarray): First batch of labels.
second_images (ndarray): Second batch of images.
second_labels (ndarray): Second batch of labels.
Returns:
ndarray: First batch of shuffled images.
ndarray: First batch of shuffled labels.
ndarray: Second bach of shuffled images.
ndarray: Second batch of shuffled labels.
"""
images = np.concatenate((first_images, second_images))
labels = np.concatenate((first_labels, second_labels))
total_length = len(images)
first_len = len(first_images)
random_indices = np.random.permutation(total_length)
new_first_images = images[random_indices[0:first_len]]
new_first_labels = labels[random_indices[0:first_len]]
new_second_images = images[random_indices[first_len:total_length]]
new_second_labels = labels[random_indices[first_len:total_length]]
return new_first_images, new_first_labels, new_second_images, new_second_labels
def shuffle_pair(first_batch, second_batch):
"""Shuffle two batches of data.
Args:
first_batch (Tuple[ObjectID. ObjectID]): The first batch to be shuffled. The
first component is the object ID of a batch of images, and the second
component is the object ID of the corresponding batch of labels.
second_batch (Tuple[ObjectID, ObjectID]): The second batch to be shuffled.
The first component is the object ID of a batch of images, and the second
component is the object ID of the corresponding batch of labels.
Returns:
Tuple[ObjectID, ObjectID]: The first batch of shuffled data.
Tuple[ObjectID, ObjectID]: Two second bach of shuffled data.
"""
images1, labels1, images2, labels2 = shuffle_arrays.remote(first_batch[0], first_batch[1], second_batch[0], second_batch[1])
return (images1, labels1), (images2, labels2)
@ray.remote
def filenames_to_labels(filenames, filename_label_dict):
"""Converts filename strings to integer labels.
Args:
filenames (List[str]): The filenames of the images.
filename_label_dict (Dict[str, int]): A dictionary mapping filenames to
integer labels.
Returns:
ndarray: Integer labels
"""
return np.asarray([int(filename_label_dict[filename]) for filename in filenames])
def one_hot(x):
"""Converts integer labels to one hot vectors.
Args:
x (int): Index to be set to one
Returns:
ndarray: One hot vector.
"""
zero = np.zeros([1000])
zero[x] = 1.0
return zero
def crop_images(images):
"""Randomly crop a batch of images.
This is used to generate many slightly different images from each training
example.
Args:
images (ndarray): A batch of images to crop. The shape of images should be
batch_size x height x width x channels.
Returns:
ndarray: A batch of cropped images.
"""
original_height = 256
original_width = 256
cropped_height = 224
cropped_width = 224
height_offset = np.random.randint(original_height - cropped_height + 1)
width_offset = np.random.randint(original_width - cropped_width + 1)
return images[:, height_offset:(height_offset + cropped_height), width_offset:(width_offset + cropped_width), :]
def shuffle(batches):
"""Shuffle the data.
This method groups the batches together in pairs and within each pair shuffles
the data between the two members.
Args:
batches (List[Tuple[ObjectID, ObjectID]]): This is a list of tuples, where
each tuple consists of two object IDs. The first component is an object ID
for a batch of images, and the second component is an object ID for the
corresponding batch of labels.
Returns:
List[Tuple[ObjectID, ObjectID]]: The shuffled data.
"""
# Randomly permute the order of the batches.
permuted_batches = np.random.permutation(batches)
new_batches = []
for i in range(len(batches) // 2):
# Swap data between consecutive batches.
shuffled_batch1, shuffled_batch2 = shuffle_pair(permuted_batches[2 * i], permuted_batches[2 * i + 1])
new_batches += [shuffled_batch1, shuffled_batch2]
if len(batches) % 2 == 1:
# If there is an odd number of batches, don't forget the last one.
new_batches.append(permuted_batches[-1])
return new_batches
@ray.remote
def compute_grad(X, Y, mean, weights):
"""Computes the gradient of the network.
Args:
X (ndarray): Numpy array of images in the form of [224, 224,3]
Y (ndarray): Labels corresponding to each image
mean (ndarray): Mean image to subtract from images
weights (List[ndarray]): The network weights.
Returns:
List of gradients for each variable
"""
comp_grads, sess, _, _, images, y_true, dropout, placeholders, _, get_weights, set_weights = ray.reusables.net_vars
# Set the network weights.
set_weights(weights)
# Choose a subset of the batch to compute on and crop the images.
random_indices = np.random.randint(0, len(X), size=128)
subset_X = crop_images(X[random_indices] - mean)
subset_Y = np.asarray([one_hot(label) for label in Y[random_indices]])
# Compute the gradients.
return sess.run([g for (g, v) in comp_grads], feed_dict={images: subset_X, y_true: subset_Y, dropout: 0.5})
@ray.remote
def compute_accuracy(X, Y, weights):
"""Returns the accuracy of the network
Args:
X (ndarray): A batch of images.
Y (ndarray): A batch of labels.
weights (List[ndarray]): The network weights.
Returns:
The accuracy of the network on the given batch.
"""
_, sess, _, accuracy, images, y_true, dropout, placeholders, _, get_weights, set_weights = ray.reusables.net_vars
# Set the network weights.
set_weights(weights)
one_hot_Y = np.asarray([one_hot(label) for label in Y])
cropped_X = crop_images(X)
return sess.run(accuracy, feed_dict={images: cropped_X, y_true: one_hot_Y, dropout: 1.0})
-98
View File
@@ -1,98 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
import os
import argparse
import boto3
import alexnet
# Arguments to specify where the imagenet data is stored.
parser = argparse.ArgumentParser(description="Run the AlexNet example.")
parser.add_argument("--s3-bucket", required=True, type=str, help="Name of the bucket that contains the image data.")
parser.add_argument("--key-prefix", default="ILSVRC2012_img_train/n015", type=str, help="Prefix for files to fetch.")
parser.add_argument("--label-file", default="train.txt", type=str, help="File containing labels.")
if __name__ == "__main__":
args = parser.parse_args()
ray.init(start_ray_local=True, num_workers=10)
# Note we do not do sess.run(tf.initialize_all_variables()) because that would
# result in a different initialization on each worker. Instead, we initialize
# the weights on the driver and load the weights on the workers every time we
# compute a gradient.
ray.reusables.net_vars = ray.Reusable(alexnet.net_initialization, alexnet.net_reinitialization)
# Prepare keys for downloading the data.
s3_resource = boto3.resource("s3")
imagenet_bucket = s3_resource.Bucket(args.s3_bucket)
objects = imagenet_bucket.objects.filter(Prefix=args.key_prefix)
image_tar_files = [str(obj.key) for obj in objects.all()]
print("Images will be downloaded from {} files.".format(len(image_tar_files)))
# Downloading the label file, and create a dictionary mapping the filenames of
# the images to their labels.
s3_client = boto3.client("s3")
label_file = s3_client.get_object(Bucket=args.s3_bucket, Key=args.label_file)
filename_label_str = label_file["Body"].read().strip().split("\n")
filename_label_pairs = [line.split(" ") for line in filename_label_str]
filename_label_dict = dict([(os.path.basename(name), label) for name, label in filename_label_pairs])
filename_label_dict_id = ray.put(filename_label_dict)
print("Labels extracted.")
# Download the imagenet dataset.
imagenet_data = alexnet.load_tarfiles_from_s3(args.s3_bucket, image_tar_files, [256, 256])
# Convert the parsed filenames to integer labels and create batches.
batches = [(images, alexnet.filenames_to_labels.remote(filenames, filename_label_dict_id)) for images, filenames in imagenet_data]
# Compute the mean image.
mean_id = alexnet.compute_mean_image.remote([images for images, labels in batches])
# The data does not start out shuffled. Images of the same class all appear
# together, so we shuffle it ourselves here. Each shuffle pairs up the batches
# and swaps data within a pair.
num_shuffles = 5
for i in range(num_shuffles):
batches = alexnet.shuffle(batches)
_, sess, application, _, _, _, _, placeholders, init_all_variables, get_weights, set_weights = ray.reusables.net_vars
# Initialize the network and optimizer weights. This is only run once on the
# driver. We initialize the weights manually on the workers.
sess.run(init_all_variables)
print("Initialized network weights.")
iteration = 0
while True:
# Extract weights from the local copy of the network.
weights = get_weights()
# Put weights in the object store.
weights_id = ray.put(weights)
# Compute the accuracy on a random training batch.
x_id, y_id = batches[np.random.randint(len(batches))]
accuracy = alexnet.compute_accuracy.remote(x_id, y_id, weights_id)
# Launch tasks in parallel to compute the gradients for some batches.
gradient_ids = []
num_batches = 4
for i in range(num_batches):
# Choose a random batch and use it to compute the gradient of the loss.
x_id, y_id = batches[np.random.randint(len(batches))]
gradient_ids.append(alexnet.compute_grad.remote(x_id, y_id, mean_id, weights_id))
# Print the accuracy on a random training batch.
print("Iteration {}: accuracy = {:.3}%".format(iteration, 100 * ray.get(accuracy)))
# Fetch the gradients. This blocks until the gradients have been computed.
gradient_sets = ray.get(gradient_ids)
# Average the gradients over all of the tasks.
mean_gradients = [np.mean([gradient_set[i] for gradient_set in gradient_sets], axis=0) for i in range(len(weights))]
# Use the gradients to update the network.
sess.run(application, feed_dict=dict(zip(placeholders, mean_gradients)))
iteration += 1
+1 -1
View File
@@ -21,7 +21,7 @@ parser.add_argument("--steps", default=10, type=int, help="The number of steps o
if __name__ == "__main__":
args = parser.parse_args()
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)
# The number of sets of random hyperparameters to try.
trials = args.trials
+1 -1
View File
@@ -11,7 +11,7 @@ import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
if __name__ == "__main__":
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)
# Define the dimensions of the data and of the model.
image_dimension = 784
+1 -1
View File
@@ -110,7 +110,7 @@ def compute_gradient(model):
return policy_backward(eph, epx, epdlogp, model), reward_sum
if __name__ == "__main__":
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)
# Run the reinforcement learning
running_reward = None
-118
View File
@@ -1,118 +0,0 @@
# Parallelizing TRPO
In this example, we show how TRPO can be parallelized using Ray. We will be
working with John Schulman's
[modular_rl code](https://github.com/joschu/modular_rl).
For this tutorial I'll assume that you have Anaconda with Python 2.7 installed.
## Setting up the single core implementation of TRPO
First, we will run the original TRPO code.
Install these dependencies:
- [Gym](https://gym.openai.com/)
- The following Python packages:
```
pip install theano
pip install keras
pip install tabulate
```
Run this code to start an experiment.
```
git clone https://github.com/joschu/modular_rl
cd modular_rl
export KERAS_BACKEND=theano && ./run_pg.py --env Pong-ram-v0 --agent modular_rl.agentzoo.TrpoAgent --video 0 --n_iter 500 --filter 1
```
**Note: On some versions of Mac OS X, this produces NaNs.**
On a m4.4xlarge EC2 instance, the first 10 iterations take 106s.
Each iteration consists of two phases. In the first phase, the rollouts are
computed (on one core). In the second phase, the objective is optimized, which
makes use of the parallel BLAS library. The code for all of this is in
`modular_rl/modular_rl/core.py`.
```python
for _ in xrange(cfg["n_iter"]):
# Rollouts ========
paths = get_paths(env, agent, cfg, seed_iter)
compute_advantage(agent.baseline, paths, gamma=cfg["gamma"], lam=cfg["lam"])
# VF Update ========
vf_stats = agent.baseline.fit(paths)
# Pol Update ========
pol_stats = agent.updater(paths)
```
We will now see how this code can be parallelized.
## Parallelizing TRPO rollouts using Ray
As a first step, we will parallelize the rollouts. This is done by implementing
a function `do_rollouts_remote` similar to
[do_rollouts_serial](https://github.com/joschu/modular_rl/blob/46a6f9a0d363a7bc1c7325ff17e2eb684612a388/modular_rl/core.py#L137),
which will be called by
[get_paths](https://github.com/joschu/modular_rl/blob/46a6f9a0d363a7bc1c7325ff17e2eb684612a388/modular_rl/core.py#L102)
(called in the above code snippet).
Check out the parallel version of the TRPO code.
```
git clone https://github.com/pcmoritz/modular_rl modular_rl_ray
cd modular_rl_ray
git checkout remote
```
You can run the code using
```
export KERAS_BACKEND=theano && ./run_pg.py --env Pong-ram-v0 --agent modular_rl.agentzoo.TrpoAgent --video 0 --n_iter 500 --filter 1 --remote 1 --n_rollouts 8
```
There are few [changes](https://github.com/joschu/modular_rl/compare/master...pcmoritz:23d3ebc).
As in the [learning to play Pong example](https://github.com/ray-project/ray/tree/master/examples/rl_pong),
we use reusable variables to store the gym environment and the neural network policy. These are
then used in the remote `do_rollout` function to do a remote rollout:
```python
@ray.remote
def do_rollout(policy, timestep_limit, seed):
# Retrieve the game environment.
env = ray.reusables.env
# Set the environment seed.
env.seed(seed)
# Set the numpy seed.
np.random.seed(seed)
# Retrieve the neural network agent.
agent = ray.reusables.agent
# Set the network weights.
agent.set_from_flat(policy)
return rollout(env, agent, timestep_limit)
```
All that is left is to invoke the remote function and collect the paths.
```python
def do_rollouts_remote(agent, timestep_limit, n_timesteps, n_parallel, seed_iter):
# Put the neural network weights into the object store.
policy = ray.put(agent.get_flat())
paths = []
timesteps_sofar = 0
# Run parallel rollouts until we have enough.
while timesteps_sofar < n_timesteps:
# Launch rollout tasks in parallel.
rollout_ids = [do_rollout.remote(policy, timestep_limit, seed_iter.next()) for i in range(n_parallel)]
for rollout_id in rollout_ids:
# Retrieve the task output from the object store.
path = ray.get(rollout_id)
paths.append(path)
timesteps_sofar += pathlength(path)
return paths
```
On the same m4.4xlarge EC2 instance, the first 10 iterations now take 42s instead of
106s.
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import numpy as np
import ray.array.remote as ra
import ray.experimental.array.remote as ra
import ray
__all__ = ["BLOCK_SIZE", "DistArray", "assemble", "zeros", "ones", "copy",
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import numpy as np
import ray.array.remote as ra
import ray.experimental.array.remote as ra
import ray
from .core import *
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import numpy as np
import ray.array.remote as ra
import ray.experimental.array.remote as ra
import ray
from .core import *
+10 -13
View File
@@ -548,7 +548,7 @@ def check_connected(worker=global_worker):
Exception: An exception is raised if the worker is not connected.
"""
if not worker.connected:
raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(start_ray_local=True, num_workers=1)'.")
raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(num_workers=10)'.")
def print_failed_task(task_status):
"""Print information about failed tasks.
@@ -724,7 +724,7 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
# Use the address 127.0.0.1 in local mode.
node_ip_address = "127.0.0.1" if node_ip_address is None else node_ip_address
# Use 1 worker if num_workers is not provided.
num_workers = 1 if num_workers is None else num_workers
num_workers = 10 if num_workers is None else num_workers
# Use 1 local scheduler if num_local_schedulers is not provided. If
# existing local schedulers are provided, use that count as
# num_local_schedulers.
@@ -770,8 +770,8 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker)
return address_info
def init(node_ip_address=None, redis_address=None, start_ray_local=False,
object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE):
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE):
"""Either connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
@@ -780,18 +780,16 @@ def init(node_ip_address=None, redis_address=None, start_ray_local=False,
Args:
node_ip_address (str): The IP address of the node that we are on.
redis_address (str): The address of the Redis server to connect to. This
should only be provided if start_ray_local is False.
start_ray_local (bool): If True then this will start Redis, a global
redis_address (str): The address of the Redis server to connect to. If this
address is not provided, then this command will start Redis, a global
scheduler, a local scheduler, a plasma store, a plasma manager, and some
workers. It will also kill these processes when Python exits. If False,
this will attach to an existing Ray cluster.
workers. It will also kill these processes when Python exits.
object_id_seed (int): Used to seed the deterministic generation of object
IDs. The same value can be used across multiple runs of the same job in
order to generate the object IDs in a consistent manner. However, the same
ID should not be used for different jobs.
num_workers (int): The number of workers to start. This is only provided if
start_ray_local is True.
redis_address is not provided.
driver_mode (bool): The mode in which to start the driver. This should be
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
@@ -806,9 +804,8 @@ def init(node_ip_address=None, redis_address=None, start_ray_local=False,
"node_ip_address": node_ip_address,
"redis_address": redis_address,
}
return _init(address_info=info,
start_ray_local=start_ray_local, num_workers=num_workers,
driver_mode=driver_mode)
return _init(address_info=info, start_ray_local=(redis_address is None),
num_workers=num_workers, driver_mode=driver_mode)
def cleanup(worker=global_worker):
"""Disconnect the driver, and terminate any processes started in init.
+8
View File
@@ -16,6 +16,14 @@ else
exit 1
fi
# Build plasma. We currently have to do this because we compile numbuf with the
# HAS_PLASMA flag.
PLASMA_DIR="$ROOT_DIR/../src/plasma"
pushd "$PLASMA_DIR"
make clean
make
popd
mkdir -p "$ROOT_DIR/build"
pushd "$ROOT_DIR/build"
cmake -DHAS_PLASMA=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" ..
+4 -4
View File
@@ -12,15 +12,15 @@ import sys
if sys.version_info >= (3, 0):
from importlib import reload
import ray.array.remote as ra
import ray.array.distributed as da
import ray.experimental.array.remote as ra
import ray.experimental.array.distributed as da
class RemoteArrayTest(unittest.TestCase):
def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True)
ray.init(num_workers=1)
# test eye
object_id = ra.eye.remote(3)
@@ -54,7 +54,7 @@ class DistributedArrayTest(unittest.TestCase):
def testAssemble(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
+7 -7
View File
@@ -24,7 +24,7 @@ def wait_for_errors(error_type, num_errors, timeout=10):
class FailureTest(unittest.TestCase):
def testUnknownSerialization(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
test_functions.test_unknown_type.remote()
wait_for_errors(b"TaskError", 1)
@@ -35,7 +35,7 @@ class FailureTest(unittest.TestCase):
class TaskSerializationTest(unittest.TestCase):
def testReturnAndPassUnknownType(self):
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
class Foo(object):
pass
@@ -57,7 +57,7 @@ class TaskSerializationTest(unittest.TestCase):
class TaskStatusTest(unittest.TestCase):
def testFailedTask(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=3, driver_mode=ray.SILENT_MODE)
test_functions.throw_exception_fct1.remote()
test_functions.throw_exception_fct1.remote()
@@ -87,7 +87,7 @@ class TaskStatusTest(unittest.TestCase):
ray.worker.cleanup()
def testFailImportingRemoteFunction(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
# This example is somewhat contrived. It should be successfully pickled, and
# then it should throw an exception when it is unpickled. This may depend a
@@ -115,7 +115,7 @@ class TaskStatusTest(unittest.TestCase):
ray.worker.cleanup()
def testFailImportingReusableVariable(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
# This will throw an exception when the reusable variable is imported on the
# workers.
@@ -131,7 +131,7 @@ class TaskStatusTest(unittest.TestCase):
ray.worker.cleanup()
def testFailReinitializingVariable(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
def initializer():
return 0
@@ -149,7 +149,7 @@ class TaskStatusTest(unittest.TestCase):
ray.worker.cleanup()
def testFailedFunctionToRun(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)
def f(worker):
if ray.worker.global_worker.mode == ray.WORKER_MODE:
+2 -2
View File
@@ -18,7 +18,7 @@ class MicroBenchmarkTest(unittest.TestCase):
def testTiming(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3)
ray.init(num_workers=3)
# measure the time required to submit a remote task to the scheduler
elapsed_times = []
@@ -88,7 +88,7 @@ class MicroBenchmarkTest(unittest.TestCase):
ray.worker.cleanup()
def testCache(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
A = np.random.rand(1, 1000000)
v = np.random.rand(1000000)
+22 -22
View File
@@ -16,8 +16,8 @@ if sys.version_info >= (3, 0):
from importlib import reload
import ray.test.test_functions as test_functions
import ray.array.remote as ra
import ray.array.distributed as da
import ray.experimental.array.remote as ra
import ray.experimental.array.distributed as da
def assert_equal(obj1, obj2):
if type(obj1).__module__ == np.__name__ or type(obj2).__module__ == np.__name__:
@@ -129,7 +129,7 @@ except AttributeError:
class SerializationTest(unittest.TestCase):
def testRecursiveObjects(self):
ray.init(start_ray_local=True, num_workers=0)
ray.init(num_workers=0)
class ClassA(object):
pass
@@ -160,7 +160,7 @@ class SerializationTest(unittest.TestCase):
ray.worker.cleanup()
def testPassingArgumentsByValue(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
@ray.remote
def f(x):
@@ -184,7 +184,7 @@ class SerializationTest(unittest.TestCase):
class WorkerTest(unittest.TestCase):
def testPutGet(self):
ray.init(start_ray_local=True, num_workers=0)
ray.init(num_workers=0)
for i in range(100):
value_before = i * 10 ** 6
@@ -215,7 +215,7 @@ class WorkerTest(unittest.TestCase):
class APITest(unittest.TestCase):
def testRegisterClass(self):
ray.init(start_ray_local=True, num_workers=0)
ray.init(num_workers=0)
# Check that putting an object of a class that has not been registered
# throws an exception.
@@ -233,7 +233,7 @@ class APITest(unittest.TestCase):
def testKeywordArgs(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
x = test_functions.keyword_fct1.remote(1)
self.assertEqual(ray.get(x), "1 hello")
@@ -270,7 +270,7 @@ class APITest(unittest.TestCase):
def testVariableNumberOfArgs(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
x = test_functions.varargs_fct1.remote(0, 1, 2)
self.assertEqual(ray.get(x), "0 1 2")
@@ -284,14 +284,14 @@ class APITest(unittest.TestCase):
def testNoArgs(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
ray.get(test_functions.no_op.remote())
ray.worker.cleanup()
def testDefiningRemoteFunctions(self):
ray.init(start_ray_local=True, num_workers=3)
ray.init(num_workers=3)
# Test that we can define a remote function in the shell.
@ray.remote
@@ -345,13 +345,13 @@ class APITest(unittest.TestCase):
ray.worker.cleanup()
def testGetMultiple(self):
ray.init(start_ray_local=True, num_workers=0)
ray.init(num_workers=0)
object_ids = [ray.put(i) for i in range(10)]
self.assertEqual(ray.get(object_ids), list(range(10)))
ray.worker.cleanup()
def testWait(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
@ray.remote
def f(delay):
@@ -401,7 +401,7 @@ class APITest(unittest.TestCase):
ray.reusables.bar.append(1)
return ray.reusables.bar
ray.init(start_ray_local=True, num_workers=2)
ray.init(num_workers=2)
self.assertEqual(ray.get(use_foo.remote()), 1)
self.assertEqual(ray.get(use_foo.remote()), 1)
@@ -425,7 +425,7 @@ class APITest(unittest.TestCase):
sys.path.append(4)
ray.worker.global_worker.run_function_on_all_workers(f)
ray.init(start_ray_local=True, num_workers=2)
ray.init(num_workers=2)
@ray.remote
def get_state():
@@ -448,7 +448,7 @@ class APITest(unittest.TestCase):
ray.worker.cleanup()
def testRunningFunctionOnAllWorkers(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
def f(worker_info):
sys.path.append("fake_directory")
@@ -471,7 +471,7 @@ class APITest(unittest.TestCase):
ray.worker.cleanup()
def testPassingInfoToAllWorkers(self):
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)
def f(worker_info):
sys.path.append(worker_info)
@@ -498,7 +498,7 @@ class APITest(unittest.TestCase):
ray.worker.cleanup()
def testLoggingAPI(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
def events():
# This is a hack for getting the event log. It is not part of the API.
@@ -551,7 +551,7 @@ class PythonModeTest(unittest.TestCase):
def testPythonMode(self):
reload(test_functions)
ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE)
ray.init(driver_mode=ray.PYTHON_MODE)
@ray.remote
def f():
@@ -574,7 +574,7 @@ class PythonModeTest(unittest.TestCase):
def testReusableVariablesInPythonMode(self):
reload(test_functions)
ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE)
ray.init(driver_mode=ray.PYTHON_MODE)
def l_init():
return []
@@ -612,7 +612,7 @@ class PythonModeTest(unittest.TestCase):
class ReusablesTest(unittest.TestCase):
def testReusables(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
# Test that we can add a variable to the key-value store.
@@ -688,7 +688,7 @@ class ReusablesTest(unittest.TestCase):
ray.worker.cleanup()
def testUsingReusablesOnDriver(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
# Test that we can add a variable to the key-value store.
@@ -731,7 +731,7 @@ class UtilsTest(unittest.TestCase):
# The functionality being tested here is really multi-node functionality,
# but this test just uses a single node.
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
source_text = "hello world"
+1 -1
View File
@@ -65,7 +65,7 @@ class TaskTests(unittest.TestCase):
ray.worker.cleanup()
def testGettingAndPutting(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)
for n in range(8):
x = np.zeros(10 ** n)
+1 -1
View File
@@ -9,7 +9,7 @@ import ray
class TensorFlowTest(unittest.TestCase):
def testTensorFlowVariables(self):
ray.init(start_ray_local=True, num_workers=2)
ray.init(num_workers=2)
x_data = tf.placeholder(tf.float32, shape=[100])
y_data = tf.placeholder(tf.float32, shape=[100])
-3
View File
@@ -1,3 +0,0 @@
{
"presets": ["es2015", "react"]
}