diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 3bcb3ada3..240683227 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -6,7 +6,6 @@ from ray.worker import (register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log, get_gpu_ids) from ray.actor import actor -from ray.worker import EnvironmentVariable, env from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE from ray.worker import global_state @@ -16,9 +15,8 @@ __version__ = "0.1.0" __all__ = ["register_class", "error_info", "init", "connect", "disconnect", "get", "put", "wait", "remote", "log_event", "log_span", - "flush_log", "actor", "get_gpu_ids", "EnvironmentVariable", "env", - "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE", - "global_state", "__version__"] + "flush_log", "actor", "get_gpu_ids", "SCRIPT_MODE", "WORKER_MODE", + "PYTHON_MODE", "SILENT_MODE", "global_state", "__version__"] import ctypes # Windows only diff --git a/python/ray/worker.py b/python/ray/worker.py index c6d9d009b..dbedd4e3d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -186,223 +186,6 @@ class RayGetArgumentError(Exception): colorama.Fore.RESET, self.task_error)) -class EnvironmentVariable(object): - """An Python object that can be shared between tasks. - - Attributes: - initializer (Callable[[], object]): A function used to create and - initialize the environment variable. - reinitializer (Optional[Callable[[object], object]]): An optional function - used to reinitialize the environment variable after it has been used. - This argument can be used as an optimization if there is a fast way to - reinitialize the state of the variable other than rerunning the - initializer. - """ - - def __init__(self, initializer, reinitializer=None): - """Initialize an EnvironmentVariable object.""" - if not callable(initializer): - raise Exception("When creating an EnvironmentVariable, initializer must " - "be a function.") - self.initializer = initializer - if reinitializer is None: - # If no reinitializer is passed in, use a wrapped version of the - # initializer. - def reinitializer(value): - return initializer() - if not callable(reinitializer): - raise Exception("When creating an EnvironmentVariable, reinitializer " - "must be a function.") - self.reinitializer = reinitializer - - -class RayEnvironmentVariables(object): - """An object used to store Python variables that are shared between tasks. - - Each worker process will have a single RayEnvironmentVariables object. This - class serves two purposes. First, some objects are not serializable, and so - the code that creates those objects must be run on the worker that uses them. - This class is responsible for running the code that creates those objects. - Second, some of these objects are expensive to create, and so they should be - shared between tasks. However, if a task mutates a variable that is shared - between tasks, then the behavior of the overall program may be - nondeterministic (it could depend on scheduling decisions). To fix this, if a - task uses a one of these shared objects, then that shared object will be - reinitialized after the task finishes. Since the initialization may be - expensive, the user can pass in custom reinitialization code that resets the - state of the shared variable to the way it was after initialization. If the - reinitialization code does not do this, then the behavior of the overall - program is undefined. - - Attributes: - _names (List[str]): A list of the names of all the environment variables. - _reinitializers (Dict[str, Callable]): A dictionary mapping the name of the - environment variables to the corresponding reinitializer. - _running_remote_function_locally (bool): A flag used to indicate if a - remote function is running locally on the driver so that we can simulate - the same behavior as running a remote function remotely. - _environment_variables: A dictionary mapping the name of an environment - variable to the value of the environment variable. - _local_mode_environment_variables: A copy of _environment_variables used on - the driver when running remote functions locally on the driver. This is - needed because there are two ways in which environment variables can be - used on the driver. The first is that the driver's copy can be - manipulated. This copy is never reset (think of the driver as a single - long-running task). The second way is that a remote function can be run - locally on the driver, and this remote function needs access to a copy of - the environment variable, and that copy must be reinitialized after use. - _cached_environment_variables (List[Tuple[str, EnvironmentVariable]]): A - list of pairs. The first element of each pair is the name of an - environment variable, and the second element is the EnvironmentVariable - object. This list is used to store environment variables that are defined - before the driver is connected. Once the driver is connected, these - variables will be exported. - _used (List[str]): A list of the names of all the environment variables - that have been accessed within the scope of the current task. This is - reset to the empty list after each task. - """ - - def __init__(self): - """Initialize an RayEnvironmentVariables object.""" - self._names = set() - self._reinitializers = {} - self._running_remote_function_locally = False - self._environment_variables = {} - self._local_mode_environment_variables = {} - self._cached_environment_variables = [] - self._used = set() - self._slots = ("_names", - "_reinitializers", - "_running_remote_function_locally", - "_environment_variables", - "_local_mode_environment_variables", - "_cached_environment_variables", - "_used", - "_slots", - "_create_environment_variable", - "_reinitialize", - "__getattribute__", - "__setattr__", - "__delattr__") - # CHECKPOINT: Attributes must not be added after _slots. The above - # attributes are protected from deletion. - - def _create_environment_variable(self, name, environment_variable): - """Create an environment variable locally. - - Args: - name (str): The name of the environment variable. - environment_variable (EnvironmentVariable): The environment variable - object to use to create the environment variable variable. - """ - self._names.add(name) - self._reinitializers[name] = environment_variable.reinitializer - self._environment_variables[name] = environment_variable.initializer() - # We create a second copy of the environment variable on the driver to use - # inside of remote functions that run locally. This occurs when we start - # Ray in PYTHON_MODE and when we call a remote function locally. - if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: - self._local_mode_environment_variables[name] = (environment_variable - .initializer()) - - def _reinitialize(self): - """Reinitialize the environment variables that the current task used.""" - for name in self._used: - current_value = self._environment_variables[name] - new_value = self._reinitializers[name](current_value) - # If we are on the driver, reset the copy of the environment variable in - # the _local_mode_environment_variables dictionary. - if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: - assert self._running_remote_function_locally - self._local_mode_environment_variables[name] = new_value - else: - self._environment_variables[name] = new_value - self._used.clear() # Reset the _used list. - - def __getattribute__(self, name): - """Get an attribute. This handles environment variables as a special case. - - When __getattribute__ is called with the name of an environment variable, - that name is added to the list of variables that were used in the current - task. - - Args: - name (str): The name of the attribute to get. - """ - if name == "_slots": - return object.__getattribute__(self, name) - if name in self._slots: - return object.__getattribute__(self, name) - # Handle various fields that are not environment variables. - if name not in self._names: - return object.__getattribute__(self, name) - # Make a note of the fact that the environment variable has been used. - if name in self._names and name not in self._used: - self._used.add(name) - if self._running_remote_function_locally: - return self._local_mode_environment_variables[name] - else: - return self._environment_variables[name] - - def __setattr__(self, name, value): - """Set an attribute. This handles environment variables as a special case. - - This is used to create environment variables. When it is called, it runs - the function for initializing the variable to create the variable. If this - is called on the driver, then the functions for initializing and - reinitializing the variable are shipped to the workers. - - If this is called before ray.init has been run, then the environment - variable will be cached and it will be created and exported when connect is - called. - - Args: - name (str): The name of the attribute to set. This is either a - whitelisted name or it is treated as the name of an environment - variable. - value: If name is a whitelisted name, then value can be any value. If - name is the name of an environment variable, then this is an - EnvironmentVariable object. - """ - try: - slots = self._slots - except AttributeError: - slots = () - if slots == (): - return object.__setattr__(self, name, value) - if name in slots: - return object.__setattr__(self, name, value) - environment_variable = value - if not issubclass(type(environment_variable), EnvironmentVariable): - raise Exception("To set an environment variable, you must pass in an " - "EnvironmentVariable object") - # If ray.init has not been called, cache the environment variable to export - # later. Otherwise, export the environment variable to the workers and - # define it locally. - if _mode() is None: - self._cached_environment_variables.append((name, environment_variable)) - else: - # If we are on the driver, export the environment variable to all the - # workers. - if _mode() in [SCRIPT_MODE, SILENT_MODE]: - _export_environment_variable(name, environment_variable) - # Define the environment variable locally. - self._create_environment_variable(name, environment_variable) - # Create an empty attribute with the name of the environment variable. - # This allows the Python interpreter to do tab complete properly. - object.__setattr__(self, name, None) - - def __delattr__(self, name): - """We do not allow attributes of RayEnvironmentVariables to be deleted. - - Args: - name (str): The name of the attribute to delete. - """ - raise Exception("Attempted deletion of attribute {}. Attributes of a " - "RayEnvironmentVariables object may not be deleted." - .format(name)) - - class Worker(object): """A class used to define the control flow of a worker process. @@ -766,16 +549,6 @@ per worker process. global_state = state.GlobalState() -env = RayEnvironmentVariables() -"""RayEnvironmentVariables: The environment variables that are shared by tasks. - -Each worker process has its own RayEnvironmentVariables object, and these -objects should be the same in all workers. This is used for storing variables -that are not serializable but must be used by remote tasks. In addition, it is -used to reinitialize these variables after they are used so that changes to -their state made by one task do not affect other tasks. -""" - class RayConnectionError(Exception): pass @@ -1334,27 +1107,6 @@ def fetch_and_register_remote_function(key, worker=global_worker): worker.worker_id) -def fetch_and_register_environment_variable(key, worker=global_worker): - """Import an environment variable.""" - (driver_id, environment_variable_name, serialized_initializer, - serialized_reinitializer) = worker.redis_client.hmget( - key, ["driver_id", "name", "initializer", "reinitializer"]) - environment_variable_name = environment_variable_name.decode("ascii") - try: - initializer = pickling.loads(serialized_initializer) - reinitializer = pickling.loads(serialized_reinitializer) - env.__setattr__(environment_variable_name, - EnvironmentVariable(initializer, reinitializer)) - except: - # If an exception was thrown when the environment variable was imported, we - # record the traceback and notify the scheduler of the failure. - traceback_str = format_error_message(traceback.format_exc()) - # Log the error message. - worker.push_error_to_driver(driver_id, "register_environment_variable", - traceback_str, - data={"name": environment_variable_name}) - - def fetch_and_execute_function_to_run(key, worker=global_worker): """Run on arbitrary function on the worker.""" driver_id, serialized_function = worker.redis_client.hmget( @@ -1404,8 +1156,6 @@ def import_thread(worker, mode): if key.startswith(b"RemoteFunction"): fetch_and_register_remote_function(key, worker=worker) - elif key.startswith(b"EnvironmentVariables"): - fetch_and_register_environment_variable(key, worker=worker) elif key.startswith(b"FunctionsToRun"): fetch_and_execute_function_to_run(key, worker=worker) elif key.startswith(b"ActorClass"): @@ -1441,9 +1191,6 @@ def import_thread(worker, mode): if key.startswith(b"RemoteFunction"): with log_span("ray:import_remote_function", worker=worker): fetch_and_register_remote_function(key, worker=worker) - elif key.startswith(b"EnvironmentVariables"): - with log_span("ray:import_environment_variable", worker=worker): - fetch_and_register_environment_variable(key, worker=worker) elif key.startswith(b"FunctionsToRun"): with log_span("ray:import_function_to_run", worker=worker): fetch_and_execute_function_to_run(key, worker=worker) @@ -1481,7 +1228,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, assert not worker.connected, error_message assert worker.cached_functions_to_run is not None, error_message assert worker.cached_remote_functions is not None, error_message - assert env._cached_environment_variables is not None, error_message # Initialize some fields. worker.worker_id = random_string() worker.actor_id = actor_id @@ -1640,22 +1386,18 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, lambda worker_info: sys.path.insert(1, script_directory)) worker.run_function_on_all_workers( lambda worker_info: sys.path.insert(1, current_directory)) - # TODO(rkn): Here we first export functions to run, then environment - # variables, then remote functions. The order matters. For example, one of - # the functions to run may set the Python path, which is needed to import a - # module used to define an environment variable, which in turn is used - # inside a remote function. We may want to change the order to simply be - # the order in which the exports were defined on the driver. In addition, - # we will need to retain the ability to decide what the first few exports - # are (mostly to set the Python path). Additionally, note that the first - # exports to be defined on the driver will be the ones defined in separate - # modules that are imported by the driver. + # TODO(rkn): Here we first export functions to run, then remote functions. + # The order matters. For example, one of the functions to run may set the + # Python path, which is needed to import a module used to define a remote + # function. We may want to change the order to simply be the order in which + # the exports were defined on the driver. In addition, we will need to + # retain the ability to decide what the first few exports are (mostly to + # set the Python path). Additionally, note that the first exports to be + # defined on the driver will be the ones defined in separate modules that + # are imported by the driver. # Export cached functions_to_run. for function in worker.cached_functions_to_run: worker.run_function_on_all_workers(function) - # Export cached environment variables to the workers. - for name, environment_variable in env._cached_environment_variables: - env.__setattr__(name, environment_variable) # Export cached remote functions to the workers. for info in worker.cached_remote_functions: (function_id, func_name, func, @@ -1664,7 +1406,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, num_return_vals, num_cpus, num_gpus, worker) worker.cached_functions_to_run = None worker.cached_remote_functions = None - env._cached_environment_variables = None def disconnect(worker=global_worker): @@ -1675,7 +1416,6 @@ def disconnect(worker=global_worker): worker.connected = False worker.cached_functions_to_run = [] worker.cached_remote_functions = [] - env._cached_environment_variables = [] serialization.clear_state() @@ -1949,15 +1689,7 @@ def format_error_message(exception_message, task_exception=False): def main_loop(worker=global_worker): - """The main loop a worker runs to receive and execute tasks. - - This method is an infinite loop. It waits to receive commands from the - scheduler. A command may consist of a task to execute, a remote function to - import, an environment variable to import, or an order to terminate the - worker process. The worker executes the command, notifies the scheduler of - any errors that occurred while executing the command, and waits for the next - command. - """ + """The main loop a worker runs to receive and execute tasks.""" def exit(signum, frame): cleanup(worker=worker) @@ -1973,8 +1705,6 @@ def main_loop(worker=global_worker): store. If the task throws an exception, RayTaskError objects are stored in the object store to represent the failed task (these will be retrieved by calls to get or by subsequent tasks that use the outputs of this task). - After the task executes, the worker resets any environment variables that - were accessed by the task. """ try: # The ID of the driver that this task belongs to. This is needed so that @@ -2041,22 +1771,6 @@ def main_loop(worker=global_worker): str(failure_object), data={"function_id": function_id.id(), "function_name": function_name}) - try: - # Reinitialize the values of environment variables that were used in the - # task above so that changes made to their state do not affect other - # tasks. - with log_span("ray:task:reinitialize_environment_variables", - worker=worker): - env._reinitialize() - except Exception as e: - # The attempt to reinitialize the environment variables threw an - # exception. We record the traceback and notify the scheduler. - traceback_str = format_error_message(traceback.format_exc()) - worker.push_error_to_driver(worker.task_driver_id.id(), - "reinitialize_environment_variable", - traceback_str, - data={"function_id": function_id.id(), - "function_name": function_name}) check_main_thread() while True: @@ -2112,41 +1826,6 @@ def _mode(worker=global_worker): return worker.mode -def _env(): - """Return the env object. - - We use this wrapper because so that functions which use the env object can be - pickled. - """ - return env - - -def _export_environment_variable(name, environment_variable, - worker=global_worker): - """Export an environment variable to the workers. - - This is only called by a driver. - - Args: - name (str): The name of the variable to export. - environment_variable (EnvironmentVariable): The environment variable object - containing code for initializing and reinitializing the variable. - """ - check_main_thread() - if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: - raise Exception("_export_environment_variable can only be called on a " - "driver.") - environment_variable_id = name - key = "EnvironmentVariables:{}:{}".format(random_string(), - environment_variable_id) - worker.redis_client.hmset(key, { - "driver_id": worker.task_driver_id.id(), - "name": name, - "initializer": pickling.dumps(environment_variable.initializer), - "reinitializer": pickling.dumps(environment_variable.reinitializer)}) - worker.redis_client.rpush("Exports", key) - - def export_remote_function(function_id, func_name, func, func_invoker, num_return_vals, num_cpus, num_gpus, worker=global_worker): @@ -2259,12 +1938,7 @@ def remote(*args, **kwargs): # In PYTHON_MODE, remote calls simply execute the function. We copy # the arguments to prevent the function call from mutating them and # to match the usual behavior of immutable remote objects. - try: - _env()._running_remote_function_locally = True - result = func(*copy.deepcopy(args)) - finally: - _env()._reinitialize() - _env()._running_remote_function_locally = False + result = func(*copy.deepcopy(args)) return result objectids = _submit_task(function_id, func_name, args) if len(objectids) == 1: diff --git a/test/failure_test.py b/test/failure_test.py index 6778ce25e..4976a113d 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -101,43 +101,6 @@ def temporary_helper_function(): sys.path.pop(-1) ray.worker.cleanup() - def testFailImportingEnvironmentVariable(self): - ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) - - # This will throw an exception when the environment variable is imported on - # the workers. - def initializer(): - if ray.worker.global_worker.mode == ray.WORKER_MODE: - raise Exception("The initializer failed.") - return 0 - ray.env.foo = ray.EnvironmentVariable(initializer) - wait_for_errors(b"register_environment_variable", 2) - # Check that the error message is in the task info. - self.assertIn(b"The initializer failed.", ray.error_info()[0][b"message"]) - - ray.worker.cleanup() - - def testFailReinitializingVariable(self): - ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) - - def initializer(): - return 0 - - def reinitializer(foo): - raise Exception("The reinitializer failed.") - ray.env.foo = ray.EnvironmentVariable(initializer, reinitializer) - - @ray.remote - def use_foo(): - ray.env.foo - use_foo.remote() - wait_for_errors(b"reinitialize_environment_variable", 1) - # Check that the error message is in the task info. - self.assertIn(b"The reinitializer failed.", - ray.error_info()[0][b"message"]) - - ray.worker.cleanup() - def testFailedFunctionToRun(self): ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) diff --git a/test/runtest.py b/test/runtest.py index 6bc31b5da..55883e378 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -659,36 +659,6 @@ class APITest(unittest.TestCase): x = f.remote(1) ray.get([h.remote([x]), h.remote([x])]) - def testCachingEnvironmentVariables(self): - # Test that we can define environment variables before the driver is - # connected. - def foo_initializer(): - return 1 - - def bar_initializer(): - return [] - - def bar_reinitializer(bar): - return [] - ray.env.foo = ray.EnvironmentVariable(foo_initializer) - ray.env.bar = ray.EnvironmentVariable(bar_initializer, bar_reinitializer) - - @ray.remote - def use_foo(): - return ray.env.foo - - @ray.remote - def use_bar(): - ray.env.bar.append(1) - return ray.env.bar - - self.init_ray() - - self.assertEqual(ray.get(use_foo.remote()), 1) - self.assertEqual(ray.get(use_foo.remote()), 1) - self.assertEqual(ray.get(use_bar.remote()), [1]) - self.assertEqual(ray.get(use_bar.remote()), [1]) - def testCachingFunctionsToRun(self): # Test that we export functions to run on all workers before the driver is # connected. @@ -914,167 +884,6 @@ class PythonModeTest(unittest.TestCase): ray.worker.cleanup() - def testEnvironmentVariablesInPythonMode(self): - reload(test_functions) - ray.init(driver_mode=ray.PYTHON_MODE) - - def l_init(): - return [] - - def l_reinit(l): - return [] - ray.env.l = ray.EnvironmentVariable(l_init, l_reinit) - - @ray.remote - def use_l(): - l = ray.env.l - l.append(1) - return l - - # Get the local copy of the environment variable. This should be stateful. - l = ray.env.l - assert_equal(l, []) - - # Make sure the remote function does what we expect. - assert_equal(ray.get(use_l.remote()), [1]) - assert_equal(ray.get(use_l.remote()), [1]) - - # Make sure the local copy of the environment variable has not been - # mutated. - assert_equal(l, []) - l = ray.env.l - assert_equal(l, []) - - # Make sure that running a remote function does not reset the state of the - # local copy of the environment variable. - l.append(2) - assert_equal(ray.get(use_l.remote()), [1]) - assert_equal(l, [2]) - - ray.worker.cleanup() - - -class EnvironmentVariablesTest(unittest.TestCase): - - def testEnvironmentVariables(self): - ray.init(num_workers=1) - - # Test that we can add a variable to the key-value store. - - def foo_initializer(): - return 1 - - def foo_reinitializer(foo): - return foo - - ray.env.foo = ray.EnvironmentVariable(foo_initializer, foo_reinitializer) - self.assertEqual(ray.env.foo, 1) - - @ray.remote - def use_foo(): - return ray.env.foo - self.assertEqual(ray.get(use_foo.remote()), 1) - self.assertEqual(ray.get(use_foo.remote()), 1) - self.assertEqual(ray.get(use_foo.remote()), 1) - - # Test that we can add a variable to the key-value store, mutate it, and - # reset it. - - def bar_initializer(): - return [1, 2, 3] - - ray.env.bar = ray.EnvironmentVariable(bar_initializer) - - @ray.remote - def use_bar(): - ray.env.bar.append(4) - return ray.env.bar - self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4]) - self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4]) - self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4]) - - # Test that we can use the reinitializer. - - def baz_initializer(): - return np.zeros([4]) - - def baz_reinitializer(baz): - for i in range(len(baz)): - baz[i] = 0 - return baz - - ray.env.baz = ray.EnvironmentVariable(baz_initializer, baz_reinitializer) - - @ray.remote - def use_baz(i): - baz = ray.env.baz - baz[i] = 1 - return baz - assert_equal(ray.get(use_baz.remote(0)), np.array([1, 0, 0, 0])) - assert_equal(ray.get(use_baz.remote(1)), np.array([0, 1, 0, 0])) - assert_equal(ray.get(use_baz.remote(2)), np.array([0, 0, 1, 0])) - assert_equal(ray.get(use_baz.remote(3)), np.array([0, 0, 0, 1])) - - # Make sure the reinitializer is actually getting called. Note that this is - # not the correct usage of a reinitializer because it does not reset qux to - # its original state. This is just for testing. - - def qux_initializer(): - return 0 - - def qux_reinitializer(x): - return x + 1 - - ray.env.qux = ray.EnvironmentVariable(qux_initializer, qux_reinitializer) - - @ray.remote - def use_qux(): - return ray.env.qux - self.assertEqual(ray.get(use_qux.remote()), 0) - self.assertEqual(ray.get(use_qux.remote()), 1) - self.assertEqual(ray.get(use_qux.remote()), 2) - - ray.worker.cleanup() - - def testUsingEnvironmentVariablesOnDriver(self): - ray.init(num_workers=1) - - # Test that we can add a variable to the key-value store. - - def foo_initializer(): - return [] - - def foo_reinitializer(foo): - return [] - - ray.env.foo = ray.EnvironmentVariable(foo_initializer, foo_reinitializer) - - @ray.remote - def use_foo(): - foo = ray.env.foo - foo.append(1) - return foo - - # Check that running a remote function does not reset the enviroment - # variable on the driver. - foo = ray.env.foo - self.assertEqual(foo, []) - foo.append(2) - self.assertEqual(foo, [2]) - foo.append(3) - self.assertEqual(foo, [2, 3]) - - self.assertEqual(ray.get(use_foo.remote()), [1]) - self.assertEqual(ray.get(use_foo.remote()), [1]) - self.assertEqual(ray.get(use_foo.remote()), [1]) - - # Check that the copy of foo on the driver has not changed. - self.assertEqual(foo, [2, 3]) - foo = ray.env.foo - self.assertEqual(foo, [2, 3]) - - ray.worker.cleanup() - class UtilsTest(unittest.TestCase):