From 13df8302e6d7e8d823509d728ca05b0164aeb216 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 8 Aug 2016 16:01:13 -0700 Subject: [PATCH] enable running example apps in cluster mode (#357) --- examples/alexnet/driver.py | 21 ++++++++---- examples/hyperopt/README.md | 4 +-- examples/hyperopt/driver.py | 24 ++++++++++--- examples/hyperopt/hyperopt.py | 6 ++-- examples/lbfgs/driver.py | 16 +++++++-- examples/rl_pong/driver.py | 15 ++++++++- lib/python/ray/services.py | 63 ++++++++++++++++++----------------- lib/python/ray/worker.py | 4 +-- scripts/cluster.py | 2 +- test/runtest.py | 36 ++++++++++++++++++++ 10 files changed, 139 insertions(+), 52 deletions(-) diff --git a/examples/alexnet/driver.py b/examples/alexnet/driver.py index 457bf56f7..9fc80f513 100644 --- a/examples/alexnet/driver.py +++ b/examples/alexnet/driver.py @@ -7,15 +7,23 @@ import boto3 import alexnet # Arguments to specify where the imagenet data is stored. -parser = argparse.ArgumentParser(description="Parse information for data loading.") +parser = argparse.ArgumentParser(description="Run the AlexNet example.") +parser.add_argument("--node-ip-address", default=None, type=str, help="The IP address of this node.") +parser.add_argument("--scheduler-address", default=None, type=str, help="The address of the scheduler.") parser.add_argument("--s3-bucket", required=True, type=str, help="Name of the bucket that contains the image data.") parser.add_argument("--key-prefix", default="ILSVRC2012_img_train/n015", type=str, help="Prefix for files to fetch.") -parser.add_argument("--label-file", default="train.txt", type=str, help="File containing labels") +parser.add_argument("--label-file", default="train.txt", type=str, help="File containing labels.") if __name__ == "__main__": args = parser.parse_args() - num_workers = 4 - ray.init(start_ray_local=True, num_workers=num_workers) + + # If node_ip_address and scheduler_address are provided, then this command + # will connect the driver to the existing scheduler. If not, it will start + # a local scheduler and connect to it. + ray.init(start_ray_local=(args.node_ip_address is None), + node_ip_address=args.node_ip_address, + scheduler_address=args.scheduler_address, + num_workers=(10 if args.node_ip_address is None else None)) # Note we do not do sess.run(tf.initialize_all_variables()) because that would # result in a different initialization on each worker. Instead, we initialize @@ -38,7 +46,7 @@ if __name__ == "__main__": filename_label_pairs = [line.split(" ") for line in filename_label_str] filename_label_dict = dict([(os.path.basename(name), label) for name, label in filename_label_pairs]) filename_label_dict_id = ray.put(filename_label_dict) - print "Labels extracted" + print "Labels extracted." # Download the imagenet dataset. imagenet_data = alexnet.load_tarfiles_from_s3(args.s3_bucket, image_tar_files, [256, 256]) @@ -75,7 +83,8 @@ if __name__ == "__main__": # Launch tasks in parallel to compute the gradients for some batches. gradient_ids = [] - for i in range(num_workers - 1): + num_batches = 4 + for i in range(num_batches): # Choose a random batch and use it to compute the gradient of the loss. x_id, y_id = batches[np.random.randint(len(batches))] gradient_ids.append(alexnet.compute_grad.remote(x_id, y_id, mean_id, weights_id)) diff --git a/examples/hyperopt/README.md b/examples/hyperopt/README.md index e8cb4e8d4..5ee4eaaa1 100644 --- a/examples/hyperopt/README.md +++ b/examples/hyperopt/README.md @@ -64,7 +64,7 @@ def generate_random_params(): results = [] for _ in range(100): randparams = generate_random_params() - results.append((randparams, train_cnn_and_compute_accuracy(randparams, epochs))) + results.append((randparams, train_cnn_and_compute_accuracy(randparams, train_images, train_labels, validation_images, validation_labels))) ``` Then we can inspect the contents of `results` and see which set of @@ -105,7 +105,7 @@ computation. Instead, it simply submits a number of tasks to the scheduler. result_ids = [] for _ in range(100): params = generate_random_params() - results.append((params, train_cnn_and_compute_accuracy.remote(params, epochs))) + results.append((params, train_cnn_and_compute_accuracy.remote(params, train_images, train_labels, validation_images, validation_labels))) ``` If we wish to wait until the results have all been retrieved, we can retrieve diff --git a/examples/hyperopt/driver.py b/examples/hyperopt/driver.py index c988b29a6..34b9ee749 100644 --- a/examples/hyperopt/driver.py +++ b/examples/hyperopt/driver.py @@ -2,20 +2,34 @@ # https://www.tensorflow.org/versions/r0.9/tutorials/mnist/pros/index.html#build-a-multilayer-convolutional-network import numpy as np import ray -import os +import argparse import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data import hyperopt +parser = argparse.ArgumentParser(description="Run the hyperparameter optimization example.") +parser.add_argument("--node-ip-address", default=None, type=str, help="The IP address of this node.") +parser.add_argument("--scheduler-address", default=None, type=str, help="The address of the scheduler.") +parser.add_argument("--trials", default=2, type=int, help="The number of random trials to do.") +parser.add_argument("--steps", default=10, type=int, help="The number of steps of training to do per network.") + if __name__ == "__main__": - ray.init(start_ray_local=True, num_workers=3) + args = parser.parse_args() + + # If node_ip_address and scheduler_address are provided, then this command + # will connect the driver to the existing scheduler. If not, it will start + # a local scheduler and connect to it. + ray.init(start_ray_local=(args.node_ip_address is None), + node_ip_address=args.node_ip_address, + scheduler_address=args.scheduler_address, + num_workers=(10 if args.node_ip_address is None else None)) # The number of sets of random hyperparameters to try. - trials = 2 + trials = args.trials # The number of training passes over the dataset to use for network. - epochs = 10 + steps = args.steps # Load the mnist data and turn the data into remote objects. print "Downloading the MNIST dataset. This may take a minute." @@ -37,7 +51,7 @@ if __name__ == "__main__": dropout = np.random.uniform(0, 1) stddev = 10 ** np.random.uniform(-5, 5) params = {"learning_rate": learning_rate, "batch_size": batch_size, "dropout": dropout, "stddev": stddev} - results.append((params, hyperopt.train_cnn_and_compute_accuracy.remote(params, epochs, train_images, train_labels, validation_images, validation_labels))) + results.append((params, hyperopt.train_cnn_and_compute_accuracy.remote(params, steps, train_images, train_labels, validation_images, validation_labels))) # Fetch the results of the tasks and print the results. for i in range(trials): diff --git a/examples/hyperopt/hyperopt.py b/examples/hyperopt/hyperopt.py index 5ff1b8ae2..9d7b12a14 100644 --- a/examples/hyperopt/hyperopt.py +++ b/examples/hyperopt/hyperopt.py @@ -52,7 +52,7 @@ def cnn_setup(x, y, keep_prob, lr, stddev): # Define a remote function that takes a set of hyperparameters as well as the # data, consructs and trains a network, and returns the validation accuracy. @ray.remote([dict, int, np.ndarray, np.ndarray, np.ndarray, np.ndarray], [float]) -def train_cnn_and_compute_accuracy(params, epochs, train_images, train_labels, validation_images, validation_labels): +def train_cnn_and_compute_accuracy(params, steps, train_images, train_labels, validation_images, validation_labels): # Extract the hyperparameters from the params dictionary. learning_rate = params["learning_rate"] batch_size = params["batch_size"] @@ -68,7 +68,7 @@ def train_cnn_and_compute_accuracy(params, epochs, train_images, train_labels, v with tf.Session() as sess: # Initialize the network weights. sess.run(tf.initialize_all_variables()) - for i in range(1, epochs): + for i in range(1, steps + 1): # Fetch the next batch of data. image_batch = get_batch(train_images, i, batch_size) label_batch = get_batch(train_labels, i, batch_size) @@ -82,7 +82,7 @@ def train_cnn_and_compute_accuracy(params, epochs, train_images, train_labels, v if train_ac < 0.25: # Compute the validation accuracy and return. totalacc = accuracy.eval(feed_dict={x: validation_images, y: validation_labels, keep_prob: 1.0}) - return totalacc + return float(totalacc) # Training is done, compute the validation accuracy and return. totalacc = accuracy.eval(feed_dict={x: validation_images, y: validation_labels, keep_prob: 1.0}) return float(totalacc) diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index b18afd655..d3556456f 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -1,5 +1,5 @@ -import os import ray +import argparse import numpy as np import scipy.optimize @@ -7,8 +7,20 @@ import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data +parser = argparse.ArgumentParser(description="Run the L-BFGS example.") +parser.add_argument("--node-ip-address", default=None, type=str, help="The IP address of this node.") +parser.add_argument("--scheduler-address", default=None, type=str, help="The address of the scheduler.") + if __name__ == "__main__": - ray.init(start_ray_local=True, num_workers=16) + args = parser.parse_args() + + # If node_ip_address and scheduler_address are provided, then this command + # will connect the driver to the existing scheduler. If not, it will start + # a local scheduler and connect to it. + ray.init(start_ray_local=(args.node_ip_address is None), + node_ip_address=args.node_ip_address, + scheduler_address=args.scheduler_address, + num_workers=(10 if args.node_ip_address is None else None)) # Define the dimensions of the data and of the model. image_dimension = 784 diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py index a83f8c6af..cbec69df3 100644 --- a/examples/rl_pong/driver.py +++ b/examples/rl_pong/driver.py @@ -4,9 +4,14 @@ import numpy as np import cPickle as pickle import ray +import argparse import gym +parser = argparse.ArgumentParser(description="Run the Pong example.") +parser.add_argument("--node-ip-address", default=None, type=str, help="The IP address of this node.") +parser.add_argument("--scheduler-address", default=None, type=str, help="The address of the scheduler.") + # hyperparameters H = 200 # number of hidden layer neurons batch_size = 10 # every how many episodes to do a param update? @@ -108,7 +113,15 @@ def compute_gradient(model): return policy_backward(eph, epx, epdlogp, model), reward_sum if __name__ == "__main__": - ray.init(start_ray_local=True, num_workers=10) + args = parser.parse_args() + + # If node_ip_address and scheduler_address are provided, then this command + # will connect the driver to the existing scheduler. If not, it will start + # a local scheduler and connect to it. + ray.init(start_ray_local=(args.node_ip_address is None), + node_ip_address=args.node_ip_address, + scheduler_address=args.scheduler_address, + num_workers=(10 if args.node_ip_address is None else None)) # Run the reinforcement learning running_reward = None diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 95275cea1..9ab3a9605 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -59,35 +59,35 @@ def cleanup(): print "Ray did not shut down properly." all_processes = [] -def start_scheduler(scheduler_address, local): +def start_scheduler(scheduler_address, cleanup): """This method starts a scheduler process. Args: scheduler_address (str): The ip address and port to use for the scheduler. - local (bool): True if using Ray in local mode. If local is true, then this - process will be killed by serices.cleanup() when the Python process that - imported services exits. + cleanup (bool): True if using Ray in local mode. If cleanup is true, then + this process will be killed by serices.cleanup() when the Python process + that imported services exits. """ p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler.log")], env=_services_env) - if local: + if cleanup: all_processes.append(p) -def start_objstore(scheduler_address, objstore_address, local): +def start_objstore(scheduler_address, objstore_address, cleanup): """This method starts an object store process. Args: scheduler_address (str): The ip address and port of the scheduler to connect to. objstore_address (str): The ip address and port to use for the object store. - local (bool): True if using Ray in local mode. If local is true, then this - process will be killed by serices.cleanup() when the Python process that - imported services exits. + cleanup (bool): True if using Ray in local mode. If cleanup is true, then + this process will be killed by serices.cleanup() when the Python process + that imported services exits. """ p = subprocess.Popen(["objstore", scheduler_address, objstore_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", objstore_address]) + ".log")], env=_services_env) - if local: + if cleanup: all_processes.append(p) -def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, local=True, user_source_directory=None): +def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True, user_source_directory=None): """This method starts a worker process. Args: @@ -98,9 +98,9 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre to. objstore_address (Optional[str]): The ip address and port of the object store to connect to. - local (Optional[bool]): True if using Ray in local mode. If local is true, - then this process will be killed by serices.cleanup() when the Python - process that imported services exits. This is True by default. + cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is + true, then this process will be killed by serices.cleanup() when the + Python process that imported services exits. This is True by default. user_source_directory (Optional[str]): The directory containing the application code. This directory will be added to the path of each worker. If not provided, the directory of the script currently being run is used. @@ -117,32 +117,35 @@ def start_worker(node_ip_address, worker_path, scheduler_address, objstore_addre if objstore_address is not None: command.append("--objstore-address=" + objstore_address) p = subprocess.Popen(command) - if local: + if cleanup: all_processes.append(p) -def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, user_source_directory=None): +def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None, user_source_directory=None, cleanup=False): """Start an object store and associated workers in the cluster setting. This starts an object store and the associated workers when Ray is being used in the cluster setting. This assumes the scheduler has already been started. Args: - scheduler_address (str): ip address and port of the scheduler (which may run - on a different node) - node_ip_address (str): ip address (without port) of the node this function - is run on - num_workers (int): the number of workers to be started on this node - worker_path (str): path of the Python worker script that will be run on the worker - user_source_directory (str): path to the user's code the workers will import - modules from + scheduler_address (str): IP address and port of the scheduler (which may run + on a different node). + node_ip_address (str): IP address (without port) of the node this function + is run on. + num_workers (int): The number of workers to be started on this node. + worker_path (str): Path of the Python worker script that will be run on the + worker. + user_source_directory (str): Path to the user's code the workers will import + modules from. + cleanup (bool): If cleanup is True, then the processes started by this + command will be killed when the process that imported services exits. """ objstore_address = address(node_ip_address, new_objstore_port()) - start_objstore(scheduler_address, objstore_address, local=False) + start_objstore(scheduler_address, objstore_address, cleanup=cleanup) time.sleep(0.2) if worker_path is None: worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") for _ in range(num_workers): - start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, user_source_directory=user_source_directory, local=False) + start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, user_source_directory=user_source_directory, cleanup=cleanup) time.sleep(0.5) def start_workers(scheduler_address, objstore_address, num_workers, worker_path): @@ -163,7 +166,7 @@ def start_workers(scheduler_address, objstore_address, num_workers, worker_path) """ node_ip_address = objstore_address.split(":")[0] for _ in range(num_workers): - start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, local=False) + start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, cleanup=False) def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, worker_path=None): """Start Ray in local mode. @@ -186,14 +189,14 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, if num_workers > 0 and num_objstores < 1: raise Exception("Attempting to start a cluster with {} workers per object store, but `num_objstores` is {}.".format(num_objstores)) scheduler_address = address(node_ip_address, new_scheduler_port()) - start_scheduler(scheduler_address, local=True) + start_scheduler(scheduler_address, cleanup=True) time.sleep(0.1) objstore_addresses = [] # create objstores for i in range(num_objstores): objstore_address = address(node_ip_address, new_objstore_port()) objstore_addresses.append(objstore_address) - start_objstore(scheduler_address, objstore_address, local=True) + start_objstore(scheduler_address, objstore_address, cleanup=True) time.sleep(0.2) if i < num_objstores - 1: num_workers_to_start = num_workers / num_objstores @@ -202,7 +205,7 @@ def start_ray_local(node_ip_address="127.0.0.1", num_objstores=1, num_workers=0, # remaining number of workers. num_workers_to_start = num_workers - (num_objstores - 1) * (num_workers / num_objstores) for _ in range(num_workers_to_start): - start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, local=True) + start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=objstore_address, cleanup=True) time.sleep(0.3) return scheduler_address, objstore_addresses diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index fae66f251..9beed015e 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -654,8 +654,8 @@ def init(start_ray_local=False, num_workers=None, num_objstores=None, scheduler_ # not need to start any processes. if (num_workers is not None) or (num_objstores is not None): raise Exception("The arguments num_workers and num_objstores must not be provided unless start_ray_local=True.") - if node_ip_address is None: - raise Exception("When start_ray_local=False, the node_ip_address of the current node must be provided.") + if (node_ip_address is None) or (scheduler_address is None): + raise Exception("When start_ray_local=False, node_ip_address and scheduler_address must be provided.") # Connect this driver to the scheduler and object store. The corresponing call # to disconnect will happen in the call to cleanup() when the Python script # exits. diff --git a/scripts/cluster.py b/scripts/cluster.py index d3b44c120..f7a8483e0 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -162,7 +162,7 @@ class RayCluster(object): start_scheduler_command = """ cd "{}"; source ../setup-env.sh; - python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\", local=False)" > start_scheduler.out 2> start_scheduler.err < /dev/null & + python -c "import ray; ray.services.start_scheduler(\\\"{}:10001\\\", cleanup=False)" > start_scheduler.out 2> start_scheduler.err < /dev/null & """.format(scripts_directory, self.node_private_ip_addresses[0]) self._run_command_over_ssh(self.node_ip_addresses[0], start_scheduler_command) diff --git a/test/runtest.py b/test/runtest.py index e60b5c66b..c3cb985c3 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -663,5 +663,41 @@ class ReusablesTest(unittest.TestCase): ray.worker.cleanup() +class ClusterAttachingTest(unittest.TestCase): + + def testAttachingToCluster(self): + node_ip_address = "127.0.0.1" + scheduler_port = np.random.randint(40000, 50000) + scheduler_address = "{}:{}".format(node_ip_address, scheduler_port) + ray.services.start_scheduler(scheduler_address, cleanup=True) + ray.services.start_node(scheduler_address, node_ip_address, num_workers=1, cleanup=True) + + ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address) + + @ray.remote([int], [int]) + def f(x): + return x + 1 + self.assertEqual(ray.get(f.remote(0)), 1) + + ray.worker.cleanup() + + def testAttachingToClusterWithMultipleObjectStores(self): + node_ip_address = "127.0.0.1" + scheduler_port = np.random.randint(40000, 50000) + scheduler_address = "{}:{}".format(node_ip_address, scheduler_port) + ray.services.start_scheduler(scheduler_address, cleanup=True) + ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) + ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) + ray.services.start_node(scheduler_address, node_ip_address, num_workers=5, cleanup=True) + + ray.init(node_ip_address=node_ip_address, scheduler_address=scheduler_address) + + @ray.remote([int], [int]) + def f(x): + return x + 1 + self.assertEqual(ray.get(f.remote(0)), 1) + + ray.worker.cleanup() + if __name__ == "__main__": unittest.main()