From be4a37bf3760262c4dc8524dd485d07ecd09b6b0 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 10 Jan 2017 17:35:27 -0800 Subject: [PATCH] Various cleanups: remove start_ray_local from ray.init, remove unused code, fix "pip install numbuf". (#193) * Remove start_ray_local from ray.init and change default number of workers to 10. * Remove alexnet example. * Move array methods to experimental. * Remove TRPO example. * Remove old files. * Compile plasma when we build numbuf. * Address comments. --- README.md | 5 +- build-webui.sh | 35 -- doc/reusable-variables.md | 2 +- doc/serialization.md | 2 +- doc/services-api.rst | 5 - doc/tutorial.md | 37 +- doc/using-ray-with-tensorflow.md | 2 +- examples/alexnet/README.md | 108 ----- examples/alexnet/alexnet.py | 448 ------------------ examples/alexnet/driver.py | 98 ---- examples/hyperopt/driver.py | 2 +- examples/lbfgs/driver.py | 2 +- examples/rl_pong/driver.py | 2 +- examples/trpo/README.md | 118 ----- .../ray/{ => experimental}/array/__init__.py | 0 .../array/distributed/__init__.py | 0 .../array/distributed/core.py | 2 +- .../array/distributed/linalg.py | 2 +- .../array/distributed/random.py | 2 +- .../array/remote/__init__.py | 0 .../{ => experimental}/array/remote/core.py | 0 .../{ => experimental}/array/remote/linalg.py | 0 .../{ => experimental}/array/remote/random.py | 0 lib/python/ray/worker.py | 23 +- numbuf/build.sh | 8 + test/array_test.py | 8 +- test/failure_test.py | 14 +- test/microbenchmarks.py | 4 +- test/runtest.py | 44 +- test/stress_tests.py | 2 +- test/tensorflow_test.py | 2 +- webui/.babelrc | 3 - 32 files changed, 89 insertions(+), 891 deletions(-) delete mode 100755 build-webui.sh delete mode 100644 doc/services-api.rst delete mode 100644 examples/alexnet/README.md delete mode 100644 examples/alexnet/alexnet.py delete mode 100644 examples/alexnet/driver.py delete mode 100644 examples/trpo/README.md rename lib/python/ray/{ => experimental}/array/__init__.py (100%) rename lib/python/ray/{ => experimental}/array/distributed/__init__.py (100%) rename lib/python/ray/{ => experimental}/array/distributed/core.py (99%) rename lib/python/ray/{ => experimental}/array/distributed/linalg.py (99%) rename lib/python/ray/{ => experimental}/array/distributed/random.py (91%) rename lib/python/ray/{ => experimental}/array/remote/__init__.py (100%) rename lib/python/ray/{ => experimental}/array/remote/core.py (100%) rename lib/python/ray/{ => experimental}/array/remote/linalg.py (100%) rename lib/python/ray/{ => experimental}/array/remote/random.py (100%) delete mode 100644 webui/.babelrc diff --git a/README.md b/README.md index 0afcf9819..7b6aa3f82 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ machines). import ray import numpy as np -# Start a scheduler, an object store, and some workers. -ray.init(start_ray_local=True, num_workers=10) +# Start Ray with some workers. +ray.init(num_workers=10) # Define a remote function for estimating pi. @ray.remote @@ -63,4 +63,3 @@ estimate of pi (waiting until the computation has finished if necessary). - [Hyperparameter Optimization](examples/hyperopt/README.md) - [Batch L-BFGS](examples/lbfgs/README.md) - [Learning to Play Pong](examples/rl_pong/README.md) -- [Training AlexNet](examples/alexnet/README.md) diff --git a/build-webui.sh b/build-webui.sh deleted file mode 100755 index 07ef7eae6..000000000 --- a/build-webui.sh +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env bash - -# Cause the script to exit if a single command fails. -set -e - -ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) - -unamestr="$(uname)" -if [[ "$unamestr" == "Linux" ]]; then - platform="linux" -elif [[ "$unamestr" == "Darwin" ]]; then - platform="macosx" -else - echo "Unrecognized platform." - exit 1 -fi - -WEBUI_DIR="$ROOT_DIR/webui" - -PYTHON_DIR="$ROOT_DIR/lib/python" -PYTHON_WEBUI_DIR="$PYTHON_DIR/webui" - -pushd "$WEBUI_DIR" - npm install - if [[ $platform == "linux" ]]; then - nodejs ./node_modules/.bin/webpack -g - elif [[ $platform == "macosx" ]]; then - node ./node_modules/.bin/webpack -g - fi - pushd node_modules - rm -rf react* babel* classnames dom-helpers jsesc webpack .bin - popd -popd - -cp -r $WEBUI_DIR/* $PYTHON_WEBUI_DIR/ diff --git a/doc/reusable-variables.md b/doc/reusable-variables.md index 38a5b698c..8f2dad0d8 100644 --- a/doc/reusable-variables.md +++ b/doc/reusable-variables.md @@ -31,7 +31,7 @@ Python wrapper for an Atari simulator. import gym import ray -ray.init(start_ray_local=True, num_workers=5) +ray.init(num_workers=10) # Define a function to create the gym environment. def env_initializer(): diff --git a/doc/serialization.md b/doc/serialization.md index 16c76fe3c..a00b4b38b 100644 --- a/doc/serialization.md +++ b/doc/serialization.md @@ -76,7 +76,7 @@ This can be addressed by calling `ray.register_class(Foo)`. ```python import ray -ray.init(start_ray_local=True, num_workers=1) +ray.init(num_workers=10) # Define a custom class. class Foo(object): diff --git a/doc/services-api.rst b/doc/services-api.rst deleted file mode 100644 index f5b8be965..000000000 --- a/doc/services-api.rst +++ /dev/null @@ -1,5 +0,0 @@ -================ -The Services API -================ - -.. autofunction:: ray.services.start_ray_local diff --git a/doc/tutorial.md b/doc/tutorial.md index cd4087df5..22a91cab3 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -7,20 +7,29 @@ To use Ray, you need to understand the following: ## Overview -Ray is a distributed extension of Python. When using Ray, several processes are -involved. +Ray is a Python-based distributed execution engine. It can be used on a single +machine to achieve effective multiprocessing, and it can be used on a cluster +for large computations. -- A **scheduler**: The scheduler assigns tasks to workers. It is its own -process. -- Multiple **workers**: Workers execute tasks and store the results in object -stores. Each worker is a separate process. -- One **object store** per node: The object store enables the sharing of Python -objects between worker processes so each worker does not have to have a separate -copy. -- A **driver**: The driver is the Python process that the user controls and -which submits tasks to the scheduler. For example, if the user is running a -script or using a Python shell, then the driver is the process that runs the -script or the shell. +When using Ray, several processes are involved. + +- Multiple **worker** processes execute tasks and store results in object stores. +Each worker is a separate process. +- One **object store** per node stores immutable objects in shared memory and +allows workers to efficiently share objects on the same node with minimal +copying and deserialization. +- One **local scheduler** per node assigns tasks to workers on the same node. +- A **global scheduler** receives tasks from local schedulers and assigns them +to other local schedulers. +- A **driver** is the Python process that the user controls. For example, if the +user is running a script or using a Python shell, then the driver is the Python +process that runs the script or the shell. A driver is similar to a worker in +that it can submit tasks to its local scheduler and get objects from the object +store, but it is different in that the local scheduler will not assign tasks to +the driver to be executed. +- A **Redis server** maintains much of the system's state. For example, it keeps +track of which objects live on which machines and of the task specifications. It +can also be queried directly for debugging purposes. ## Starting Ray @@ -28,7 +37,7 @@ To start Ray, start Python, and run the following commands. ```python import ray -ray.init(start_ray_local=True, num_workers=10) +ray.init(num_workers=10) ``` That command starts a scheduler, one object store, and ten workers. Each of diff --git a/doc/using-ray-with-tensorflow.md b/doc/using-ray-with-tensorflow.md index 49f05b5d3..41fcaf6e8 100644 --- a/doc/using-ray-with-tensorflow.md +++ b/doc/using-ray-with-tensorflow.md @@ -79,7 +79,7 @@ import tensorflow as tf import numpy as np import ray -ray.init(start_ray_local=True, num_workers=5) +ray.init(num_workers=5) BATCH_SIZE = 100 NUM_BATCHES = 1 diff --git a/examples/alexnet/README.md b/examples/alexnet/README.md deleted file mode 100644 index 23e7da2b5..000000000 --- a/examples/alexnet/README.md +++ /dev/null @@ -1,108 +0,0 @@ -# Training AlexNet - -WARNING: Running this application is fairly involved. In particular, it requires -you to download the ImageNet dataset and put it on S3. - -This document walks through how to load the ImageNet dataset from S3 and train -AlexNet using data parallel stochastic gradient descent. - -## Running the Application - -The instructions in this section must be done before you can run the -application. - -### Install the Dependencies - -Install the following dependencies. - -- [TensorFlow](https://www.tensorflow.org/) - -In addition, install the following dependencies. - -**On Ubuntu** - -``` -sudo apt-get install libjpeg8-dev awscli -sudo pip install boto3 botocore pillow -``` - -**On Mac OSX** - -``` -brew install libjpeg awscli -sudo pip install boto3 botocore pillow -``` - -### Put ImageNet on S3 - -1. To run this application, first put the ImageNet tar files on S3 (e.g., in the -directory `ILSVRC2012_img_train`). Also, put the file `train.txt` on S3. We will -use `$S3_BUCKET` to refer to the name of your S3 bucket. - -2. Use `aws configure` to enable Boto to connect to S3. If you are using -multiple machines, this must be done on all machines. - -### Run the Application - -From the directory `ray/examples/alexnet/` run the following - -``` -source ../../setup-env.sh -python driver.py --s3-bucket $S3_BUCKET -``` - -## Parallel Data Loading - -To speed up data loading, we will pull data from S3 in parallel with a number of -workers. At the core of our loading code is the remote function -`load_tarfile_from_s3`. When executed, this function connects to S3 and -retrieves the appropriate object. - -```python -@ray.remote(num_return_vals=2) -def load_tarfile_from_s3(bucket, s3_key, size=[]): - # Pull the object with the given key and bucket from S3, untar the contents, - # and return it. - return images, labels -``` - -To load data in parallel, we simply call this function multiple times with the -keys of all the objects that we want to retrieve. This returns a list of pairs -of object IDs, where the first object ID in each pair refers to a -batch of images and the second refers to the corresponding batch of labels. - -```python -batches = [load_tarfile_from_s3.remote(bucket, s3_key, size) for s3_key in s3_keys] -``` - -By default, this will only fetch objects whose keys have prefix -`ILSVRC2012_img_train/n015` (this is 13 tar files). To fetch all of the data, -pass in `--key-prefix ILSVRC2012_img_train/n`. - -## Data Parallel Training - -The other parallel component of this application is the training procedure. This -is built on top of the remote function `compute_grad`. - -```python -@ray.remote -def compute_grad(X, Y, mean, weights): - # Load the weights into the network. - # Subtract the mean and crop the images. - # Compute the gradients. - return gradients -``` - -This function takes training inputs and outputs, the mean image (to subtract off -of the input images), and the current network weights. - -We can parallelize the computation of the gradient over multiple batches by -calling `compute_grad` multiple times in parallel. - -```python -gradient_ids = [] -for i in range(num_workers): - # Choose a random batch and use it to compute the gradient of the loss. - x_id, y_id = batches[np.random.randint(len(batches))] - gradient_ids.append(compute_grad.remote(x_id, y_id, mean_id, weights_id)) -``` diff --git a/examples/alexnet/alexnet.py b/examples/alexnet/alexnet.py deleted file mode 100644 index 3c23ea328..000000000 --- a/examples/alexnet/alexnet.py +++ /dev/null @@ -1,448 +0,0 @@ -# The code for AlexNet is copied and adapted from the TensorFlow repository -# https://github.com/tensorflow/tensorflow/blob/master/tensorflow/models/image/alexnet/alexnet_benchmark.py. - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import ray -import numpy as np -import tarfile, io -import boto3 -import PIL.Image as Image -import tensorflow as tf - -import ray.array.remote as ra - -STDDEV = 0.001 # The standard deviation of the network weight initialization. - -def load_chunk(tarfile, size=None): - """Load a number of images from a single imagenet .tar file. - - This function also converts the image from grayscale to RGB if necessary. - - Args: - tarfile (tarfile.TarFile): The archive from which the files get loaded. - size (Optional[Tuple[int, int]]): Resize the image to this size if provided. - - Returns: - numpy.ndarray: Contains the image data in format [batch, w, h, c] - """ - result = [] - filenames = [] - for member in tarfile.getmembers(): - filename = member.path - content = tarfile.extractfile(member) - img = Image.open(content) - rgbimg = Image.new("RGB", img.size) - rgbimg.paste(img) - if size != None: - rgbimg = rgbimg.resize(size, Image.ANTIALIAS) - result.append(np.array(rgbimg).reshape(1, rgbimg.size[0], rgbimg.size[1], 3)) - filenames.append(filename) - return np.concatenate(result), filenames - -@ray.remote(num_return_vals=2) -def load_tarfile_from_s3(bucket, s3_key, size=[]): - """Load an imagenet .tar file. - - Args: - bucket (str): Bucket holding the imagenet .tar. - s3_key (str): s3 key from which the .tar file is loaded. - size (List[int]): Resize the image to this size if size != []; len(size) == 2 required. - - Returns: - np.ndarray: The image data (see load_chunk). - """ - s3 = boto3.client("s3") - response = s3.get_object(Bucket=bucket, Key=s3_key) - output = io.BytesIO() - chunk = response["Body"].read(1024 * 8) - while chunk: - output.write(chunk) - chunk = response["Body"].read(1024 * 8) - output.seek(0) # go to the beginning of the .tar file - tar = tarfile.open(mode="r", fileobj=output) - return load_chunk(tar, size=size if size != [] else None) - -def load_tarfiles_from_s3(bucket, s3_keys, size=[]): - """Load a number of imagenet .tar files. - - Args: - bucket (str): Bucket holding the imagenet .tars. - s3_keys (List[str]): List of s3 keys from which the .tar files are being - loaded. - size (List[int]): Resize the image to this size if size does not equal []. - The length of size must be 2. - - Returns: - np.ndarray: Contains object IDs to the chunks of the images (see load_chunk). - """ - - return [load_tarfile_from_s3.remote(bucket, s3_key, size) for s3_key in s3_keys] - -def setup_variables(params, placeholders, kernelshape, biasshape): - """Create the variables for each layer. - - Args: - params (List): Network parameters used for creating feed_dicts - placeholders (List): Placeholders used for feeding weights into - kernelshape (List): Shape of the kernel used for the conv layer - biasshape (List): Shape of the bias used - - Returns: - None - """ - kernel = tf.Variable(tf.truncated_normal(kernelshape, stddev=STDDEV)) - biases = tf.Variable(tf.constant(0.0, shape=biasshape, dtype=tf.float32), - trainable=True, name='biases') - kernel_new = tf.placeholder(tf.float32, shape=kernel.get_shape()) - biases_new = tf.placeholder(tf.float32, shape=biases.get_shape()) - update_kernel = kernel.assign(kernel_new) - update_biases = biases.assign(biases_new) - params += [kernel, biases] - placeholders += [kernel_new, biases_new] - -def conv_layer(parameters, prev_layer, shape, scope): - """Constructs a convolutional layer for the network. - - Args: - parameters (List): Parameters used in constructing layer. - prevlayer (Tensor): The previous layer to connect the network together. - shape (List): The strides used for convolution - scope (Scope): Current scope of tensorflow - - Returns: - Tensor: Activation of layer - """ - kernel = parameters[-2] - bias = parameters[-1] - conv = tf.nn.conv2d(prev_layer, kernel, shape, padding='SAME') - add_bias = tf.nn.bias_add(conv, bias) - return tf.nn.relu(add_bias, name=scope) - -def net_initialization(): - images = tf.placeholder(tf.float32, shape=[None, 224, 224, 3]) - y_true = tf.placeholder(tf.float32, shape=[None, 1000]) - parameters = [] - placeholders = [] - # conv1 - with tf.name_scope('conv1') as scope: - setup_variables(parameters, placeholders, [11, 11, 3, 96], [96]) - conv1 = conv_layer(parameters, images, [1, 4, 4, 1], scope) - - # pool1 - pool1 = tf.nn.max_pool(conv1, - ksize=[1, 3, 3, 1], - strides=[1, 2, 2, 1], - padding='VALID', - name='pool1') - - # lrn1 - pool1_lrn = tf.nn.lrn(pool1, depth_radius=5, bias=1.0, - alpha=0.0001, beta=0.75, - name="LocalResponseNormalization") - - # conv2 - with tf.name_scope('conv2') as scope: - setup_variables(parameters, placeholders, [5, 5, 96, 256], [256]) - conv2 = conv_layer(parameters, pool1_lrn, [1, 1, 1, 1], scope) - - pool2 = tf.nn.max_pool(conv2, - ksize=[1, 3, 3, 1], - strides=[1, 2, 2, 1], - padding='VALID', - name='pool2') - - # lrn2 - pool2_lrn = tf.nn.lrn(pool2, depth_radius=5, bias=1.0, - alpha=0.0001, beta=0.75, - name="LocalResponseNormalization") - - # conv3 - with tf.name_scope('conv3') as scope: - setup_variables(parameters, placeholders, [3, 3, 256, 384], [384]) - conv3 = conv_layer(parameters, pool2_lrn, [1, 1, 1, 1], scope) - - # conv4 - with tf.name_scope('conv4') as scope: - setup_variables(parameters, placeholders, [3, 3, 384, 384], [384]) - conv4 = conv_layer(parameters, conv3, [1, 1, 1, 1], scope) - - # conv5 - with tf.name_scope('conv5') as scope: - setup_variables(parameters, placeholders, [3, 3, 384, 256], [256]) - conv5 = conv_layer(parameters, conv4, [1, 1, 1, 1], scope) - - # pool5 - pool5 = tf.nn.max_pool(conv5, - ksize=[1, 3, 3, 1], - strides=[1, 2, 2, 1], - padding='VALID', - name='pool5') - - # lrn5 - pool5_lrn = tf.nn.lrn(pool5, depth_radius=5, bias=1.0, - alpha=0.0001, beta=0.75, - name="LocalResponseNormalization") - - dropout = tf.placeholder(tf.float32) - - with tf.name_scope('fc1') as scope: - n_input = int(np.prod(pool5_lrn.get_shape().as_list()[1:])) - setup_variables(parameters, placeholders, [n_input, 4096], [4096]) - fc_in = tf.reshape(pool5_lrn, [-1, n_input]) - fc_layer1 = tf.nn.tanh(tf.nn.bias_add(tf.matmul(fc_in, parameters[-2]), parameters[-1])) - fc_out1 = tf.nn.dropout(fc_layer1, dropout) - - with tf.name_scope('fc2') as scope: - n_input = int(np.prod(fc_out1.get_shape().as_list()[1:])) - setup_variables(parameters, placeholders, [n_input, 4096], [4096]) - fc_in = tf.reshape(fc_out1, [-1, n_input]) - fc_layer2 = tf.nn.tanh(tf.nn.bias_add(tf.matmul(fc_in, parameters[-2]), parameters[-1])) - fc_out2 = tf.nn.dropout(fc_layer2, dropout) - - with tf.name_scope('fc3') as scope: - n_input = int(np.prod(fc_out2.get_shape().as_list()[1:])) - setup_variables(parameters, placeholders, [n_input, 1000], [1000]) - fc_in = tf.reshape(fc_out2, [-1, n_input]) - fc_layer3 = tf.nn.softmax(tf.nn.bias_add(tf.matmul(fc_in, parameters[-2]), parameters[-1])) - - y_pred = fc_layer3 / tf.reduce_sum(fc_layer3, - reduction_indices=len(fc_layer3.get_shape()) - 1, - keep_dims=True) - # manual computation of crossentropy - y_pred = tf.clip_by_value(y_pred, tf.cast(1e-10, dtype=tf.float32), - tf.cast(1. - 1e-10, dtype=tf.float32)) - cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_true * tf.log(y_pred), - reduction_indices=len(y_pred.get_shape()) - 1)) - opt = tf.train.MomentumOptimizer(learning_rate=0.01, momentum=0.9) # Any other optimizier can be placed here - correct_pred = tf.equal(tf.argmax(y_pred, 1), tf.argmax(y_true, 1)) - accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32)) - - comp_grads = opt.compute_gradients(cross_entropy, parameters) - - application = opt.apply_gradients(zip(placeholders, parameters)) - sess = tf.Session() - init_all_variables = tf.initialize_all_variables() - - # In order to set the weights of the TensorFlow graph on a worker, we add - # assignment nodes. To get the network weights (as a list of numpy arrays) - # and to set the network weights (from a list of numpy arrays), use the - # methods get_weights and set_weights. This can be done from within a remote - # function or on the driver. - def get_and_set_weights_methods(): - assignment_placeholders = [] - assignment_nodes = [] - for var in tf.trainable_variables(): - assignment_placeholders.append(tf.placeholder(var.value().dtype, var.get_shape().as_list())) - assignment_nodes.append(var.assign(assignment_placeholders[-1])) - - def get_weights(): - return [v.eval(session=sess) for v in tf.trainable_variables()] - - def set_weights(new_weights): - sess.run(assignment_nodes, feed_dict={p: w for p, w in zip(assignment_placeholders, new_weights)}) - - return get_weights, set_weights - - get_weights, set_weights = get_and_set_weights_methods() - - return comp_grads, sess, application, accuracy, images, y_true, dropout, placeholders, init_all_variables, get_weights, set_weights - - -def net_reinitialization(net_vars): - return net_vars - -@ray.remote -def num_images(batches): - """Counts number of images in batches. - - Args: - batches (List): Collection of batches of images and labels. - - Returns: - int: The number of images - """ - shape_ids = [ra.shape.remote(batch) for batch in batches] - return sum([shape[0] for shape in ray.get(shape_ids)]) - -@ray.remote -def compute_mean_image(batches): - """Computes the mean image given a list of batches of images. - - Args: - batches (List[ObjectID]): A list of batches of images. - - Returns: - ndarray: The mean image - """ - if len(batches) == 0: - raise Exception("No images were passed into `compute_mean_image`.") - sum_image_ids = [ra.sum.remote(batch, axis=0) for batch in batches] - n_images = num_images.remote(batches) - return np.sum(ray.get(sum_image_ids), axis=0).astype("float64") / ray.get(n_images) - -@ray.remote(num_return_vals=4) -def shuffle_arrays(first_images, first_labels, second_images, second_labels): - """Shuffles the images and labels from two batches. - - Args: - first_images (ndarray): First batch of images. - first_labels (ndarray): First batch of labels. - second_images (ndarray): Second batch of images. - second_labels (ndarray): Second batch of labels. - - Returns: - ndarray: First batch of shuffled images. - ndarray: First batch of shuffled labels. - ndarray: Second bach of shuffled images. - ndarray: Second batch of shuffled labels. - """ - images = np.concatenate((first_images, second_images)) - labels = np.concatenate((first_labels, second_labels)) - total_length = len(images) - first_len = len(first_images) - random_indices = np.random.permutation(total_length) - new_first_images = images[random_indices[0:first_len]] - new_first_labels = labels[random_indices[0:first_len]] - new_second_images = images[random_indices[first_len:total_length]] - new_second_labels = labels[random_indices[first_len:total_length]] - return new_first_images, new_first_labels, new_second_images, new_second_labels - -def shuffle_pair(first_batch, second_batch): - """Shuffle two batches of data. - - Args: - first_batch (Tuple[ObjectID. ObjectID]): The first batch to be shuffled. The - first component is the object ID of a batch of images, and the second - component is the object ID of the corresponding batch of labels. - second_batch (Tuple[ObjectID, ObjectID]): The second batch to be shuffled. - The first component is the object ID of a batch of images, and the second - component is the object ID of the corresponding batch of labels. - - Returns: - Tuple[ObjectID, ObjectID]: The first batch of shuffled data. - Tuple[ObjectID, ObjectID]: Two second bach of shuffled data. - """ - images1, labels1, images2, labels2 = shuffle_arrays.remote(first_batch[0], first_batch[1], second_batch[0], second_batch[1]) - return (images1, labels1), (images2, labels2) - -@ray.remote -def filenames_to_labels(filenames, filename_label_dict): - """Converts filename strings to integer labels. - - Args: - filenames (List[str]): The filenames of the images. - filename_label_dict (Dict[str, int]): A dictionary mapping filenames to - integer labels. - - Returns: - ndarray: Integer labels - """ - return np.asarray([int(filename_label_dict[filename]) for filename in filenames]) - -def one_hot(x): - """Converts integer labels to one hot vectors. - - Args: - x (int): Index to be set to one - - Returns: - ndarray: One hot vector. - """ - zero = np.zeros([1000]) - zero[x] = 1.0 - return zero - -def crop_images(images): - """Randomly crop a batch of images. - - This is used to generate many slightly different images from each training - example. - - Args: - images (ndarray): A batch of images to crop. The shape of images should be - batch_size x height x width x channels. - - Returns: - ndarray: A batch of cropped images. - """ - original_height = 256 - original_width = 256 - cropped_height = 224 - cropped_width = 224 - height_offset = np.random.randint(original_height - cropped_height + 1) - width_offset = np.random.randint(original_width - cropped_width + 1) - return images[:, height_offset:(height_offset + cropped_height), width_offset:(width_offset + cropped_width), :] - -def shuffle(batches): - """Shuffle the data. - - This method groups the batches together in pairs and within each pair shuffles - the data between the two members. - - Args: - batches (List[Tuple[ObjectID, ObjectID]]): This is a list of tuples, where - each tuple consists of two object IDs. The first component is an object ID - for a batch of images, and the second component is an object ID for the - corresponding batch of labels. - - Returns: - List[Tuple[ObjectID, ObjectID]]: The shuffled data. - """ - # Randomly permute the order of the batches. - permuted_batches = np.random.permutation(batches) - new_batches = [] - for i in range(len(batches) // 2): - # Swap data between consecutive batches. - shuffled_batch1, shuffled_batch2 = shuffle_pair(permuted_batches[2 * i], permuted_batches[2 * i + 1]) - new_batches += [shuffled_batch1, shuffled_batch2] - if len(batches) % 2 == 1: - # If there is an odd number of batches, don't forget the last one. - new_batches.append(permuted_batches[-1]) - return new_batches - -@ray.remote -def compute_grad(X, Y, mean, weights): - """Computes the gradient of the network. - - Args: - X (ndarray): Numpy array of images in the form of [224, 224,3] - Y (ndarray): Labels corresponding to each image - mean (ndarray): Mean image to subtract from images - weights (List[ndarray]): The network weights. - - Returns: - List of gradients for each variable - """ - comp_grads, sess, _, _, images, y_true, dropout, placeholders, _, get_weights, set_weights = ray.reusables.net_vars - # Set the network weights. - set_weights(weights) - # Choose a subset of the batch to compute on and crop the images. - random_indices = np.random.randint(0, len(X), size=128) - subset_X = crop_images(X[random_indices] - mean) - subset_Y = np.asarray([one_hot(label) for label in Y[random_indices]]) - - # Compute the gradients. - return sess.run([g for (g, v) in comp_grads], feed_dict={images: subset_X, y_true: subset_Y, dropout: 0.5}) - -@ray.remote -def compute_accuracy(X, Y, weights): - """Returns the accuracy of the network - - Args: - X (ndarray): A batch of images. - Y (ndarray): A batch of labels. - weights (List[ndarray]): The network weights. - - Returns: - The accuracy of the network on the given batch. - """ - _, sess, _, accuracy, images, y_true, dropout, placeholders, _, get_weights, set_weights = ray.reusables.net_vars - # Set the network weights. - set_weights(weights) - - one_hot_Y = np.asarray([one_hot(label) for label in Y]) - cropped_X = crop_images(X) - return sess.run(accuracy, feed_dict={images: cropped_X, y_true: one_hot_Y, dropout: 1.0}) diff --git a/examples/alexnet/driver.py b/examples/alexnet/driver.py deleted file mode 100644 index 8182264c8..000000000 --- a/examples/alexnet/driver.py +++ /dev/null @@ -1,98 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import numpy as np -import ray -import os -import argparse -import boto3 - -import alexnet - -# Arguments to specify where the imagenet data is stored. -parser = argparse.ArgumentParser(description="Run the AlexNet example.") -parser.add_argument("--s3-bucket", required=True, type=str, help="Name of the bucket that contains the image data.") -parser.add_argument("--key-prefix", default="ILSVRC2012_img_train/n015", type=str, help="Prefix for files to fetch.") -parser.add_argument("--label-file", default="train.txt", type=str, help="File containing labels.") - -if __name__ == "__main__": - args = parser.parse_args() - - ray.init(start_ray_local=True, num_workers=10) - - # Note we do not do sess.run(tf.initialize_all_variables()) because that would - # result in a different initialization on each worker. Instead, we initialize - # the weights on the driver and load the weights on the workers every time we - # compute a gradient. - ray.reusables.net_vars = ray.Reusable(alexnet.net_initialization, alexnet.net_reinitialization) - - # Prepare keys for downloading the data. - s3_resource = boto3.resource("s3") - imagenet_bucket = s3_resource.Bucket(args.s3_bucket) - objects = imagenet_bucket.objects.filter(Prefix=args.key_prefix) - image_tar_files = [str(obj.key) for obj in objects.all()] - print("Images will be downloaded from {} files.".format(len(image_tar_files))) - - # Downloading the label file, and create a dictionary mapping the filenames of - # the images to their labels. - s3_client = boto3.client("s3") - label_file = s3_client.get_object(Bucket=args.s3_bucket, Key=args.label_file) - filename_label_str = label_file["Body"].read().strip().split("\n") - filename_label_pairs = [line.split(" ") for line in filename_label_str] - filename_label_dict = dict([(os.path.basename(name), label) for name, label in filename_label_pairs]) - filename_label_dict_id = ray.put(filename_label_dict) - print("Labels extracted.") - - # Download the imagenet dataset. - imagenet_data = alexnet.load_tarfiles_from_s3(args.s3_bucket, image_tar_files, [256, 256]) - - # Convert the parsed filenames to integer labels and create batches. - batches = [(images, alexnet.filenames_to_labels.remote(filenames, filename_label_dict_id)) for images, filenames in imagenet_data] - - # Compute the mean image. - mean_id = alexnet.compute_mean_image.remote([images for images, labels in batches]) - - # The data does not start out shuffled. Images of the same class all appear - # together, so we shuffle it ourselves here. Each shuffle pairs up the batches - # and swaps data within a pair. - num_shuffles = 5 - for i in range(num_shuffles): - batches = alexnet.shuffle(batches) - - _, sess, application, _, _, _, _, placeholders, init_all_variables, get_weights, set_weights = ray.reusables.net_vars - # Initialize the network and optimizer weights. This is only run once on the - # driver. We initialize the weights manually on the workers. - sess.run(init_all_variables) - print("Initialized network weights.") - - iteration = 0 - while True: - # Extract weights from the local copy of the network. - weights = get_weights() - # Put weights in the object store. - weights_id = ray.put(weights) - - # Compute the accuracy on a random training batch. - x_id, y_id = batches[np.random.randint(len(batches))] - accuracy = alexnet.compute_accuracy.remote(x_id, y_id, weights_id) - - # Launch tasks in parallel to compute the gradients for some batches. - gradient_ids = [] - num_batches = 4 - for i in range(num_batches): - # Choose a random batch and use it to compute the gradient of the loss. - x_id, y_id = batches[np.random.randint(len(batches))] - gradient_ids.append(alexnet.compute_grad.remote(x_id, y_id, mean_id, weights_id)) - - # Print the accuracy on a random training batch. - print("Iteration {}: accuracy = {:.3}%".format(iteration, 100 * ray.get(accuracy))) - - # Fetch the gradients. This blocks until the gradients have been computed. - gradient_sets = ray.get(gradient_ids) - # Average the gradients over all of the tasks. - mean_gradients = [np.mean([gradient_set[i] for gradient_set in gradient_sets], axis=0) for i in range(len(weights))] - # Use the gradients to update the network. - sess.run(application, feed_dict=dict(zip(placeholders, mean_gradients))) - - iteration += 1 diff --git a/examples/hyperopt/driver.py b/examples/hyperopt/driver.py index 2d56369b8..4b20b1eea 100644 --- a/examples/hyperopt/driver.py +++ b/examples/hyperopt/driver.py @@ -21,7 +21,7 @@ parser.add_argument("--steps", default=10, type=int, help="The number of steps o if __name__ == "__main__": args = parser.parse_args() - ray.init(start_ray_local=True, num_workers=10) + ray.init(num_workers=10) # The number of sets of random hyperparameters to try. trials = args.trials diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index 115918f06..f74af6ddd 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -11,7 +11,7 @@ import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data if __name__ == "__main__": - ray.init(start_ray_local=True, num_workers=10) + ray.init(num_workers=10) # Define the dimensions of the data and of the model. image_dimension = 784 diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py index 693097df9..ad7116f2d 100644 --- a/examples/rl_pong/driver.py +++ b/examples/rl_pong/driver.py @@ -110,7 +110,7 @@ def compute_gradient(model): return policy_backward(eph, epx, epdlogp, model), reward_sum if __name__ == "__main__": - ray.init(start_ray_local=True, num_workers=10) + ray.init(num_workers=10) # Run the reinforcement learning running_reward = None diff --git a/examples/trpo/README.md b/examples/trpo/README.md deleted file mode 100644 index 019a4e236..000000000 --- a/examples/trpo/README.md +++ /dev/null @@ -1,118 +0,0 @@ -# Parallelizing TRPO - -In this example, we show how TRPO can be parallelized using Ray. We will be -working with John Schulman's -[modular_rl code](https://github.com/joschu/modular_rl). - -For this tutorial I'll assume that you have Anaconda with Python 2.7 installed. - -## Setting up the single core implementation of TRPO - -First, we will run the original TRPO code. - -Install these dependencies: - -- [Gym](https://gym.openai.com/) -- The following Python packages: - - ``` - pip install theano - pip install keras - pip install tabulate - ``` - -Run this code to start an experiment. -``` -git clone https://github.com/joschu/modular_rl -cd modular_rl -export KERAS_BACKEND=theano && ./run_pg.py --env Pong-ram-v0 --agent modular_rl.agentzoo.TrpoAgent --video 0 --n_iter 500 --filter 1 -``` - -**Note: On some versions of Mac OS X, this produces NaNs.** - -On a m4.4xlarge EC2 instance, the first 10 iterations take 106s. - - -Each iteration consists of two phases. In the first phase, the rollouts are -computed (on one core). In the second phase, the objective is optimized, which -makes use of the parallel BLAS library. The code for all of this is in -`modular_rl/modular_rl/core.py`. - -```python -for _ in xrange(cfg["n_iter"]): - # Rollouts ======== - paths = get_paths(env, agent, cfg, seed_iter) - compute_advantage(agent.baseline, paths, gamma=cfg["gamma"], lam=cfg["lam"]) - # VF Update ======== - vf_stats = agent.baseline.fit(paths) - # Pol Update ======== - pol_stats = agent.updater(paths) -``` - -We will now see how this code can be parallelized. - -## Parallelizing TRPO rollouts using Ray - -As a first step, we will parallelize the rollouts. This is done by implementing -a function `do_rollouts_remote` similar to -[do_rollouts_serial](https://github.com/joschu/modular_rl/blob/46a6f9a0d363a7bc1c7325ff17e2eb684612a388/modular_rl/core.py#L137), -which will be called by -[get_paths](https://github.com/joschu/modular_rl/blob/46a6f9a0d363a7bc1c7325ff17e2eb684612a388/modular_rl/core.py#L102) -(called in the above code snippet). - -Check out the parallel version of the TRPO code. - -``` -git clone https://github.com/pcmoritz/modular_rl modular_rl_ray -cd modular_rl_ray -git checkout remote -``` - -You can run the code using -``` -export KERAS_BACKEND=theano && ./run_pg.py --env Pong-ram-v0 --agent modular_rl.agentzoo.TrpoAgent --video 0 --n_iter 500 --filter 1 --remote 1 --n_rollouts 8 -``` - -There are few [changes](https://github.com/joschu/modular_rl/compare/master...pcmoritz:23d3ebc). -As in the [learning to play Pong example](https://github.com/ray-project/ray/tree/master/examples/rl_pong), -we use reusable variables to store the gym environment and the neural network policy. These are -then used in the remote `do_rollout` function to do a remote rollout: - -```python -@ray.remote -def do_rollout(policy, timestep_limit, seed): - # Retrieve the game environment. - env = ray.reusables.env - # Set the environment seed. - env.seed(seed) - # Set the numpy seed. - np.random.seed(seed) - # Retrieve the neural network agent. - agent = ray.reusables.agent - # Set the network weights. - agent.set_from_flat(policy) - return rollout(env, agent, timestep_limit) -``` - -All that is left is to invoke the remote function and collect the paths. - -```python -def do_rollouts_remote(agent, timestep_limit, n_timesteps, n_parallel, seed_iter): - # Put the neural network weights into the object store. - policy = ray.put(agent.get_flat()) - paths = [] - timesteps_sofar = 0 - # Run parallel rollouts until we have enough. - while timesteps_sofar < n_timesteps: - # Launch rollout tasks in parallel. - rollout_ids = [do_rollout.remote(policy, timestep_limit, seed_iter.next()) for i in range(n_parallel)] - for rollout_id in rollout_ids: - # Retrieve the task output from the object store. - path = ray.get(rollout_id) - paths.append(path) - timesteps_sofar += pathlength(path) - return paths -``` - -On the same m4.4xlarge EC2 instance, the first 10 iterations now take 42s instead of -106s. diff --git a/lib/python/ray/array/__init__.py b/lib/python/ray/experimental/array/__init__.py similarity index 100% rename from lib/python/ray/array/__init__.py rename to lib/python/ray/experimental/array/__init__.py diff --git a/lib/python/ray/array/distributed/__init__.py b/lib/python/ray/experimental/array/distributed/__init__.py similarity index 100% rename from lib/python/ray/array/distributed/__init__.py rename to lib/python/ray/experimental/array/distributed/__init__.py diff --git a/lib/python/ray/array/distributed/core.py b/lib/python/ray/experimental/array/distributed/core.py similarity index 99% rename from lib/python/ray/array/distributed/core.py rename to lib/python/ray/experimental/array/distributed/core.py index a307c3953..d885218c4 100644 --- a/lib/python/ray/array/distributed/core.py +++ b/lib/python/ray/experimental/array/distributed/core.py @@ -3,7 +3,7 @@ from __future__ import division from __future__ import print_function import numpy as np -import ray.array.remote as ra +import ray.experimental.array.remote as ra import ray __all__ = ["BLOCK_SIZE", "DistArray", "assemble", "zeros", "ones", "copy", diff --git a/lib/python/ray/array/distributed/linalg.py b/lib/python/ray/experimental/array/distributed/linalg.py similarity index 99% rename from lib/python/ray/array/distributed/linalg.py rename to lib/python/ray/experimental/array/distributed/linalg.py index 87ab4c6a1..45341dddd 100644 --- a/lib/python/ray/array/distributed/linalg.py +++ b/lib/python/ray/experimental/array/distributed/linalg.py @@ -3,7 +3,7 @@ from __future__ import division from __future__ import print_function import numpy as np -import ray.array.remote as ra +import ray.experimental.array.remote as ra import ray from .core import * diff --git a/lib/python/ray/array/distributed/random.py b/lib/python/ray/experimental/array/distributed/random.py similarity index 91% rename from lib/python/ray/array/distributed/random.py rename to lib/python/ray/experimental/array/distributed/random.py index ff3343b18..61aabe21d 100644 --- a/lib/python/ray/array/distributed/random.py +++ b/lib/python/ray/experimental/array/distributed/random.py @@ -3,7 +3,7 @@ from __future__ import division from __future__ import print_function import numpy as np -import ray.array.remote as ra +import ray.experimental.array.remote as ra import ray from .core import * diff --git a/lib/python/ray/array/remote/__init__.py b/lib/python/ray/experimental/array/remote/__init__.py similarity index 100% rename from lib/python/ray/array/remote/__init__.py rename to lib/python/ray/experimental/array/remote/__init__.py diff --git a/lib/python/ray/array/remote/core.py b/lib/python/ray/experimental/array/remote/core.py similarity index 100% rename from lib/python/ray/array/remote/core.py rename to lib/python/ray/experimental/array/remote/core.py diff --git a/lib/python/ray/array/remote/linalg.py b/lib/python/ray/experimental/array/remote/linalg.py similarity index 100% rename from lib/python/ray/array/remote/linalg.py rename to lib/python/ray/experimental/array/remote/linalg.py diff --git a/lib/python/ray/array/remote/random.py b/lib/python/ray/experimental/array/remote/random.py similarity index 100% rename from lib/python/ray/array/remote/random.py rename to lib/python/ray/experimental/array/remote/random.py diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index bb8a1bf95..fad7172b1 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -548,7 +548,7 @@ def check_connected(worker=global_worker): Exception: An exception is raised if the worker is not connected. """ if not worker.connected: - raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(start_ray_local=True, num_workers=1)'.") + raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(num_workers=10)'.") def print_failed_task(task_status): """Print information about failed tasks. @@ -724,7 +724,7 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, # Use the address 127.0.0.1 in local mode. node_ip_address = "127.0.0.1" if node_ip_address is None else node_ip_address # Use 1 worker if num_workers is not provided. - num_workers = 1 if num_workers is None else num_workers + num_workers = 10 if num_workers is None else num_workers # Use 1 local scheduler if num_local_schedulers is not provided. If # existing local schedulers are provided, use that count as # num_local_schedulers. @@ -770,8 +770,8 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker) return address_info -def init(node_ip_address=None, redis_address=None, start_ray_local=False, - object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE): +def init(redis_address=None, node_ip_address=None, object_id_seed=None, + num_workers=None, driver_mode=SCRIPT_MODE): """Either connect to an existing Ray cluster or start one and connect to it. This method handles two cases. Either a Ray cluster already exists and we @@ -780,18 +780,16 @@ def init(node_ip_address=None, redis_address=None, start_ray_local=False, Args: node_ip_address (str): The IP address of the node that we are on. - redis_address (str): The address of the Redis server to connect to. This - should only be provided if start_ray_local is False. - start_ray_local (bool): If True then this will start Redis, a global + redis_address (str): The address of the Redis server to connect to. If this + address is not provided, then this command will start Redis, a global scheduler, a local scheduler, a plasma store, a plasma manager, and some - workers. It will also kill these processes when Python exits. If False, - this will attach to an existing Ray cluster. + workers. It will also kill these processes when Python exits. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different jobs. num_workers (int): The number of workers to start. This is only provided if - start_ray_local is True. + redis_address is not provided. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. @@ -806,9 +804,8 @@ def init(node_ip_address=None, redis_address=None, start_ray_local=False, "node_ip_address": node_ip_address, "redis_address": redis_address, } - return _init(address_info=info, - start_ray_local=start_ray_local, num_workers=num_workers, - driver_mode=driver_mode) + return _init(address_info=info, start_ray_local=(redis_address is None), + num_workers=num_workers, driver_mode=driver_mode) def cleanup(worker=global_worker): """Disconnect the driver, and terminate any processes started in init. diff --git a/numbuf/build.sh b/numbuf/build.sh index 0a47172d2..0d3618d61 100755 --- a/numbuf/build.sh +++ b/numbuf/build.sh @@ -16,6 +16,14 @@ else exit 1 fi +# Build plasma. We currently have to do this because we compile numbuf with the +# HAS_PLASMA flag. +PLASMA_DIR="$ROOT_DIR/../src/plasma" +pushd "$PLASMA_DIR" + make clean + make +popd + mkdir -p "$ROOT_DIR/build" pushd "$ROOT_DIR/build" cmake -DHAS_PLASMA=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" .. diff --git a/test/array_test.py b/test/array_test.py index 30605d213..bb472bc6b 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -12,15 +12,15 @@ import sys if sys.version_info >= (3, 0): from importlib import reload -import ray.array.remote as ra -import ray.array.distributed as da +import ray.experimental.array.remote as ra +import ray.experimental.array.distributed as da class RemoteArrayTest(unittest.TestCase): def testMethods(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(start_ray_local=True) + ray.init(num_workers=1) # test eye object_id = ra.eye.remote(3) @@ -54,7 +54,7 @@ class DistributedArrayTest(unittest.TestCase): def testAssemble(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) diff --git a/test/failure_test.py b/test/failure_test.py index 69e36185e..8cf7c2aa1 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -24,7 +24,7 @@ def wait_for_errors(error_type, num_errors, timeout=10): class FailureTest(unittest.TestCase): def testUnknownSerialization(self): reload(test_functions) - ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) test_functions.test_unknown_type.remote() wait_for_errors(b"TaskError", 1) @@ -35,7 +35,7 @@ class FailureTest(unittest.TestCase): class TaskSerializationTest(unittest.TestCase): def testReturnAndPassUnknownType(self): - ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) class Foo(object): pass @@ -57,7 +57,7 @@ class TaskSerializationTest(unittest.TestCase): class TaskStatusTest(unittest.TestCase): def testFailedTask(self): reload(test_functions) - ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=3, driver_mode=ray.SILENT_MODE) test_functions.throw_exception_fct1.remote() test_functions.throw_exception_fct1.remote() @@ -87,7 +87,7 @@ class TaskStatusTest(unittest.TestCase): ray.worker.cleanup() def testFailImportingRemoteFunction(self): - ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) # This example is somewhat contrived. It should be successfully pickled, and # then it should throw an exception when it is unpickled. This may depend a @@ -115,7 +115,7 @@ class TaskStatusTest(unittest.TestCase): ray.worker.cleanup() def testFailImportingReusableVariable(self): - ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) # This will throw an exception when the reusable variable is imported on the # workers. @@ -131,7 +131,7 @@ class TaskStatusTest(unittest.TestCase): ray.worker.cleanup() def testFailReinitializingVariable(self): - ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) def initializer(): return 0 @@ -149,7 +149,7 @@ class TaskStatusTest(unittest.TestCase): ray.worker.cleanup() def testFailedFunctionToRun(self): - ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) def f(worker): if ray.worker.global_worker.mode == ray.WORKER_MODE: diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index 974803591..85ec735e4 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -18,7 +18,7 @@ class MicroBenchmarkTest(unittest.TestCase): def testTiming(self): reload(test_functions) - ray.init(start_ray_local=True, num_workers=3) + ray.init(num_workers=3) # measure the time required to submit a remote task to the scheduler elapsed_times = [] @@ -88,7 +88,7 @@ class MicroBenchmarkTest(unittest.TestCase): ray.worker.cleanup() def testCache(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) A = np.random.rand(1, 1000000) v = np.random.rand(1000000) diff --git a/test/runtest.py b/test/runtest.py index 5faa44ac1..cdcf2e5b9 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -16,8 +16,8 @@ if sys.version_info >= (3, 0): from importlib import reload import ray.test.test_functions as test_functions -import ray.array.remote as ra -import ray.array.distributed as da +import ray.experimental.array.remote as ra +import ray.experimental.array.distributed as da def assert_equal(obj1, obj2): if type(obj1).__module__ == np.__name__ or type(obj2).__module__ == np.__name__: @@ -129,7 +129,7 @@ except AttributeError: class SerializationTest(unittest.TestCase): def testRecursiveObjects(self): - ray.init(start_ray_local=True, num_workers=0) + ray.init(num_workers=0) class ClassA(object): pass @@ -160,7 +160,7 @@ class SerializationTest(unittest.TestCase): ray.worker.cleanup() def testPassingArgumentsByValue(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) @ray.remote def f(x): @@ -184,7 +184,7 @@ class SerializationTest(unittest.TestCase): class WorkerTest(unittest.TestCase): def testPutGet(self): - ray.init(start_ray_local=True, num_workers=0) + ray.init(num_workers=0) for i in range(100): value_before = i * 10 ** 6 @@ -215,7 +215,7 @@ class WorkerTest(unittest.TestCase): class APITest(unittest.TestCase): def testRegisterClass(self): - ray.init(start_ray_local=True, num_workers=0) + ray.init(num_workers=0) # Check that putting an object of a class that has not been registered # throws an exception. @@ -233,7 +233,7 @@ class APITest(unittest.TestCase): def testKeywordArgs(self): reload(test_functions) - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) x = test_functions.keyword_fct1.remote(1) self.assertEqual(ray.get(x), "1 hello") @@ -270,7 +270,7 @@ class APITest(unittest.TestCase): def testVariableNumberOfArgs(self): reload(test_functions) - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) x = test_functions.varargs_fct1.remote(0, 1, 2) self.assertEqual(ray.get(x), "0 1 2") @@ -284,14 +284,14 @@ class APITest(unittest.TestCase): def testNoArgs(self): reload(test_functions) - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) ray.get(test_functions.no_op.remote()) ray.worker.cleanup() def testDefiningRemoteFunctions(self): - ray.init(start_ray_local=True, num_workers=3) + ray.init(num_workers=3) # Test that we can define a remote function in the shell. @ray.remote @@ -345,13 +345,13 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testGetMultiple(self): - ray.init(start_ray_local=True, num_workers=0) + ray.init(num_workers=0) object_ids = [ray.put(i) for i in range(10)] self.assertEqual(ray.get(object_ids), list(range(10))) ray.worker.cleanup() def testWait(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) @ray.remote def f(delay): @@ -401,7 +401,7 @@ class APITest(unittest.TestCase): ray.reusables.bar.append(1) return ray.reusables.bar - ray.init(start_ray_local=True, num_workers=2) + ray.init(num_workers=2) self.assertEqual(ray.get(use_foo.remote()), 1) self.assertEqual(ray.get(use_foo.remote()), 1) @@ -425,7 +425,7 @@ class APITest(unittest.TestCase): sys.path.append(4) ray.worker.global_worker.run_function_on_all_workers(f) - ray.init(start_ray_local=True, num_workers=2) + ray.init(num_workers=2) @ray.remote def get_state(): @@ -448,7 +448,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testRunningFunctionOnAllWorkers(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) def f(worker_info): sys.path.append("fake_directory") @@ -471,7 +471,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testPassingInfoToAllWorkers(self): - ray.init(start_ray_local=True, num_workers=10) + ray.init(num_workers=10) def f(worker_info): sys.path.append(worker_info) @@ -498,7 +498,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testLoggingAPI(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) def events(): # This is a hack for getting the event log. It is not part of the API. @@ -551,7 +551,7 @@ class PythonModeTest(unittest.TestCase): def testPythonMode(self): reload(test_functions) - ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE) + ray.init(driver_mode=ray.PYTHON_MODE) @ray.remote def f(): @@ -574,7 +574,7 @@ class PythonModeTest(unittest.TestCase): def testReusableVariablesInPythonMode(self): reload(test_functions) - ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE) + ray.init(driver_mode=ray.PYTHON_MODE) def l_init(): return [] @@ -612,7 +612,7 @@ class PythonModeTest(unittest.TestCase): class ReusablesTest(unittest.TestCase): def testReusables(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) # Test that we can add a variable to the key-value store. @@ -688,7 +688,7 @@ class ReusablesTest(unittest.TestCase): ray.worker.cleanup() def testUsingReusablesOnDriver(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) # Test that we can add a variable to the key-value store. @@ -731,7 +731,7 @@ class UtilsTest(unittest.TestCase): # The functionality being tested here is really multi-node functionality, # but this test just uses a single node. - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) source_text = "hello world" diff --git a/test/stress_tests.py b/test/stress_tests.py index 06e9da6cb..ca242ee99 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -65,7 +65,7 @@ class TaskTests(unittest.TestCase): ray.worker.cleanup() def testGettingAndPutting(self): - ray.init(start_ray_local=True, num_workers=1) + ray.init(num_workers=1) for n in range(8): x = np.zeros(10 ** n) diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index da0c0c687..d9ad347bb 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -9,7 +9,7 @@ import ray class TensorFlowTest(unittest.TestCase): def testTensorFlowVariables(self): - ray.init(start_ray_local=True, num_workers=2) + ray.init(num_workers=2) x_data = tf.placeholder(tf.float32, shape=[100]) y_data = tf.placeholder(tf.float32, shape=[100]) diff --git a/webui/.babelrc b/webui/.babelrc deleted file mode 100644 index 86c445f54..000000000 --- a/webui/.babelrc +++ /dev/null @@ -1,3 +0,0 @@ -{ - "presets": ["es2015", "react"] -}