From 3bae6f136bef5f02f47c68372b4a937acb2438c1 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 26 Jul 2016 11:40:09 -0700 Subject: [PATCH] export remote functions and reusable variables that were defined before connect was called (#292) --- lib/python/ray/__init__.py | 1 + lib/python/ray/array/distributed/core.py | 6 +- lib/python/ray/array/distributed/random.py | 2 +- lib/python/ray/array/remote/core.py | 8 +- lib/python/ray/array/remote/random.py | 2 +- lib/python/ray/pickling.py | 8 +- lib/python/ray/services.py | 4 + lib/python/ray/worker.py | 77 ++++++++---- scripts/default_worker.py | 12 -- scripts/example_functions.py | 2 +- test/array_test.py | 16 ++- test/memory_leak_deserialize.py | 3 +- test/microbenchmarks.py | 4 +- test/runtest.py | 134 ++++++++++++--------- test/test_worker.py | 29 ----- 15 files changed, 167 insertions(+), 141 deletions(-) delete mode 100644 test/test_worker.py diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 8a561565b..9255762f3 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -5,6 +5,7 @@ SCRIPT_MODE = 0 WORKER_MODE = 1 SHELL_MODE = 2 PYTHON_MODE = 3 +SILENT_MODE = 4 # This is only used during testing. import ctypes # Windows only diff --git a/lib/python/ray/array/distributed/core.py b/lib/python/ray/array/distributed/core.py index f50c7422b..fad94914b 100644 --- a/lib/python/ray/array/distributed/core.py +++ b/lib/python/ray/array/distributed/core.py @@ -83,14 +83,14 @@ def numpy_to_dist(a): result.objrefs[index] = ray.put(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) return result -@ray.remote([List[int], str], [DistArray]) +@ray.remote([List, str], [DistArray]) def zeros(shape, dtype_name="float"): result = DistArray(shape) for index in np.ndindex(*result.num_blocks): result.objrefs[index] = ra.zeros(DistArray.compute_block_shape(index, shape), dtype_name=dtype_name) return result -@ray.remote([List[int], str], [DistArray]) +@ray.remote([List, str], [DistArray]) def ones(shape, dtype_name="float"): result = DistArray(shape) for index in np.ndindex(*result.num_blocks): @@ -171,7 +171,7 @@ def dot(a, b): result.objrefs[i, j] = blockwise_dot(*args) return result -@ray.remote([DistArray, List[int]], [DistArray]) +@ray.remote([DistArray, List], [DistArray]) def subblocks(a, *ranges): """ This function produces a distributed array from a subset of the blocks in the `a`. The result and `a` will have the same number of dimensions.For example, diff --git a/lib/python/ray/array/distributed/random.py b/lib/python/ray/array/distributed/random.py index c3ad4a27e..1d6d249f1 100644 --- a/lib/python/ray/array/distributed/random.py +++ b/lib/python/ray/array/distributed/random.py @@ -6,7 +6,7 @@ import ray from core import * -@ray.remote([List[int]], [DistArray]) +@ray.remote([List], [DistArray]) def normal(shape): num_blocks = DistArray.compute_num_blocks(shape) objrefs = np.empty(num_blocks, dtype=object) diff --git a/lib/python/ray/array/remote/core.py b/lib/python/ray/array/remote/core.py index a629a7985..1b9bf5318 100644 --- a/lib/python/ray/array/remote/core.py +++ b/lib/python/ray/array/remote/core.py @@ -4,7 +4,7 @@ import ray __all__ = ["zeros", "zeros_like", "ones", "eye", "dot", "vstack", "hstack", "subarray", "copy", "tril", "triu", "diag", "transpose", "add", "subtract", "sum", "shape", "sum_list"] -@ray.remote([List[int], str, str], [np.ndarray]) +@ray.remote([List, str, str], [np.ndarray]) def zeros(shape, dtype_name="float", order="C"): return np.zeros(shape, dtype=np.dtype(dtype_name), order=order) @@ -13,7 +13,7 @@ def zeros_like(a, dtype_name="None", order="K", subok=True): dtype_val = None if dtype_name == "None" else np.dtype(dtype_name) return np.zeros_like(a, dtype=dtype_val, order=order, subok=subok) -@ray.remote([List[int], str, str], [np.ndarray]) +@ray.remote([List, str, str], [np.ndarray]) def ones(shape, dtype_name="float", order="C"): return np.ones(shape, dtype=np.dtype(dtype_name), order=order) @@ -35,7 +35,7 @@ def hstack(*xs): return np.hstack(xs) # TODO(rkn): instead of this, consider implementing slicing -@ray.remote([np.ndarray, List[int], List[int]], [np.ndarray]) +@ray.remote([np.ndarray, List, List], [np.ndarray]) def subarray(a, lower_indices, upper_indices): # TODO(rkn): be consistent about using "index" versus "indices" return a[[slice(l, u) for (l, u) in zip(lower_indices, upper_indices)]] @@ -55,7 +55,7 @@ def triu(m, k=0): def diag(v, k=0): return np.diag(v, k=k) -@ray.remote([np.ndarray, List[int]], [np.ndarray]) +@ray.remote([np.ndarray, List], [np.ndarray]) def transpose(a, axes=[]): axes = None if axes == [] else axes return np.transpose(a, axes=axes) diff --git a/lib/python/ray/array/remote/random.py b/lib/python/ray/array/remote/random.py index 7c2a6419c..d76bb5dc9 100644 --- a/lib/python/ray/array/remote/random.py +++ b/lib/python/ray/array/remote/random.py @@ -2,6 +2,6 @@ from typing import List import numpy as np import ray -@ray.remote([List[int]], [np.ndarray]) +@ray.remote([List], [np.ndarray]) def normal(shape): return np.random.normal(size=shape) diff --git a/lib/python/ray/pickling.py b/lib/python/ray/pickling.py index 4fdf05a33..1cc5914c8 100644 --- a/lib/python/ray/pickling.py +++ b/lib/python/ray/pickling.py @@ -6,10 +6,10 @@ from ctypes import c_void_p from cloudpickle import pickle, cloudpickle, CloudPickler, load, loads try: - from ctypes import pythonapi - pythonapi.PyCell_Set # Make sure this exists + from ctypes import pythonapi + pythonapi.PyCell_Set # Make sure this exists except: - pythonapi = None + pythonapi = None def dump(obj, file, protocol=2): return BetterPickler(file, protocol).dump(obj) @@ -69,4 +69,4 @@ class BetterPickler(CloudPickler): self.write(pickle.REDUCE) dispatch = CloudPickler.dispatch.copy() dispatch[(lambda _: lambda: _)(0).__closure__[0].__class__] = save_cell - dispatch[typing.GenericMeta] = save_type + # dispatch[typing.GenericMeta] = save_type diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index d80a84062..3a4cf5362 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -56,8 +56,10 @@ def cleanup(): global drivers for driver in drivers: ray.disconnect(driver) + driver.set_mode(None) if len(drivers) == 0: ray.disconnect() + ray.worker.global_worker.set_mode(None) drivers = [] global all_processes @@ -191,6 +193,8 @@ def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE equivalent to serial Python code. It should be ray.WORKER_MODE to surpress the printing of error messages. """ + if worker_path is None: + worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../../scripts/default_worker.py") start_services_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=worker_path, driver_mode=driver_mode) # This is a helper method which is only used in the tests and should not be diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 876dfdb16..5893eb3e3 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -147,12 +147,13 @@ class RayReusables(object): Attributes: _names (List[str]): A list of the names of all the reusable variables. - _initializers (dict[str, [Callable[[], object]])]: A dictionary mapping the - names of the reusable variables to the code for initializing them. - _reinitializers (Dict[str, Callable[[object], object]]): A dictionary - mapping the names of the reusable variables to the code for reinitializing - them. For reusable variables for which reinitializer code is not provided, - the reinitializer here essentially wraps the initializer. + _reusables (Dict[str, Reusable]): A dictionary mapping the name of the + reusable variables to the corresponding Reusable object. + _cached_reusables (List[Tuple[str, Reusable]]): A list of pairs. The first + element of each pair is the name of a reusable variable, and the second + element is the Reusable object. This list is used to store reusable + variables that are defined before the driver is connected. Once the driver + is connected, these variables will be exported. _used (List[str]): A list of the names of all the reusable variables that have been accessed within the scope of the current task. This is reset to the empty list after each task. @@ -162,9 +163,10 @@ class RayReusables(object): """Initialize a RayReusables object.""" self._names = set() self._reusables = {} + self._cached_reusables = [] self._used = set() - self._slots = ("_names", "_reusables", "_used", "_slots", "_reinitialize", "__getattribute__", "__setattr__", "__delattr__") - # CHECKPOINT: Any attributes assigned before _here_ will be protected from rewrite or deletion + self._slots = ("_names", "_reusables", "_cached_reusables", "_used", "_slots", "_reinitialize", "__getattribute__", "__setattr__", "__delattr__") + # CHECKPOINT: Attributes must not be added after _slots. The above attributes are protected from deletion. def _reinitialize(self): """Reinitialize the reusable variables that the current task used.""" @@ -214,14 +216,16 @@ class RayReusables(object): if slots == (): return object.__setattr__(self, name, value) if name in slots: - raise AttributeError("Illegal assignment to {} object attribute {}".format(self.__class__.__name__, name)) + return object.__setattr__(self, name, value) reusable = value if not issubclass(type(reusable), Reusable): raise Exception("To set a reusable variable, you must pass in a Reusable object") self._names.add(name) self._reusables[name] = reusable - if _mode() in [ray.SHELL_MODE, ray.SCRIPT_MODE]: + if _mode() in [ray.SHELL_MODE, ray.SCRIPT_MODE, ray.SILENT_MODE]: _export_reusable_variable(name, reusable) + elif _mode() is None: + self._cached_reusables.append((name, reusable)) object.__setattr__(self, name, reusable.initializer()) def __delattr__(self, name): @@ -244,6 +248,13 @@ class Worker(object): function to the remote function itself. This is the set of remote functions that can be executed by this worker. handle (worker capsule): A Python object wrapping a C++ Worker object. + mode: The mode of the worker. One of ray.SCRIPT_MODE, ray.SHELL_MODE, + ray.PYTHON_MODE, ray.SILENT_MODE, and ray.WORKER_MODE. + cached_remote_functions (List[str]): A list of serialized remote functions + that were defined before the worker called connect. When the worker + eventually does call connect, if it is a driver, it will export these + functions to the scheduler. If cached_remote_functions is None, that means + that connect has been called. """ def __init__(self): @@ -251,6 +262,7 @@ class Worker(object): self.functions = {} self.handle = None self.mode = None + self.cached_remote_functions = [] def set_mode(self, mode): """Set the mode of the worker. @@ -270,9 +282,13 @@ class Worker(object): debugging purposes. It will not send remote function calls to the scheduler and will insead execute them in a blocking fashion. + The mode ray.SILENT_MODE should be used only during testing. It does not + print any information about errors because some of the tests intentionally + fail. + args: - mode: One of ray.SCRIPT_MODE, ray.WORKER_MODE, ray.SHELL_MODE, and - ray.PYTHON_MODE. + mode: One of ray.SCRIPT_MODE, ray.WORKER_MODE, ray.SHELL_MODE, + ray.PYTHON_MODE, and ray.SILENT_MODE. """ self.mode = mode colorama.init() @@ -381,6 +397,7 @@ class Worker(object): Args: function (Callable): The remote function that this worker can execute. """ + logging.info("Registering function {}.".format(function.func_name)) ray.lib.register_function(self.handle, function.func_name, len(function.return_types)) self.functions[function.func_name] = function @@ -399,7 +416,7 @@ class Worker(object): """ task_capsule = serialization.serialize_task(self.handle, func_name, args) objrefs = ray.lib.submit_task(self.handle, task_capsule) - if self.mode == ray.SHELL_MODE or self.mode == ray.SCRIPT_MODE: + if self.mode in [ray.SHELL_MODE, ray.SCRIPT_MODE]: print_task_info(ray.lib.task_info(self.handle), self.mode) return objrefs @@ -530,7 +547,7 @@ def connect(scheduler_address, objstore_address, worker_address, is_driver=False be chosen arbitrarily. is_driver (bool): True if this worker is a driver and false otherwise. mode: The mode of the worker. One of ray.SCRIPT_MODE, ray.WORKER_MODE, - ray.SHELL_MODE, and ray.PYTHON_MODE. + ray.SHELL_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. """ if hasattr(worker, "handle"): del worker.handle @@ -542,11 +559,23 @@ def connect(scheduler_address, objstore_address, worker_address, is_driver=False FORMAT = "%(asctime)-15s %(message)s" logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=ray.config.get_log_file_path("-".join(["worker", worker_address]) + ".log")) ray.lib.set_log_config(ray.config.get_log_file_path("-".join(["worker", worker_address, "c++"]) + ".log")) + if mode in [ray.SHELL_MODE, ray.SCRIPT_MODE, ray.SILENT_MODE]: + for function_to_export in worker.cached_remote_functions: + ray.lib.export_function(worker.handle, function_to_export) + for name, reusable_variable in reusables._cached_reusables: + _export_reusable_variable(name, reusable_variable) + worker.cached_remote_functions = None + reusables._cached_reusables = None def disconnect(worker=global_worker): """Disconnect this worker from the scheduler and object store.""" if worker.handle is not None: ray.lib.disconnect(worker.handle) + # Reset the list of cached remote functions so that if more remote functions + # are defined and then connect is called again, the remote functions will be + # exported. This is mostly relevant for the tests. + worker.cached_remote_functions = [] + reusables._cached_reusables = [] def get(objref, worker=global_worker): """Get a remote object from an object store. @@ -565,7 +594,7 @@ def get(objref, worker=global_worker): if worker.mode == ray.PYTHON_MODE: return objref # In ray.PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objref) ray.lib.request_object(worker.handle, objref) - if worker.mode == ray.SHELL_MODE or worker.mode == ray.SCRIPT_MODE: + if worker.mode in [ray.SHELL_MODE, ray.SCRIPT_MODE]: print_task_info(ray.lib.task_info(worker.handle), worker.mode) value = worker.get_object(objref) if isinstance(value, RayFailedObject): @@ -585,7 +614,7 @@ def put(value, worker=global_worker): return value # In ray.PYTHON_MODE, ray.put is the identity operation objref = ray.lib.get_objref(worker.handle) worker.put_object(objref, value) - if worker.mode == ray.SHELL_MODE or worker.mode == ray.SCRIPT_MODE: + if worker.mode in [ray.SHELL_MODE, ray.SCRIPT_MODE]: print_task_info(ray.lib.task_info(worker.handle), worker.mode) return objref @@ -692,8 +721,9 @@ def main_loop(worker=global_worker): # We use this as a mechanism to allow the scheduler to kill workers. break elif command == "function": - (function, arg_types, return_types) = pickling.loads(command_args) - if function.__module__ is None: function.__module__ = "__main__" + (function, arg_types, return_types, module) = pickling.loads(command_args) + # TODO(rkn): Why is the below line necessary? + function.__module__ = module worker.register_function(remote(arg_types, return_types, worker)(function)) elif command == "reusable_variable": name, initializer_str, reinitializer_str = command_args @@ -736,7 +766,7 @@ def _export_reusable_variable(name, reusable, worker=global_worker): reusable (Reusable): The reusable object containing code for initializing and reinitializing the variable. """ - if _mode(worker) not in [ray.SHELL_MODE, ray.SCRIPT_MODE]: + if _mode(worker) not in [ray.SHELL_MODE, ray.SCRIPT_MODE, ray.SILENT_MODE]: raise Exception("_export_reusable_variable can only be called on a driver.") ray.lib.export_reusable_variable(worker.handle, name, pickling.dumps(reusable.initializer), pickling.dumps(reusable.reinitializer)) @@ -787,20 +817,21 @@ def remote(arg_types, return_types, worker=global_worker): check_signature_supported(has_kwargs_param, has_vararg_param, keyword_defaults, func_name) # Everything ready - export the function - to_export = None - if worker.mode in [ray.SHELL_MODE, ray.SCRIPT_MODE]: + if worker.mode in [None, ray.SHELL_MODE, ray.SCRIPT_MODE, ray.SILENT_MODE]: func_name_global_valid = func.__name__ in func.__globals__ func_name_global_value = func.__globals__.get(func.__name__) # Set the function globally to make it refer to itself func.__globals__[func.__name__] = func_call # Allow the function to reference itself as a global variable try: - to_export = pickling.dumps((func, arg_types, return_types)) + to_export = pickling.dumps((func, arg_types, return_types, func.__module__)) finally: # Undo our changes if func_name_global_valid: func.__globals__[func.__name__] = func_name_global_value else: del func.__globals__[func.__name__] - if to_export: + if worker.mode in [ray.SHELL_MODE, ray.SCRIPT_MODE, ray.SILENT_MODE]: ray.lib.export_function(worker.handle, to_export) + elif worker.mode is None: + worker.cached_remote_functions.append(to_export) return func_call return remote_decorator diff --git a/scripts/default_worker.py b/scripts/default_worker.py index 4cef71377..5a75ef578 100644 --- a/scripts/default_worker.py +++ b/scripts/default_worker.py @@ -2,10 +2,6 @@ import sys import argparse import numpy as np -import ray.array.remote as ra -import ray.array.distributed as da -import example_functions - import ray parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") @@ -17,12 +13,4 @@ if __name__ == "__main__": args = parser.parse_args() ray.worker.connect(args.scheduler_address, args.objstore_address, args.worker_address) - ray.register_module(ra) - ray.register_module(ra.random) - ray.register_module(ra.linalg) - ray.register_module(da) - ray.register_module(da.random) - ray.register_module(da.linalg) - ray.register_module(example_functions) - ray.worker.main_loop() diff --git a/scripts/example_functions.py b/scripts/example_functions.py index f04e55a60..1a9aa6c9d 100644 --- a/scripts/example_functions.py +++ b/scripts/example_functions.py @@ -16,7 +16,7 @@ def increment(x): def add(a, b): return a + b -@ray.remote([List[int]], [np.ndarray]) +@ray.remote([List], [np.ndarray]) def zeros(shape): return np.zeros(shape) diff --git a/test/array_test.py b/test/array_test.py index de47d298b..4b32cacb6 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -11,8 +11,9 @@ import ray.array.distributed as da class RemoteArrayTest(unittest.TestCase): def testMethods(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path) + for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: + reload(module) + ray.services.start_ray_local(num_workers=1) # test eye ref = ra.eye(3) @@ -44,6 +45,8 @@ class RemoteArrayTest(unittest.TestCase): class DistributedArrayTest(unittest.TestCase): def testSerialization(self): + for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: + reload(module) ray.services.start_ray_local() x = da.DistArray() @@ -56,8 +59,9 @@ class DistributedArrayTest(unittest.TestCase): ray.services.cleanup() def testAssemble(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path) + for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: + reload(module) + ray.services.start_ray_local(num_workers=1) a = ra.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]) b = ra.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE]) @@ -68,7 +72,9 @@ class DistributedArrayTest(unittest.TestCase): ray.services.cleanup() def testMethods(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") + for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: + reload(module) + worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../scripts/default_worker.py") ray.services.start_services_local(num_objstores=2, num_workers_per_objstore=5, worker_path=worker_path) x = da.zeros([9, 25, 51], "float") diff --git a/test/memory_leak_deserialize.py b/test/memory_leak_deserialize.py index bd82baead..9fc7ed937 100644 --- a/test/memory_leak_deserialize.py +++ b/test/memory_leak_deserialize.py @@ -4,8 +4,7 @@ import os import numpy as np import ray -worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") -ray.services.start_ray_local(num_workers=1, worker_path=worker_path) +ray.services.start_ray_local(num_workers=1) d = {"w": np.zeros(1000000)} diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index 8581fa99e..2d4771087 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -9,8 +9,8 @@ import test_functions class MicroBenchmarkTest(unittest.TestCase): def testTiming(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=3, worker_path=worker_path) + reload(test_functions) + ray.services.start_ray_local(num_workers=3) # measure the time required to submit a remote task to the scheduler elapsed_times = [] diff --git a/test/runtest.py b/test/runtest.py index 7a78b1419..0f23c5e2f 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -113,15 +113,13 @@ class ObjStoreTest(unittest.TestCase): result = ray.get(objref, w2) self.assertTrue(np.alltrue(result == data)) - """ # getting multiple times shouldn't matter - for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]: - objref = worker.put(data, w1) - result = worker.get(objref, w2) - result = worker.get(objref, w2) - result = worker.get(objref, w2) - self.assertTrue(np.alltrue(result == data)) - """ + # for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]: + # objref = worker.put(data, w1) + # result = worker.get(objref, w2) + # result = worker.get(objref, w2) + # result = worker.get(objref, w2) + # self.assertTrue(np.alltrue(result == data)) # shipping a numpy array inside something else should be fine data = ("a", np.random.normal(size=[10, 10])) @@ -181,8 +179,8 @@ class WorkerTest(unittest.TestCase): class APITest(unittest.TestCase): def testObjRefAliasing(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=3, worker_path=worker_path) + reload(test_functions) + ray.services.start_ray_local(num_workers=3, driver_mode=ray.SILENT_MODE) ref = test_functions.test_alias_f() self.assertTrue(np.alltrue(ray.get(ref) == np.ones([3, 4, 5]))) @@ -194,8 +192,8 @@ class APITest(unittest.TestCase): ray.services.cleanup() def testKeywordArgs(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path) + reload(test_functions) + ray.services.start_ray_local(num_workers=1) x = test_functions.keyword_fct1(1) self.assertEqual(ray.get(x), "1 hello") @@ -231,8 +229,8 @@ class APITest(unittest.TestCase): ray.services.cleanup() def testVariableNumberOfArgs(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path) + reload(test_functions) + ray.services.start_ray_local(num_workers=1) x = test_functions.varargs_fct1(0, 1, 2) self.assertEqual(ray.get(x), "0 1 2") @@ -245,8 +243,8 @@ class APITest(unittest.TestCase): ray.services.cleanup() def testNoArgs(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path, driver_mode=ray.WORKER_MODE) + reload(test_functions) + ray.services.start_ray_local(num_workers=1, driver_mode=ray.SILENT_MODE) test_functions.no_op() time.sleep(0.2) @@ -266,8 +264,8 @@ class APITest(unittest.TestCase): ray.services.cleanup() def testTypeChecking(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path, driver_mode=ray.WORKER_MODE) + reload(test_functions) + ray.services.start_ray_local(num_workers=1, driver_mode=ray.SILENT_MODE) # Make sure that these functions throw exceptions because there return # values do not type check. @@ -282,8 +280,7 @@ class APITest(unittest.TestCase): ray.services.cleanup() def testDefiningRemoteFunctions(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=2, worker_path=worker_path, driver_mode=ray.SCRIPT_MODE) + ray.services.start_ray_local(num_workers=2) # Test that we can define a remote function in the shell. @ray.remote([int], [int]) @@ -330,10 +327,39 @@ class APITest(unittest.TestCase): ray.services.cleanup() + def testCachingReusables(self): + # Test that we can define reusable variables before the driver is connected. + def foo_initializer(): + return 1 + def bar_initializer(): + return [] + def bar_reinitializer(bar): + return [] + ray.reusables.foo = ray.Reusable(foo_initializer) + ray.reusables.bar = ray.Reusable(bar_initializer, bar_reinitializer) + + @ray.remote([], [int]) + def use_foo(): + return ray.reusables.foo + @ray.remote([], [list]) + def use_bar(): + ray.reusables.bar.append(1) + return ray.reusables.bar + + ray.services.start_ray_local(num_workers=2) + + self.assertEqual(ray.get(use_foo()), 1) + self.assertEqual(ray.get(use_foo()), 1) + self.assertEqual(ray.get(use_bar()), [1]) + self.assertEqual(ray.get(use_bar()), [1]) + + ray.services.cleanup() + class TaskStatusTest(unittest.TestCase): def testFailedTask(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=3, worker_path=worker_path, driver_mode=ray.WORKER_MODE) + reload(test_functions) + ray.services.start_ray_local(num_workers=3, driver_mode=ray.SILENT_MODE) + test_functions.test_alias_f() test_functions.throw_exception_fct1() test_functions.throw_exception_fct1() @@ -365,6 +391,8 @@ class TaskStatusTest(unittest.TestCase): else: self.assertTrue(False) # ray.get should throw an exception + ray.services.cleanup() + def check_get_deallocated(data): x = ray.put(data) ray.get(x) @@ -378,8 +406,10 @@ def check_get_not_deallocated(data): class ReferenceCountingTest(unittest.TestCase): def testDeallocation(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=3, worker_path=worker_path) + reload(test_functions) + for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: + reload(module) + ray.services.start_ray_local(num_workers=1) x = test_functions.test_alias_f() ray.get(x) @@ -399,7 +429,7 @@ class ReferenceCountingTest(unittest.TestCase): del y self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)], [-1, -1, -1]) - z = da.zeros([da.BLOCK_SIZE, 2 * da.BLOCK_SIZE], "float") + z = da.zeros([da.BLOCK_SIZE, 2 * da.BLOCK_SIZE]) time.sleep(0.1) objref_val = z.val self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)], [1, 1, 1]) @@ -408,8 +438,8 @@ class ReferenceCountingTest(unittest.TestCase): time.sleep(0.1) self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)], [-1, -1, -1]) - x = ra.zeros([10, 10], "float") - y = ra.zeros([10, 10], "float") + x = ra.zeros([10, 10]) + y = ra.zeros([10, 10]) z = ra.dot(x, y) objref_val = x.val time.sleep(0.1) @@ -428,8 +458,7 @@ class ReferenceCountingTest(unittest.TestCase): ray.services.cleanup() def testGet(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=3, worker_path=worker_path) + ray.services.start_ray_local(num_workers=3) for val in RAY_TEST_OBJECTS + [np.zeros((2, 2)), UserDefinedType()]: objref_val = check_get_deallocated(val) @@ -442,36 +471,35 @@ class ReferenceCountingTest(unittest.TestCase): # The following currently segfaults: The second "result = " closes the # memory segment as soon as the assignment is done (and the first result # goes out of scope). - """ - data = np.zeros([10, 20]) - objref = ray.put(data) - result = worker.get(objref) - result = worker.get(objref) - self.assertTrue(np.alltrue(result == data)) - """ + # data = np.zeros([10, 20]) + # objref = ray.put(data) + # result = worker.get(objref) + # result = worker.get(objref) + # self.assertTrue(np.alltrue(result == data)) ray.services.cleanup() - @unittest.expectedFailure - def testGetFailing(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=3, worker_path=worker_path) + # @unittest.expectedFailure + # def testGetFailing(self): + # worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") + # ray.services.start_ray_local(num_workers=3, worker_path=worker_path) - # This is failing, because for bool and None, we cannot track python - # refcounts and therefore cannot keep the refcount up - # (see 5281bd414f6b404f61e1fe25ec5f6651defee206). - # The resulting behavior is still correct however because True, False and - # None are returned by get "by value" and therefore can be reclaimed from - # the object store safely. - for val in [True, False, None]: - x, objref_val = check_get_not_deallocated(val) - self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], 1) + # # This is failing, because for bool and None, we cannot track python + # # refcounts and therefore cannot keep the refcount up + # # (see 5281bd414f6b404f61e1fe25ec5f6651defee206). + # # The resulting behavior is still correct however because True, False and + # # None are returned by get "by value" and therefore can be reclaimed from + # # the object store safely. + # for val in [True, False, None]: + # x, objref_val = check_get_not_deallocated(val) + # self.assertEqual(ray.scheduler_info()["reference_counts"][objref_val], 1) - ray.services.cleanup() + # ray.services.cleanup() class PythonModeTest(unittest.TestCase): def testPythonMode(self): + reload(test_functions) ray.services.start_ray_local(driver_mode=ray.PYTHON_MODE) xref = test_functions.test_alias_h() @@ -493,8 +521,7 @@ class PythonModeTest(unittest.TestCase): class PythonCExtensionTest(unittest.TestCase): def testReferenceCountNone(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path) + ray.services.start_ray_local(num_workers=1) # Make sure that we aren't accidentally messing up Python's reference counts. for obj in [None, True, False]: @@ -510,8 +537,7 @@ class PythonCExtensionTest(unittest.TestCase): class ReusablesTest(unittest.TestCase): def testReusables(self): - worker_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_worker.py") - ray.services.start_ray_local(num_workers=1, worker_path=worker_path) + ray.services.start_ray_local(num_workers=1) # Test that we can add a variable to the key-value store. diff --git a/test/test_worker.py b/test/test_worker.py deleted file mode 100644 index 45a80181e..000000000 --- a/test/test_worker.py +++ /dev/null @@ -1,29 +0,0 @@ -import sys -import argparse -import numpy as np - -import test_functions -import ray.array.remote as ra -import ray.array.distributed as da - -import ray - -parser = argparse.ArgumentParser(description='Parse addresses for the worker to connect to.') -parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") -parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") -parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address") - -if __name__ == "__main__": - args = parser.parse_args() - ray.worker.connect(args.scheduler_address, args.objstore_address, args.worker_address) - - ray.register_module(test_functions) - ray.register_module(ra) - ray.register_module(ra.random) - ray.register_module(ra.linalg) - ray.register_module(da) - ray.register_module(da.random) - ray.register_module(da.linalg) - ray.register_module(sys.modules[__name__]) - - ray.worker.main_loop()