diff --git a/README.md b/README.md index 08059a00e..ef71f071e 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ import ray import numpy as np # Start a scheduler, an object store, and some workers. -ray.services.start_ray_local(num_workers=10) +ray.init(start_ray_local=True, num_workers=10) # Define a remote function for estimating pi. @ray.remote([int], [float]) diff --git a/doc/api.rst b/doc/api.rst index b80b6f40b..2d4d74a29 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -5,6 +5,7 @@ The Ray API .. autofunction:: ray.put .. autofunction:: ray.get .. autofunction:: ray.remote +.. autofunction:: ray.init .. autofunction:: ray.kill_workers .. autofunction:: ray.restart_workers_local .. autofunction:: ray.visualize_computation_graph diff --git a/doc/tutorial.md b/doc/tutorial.md index c6f2429cc..2ac5544fd 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -28,7 +28,7 @@ To start Ray, start Python, and run the following commands. ```python import ray -ray.services.start_ray_local(num_workers=10) +ray.init(start_ray_local=True, num_workers=10) ``` That command starts a scheduler, one object store, and ten workers. Each of diff --git a/doc/using-ray-on-a-cluster.md b/doc/using-ray-on-a-cluster.md index cc2b970c3..c15f2ab7b 100644 --- a/doc/using-ray-on-a-cluster.md +++ b/doc/using-ray-on-a-cluster.md @@ -102,14 +102,16 @@ command `cluster.start_ray("~/example_ray_code")`, where the argument is the local path to the directory that contains your Python code. This command will copy this source code to each node and will start the cluster. Each worker that is started will have a local copy of the ~/example_ray_code directory in their -PYTHONPATH. After completing successfully, this command will print out a command -that can be run on the head node to attach a shell (the driver) to the cluster. -For example, - +PYTHONPATH. After completing successfully, you can connect to the ssh to the +head node and attach a shell to the cluster by first running the following code. ``` cd "$RAY_HOME/../user_source_files/example_ray_code"; source "$RAY_HOME/setup-env.sh"; - python "$RAY_HOME/scripts/shell.py" --scheduler-address=12.34.56.789:10001 --objstore-address=12.34.56.789:20001 --worker-address=12.34.56.789:30001 --attach + ``` +Then within Python, run the following. + ```python + import ray + ray.init(scheduler_address="12.34.56.789:10001", objstore_address="12.34.56.789:20001", driver_address="12.34.56.789:30001") ``` 7. Note that there are several more commands that can be run from within diff --git a/examples/alexnet/driver.py b/examples/alexnet/driver.py index a24ff5e21..92eb1f196 100644 --- a/examples/alexnet/driver.py +++ b/examples/alexnet/driver.py @@ -15,7 +15,7 @@ parser.add_argument("--label-file", default="train.txt", type=str, help="File co if __name__ == "__main__": args = parser.parse_args() num_workers = 4 - ray.services.start_ray_local(num_workers=num_workers) + ray.init(start_ray_local=True, num_workers=num_workers) # Note we do not do sess.run(tf.initialize_all_variables()) because that would # result in a different initialization on each worker. Instead, we initialize diff --git a/examples/hyperopt/driver.py b/examples/hyperopt/driver.py index 7c422f948..1e5c9da88 100644 --- a/examples/hyperopt/driver.py +++ b/examples/hyperopt/driver.py @@ -10,7 +10,7 @@ from tensorflow.examples.tutorials.mnist import input_data import hyperopt if __name__ == "__main__": - ray.services.start_ray_local(num_workers=3) + ray.init(start_ray_local=True, num_workers=3) # The number of sets of random hyperparameters to try. trials = 2 diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index 9e3dc2398..cdb578596 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -8,7 +8,7 @@ import tensorflow as tf from tensorflow.examples.tutorials.mnist import input_data if __name__ == "__main__": - ray.services.start_ray_local(num_workers=16) + ray.init(start_ray_local=True, num_workers=16) # Define the dimensions of the data and of the model. image_dimension = 784 diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py index c51f91e5f..331a4ca34 100644 --- a/examples/rl_pong/driver.py +++ b/examples/rl_pong/driver.py @@ -108,7 +108,7 @@ def compute_gradient(model): return policy_backward(eph, epx, epdlogp, model), reward_sum if __name__ == "__main__": - ray.services.start_ray_local(num_workers=10) + ray.init(start_ray_local=True, num_workers=10) # Run the reinforcement learning running_reward = None diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 762b69570..a824efeae 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -21,7 +21,7 @@ if hasattr(ctypes, "windll"): import config import libraylib as lib import serialization -from worker import scheduler_info, visualize_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers, restart_workers_local +from worker import scheduler_info, visualize_computation_graph, task_info, register_module, init, connect, disconnect, get, put, remote, kill_workers, restart_workers_local from worker import Reusable, reusables from libraylib import ObjRef import internal diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 0c9c772bc..bede3f694 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -15,7 +15,7 @@ _services_env["PATH"] = os.pathsep.join([os.path.dirname(os.path.abspath(__file_ # mode. all_processes = [] # drivers is a list of the worker objects corresponding to drivers if -# start_services_local is run with return_drivers=True. +# start_ray_local is run with return_drivers=True. drivers = [] IP_ADDRESS = "127.0.0.1" @@ -189,14 +189,16 @@ def start_workers(scheduler_address, objstore_address, num_workers, worker_path) for _ in range(num_workers): start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False) -def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE): +def start_ray_local(num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE, return_drivers=False): """Start Ray in local mode. This method starts Ray in local mode (as opposed to cluster mode, which is handled by cluster.py). Args: - num_workers (int): The number of workers to start. + num_objstores (int): The number of object stores to start. + num_workers_per_objstore (int): The number of workers to start per object + store. worker_path (str): The path of the source code that will be run by the worker driver_mode: The mode for the driver, this only affects the printing of @@ -205,17 +207,11 @@ def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE in the shell. It should be ray.PYTHON_MODE to run things in a manner equivalent to serial Python code. It should be ray.WORKER_MODE to surpress the printing of error messages. + return_drivers (bool): This should only be True in special cases for tests. """ + global drivers if worker_path is None: worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") - start_services_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=worker_path, driver_mode=driver_mode) - -# This is a helper method which is only used in the tests and should not be -# called by users -def start_services_local(num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE, return_drivers=False): - global drivers - if num_workers_per_objstore > 0 and worker_path is None: - raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore)) if num_workers_per_objstore > 0 and num_objstores < 1: raise Exception("Attempting to start a cluster with {} workers per object store, but `num_objstores` is {}.".format(num_objstores)) scheduler_address = address(IP_ADDRESS, new_scheduler_port()) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index f19603a21..8949f77b9 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -447,7 +447,7 @@ def check_connected(worker=global_worker): Exception: An exception is raised if the worker is not connected. """ if worker.handle is None: - raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.services.start_ray_local(num_workers=1)'.") + raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.init(start_ray_local=True, num_workers=1)'.") def print_failed_task(task_status): """Print information about failed tasks. @@ -505,8 +505,9 @@ def visualize_computation_graph(file_path=None, view=False, worker=global_worker open the result in a viewer. Examples: - In ray/scripts, call "python shell.py" and try the following code. + Try the following code. + >>> import ray.array.distributed as da >>> x = da.zeros([20, 20]) >>> y = da.zeros([20, 20]) >>> z = da.dot(x, y) @@ -552,6 +553,48 @@ def register_module(module, worker=global_worker): _logger().info("registering {}.".format(val.func_name)) worker.register_function(val) +def init(start_ray_local=False, num_workers=None, scheduler_address=None, objstore_address=None, driver_address=None, driver_mode=ray.SCRIPT_MODE): + """Either connect to an existing Ray cluster or start one and connect to it. + + This method handles two cases. Either a Ray cluster already exists and we + just attach this driver to it, or we start all of the processes associated + with a Ray cluster and attach to the newly started cluster. + + Args: + start_ray_local (Optional[bool]): If True then this will start a scheduler + an object store, and some workers. If False, this will attach to an + existing Ray cluster. + num_workers (Optional[int]): The number of workers to start if + start_ray_local is True. + scheduler_address (Optional[str]): The address of the scheduler to connect + to if start_ray_local is False. + objstore_address (Optional[str]): The address of the object store to connect + to if start_ray_local is False. + driver_address (Optional[str]): The address of this driver if + start_ray_local is False. + driver_mode (Optional[bool]): The mode in which to start the driver. This + should be one of ray.SCRIPT_MODE, ray.SHELL_MODE, ray.PYTHON_MODE, and + ray.SILENT_MODE. + + raises: + Exception: An exception is raised if an inappropriate combination of + arguments is passed in. + """ + if start_ray_local: + # In this case, we launch a scheduler, a new object store, and some workers, + # and we connect to them. + if (scheduler_address is not None) or (objstore_address is not None) or (driver_address is not None): + raise Exception("If start_ray_local=True, then you cannot pass in a scheduler_address, objstore_address, or worker_address.") + if driver_mode not in [ray.SCRIPT_MODE, ray.SHELL_MODE, ray.PYTHON_MODE, ray.SILENT_MODE]: + raise Exception("If start_ray_local=True, then driver_mode must be in [ray.SCRIPT_MODE, ray.SHELL_MODE, ray.PYTHON_MODE, ray.SILENT_MODE].") + num_workers = 1 if num_workers is None else num_workers + ray.services.start_ray_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=None, driver_mode=driver_mode) + else: + # In this case, connect to an existing scheduler and object store. + if num_workers is not None: + raise Exception("The argument num_workers must not be provided unless start_ray_local=True.") + connect(scheduler_address, objstore_address, driver_address, is_driver=True, worker=global_worker, mode=driver_mode) + def connect(scheduler_address, objstore_address, worker_address, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE): """Connect this worker to the scheduler and an object store. diff --git a/scripts/cluster.py b/scripts/cluster.py index 1e160da7a..ddf29639e 100644 --- a/scripts/cluster.py +++ b/scripts/cluster.py @@ -169,14 +169,22 @@ class RayCluster(object): start_workers_commands.append(start_workers_command) self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands) - print "cluster started; you can start the shell on the head node with:" setup_env_path = os.path.join(self.installation_directory, "ray/setup-env.sh") - shell_script_path = os.path.join(self.installation_directory, "ray/scripts/shell.py") print """ - cd "{}"; - source "{}"; - python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach - """.format(remote_user_source_directory, setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0]) + The cluster has been started. You can attach to the cluster by sshing to the head node with the following command. + + ssh -i {} {}@{} + + Then run the following commands. + + cd {} + source {} + + Then within a Python interpreter, run the following commands. + + import ray + ray.init(scheduler_address="{}:10001", objstore_address="{}:20001", driver_address="{}:30001") + """.format(self.key_file, self.username, self.node_ip_addresses[0], remote_user_source_directory, setup_env_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0]) def stop_ray(self): """Kill all of the processes in the Ray cluster. diff --git a/scripts/shell.py b/scripts/shell.py deleted file mode 100755 index 8b4cc4acc..000000000 --- a/scripts/shell.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python - -import os -import sys -import numpy as np - -import ray - -def main(argv): - DEFAULT_NUM_WORKERS = 1 - DEFAULT_WORKER_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default_worker.py") - - import argparse # No need for this to be global - parser = argparse.ArgumentParser(description="Parse shell options") - parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") - parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") - parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address") - parser.add_argument("--attach", action="store_true", help="If true, attach the shell to an already running cluster. If false, start a new cluster.") - parser.add_argument("--worker-path", type=str, help="Path to the worker script") - parser.add_argument("--num-workers", type=int, help="Number of workers to start") - - args, unknown_args = parser.parse_known_args(argv) - if args.attach: - assert args.worker_path is None, "when attaching, no new worker can be started" - assert args.num_workers is None, "when attaching, no new worker can be started" - ray.worker.connect(args.scheduler_address, args.objstore_address, args.worker_address, is_driver=True, mode=ray.SHELL_MODE) - else: - ray.services.start_ray_local(num_workers=args.num_workers if not args.num_workers is None else DEFAULT_NUM_WORKERS, - worker_path=args.worker_path if not args.worker_path is None else DEFAULT_WORKER_PATH, - driver_mode=ray.SHELL_MODE) - return unknown_args - -if __name__ == "__main__": - import IPython - IPython.terminal.ipapp.launch_new_instance(argv=main(sys.argv[1:]), user_ns=globals()) diff --git a/test/array_test.py b/test/array_test.py index a7f6c6465..411b0efda 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -13,7 +13,7 @@ class RemoteArrayTest(unittest.TestCase): def testMethods(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.services.start_ray_local(num_workers=1) + ray.init(start_ray_local=True) # test eye ref = ra.eye.remote(3) @@ -47,7 +47,7 @@ class DistributedArrayTest(unittest.TestCase): def testSerialization(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.services.start_ray_local() + ray.init(start_ray_local=True, num_workers=0) x = da.DistArray() x.construct([2, 3, 4], np.array([[[ray.put(0)]]])) @@ -61,7 +61,7 @@ class DistributedArrayTest(unittest.TestCase): def testAssemble(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.services.start_ray_local(num_workers=1) + ray.init(start_ray_local=True, num_workers=1) a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) @@ -75,7 +75,7 @@ class DistributedArrayTest(unittest.TestCase): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../scripts/default_worker.py") - ray.services.start_services_local(num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path) + ray.services.start_ray_local(num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path) x = da.zeros.remote([9, 25, 51], "float") self.assertTrue(np.alltrue(ray.get(da.assemble.remote(x)) == np.zeros([9, 25, 51]))) diff --git a/test/memory_leak_deserialize.py b/test/memory_leak_deserialize.py index 9fc7ed937..e6549297d 100644 --- a/test/memory_leak_deserialize.py +++ b/test/memory_leak_deserialize.py @@ -4,7 +4,7 @@ import os import numpy as np import ray -ray.services.start_ray_local(num_workers=1) +ray.init(start_ray_local=True, num_workers=1) d = {"w": np.zeros(1000000)} diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index c3665f5ea..fce32acb0 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -10,7 +10,7 @@ class MicroBenchmarkTest(unittest.TestCase): def testTiming(self): reload(test_functions) - ray.services.start_ray_local(num_workers=3) + ray.init(start_ray_local=True, num_workers=3) # measure the time required to submit a remote task to the scheduler elapsed_times = [] diff --git a/test/runtest.py b/test/runtest.py index 6ec1d4b69..27fd75038 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -53,7 +53,7 @@ class SerializationTest(unittest.TestCase): self.assertEqual(a.dtype, c.dtype) def testSerialize(self): - ray.services.start_ray_local() + ray.init(start_ray_local=True, num_workers=0) for val in RAY_TEST_OBJECTS: self.roundTripTest(val) @@ -93,7 +93,7 @@ class ObjStoreTest(unittest.TestCase): # Test setting up object stores, transfering data between them and retrieving data to a client def testObjStore(self): - [w1, w2] = ray.services.start_services_local(return_drivers=True, num_objstores=2, num_workers_per_objstore=0) + [w1, w2] = ray.services.start_ray_local(return_drivers=True, num_objstores=2, num_workers_per_objstore=0) # putting and getting an object shouldn't change it for data in ["h", "h" * 10000, 0, 0.0]: @@ -148,7 +148,7 @@ class ObjStoreTest(unittest.TestCase): class WorkerTest(unittest.TestCase): def testPutGet(self): - ray.services.start_ray_local() + ray.init(start_ray_local=True, num_workers=0) for i in range(100): value_before = i * 10 ** 6 @@ -180,7 +180,7 @@ class APITest(unittest.TestCase): def testObjRefAliasing(self): reload(test_functions) - ray.services.start_ray_local(num_workers=3, driver_mode=ray.SILENT_MODE) + ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE) ref = test_functions.test_alias_f.remote() self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5]))) @@ -193,7 +193,7 @@ class APITest(unittest.TestCase): def testKeywordArgs(self): reload(test_functions) - ray.services.start_ray_local(num_workers=1) + ray.init(start_ray_local=True, num_workers=1) x = test_functions.keyword_fct1.remote(1) self.assertEqual(ray.get(x), "1 hello") @@ -230,7 +230,7 @@ class APITest(unittest.TestCase): def testVariableNumberOfArgs(self): reload(test_functions) - ray.services.start_ray_local(num_workers=1) + ray.init(start_ray_local=True, num_workers=1) x = test_functions.varargs_fct1.remote(0, 1, 2) self.assertEqual(ray.get(x), "0 1 2") @@ -244,7 +244,7 @@ class APITest(unittest.TestCase): def testNoArgs(self): reload(test_functions) - ray.services.start_ray_local(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) test_functions.no_op.remote() time.sleep(0.2) @@ -265,7 +265,7 @@ class APITest(unittest.TestCase): def testTypeChecking(self): reload(test_functions) - ray.services.start_ray_local(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE) # Make sure that these functions throw exceptions because there return # values do not type check. @@ -280,7 +280,7 @@ class APITest(unittest.TestCase): ray.services.cleanup() def testDefiningRemoteFunctions(self): - ray.services.start_ray_local(num_workers=2) + ray.init(start_ray_local=True, num_workers=2) # Test that we can define a remote function in the shell. @ray.remote([int], [int]) @@ -346,7 +346,7 @@ class APITest(unittest.TestCase): ray.reusables.bar.append(1) return ray.reusables.bar - ray.services.start_ray_local(num_workers=2) + ray.init(start_ray_local=True, num_workers=2) self.assertEqual(ray.get(use_foo.remote()), 1) self.assertEqual(ray.get(use_foo.remote()), 1) @@ -358,7 +358,7 @@ class APITest(unittest.TestCase): class TaskStatusTest(unittest.TestCase): def testFailedTask(self): reload(test_functions) - ray.services.start_ray_local(num_workers=3, driver_mode=ray.SILENT_MODE) + ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE) test_functions.test_alias_f.remote() test_functions.throw_exception_fct1.remote() @@ -409,7 +409,7 @@ class ReferenceCountingTest(unittest.TestCase): reload(test_functions) for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.services.start_ray_local(num_workers=1) + ray.init(start_ray_local=True, num_workers=1) x = test_functions.test_alias_f.remote() ray.get(x) @@ -458,7 +458,7 @@ class ReferenceCountingTest(unittest.TestCase): ray.services.cleanup() def testGet(self): - ray.services.start_ray_local(num_workers=3) + ray.init(start_ray_local=True, num_workers=3) for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]: objref_val = check_get_deallocated(val) @@ -481,8 +481,7 @@ class ReferenceCountingTest(unittest.TestCase): # @unittest.expectedFailure # def testGetFailing(self): - # worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - # ray.services.start_ray_local(num_workers=3, worker_path=worker_path) + # ray.init(start_ray_local=True, num_workers=3) # # This is failing, because for bool and None, we cannot track python # # refcounts and therefore cannot keep the refcount up @@ -500,7 +499,7 @@ class PythonModeTest(unittest.TestCase): def testPythonMode(self): reload(test_functions) - ray.services.start_ray_local(driver_mode=ray.PYTHON_MODE) + ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE) xref = test_functions.test_alias_h.remote() self.assertTrue(np.alltrue(xref == np.ones([3, 4, 5]))) # remote functions should return by value @@ -521,7 +520,7 @@ class PythonModeTest(unittest.TestCase): class PythonCExtensionTest(unittest.TestCase): def testReferenceCountNone(self): - ray.services.start_ray_local(num_workers=1) + ray.init(start_ray_local=True, num_workers=1) # Make sure that we aren't accidentally messing up Python's reference counts. for obj in [None, True, False]: @@ -537,7 +536,7 @@ class PythonCExtensionTest(unittest.TestCase): class ReusablesTest(unittest.TestCase): def testReusables(self): - ray.services.start_ray_local(num_workers=1) + ray.init(start_ray_local=True, num_workers=1) # Test that we can add a variable to the key-value store.