unify starting local cluster with attaching to existing cluster (#327)

This commit is contained in:
Robert Nishihara
2016-07-31 19:26:35 -07:00
committed by Philipp Moritz
parent 0e5b858324
commit 2040372084
17 changed files with 104 additions and 90 deletions
+1 -1
View File
@@ -18,7 +18,7 @@ import ray
import numpy as np
# Start a scheduler, an object store, and some workers.
ray.services.start_ray_local(num_workers=10)
ray.init(start_ray_local=True, num_workers=10)
# Define a remote function for estimating pi.
@ray.remote([int], [float])
+1
View File
@@ -5,6 +5,7 @@ The Ray API
.. autofunction:: ray.put
.. autofunction:: ray.get
.. autofunction:: ray.remote
.. autofunction:: ray.init
.. autofunction:: ray.kill_workers
.. autofunction:: ray.restart_workers_local
.. autofunction:: ray.visualize_computation_graph
+1 -1
View File
@@ -28,7 +28,7 @@ To start Ray, start Python, and run the following commands.
```python
import ray
ray.services.start_ray_local(num_workers=10)
ray.init(start_ray_local=True, num_workers=10)
```
That command starts a scheduler, one object store, and ten workers. Each of
+7 -5
View File
@@ -102,14 +102,16 @@ command `cluster.start_ray("~/example_ray_code")`, where the argument is
the local path to the directory that contains your Python code. This command will
copy this source code to each node and will start the cluster. Each worker that
is started will have a local copy of the ~/example_ray_code directory in their
PYTHONPATH. After completing successfully, this command will print out a command
that can be run on the head node to attach a shell (the driver) to the cluster.
For example,
PYTHONPATH. After completing successfully, you can connect to the ssh to the
head node and attach a shell to the cluster by first running the following code.
```
cd "$RAY_HOME/../user_source_files/example_ray_code";
source "$RAY_HOME/setup-env.sh";
python "$RAY_HOME/scripts/shell.py" --scheduler-address=12.34.56.789:10001 --objstore-address=12.34.56.789:20001 --worker-address=12.34.56.789:30001 --attach
```
Then within Python, run the following.
```python
import ray
ray.init(scheduler_address="12.34.56.789:10001", objstore_address="12.34.56.789:20001", driver_address="12.34.56.789:30001")
```
7. Note that there are several more commands that can be run from within
+1 -1
View File
@@ -15,7 +15,7 @@ parser.add_argument("--label-file", default="train.txt", type=str, help="File co
if __name__ == "__main__":
args = parser.parse_args()
num_workers = 4
ray.services.start_ray_local(num_workers=num_workers)
ray.init(start_ray_local=True, num_workers=num_workers)
# Note we do not do sess.run(tf.initialize_all_variables()) because that would
# result in a different initialization on each worker. Instead, we initialize
+1 -1
View File
@@ -10,7 +10,7 @@ from tensorflow.examples.tutorials.mnist import input_data
import hyperopt
if __name__ == "__main__":
ray.services.start_ray_local(num_workers=3)
ray.init(start_ray_local=True, num_workers=3)
# The number of sets of random hyperparameters to try.
trials = 2
+1 -1
View File
@@ -8,7 +8,7 @@ import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
if __name__ == "__main__":
ray.services.start_ray_local(num_workers=16)
ray.init(start_ray_local=True, num_workers=16)
# Define the dimensions of the data and of the model.
image_dimension = 784
+1 -1
View File
@@ -108,7 +108,7 @@ def compute_gradient(model):
return policy_backward(eph, epx, epdlogp, model), reward_sum
if __name__ == "__main__":
ray.services.start_ray_local(num_workers=10)
ray.init(start_ray_local=True, num_workers=10)
# Run the reinforcement learning
running_reward = None
+1 -1
View File
@@ -21,7 +21,7 @@ if hasattr(ctypes, "windll"):
import config
import libraylib as lib
import serialization
from worker import scheduler_info, visualize_computation_graph, task_info, register_module, connect, disconnect, get, put, remote, kill_workers, restart_workers_local
from worker import scheduler_info, visualize_computation_graph, task_info, register_module, init, connect, disconnect, get, put, remote, kill_workers, restart_workers_local
from worker import Reusable, reusables
from libraylib import ObjRef
import internal
+7 -11
View File
@@ -15,7 +15,7 @@ _services_env["PATH"] = os.pathsep.join([os.path.dirname(os.path.abspath(__file_
# mode.
all_processes = []
# drivers is a list of the worker objects corresponding to drivers if
# start_services_local is run with return_drivers=True.
# start_ray_local is run with return_drivers=True.
drivers = []
IP_ADDRESS = "127.0.0.1"
@@ -189,14 +189,16 @@ def start_workers(scheduler_address, objstore_address, num_workers, worker_path)
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False)
def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE):
def start_ray_local(num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE, return_drivers=False):
"""Start Ray in local mode.
This method starts Ray in local mode (as opposed to cluster mode, which is
handled by cluster.py).
Args:
num_workers (int): The number of workers to start.
num_objstores (int): The number of object stores to start.
num_workers_per_objstore (int): The number of workers to start per object
store.
worker_path (str): The path of the source code that will be run by the
worker
driver_mode: The mode for the driver, this only affects the printing of
@@ -205,17 +207,11 @@ def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE
in the shell. It should be ray.PYTHON_MODE to run things in a manner
equivalent to serial Python code. It should be ray.WORKER_MODE to surpress
the printing of error messages.
return_drivers (bool): This should only be True in special cases for tests.
"""
global drivers
if worker_path is None:
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py")
start_services_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=worker_path, driver_mode=driver_mode)
# This is a helper method which is only used in the tests and should not be
# called by users
def start_services_local(num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE, return_drivers=False):
global drivers
if num_workers_per_objstore > 0 and worker_path is None:
raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore))
if num_workers_per_objstore > 0 and num_objstores < 1:
raise Exception("Attempting to start a cluster with {} workers per object store, but `num_objstores` is {}.".format(num_objstores))
scheduler_address = address(IP_ADDRESS, new_scheduler_port())
+45 -2
View File
@@ -447,7 +447,7 @@ def check_connected(worker=global_worker):
Exception: An exception is raised if the worker is not connected.
"""
if worker.handle is None:
raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.services.start_ray_local(num_workers=1)'.")
raise Exception("This command cannot be called before a Ray cluster has been started. You can start one with 'ray.init(start_ray_local=True, num_workers=1)'.")
def print_failed_task(task_status):
"""Print information about failed tasks.
@@ -505,8 +505,9 @@ def visualize_computation_graph(file_path=None, view=False, worker=global_worker
open the result in a viewer.
Examples:
In ray/scripts, call "python shell.py" and try the following code.
Try the following code.
>>> import ray.array.distributed as da
>>> x = da.zeros([20, 20])
>>> y = da.zeros([20, 20])
>>> z = da.dot(x, y)
@@ -552,6 +553,48 @@ def register_module(module, worker=global_worker):
_logger().info("registering {}.".format(val.func_name))
worker.register_function(val)
def init(start_ray_local=False, num_workers=None, scheduler_address=None, objstore_address=None, driver_address=None, driver_mode=ray.SCRIPT_MODE):
"""Either connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
just attach this driver to it, or we start all of the processes associated
with a Ray cluster and attach to the newly started cluster.
Args:
start_ray_local (Optional[bool]): If True then this will start a scheduler
an object store, and some workers. If False, this will attach to an
existing Ray cluster.
num_workers (Optional[int]): The number of workers to start if
start_ray_local is True.
scheduler_address (Optional[str]): The address of the scheduler to connect
to if start_ray_local is False.
objstore_address (Optional[str]): The address of the object store to connect
to if start_ray_local is False.
driver_address (Optional[str]): The address of this driver if
start_ray_local is False.
driver_mode (Optional[bool]): The mode in which to start the driver. This
should be one of ray.SCRIPT_MODE, ray.SHELL_MODE, ray.PYTHON_MODE, and
ray.SILENT_MODE.
raises:
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
if start_ray_local:
# In this case, we launch a scheduler, a new object store, and some workers,
# and we connect to them.
if (scheduler_address is not None) or (objstore_address is not None) or (driver_address is not None):
raise Exception("If start_ray_local=True, then you cannot pass in a scheduler_address, objstore_address, or worker_address.")
if driver_mode not in [ray.SCRIPT_MODE, ray.SHELL_MODE, ray.PYTHON_MODE, ray.SILENT_MODE]:
raise Exception("If start_ray_local=True, then driver_mode must be in [ray.SCRIPT_MODE, ray.SHELL_MODE, ray.PYTHON_MODE, ray.SILENT_MODE].")
num_workers = 1 if num_workers is None else num_workers
ray.services.start_ray_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=None, driver_mode=driver_mode)
else:
# In this case, connect to an existing scheduler and object store.
if num_workers is not None:
raise Exception("The argument num_workers must not be provided unless start_ray_local=True.")
connect(scheduler_address, objstore_address, driver_address, is_driver=True, worker=global_worker, mode=driver_mode)
def connect(scheduler_address, objstore_address, worker_address, is_driver=False, worker=global_worker, mode=ray.WORKER_MODE):
"""Connect this worker to the scheduler and an object store.
+14 -6
View File
@@ -169,14 +169,22 @@ class RayCluster(object):
start_workers_commands.append(start_workers_command)
self._run_command_over_ssh_on_all_nodes_in_parallel(start_workers_commands)
print "cluster started; you can start the shell on the head node with:"
setup_env_path = os.path.join(self.installation_directory, "ray/setup-env.sh")
shell_script_path = os.path.join(self.installation_directory, "ray/scripts/shell.py")
print """
cd "{}";
source "{}";
python "{}" --scheduler-address={}:10001 --objstore-address={}:20001 --worker-address={}:30001 --attach
""".format(remote_user_source_directory, setup_env_path, shell_script_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0])
The cluster has been started. You can attach to the cluster by sshing to the head node with the following command.
ssh -i {} {}@{}
Then run the following commands.
cd {}
source {}
Then within a Python interpreter, run the following commands.
import ray
ray.init(scheduler_address="{}:10001", objstore_address="{}:20001", driver_address="{}:30001")
""".format(self.key_file, self.username, self.node_ip_addresses[0], remote_user_source_directory, setup_env_path, self.node_private_ip_addresses[0], self.node_private_ip_addresses[0], self.node_private_ip_addresses[0])
def stop_ray(self):
"""Kill all of the processes in the Ray cluster.
-35
View File
@@ -1,35 +0,0 @@
#!/usr/bin/env python
import os
import sys
import numpy as np
import ray
def main(argv):
DEFAULT_NUM_WORKERS = 1
DEFAULT_WORKER_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "default_worker.py")
import argparse # No need for this to be global
parser = argparse.ArgumentParser(description="Parse shell options")
parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address")
parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address")
parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address")
parser.add_argument("--attach", action="store_true", help="If true, attach the shell to an already running cluster. If false, start a new cluster.")
parser.add_argument("--worker-path", type=str, help="Path to the worker script")
parser.add_argument("--num-workers", type=int, help="Number of workers to start")
args, unknown_args = parser.parse_known_args(argv)
if args.attach:
assert args.worker_path is None, "when attaching, no new worker can be started"
assert args.num_workers is None, "when attaching, no new worker can be started"
ray.worker.connect(args.scheduler_address, args.objstore_address, args.worker_address, is_driver=True, mode=ray.SHELL_MODE)
else:
ray.services.start_ray_local(num_workers=args.num_workers if not args.num_workers is None else DEFAULT_NUM_WORKERS,
worker_path=args.worker_path if not args.worker_path is None else DEFAULT_WORKER_PATH,
driver_mode=ray.SHELL_MODE)
return unknown_args
if __name__ == "__main__":
import IPython
IPython.terminal.ipapp.launch_new_instance(argv=main(sys.argv[1:]), user_ns=globals())
+4 -4
View File
@@ -13,7 +13,7 @@ class RemoteArrayTest(unittest.TestCase):
def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True)
# test eye
ref = ra.eye.remote(3)
@@ -47,7 +47,7 @@ class DistributedArrayTest(unittest.TestCase):
def testSerialization(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.services.start_ray_local()
ray.init(start_ray_local=True, num_workers=0)
x = da.DistArray()
x.construct([2, 3, 4], np.array([[[ray.put(0)]]]))
@@ -61,7 +61,7 @@ class DistributedArrayTest(unittest.TestCase):
def testAssemble(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True, num_workers=1)
a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
@@ -75,7 +75,7 @@ class DistributedArrayTest(unittest.TestCase):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../scripts/default_worker.py")
ray.services.start_services_local(num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path)
ray.services.start_ray_local(num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path)
x = da.zeros.remote([9, 25, 51], "float")
self.assertTrue(np.alltrue(ray.get(da.assemble.remote(x)) == np.zeros([9, 25, 51])))
+1 -1
View File
@@ -4,7 +4,7 @@ import os
import numpy as np
import ray
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True, num_workers=1)
d = {"w": np.zeros(1000000)}
+1 -1
View File
@@ -10,7 +10,7 @@ class MicroBenchmarkTest(unittest.TestCase):
def testTiming(self):
reload(test_functions)
ray.services.start_ray_local(num_workers=3)
ray.init(start_ray_local=True, num_workers=3)
# measure the time required to submit a remote task to the scheduler
elapsed_times = []
+17 -18
View File
@@ -53,7 +53,7 @@ class SerializationTest(unittest.TestCase):
self.assertEqual(a.dtype, c.dtype)
def testSerialize(self):
ray.services.start_ray_local()
ray.init(start_ray_local=True, num_workers=0)
for val in RAY_TEST_OBJECTS:
self.roundTripTest(val)
@@ -93,7 +93,7 @@ class ObjStoreTest(unittest.TestCase):
# Test setting up object stores, transfering data between them and retrieving data to a client
def testObjStore(self):
[w1, w2] = ray.services.start_services_local(return_drivers=True, num_objstores=2, num_workers_per_objstore=0)
[w1, w2] = ray.services.start_ray_local(return_drivers=True, num_objstores=2, num_workers_per_objstore=0)
# putting and getting an object shouldn't change it
for data in ["h", "h" * 10000, 0, 0.0]:
@@ -148,7 +148,7 @@ class ObjStoreTest(unittest.TestCase):
class WorkerTest(unittest.TestCase):
def testPutGet(self):
ray.services.start_ray_local()
ray.init(start_ray_local=True, num_workers=0)
for i in range(100):
value_before = i * 10 ** 6
@@ -180,7 +180,7 @@ class APITest(unittest.TestCase):
def testObjRefAliasing(self):
reload(test_functions)
ray.services.start_ray_local(num_workers=3, driver_mode=ray.SILENT_MODE)
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
ref = test_functions.test_alias_f.remote()
self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5])))
@@ -193,7 +193,7 @@ class APITest(unittest.TestCase):
def testKeywordArgs(self):
reload(test_functions)
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True, num_workers=1)
x = test_functions.keyword_fct1.remote(1)
self.assertEqual(ray.get(x), "1 hello")
@@ -230,7 +230,7 @@ class APITest(unittest.TestCase):
def testVariableNumberOfArgs(self):
reload(test_functions)
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True, num_workers=1)
x = test_functions.varargs_fct1.remote(0, 1, 2)
self.assertEqual(ray.get(x), "0 1 2")
@@ -244,7 +244,7 @@ class APITest(unittest.TestCase):
def testNoArgs(self):
reload(test_functions)
ray.services.start_ray_local(num_workers=1, driver_mode=ray.SILENT_MODE)
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
test_functions.no_op.remote()
time.sleep(0.2)
@@ -265,7 +265,7 @@ class APITest(unittest.TestCase):
def testTypeChecking(self):
reload(test_functions)
ray.services.start_ray_local(num_workers=1, driver_mode=ray.SILENT_MODE)
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
# Make sure that these functions throw exceptions because there return
# values do not type check.
@@ -280,7 +280,7 @@ class APITest(unittest.TestCase):
ray.services.cleanup()
def testDefiningRemoteFunctions(self):
ray.services.start_ray_local(num_workers=2)
ray.init(start_ray_local=True, num_workers=2)
# Test that we can define a remote function in the shell.
@ray.remote([int], [int])
@@ -346,7 +346,7 @@ class APITest(unittest.TestCase):
ray.reusables.bar.append(1)
return ray.reusables.bar
ray.services.start_ray_local(num_workers=2)
ray.init(start_ray_local=True, num_workers=2)
self.assertEqual(ray.get(use_foo.remote()), 1)
self.assertEqual(ray.get(use_foo.remote()), 1)
@@ -358,7 +358,7 @@ class APITest(unittest.TestCase):
class TaskStatusTest(unittest.TestCase):
def testFailedTask(self):
reload(test_functions)
ray.services.start_ray_local(num_workers=3, driver_mode=ray.SILENT_MODE)
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
test_functions.test_alias_f.remote()
test_functions.throw_exception_fct1.remote()
@@ -409,7 +409,7 @@ class ReferenceCountingTest(unittest.TestCase):
reload(test_functions)
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True, num_workers=1)
x = test_functions.test_alias_f.remote()
ray.get(x)
@@ -458,7 +458,7 @@ class ReferenceCountingTest(unittest.TestCase):
ray.services.cleanup()
def testGet(self):
ray.services.start_ray_local(num_workers=3)
ray.init(start_ray_local=True, num_workers=3)
for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]:
objref_val = check_get_deallocated(val)
@@ -481,8 +481,7 @@ class ReferenceCountingTest(unittest.TestCase):
# @unittest.expectedFailure
# def testGetFailing(self):
# worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py")
# ray.services.start_ray_local(num_workers=3, worker_path=worker_path)
# ray.init(start_ray_local=True, num_workers=3)
# # This is failing, because for bool and None, we cannot track python
# # refcounts and therefore cannot keep the refcount up
@@ -500,7 +499,7 @@ class PythonModeTest(unittest.TestCase):
def testPythonMode(self):
reload(test_functions)
ray.services.start_ray_local(driver_mode=ray.PYTHON_MODE)
ray.init(start_ray_local=True, driver_mode=ray.PYTHON_MODE)
xref = test_functions.test_alias_h.remote()
self.assertTrue(np.alltrue(xref == np.ones([3, 4, 5]))) # remote functions should return by value
@@ -521,7 +520,7 @@ class PythonModeTest(unittest.TestCase):
class PythonCExtensionTest(unittest.TestCase):
def testReferenceCountNone(self):
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True, num_workers=1)
# Make sure that we aren't accidentally messing up Python's reference counts.
for obj in [None, True, False]:
@@ -537,7 +536,7 @@ class PythonCExtensionTest(unittest.TestCase):
class ReusablesTest(unittest.TestCase):
def testReusables(self):
ray.services.start_ray_local(num_workers=1)
ray.init(start_ray_local=True, num_workers=1)
# Test that we can add a variable to the key-value store.