diff --git a/.travis.yml b/.travis.yml index dd912e73a..59b284b81 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,3 +15,4 @@ script: - cd test - python runtest.py - python arrays_test.py + - python datasets_test.py diff --git a/README.md b/README.md index d340b0d39..bb631349b 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,11 @@ Ray is an experimental distributed execution framework with a Python-like programming model. It is under development and not ready for general use. +## Example Code + +### Loading ImageNet +TODO: fill this out. + ## Design Decisions For a description of our design decisions, see diff --git a/data/README.md b/data/README.md new file mode 100644 index 000000000..9f4765b5a --- /dev/null +++ b/data/README.md @@ -0,0 +1,5 @@ +# Data for Ray + +This folder contains data neccessary to run tests, etc. Only very small amounts +of data should be stored here and if a loader for a large dataset is tested, a +miniature version of this dataset should be created. diff --git a/data/mini.tar b/data/mini.tar new file mode 100644 index 000000000..73098dfe5 Binary files /dev/null and b/data/mini.tar differ diff --git a/examples/imagenet/driver.py b/examples/imagenet/driver.py new file mode 100644 index 000000000..cc25bfe58 --- /dev/null +++ b/examples/imagenet/driver.py @@ -0,0 +1,38 @@ +import argparse +import boto3 +import os +import numpy as np +import ray +import ray.services as services +import ray.datasets.imagenet as imagenet + +import functions + +parser = argparse.ArgumentParser(description="Parse information for data loading.") +parser.add_argument("--s3-bucket", type=str, help="Name of the bucket that contains the image data.") +parser.add_argument("--key-prefix", default="ILSVRC2012_img_train/n015", type=str, help="Prefix for files to fetch.") +parser.add_argument("--drop-ipython", default=False, type=bool, help="Drop into IPython at the end?") + +if __name__ == "__main__": + args = parser.parse_args() + test_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "worker.py") + services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=5, worker_path=test_path) + + s3 = boto3.resource("s3") + imagenet_bucket = s3.Bucket(args.s3_bucket) + objects = imagenet_bucket.objects.filter(Prefix=args.key_prefix) + images = [obj.key for obj in objects.all()] + + x = imagenet.load_tarfiles_from_s3(args.s3_bucket, map(str, images), [256, 256]) # TODO(pcm): implement unicode serialization + + mean_image = functions.compute_mean_image(x) + mean_image = ray.pull(mean_image) + + print "The mean image is:" + print mean_image + + if args.drop_ipython: + import IPython + IPython.embed() + + services.cleanup() diff --git a/examples/imagenet/functions.py b/examples/imagenet/functions.py new file mode 100644 index 000000000..5efe3da7e --- /dev/null +++ b/examples/imagenet/functions.py @@ -0,0 +1,18 @@ +import numpy as np +from typing import List +import ray +import ray.arrays.remote as ra + +@ray.remote([List[ray.ObjRef]], [int]) +def num_images(batches): + shape_refs = [ra.shape(batch) for batch in batches] + return sum([ray.pull(shape_ref)[0] for shape_ref in shape_refs]) + +@ray.remote([List[ray.ObjRef]], [np.ndarray]) +def compute_mean_image(batches): + if len(batches) == 0: + raise Exception("No images were passed into `compute_mean_image`.") + sum_image_refs = [ra.sum(batch, axis=0) for batch in batches] + sum_images = [ray.pull(ref) for ref in sum_image_refs] + n_images = num_images(batches) + return np.sum(sum_images, axis=0).astype("float64") / ray.pull(n_images) diff --git a/examples/imagenet/worker.py b/examples/imagenet/worker.py new file mode 100644 index 000000000..b34c36af1 --- /dev/null +++ b/examples/imagenet/worker.py @@ -0,0 +1,33 @@ +import sys +import argparse +import numpy as np + +import ray.datasets.imagenet + +import ray +import ray.services as services +import ray.worker as worker +import ray.arrays.remote as ra +import ray.arrays.distributed as da + +import functions + +parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") +parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") +parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") +parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address") + +if __name__ == "__main__": + args = parser.parse_args() + worker.connect(args.scheduler_address, args.objstore_address, args.worker_address) + + ray.register_module(ray.datasets.imagenet) + ray.register_module(functions) + ray.register_module(ra) + ray.register_module(ra.random) + ray.register_module(ra.linalg) + ray.register_module(da) + ray.register_module(da.random) + ray.register_module(da.linalg) + + worker.main_loop() diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 9f8f3132d..182735e35 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -1,3 +1,4 @@ import libraylib as lib import serialization from worker import scheduler_info, register_module, connect, disconnect, pull, push, remote +from libraylib import ObjRef diff --git a/lib/python/ray/arrays/distributed/linalg.py b/lib/python/ray/arrays/distributed/linalg.py index 909a1b4d6..257a68da7 100644 --- a/lib/python/ray/arrays/distributed/linalg.py +++ b/lib/python/ray/arrays/distributed/linalg.py @@ -176,7 +176,7 @@ def qr(a): for r in range(i, a.num_blocks[0]): y_ri = y_val.objrefs[r - i, 0] W_rcs.append(qr_helper2(y_ri, a_work.objrefs[r, c])) - W_c = ra.sum(0, *W_rcs) + W_c = ra.linalg.sum_list(*W_rcs) for r in range(i, a.num_blocks[0]): y_ri = y_val.objrefs[r - i, 0] A_rc = qr_helper1(a_work.objrefs[r, c], y_ri, t, W_c) diff --git a/lib/python/ray/arrays/remote/core.py b/lib/python/ray/arrays/remote/core.py index c169e7431..65f529c2e 100644 --- a/lib/python/ray/arrays/remote/core.py +++ b/lib/python/ray/arrays/remote/core.py @@ -68,9 +68,9 @@ def add(x1, x2): def subtract(x1, x2): return np.subtract(x1, x2) -@ray.remote([int, np.ndarray], [np.ndarray]) -def sum(axis, *xs): - return np.sum(xs, axis=axis) +@ray.remote([np.ndarray, int], [np.ndarray]) +def sum(x, axis=-1): + return np.sum(x, axis=axis if axis != -1 else None) @ray.remote([np.ndarray], [tuple]) def shape(a): diff --git a/lib/python/ray/arrays/remote/linalg.py b/lib/python/ray/arrays/remote/linalg.py index f79e94684..332394b63 100644 --- a/lib/python/ray/arrays/remote/linalg.py +++ b/lib/python/ray/arrays/remote/linalg.py @@ -86,3 +86,7 @@ def matrix_rank(M): @ray.remote([np.ndarray], [np.ndarray]) def multi_dot(*a): raise NotImplementedError + +@ray.remote([np.ndarray], [np.ndarray]) +def sum_list(*xs): + return np.sum(xs, axis=0) diff --git a/lib/python/ray/datasets/imagenet.py b/lib/python/ray/datasets/imagenet.py new file mode 100644 index 000000000..a8cd70dbe --- /dev/null +++ b/lib/python/ray/datasets/imagenet.py @@ -0,0 +1,71 @@ +import tarfile, io +from typing import List +import PIL.Image +import numpy as np +import boto3 +import ray + +s3 = boto3.client("s3") + +def load_chunk(tarfile, size=None): + """Load a number of images from a single imagenet .tar file. + + This function also converts the image from grayscale to RGB if neccessary. + + Args: + tarfile (tarfile.TarFile): The archive from which the files get loaded. + size (Optional[Tuple[int, int]]): Resize the image to this size if provided. + + Returns: + numpy.ndarray: Contains the image data in format [batch, w, h, c] + """ + + result = [] + for member in tarfile.getmembers(): + filename = member.path + content = tarfile.extractfile(member) + img = PIL.Image.open(content) + rgbimg = PIL.Image.new("RGB", img.size) + rgbimg.paste(img) + if size != None: + rgbimg = rgbimg.resize(size, PIL.Image.ANTIALIAS) + result.append(np.array(rgbimg).reshape(1, rgbimg.size[0], rgbimg.size[1], 3)) + return np.concatenate(result) + +@ray.remote([str, str, List[int]], [np.ndarray]) +def load_tarfile_from_s3(bucket, s3_key, size=[]): + """Load an imagenet .tar file. + + Args: + bucket (str): Bucket holding the imagenet .tar. + s3_key (str): s3 key from which the .tar file is loaded. + size (List[int]): Resize the image to this size if size != []; len(size) == 2 required. + + Returns: + np.ndarray: The image data (see load_chunk). + """ + + response = s3.get_object(Bucket=bucket, Key=s3_key) + output = io.BytesIO() + chunk = response["Body"].read(1024 * 8) + while chunk: + output.write(chunk) + chunk = response["Body"].read(1024 * 8) + output.seek(0) # go to the beginning of the .tar file + tar = tarfile.open(mode= "r", fileobj=output) + return load_chunk(tar, size=size if size != [] else None) + +@ray.remote([str, List[str], List[int]], [List[ray.ObjRef]]) +def load_tarfiles_from_s3(bucket, s3_keys, size=[]): + """Load a number of imagenet .tar files. + + Args: + bucket (str): Bucket holding the imagenet .tars. + s3_keys (List[str]): List of s3 keys from which the .tar files are being loaded. + size (List[int]): Resize the image to this size if size != []; len(size) == 2 required. + + Returns: + np.ndarray: Contains object references to the chunks of the images (see load_chunk). + """ + + return [load_tarfile_from_s3(bucket, s3_key, size) for s3_key in s3_keys] diff --git a/requirements.txt b/requirements.txt index 6105fd65f..a2bc4a333 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ typing funcsigs subprocess32 +boto3 +botocore +Pillow diff --git a/setup.sh b/setup.sh index 61bee8917..7bea8cceb 100755 --- a/setup.sh +++ b/setup.sh @@ -28,9 +28,9 @@ fi if [[ $platform == "linux" ]]; then sudo apt-get update - sudo apt-get install -y git cmake build-essential autoconf libtool python-dev python-numpy python-pip libboost-all-dev unzip + sudo apt-get install -y git cmake build-essential autoconf libtool python-dev python-numpy python-pip libboost-all-dev unzip libjpeg8-dev elif [[ $platform == "macosx" ]]; then - brew install git cmake autoconf libtool boost + brew install git cmake autoconf libtool boost libjpeg sudo easy_install pip sudo pip install numpy fi diff --git a/test/datasets_test.py b/test/datasets_test.py new file mode 100644 index 000000000..fb0391bbd --- /dev/null +++ b/test/datasets_test.py @@ -0,0 +1,23 @@ +import os +import tarfile +import unittest +import ray +import ray.services as services +import ray.datasets.imagenet as imagenet + +class ImageNetTest(unittest.TestCase): + + def testImageNetLoading(self): + test_dir = os.path.dirname(os.path.abspath(__file__)) + test_path = os.path.join(test_dir, "test_worker.py") + services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=5, worker_path=test_path) + + chunk_name = os.path.join(test_dir, "..", "data", "mini.tar") + tar = tarfile.open(chunk_name, mode= "r") + chunk = imagenet.load_chunk(tar, size=(256, 256)) + self.assertEqual(chunk.shape, (2, 256, 256, 3)) + + services.cleanup() + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_worker.py b/test/test_worker.py index 51d5b4ef6..2b02803d0 100644 --- a/test/test_worker.py +++ b/test/test_worker.py @@ -5,6 +5,7 @@ import numpy as np import test_functions import ray.arrays.remote as ra import ray.arrays.distributed as da +import ray.datasets.imagenet import ray import ray.services as services