diff --git a/examples/hyperopt/driver.py b/examples/hyperopt/driver.py index 0345b99dc..ebae1ba8c 100644 --- a/examples/hyperopt/driver.py +++ b/examples/hyperopt/driver.py @@ -11,7 +11,7 @@ epochs = 100 worker_dir = os.path.dirname(os.path.abspath(__file__)) worker_path = os.path.join(worker_dir, "worker.py") -services.start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=num_workers, worker_path=worker_path) +services.start_ray_local(num_workers=num_workers, worker_path=worker_path) best_params = None best_accuracy = 0 diff --git a/examples/imagenet/driver.py b/examples/imagenet/driver.py index 44e6ab846..54c2fab4c 100644 --- a/examples/imagenet/driver.py +++ b/examples/imagenet/driver.py @@ -16,7 +16,7 @@ parser.add_argument("--drop-ipython", default=False, type=bool, help="Drop into if __name__ == "__main__": args = parser.parse_args() worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=5, worker_path=worker_path) + services.start_ray_local(num_workers=5, worker_path=worker_path) s3 = boto3.resource("s3") imagenet_bucket = s3.Bucket(args.s3_bucket) diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index a8bed0c5c..ce3a50c44 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -20,7 +20,7 @@ batches = [mnist.train.next_batch(batch_size) for _ in range(num_batches)] if __name__ == "__main__": worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=16, worker_path=worker_path) + services.start_ray_local(num_workers=16, worker_path=worker_path) x_batches = [ray.put(batches[i][0]) for i in range(num_batches)] y_batches = [ray.put(batches[i][1]) for i in range(num_batches)] diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py index 015fc0fd7..8dcb74859 100644 --- a/examples/rl_pong/driver.py +++ b/examples/rl_pong/driver.py @@ -12,7 +12,7 @@ import functions worker_dir = os.path.dirname(os.path.abspath(__file__)) worker_path = os.path.join(worker_dir, "worker.py") -services.start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=10, worker_path=worker_path) +services.start_ray_local(num_workers=10, worker_path=worker_path) # hyperparameters H = 200 # number of hidden layer neurons diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 9b621fc91..cfb71ed62 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -107,7 +107,12 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None # driver_mode should equal ray.SCRIPT_MODE if this is being run in a script and # ray.SHELL_MODE if it is being used interactively in a shell. It can also equal # ray.PYTHON_MODE to run things in a manner equivalent to serial Python code. -def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE): +def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE): + start_services_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=worker_path, driver_mode=driver_mode) + +# This is a helper method which is only used in the tests and should not be +# called by users +def start_services_local(num_objstores=1, num_workers_per_objstore=0, worker_path=None, driver_mode=ray.SCRIPT_MODE, return_drivers=False): global drivers if num_workers_per_objstore > 0 and worker_path is None: raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore)) diff --git a/test/array_test.py b/test/array_test.py index bc7423c8e..ff93494c1 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -15,7 +15,7 @@ class RemoteArrayTest(unittest.TestCase): def testMethods(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path) + services.start_ray_local(num_workers=1, worker_path=worker_path) # test eye ref = ra.eye(3) @@ -47,12 +47,12 @@ class RemoteArrayTest(unittest.TestCase): class DistributedArrayTest(unittest.TestCase): def testSerialization(self): - [w] = services.start_singlenode_cluster(return_drivers=True) + services.start_ray_local() x = da.DistArray() - x.construct([2, 3, 4], np.array([[[ray.put(0, w)]]])) - capsule, _ = serialization.serialize(w.handle, x) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE - y = serialization.deserialize(w.handle, capsule) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE + x.construct([2, 3, 4], np.array([[[ray.put(0)]]])) + capsule, _ = serialization.serialize(ray.worker.global_worker.handle, x) + y = serialization.deserialize(ray.worker.global_worker.handle, capsule) self.assertEqual(x.shape, y.shape) self.assertEqual(x.objrefs[0, 0, 0].val, y.objrefs[0, 0, 0].val) @@ -60,7 +60,7 @@ class DistributedArrayTest(unittest.TestCase): def testAssemble(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path) + services.start_ray_local(num_workers=1, worker_path=worker_path) a = ra.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]) b = ra.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE]) @@ -72,7 +72,7 @@ class DistributedArrayTest(unittest.TestCase): def testMethods(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path) + services.start_services_local(num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path) x = da.zeros([9, 25, 51], "float") self.assertTrue(np.alltrue(ray.get(da.assemble(x)) == np.zeros([9, 25, 51]))) diff --git a/test/datasets_test.py b/test/datasets_test.py index 8871a65dc..13d6f42f5 100644 --- a/test/datasets_test.py +++ b/test/datasets_test.py @@ -9,7 +9,7 @@ class ImageNetTest(unittest.TestCase): def testImageNetLoading(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=5, worker_path=worker_path) + services.start_ray_local(num_workers=5, worker_path=worker_path) chunk_name = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../data/mini.tar") tar = tarfile.open(chunk_name, mode= "r") diff --git a/test/memory_leak_deserialize.py b/test/memory_leak_deserialize.py index 5ff87095d..a5a46161f 100644 --- a/test/memory_leak_deserialize.py +++ b/test/memory_leak_deserialize.py @@ -7,7 +7,7 @@ import ray.worker import ray.services as services worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") -services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path) +services.start_ray_local(num_workers=1, worker_path=worker_path) d = {"w": np.zeros(1000000)} diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index c41f40591..7ea69d8c8 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -11,7 +11,7 @@ class MicroBenchmarkTest(unittest.TestCase): def testTiming(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) + services.start_ray_local(num_workers=3, worker_path=worker_path) # measure the time required to submit a remote task to the scheduler elapsed_times = [] diff --git a/test/runtest.py b/test/runtest.py index b1597c075..1df64568e 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -33,61 +33,61 @@ class UserDefinedType(object): class SerializationTest(unittest.TestCase): - def roundTripTest(self, worker, data): - serialized, _ = serialization.serialize(worker.handle, data) - result = serialization.deserialize(worker.handle, serialized) + def roundTripTest(self, data): + serialized, _ = serialization.serialize(ray.worker.global_worker.handle, data) + result = serialization.deserialize(ray.worker.global_worker.handle, serialized) self.assertEqual(data, result) - def numpyTypeTest(self, worker, typ): + def numpyTypeTest(self, typ): a = np.random.randint(0, 10, size=(100, 100)).astype(typ) - b, _ = serialization.serialize(worker.handle, a) - c = serialization.deserialize(worker.handle, b) + b, _ = serialization.serialize(ray.worker.global_worker.handle, a) + c = serialization.deserialize(ray.worker.global_worker.handle, b) self.assertTrue((a == c).all()) a = np.array(0).astype(typ) - b, _ = serialization.serialize(worker.handle, a) - c = serialization.deserialize(worker.handle, b) + b, _ = serialization.serialize(ray.worker.global_worker.handle, a) + c = serialization.deserialize(ray.worker.global_worker.handle, b) self.assertTrue((a == c).all()) a = np.empty((0,)).astype(typ) - b, _ = serialization.serialize(worker.handle, a) - c = serialization.deserialize(worker.handle, b) + b, _ = serialization.serialize(ray.worker.global_worker.handle, a) + c = serialization.deserialize(ray.worker.global_worker.handle, b) self.assertTrue(a.dtype == c.dtype) def testSerialize(self): - [w] = services.start_singlenode_cluster(return_drivers=True) + services.start_ray_local() for val in RAY_TEST_OBJECTS: - self.roundTripTest(w, val) + self.roundTripTest(val) a = np.zeros((100, 100)) - res, _ = serialization.serialize(w.handle, a) - b = serialization.deserialize(w.handle, res) + res, _ = serialization.serialize(ray.worker.global_worker.handle, a) + b = serialization.deserialize(ray.worker.global_worker.handle, res) self.assertTrue((a == b).all()) - self.numpyTypeTest(w, "int8") - self.numpyTypeTest(w, "uint8") - self.numpyTypeTest(w, "int16") - self.numpyTypeTest(w, "uint16") - self.numpyTypeTest(w, "int32") - self.numpyTypeTest(w, "uint32") - self.numpyTypeTest(w, "float32") - self.numpyTypeTest(w, "float64") + self.numpyTypeTest("int8") + self.numpyTypeTest("uint8") + self.numpyTypeTest("int16") + self.numpyTypeTest("uint16") + self.numpyTypeTest("int32") + self.numpyTypeTest("uint32") + self.numpyTypeTest("float32") + self.numpyTypeTest("float64") - ref0 = ray.put(0, w) - ref1 = ray.put(0, w) - ref2 = ray.put(0, w) - ref3 = ray.put(0, w) + ref0 = ray.put(0) + ref1 = ray.put(0) + ref2 = ray.put(0) + ref3 = ray.put(0) a = np.array([[ref0, ref1], [ref2, ref3]]) - capsule, _ = serialization.serialize(w.handle, a) - result = serialization.deserialize(w.handle, capsule) + capsule, _ = serialization.serialize(ray.worker.global_worker.handle, a) + result = serialization.deserialize(ray.worker.global_worker.handle, capsule) self.assertTrue((a == result).all()) - self.roundTripTest(w, ref0) - self.roundTripTest(w, [ref0, ref1, ref2, ref3]) - self.roundTripTest(w, {"0": ref0, "1": ref1, "2": ref2, "3": ref3}) - self.roundTripTest(w, (ref0, 1)) + self.roundTripTest(ref0) + self.roundTripTest([ref0, ref1, ref2, ref3]) + self.roundTripTest({"0": ref0, "1": ref1, "2": ref2, "3": ref3}) + self.roundTripTest((ref0, 1)) services.cleanup() @@ -95,7 +95,7 @@ class ObjStoreTest(unittest.TestCase): # Test setting up object stores, transfering data between them and retrieving data to a client def testObjStore(self): - [w1, w2] = services.start_singlenode_cluster(return_drivers=True, num_objstores=2, num_workers_per_objstore=0) + [w1, w2] = services.start_services_local(return_drivers=True, num_objstores=2, num_workers_per_objstore=0) # putting and getting an object shouldn't change it for data in ["h", "h" * 10000, 0, 0.0]: @@ -144,30 +144,30 @@ class ObjStoreTest(unittest.TestCase): class WorkerTest(unittest.TestCase): def testPutGet(self): - [w] = services.start_singlenode_cluster(return_drivers=True) + services.start_ray_local() for i in range(100): value_before = i * 10 ** 6 - objref = ray.put(value_before, w) - value_after = ray.get(objref, w) + objref = ray.put(value_before) + value_after = ray.get(objref) self.assertEqual(value_before, value_after) for i in range(100): value_before = i * 10 ** 6 * 1.0 - objref = ray.put(value_before, w) - value_after = ray.get(objref, w) + objref = ray.put(value_before) + value_after = ray.get(objref) self.assertEqual(value_before, value_after) for i in range(100): value_before = "h" * i - objref = ray.put(value_before, w) - value_after = ray.get(objref, w) + objref = ray.put(value_before) + value_after = ray.get(objref) self.assertEqual(value_before, value_after) for i in range(100): value_before = [1] * i - objref = ray.put(value_before, w) - value_after = ray.get(objref, w) + objref = ray.put(value_before) + value_after = ray.get(objref) self.assertEqual(value_before, value_after) services.cleanup() @@ -176,20 +176,20 @@ class APITest(unittest.TestCase): def testObjRefAliasing(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - [w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=worker_path) + services.start_ray_local(num_workers=3, worker_path=worker_path) - objref = w.submit_task("test_functions.test_alias_f", []) - self.assertTrue(np.alltrue(ray.get(objref[0], w) == np.ones([3, 4, 5]))) - objref = w.submit_task("test_functions.test_alias_g", []) - self.assertTrue(np.alltrue(ray.get(objref[0], w) == np.ones([3, 4, 5]))) - objref = w.submit_task("test_functions.test_alias_h", []) - self.assertTrue(np.alltrue(ray.get(objref[0], w) == np.ones([3, 4, 5]))) + ref = test_functions.test_alias_f() + self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5]))) + ref = test_functions.test_alias_g() + self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5]))) + ref = test_functions.test_alias_h() + self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5]))) services.cleanup() def testKeywordArgs(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path) + services.start_ray_local(num_workers=1, worker_path=worker_path) x = test_functions.keyword_fct1(1) self.assertEqual(ray.get(x), "1 hello") @@ -226,7 +226,7 @@ class APITest(unittest.TestCase): def testVariableNumberOfArgs(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=worker_path) + services.start_ray_local(num_workers=1, worker_path=worker_path) x = test_functions.varargs_fct1(0, 1, 2) self.assertEqual(ray.get(x), "0 1 2") @@ -241,7 +241,7 @@ class APITest(unittest.TestCase): class TaskStatusTest(unittest.TestCase): def testFailedTask(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path, driver_mode=ray.WORKER_MODE) + services.start_ray_local(num_workers=3, worker_path=worker_path, driver_mode=ray.WORKER_MODE) test_functions.test_alias_f() test_functions.throw_exception_fct1() test_functions.throw_exception_fct1() @@ -287,7 +287,7 @@ class ReferenceCountingTest(unittest.TestCase): def testDeallocation(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) + services.start_ray_local(num_workers=3, worker_path=worker_path) x = test_functions.test_alias_f() ray.get(x) @@ -337,7 +337,7 @@ class ReferenceCountingTest(unittest.TestCase): def testGet(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) + services.start_ray_local(num_workers=3, worker_path=worker_path) for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]: objref_val = check_get_deallocated(val) @@ -363,7 +363,7 @@ class ReferenceCountingTest(unittest.TestCase): @unittest.expectedFailure def testGetFailing(self): worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=worker_path) + services.start_ray_local(num_workers=3, worker_path=worker_path) # This is failing, because for bool and None, we cannot track python # refcounts and therefore cannot keep the refcount up @@ -380,7 +380,7 @@ class ReferenceCountingTest(unittest.TestCase): class PythonModeTest(unittest.TestCase): def testObjRefAliasing(self): - services.start_singlenode_cluster(driver_mode=ray.PYTHON_MODE) + services.start_ray_local(driver_mode=ray.PYTHON_MODE) xref = test_functions.test_alias_h() self.assertTrue(np.alltrue(xref == np.ones([3, 4, 5]))) # remote functions should return by value