diff --git a/README.md b/README.md index 7b6aa3f82..def9113e2 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ estimate of pi (waiting until the computation has finished if necessary). - [Tutorial](doc/tutorial.md) - Documentation - [Serialization in the Object Store](doc/serialization.md) - - [Reusable Variables](doc/reusable-variables.md) + - [Environment Variables](doc/environment-variables.md) - [Using Ray with TensorFlow](doc/using-ray-with-tensorflow.md) - [Using Ray on a Cluster](doc/using-ray-on-a-cluster.md) diff --git a/doc/reusable-variables.md b/doc/environment-variables.md similarity index 55% rename from doc/reusable-variables.md rename to doc/environment-variables.md index 8f2dad0d8..d695dba19 100644 --- a/doc/reusable-variables.md +++ b/doc/environment-variables.md @@ -1,17 +1,21 @@ -# Reusable Variables +# Environment Variables -This document explains how to create and use reusable variables in Ray. +This document explains how to create and use **environment variables** in Ray. -A reusable variable is a per-worker variable which is (1) created when the worker starts, and (2) is reinitialized before a task reuses it. Thus, while a task can modify a reusable variable, the variable is reinitialized before the next task uses it. Reusable variables obviates the need for serialization/deserialization and, like Ray objects, avoid side effects. +An environment variable is a per-worker variable which is (1) created when the +worker starts, and (2) is reinitialized before a task reuses it. Thus, while a +task can modify an environment variable, the variable is reinitialized before +the next task uses it. Environment variables obviates the need for +serialization/deserialization and help avoid side effects. -Reusable variables are Python objects that are created once on each worker and -can be used by all subsequent tasks that run on that worker. Reusable variables -will be reinitialized between tasks. There are several primary reasons for doing -this. +Environment variables are Python objects that are created once on each worker +and can be used by all subsequent tasks that run on that worker. Environment +variables will be reinitialized between tasks. There are several primary reasons +for using environment variables. -1. Reusable variables are created once on each worker and are not shipped +1. Environment variables are created once on each worker and are not shipped between machines, so they do not need to be serialized or deserialized (however, -the code that creates the reusable variable does need to be pickled). +the code that creates the environment variable does need to be pickled). 2. Objects that are slow to construct (like a TensorFlow graph) only need to be constructed once on each worker. 3. By reinitializing between tasks that use them, they help avoid side effects. @@ -22,7 +26,7 @@ ship the code that creates the object to each worker and to run the code on each worker than it would be to create the object on the driver and ship the object to each worker. -## Creating a Reusable Variable +## Creating an Environment Variable To give an example, consider a gym environment, which essentially provides a Python wrapper for an Atari simulator. @@ -37,14 +41,14 @@ ray.init(num_workers=10) def env_initializer(): return gym.make("Pong-v0") -# Create the reusable variable. This line will cause env_initializer to run on -# each worker and on the driver. -ray.reusables.env = ray.Reusable(env_initializer) +# Create the environment variable. This line will cause env_initializer to run +# on each worker and on the driver. +ray.env.env = ray.EnvironmentVariable(env_initializer) # Define a remote function that uses the gym environment. @ray.remote def step(): - env = ray.reusables.env + env = ray.env.env # Choose a random action. action = env.action_space.sample() # Take the action and return the result. @@ -58,15 +62,15 @@ When the gym is created, it prints something like `Making new env: Pong-v0`. You may notice that this is printed once for each worker. Calling `step.remote()` will run a remote function that uses the `env` variable. You may notice that calling `step.remote()` causes the line `Making new env: Pong-v0` to be printed -again. That occurs because, by default, every time a remote function uses a -reusable variable, the worker will rerun the code that initializes the reusable -variable to prevent side effects from leaking between tasks and introducing -non-determinism into the program. +again. That occurs because, by default, every time a remote function uses an +environment variable, the worker will rerun the code that initializes the +environment variable to prevent side effects from leaking between tasks and +introducing non-determinism into the program. Of course, rerunning the initialization code can be expensive, so a custom -reinitializer can be passed into the creation of a reusable variable. If the -state of the reusable variable is not mutated by any remote function, then the -reinitialization code can just be the identity function. +reinitializer can be passed into the creation of an environment variable. If the +state of the environment variable is not mutated by any remote function, then +the reinitialization code can just be the identity function. ```python # Define a function to create the gym environment. @@ -78,15 +82,16 @@ def env_reinitializer(env): env.reset() return env -# Create the reusable variable. This line will cause env_initializer to run on -# each worker and on the driver. Every time a remote function uses the reusable -# variable, env_reinitializer will run to reset the state of the variable. -ray.reusables.env = ray.Reusable(env_initializer, env_reinitializer) +# Create the environment variable. This line will cause env_initializer to run +# on each worker and on the driver. Every time a remote function uses the +# environment variable, env_reinitializer will run to reset the state of the +# variable. +ray.env.env = ray.EnvironmentVariable(env_initializer, env_reinitializer) # Define a remote function that uses the gym environment. @ray.remote def step(): - env = ray.reusables.env + env = ray.env.env # Choose a random action. action = env.action_space.sample() # Take the action and return the result. diff --git a/doc/using-ray-with-tensorflow.md b/doc/using-ray-with-tensorflow.md index 41fcaf6e8..94f52e2ee 100644 --- a/doc/using-ray-with-tensorflow.md +++ b/doc/using-ray-with-tensorflow.md @@ -69,10 +69,10 @@ b.assign(np.zeros(1)) # This adds a node to the graph every time you call it. ## Complete Example Putting this all together, we would first create the graph on each worker using -reusable variables. Within the reusable variables, we would define `get_weights` -and `set_weights` methods. We would then use those methods to ship the weights -(as lists of numpy arrays) between the processes without shipping the actual -TensorFlow graphs, which are much more complex Python objects. +environment variables. Within the environment variables, we would define +`get_weights` and `set_weights` methods. We would then use those methods to ship +the weights (as lists of numpy arrays) between the processes without shipping +the actual TensorFlow graphs, which are much more complex Python objects. ```python import tensorflow as tf @@ -110,14 +110,14 @@ def net_vars_initializer(): def net_vars_reinitializer(net_vars): return net_vars -# Define a reusable variable for the network variables. -ray.reusables.net_vars = ray.Reusable(net_vars_initializer, net_vars_reinitializer) +# Define an environment variable for the network variables. +ray.env.net_vars = ray.EnvironmentVariable(net_vars_initializer, net_vars_reinitializer) # Define a remote function that trains the network for one step and returns the # new weights. @ray.remote def step(weights, x, y): - variables, sess, train, _, x_data, y_data, _ = ray.reusables.net_vars + variables, sess, train, _, x_data, y_data, _ = ray.env.net_vars # Set the weights in the network. variables.set_weights(weights) # Do one step of training. @@ -125,7 +125,7 @@ def step(weights, x, y): # Return the new weights. return variables.get_weights() -variables, sess, _, loss, x_data, y_data, init = ray.reusables.net_vars +variables, sess, _, loss, x_data, y_data, init = ray.env.net_vars # Initialize the network weights. sess.run(init) # Get the weights as a list of numpy arrays. diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index f81f19233..b7610865d 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -13,7 +13,7 @@ from tensorflow.examples.tutorials.mnist import input_data class LinearModel(object): """Simple class for a one layer neural network. - Note that this code does not initialize the network weights. Instead weights + Note that this code does not initialize the network weights. Instead weights are set via self.variables.set_weights. Example: @@ -30,7 +30,7 @@ class LinearModel(object): cross_entropy (tf.Operation): Final layer of network. cross_entropy_grads (tf.Operation): Gradient computation. sess (tf.Session): Session used for training. - variables (TensorFlowVariables): Extracted variables and methods to + variables (TensorFlowVariables): Extracted variables and methods to manipulate them. """ def __init__(self, shape): @@ -48,7 +48,7 @@ class LinearModel(object): self.cross_entropy = cross_entropy self.cross_entropy_grads = tf.gradients(cross_entropy, [w, b]) self.sess = tf.Session() - # In order to get and set the weights, we pass in the loss function to Ray's + # In order to get and set the weights, we pass in the loss function to Ray's # TensorFlowVariables to automatically create methods to modify the weights. self.variables = ray.experimental.TensorFlowVariables(cross_entropy, self.sess) @@ -63,7 +63,7 @@ class LinearModel(object): def net_initialization(): return LinearModel([784,10]) -# By default, when a reusable variable is used by a remote function, the +# By default, when an environment variable is used by a remote function, the # initialization code will be rerun at the end of the remote task to ensure # that the state of the variable is not changed by the remote task. However, # the initialization code may be expensive. This case is one example, because @@ -74,20 +74,20 @@ def net_initialization(): def net_reinitialization(net): return net -# Register the network with Ray and create a reusable variable for it. -ray.reusables.net = ray.Reusable(net_initialization, net_reinitialization) +# Register the network with Ray and create an environment variable for it. +ray.env.net = ray.EnvironmentVariable(net_initialization, net_reinitialization) # Compute the loss on a batch of data. @ray.remote def loss(theta, xs, ys): - net = ray.reusables.net + net = ray.env.net net.variables.set_flat(theta) return net.loss(xs,ys) # Compute the gradient of the loss on a batch of data. @ray.remote def grad(theta, xs, ys): - net = ray.reusables.net + net = ray.env.net net.variables.set_flat(theta) gradients = net.grad(xs, ys) return np.concatenate([g.flatten() for g in gradients]) @@ -127,7 +127,7 @@ if __name__ == "__main__": batch_ids = [(ray.put(xs), ray.put(ys)) for (xs, ys) in batches] # Initialize the weights for the network to the vector of all zeros. - dim = ray.reusables.net.variables.get_flat_size() + dim = ray.env.net.variables.get_flat_size() theta_init = 1e-2 * np.random.normal(size=dim) # Use L-BFGS to minimize the loss function. diff --git a/examples/rl_pong/README.md b/examples/rl_pong/README.md index daafaecd0..f6019c7bc 100644 --- a/examples/rl_pong/README.md +++ b/examples/rl_pong/README.md @@ -35,7 +35,7 @@ function. @ray.remote(num_return_vals=2) def compute_gradient(model): # Retrieve the game environment. - env = ray.reusables.env + env = ray.env.env # Reset the game. observation = env.reset() while not done: @@ -73,22 +73,22 @@ the output of the overall program will depend on which tasks are scheduled on which workers. This can be avoided if the state of the Pong environment is reset between tasks. -To accomplish this, the user must mark the Pong environment as a reusable +To accomplish this, the user must mark the Pong environment as an environment variable. This is done by providing a method for initializing the gym, and -storing it in `ray.reusables`. +storing it in `ray.env`. ```python # Function for initializing the gym environment. def env_initializer(): return gym.make("Pong-v0") -# Create a reusable variable for the gym environment. -ray.reusables.env = ray.Reusable(env_initializer) +# Create an environment variable for the gym environment. +ray.env.env = ray.EnvironmentVariable(env_initializer) ``` -A remote task can then call `ray.reusables.env` to retrieve the variable. +A remote task can then call `ray.env.env` to retrieve the variable. -By default, whenever a task uses the `ray.reusables.env` variable, the worker +By default, whenever a task uses the `ray.env.env` variable, the worker that the task was scheduled on will rerun the initialization code `env_initializer` after the task has finished so that state will not leak between the tasks. @@ -109,6 +109,6 @@ def env_reinitializer(env): env.reset() return env -# Create a reusable variable for the gym environment. -ray.reusables.env = ray.Reusable(env_initializer, env_reinitializer) +# Create an environment variable for the gym environment. +ray.env.env = ray.EnvironmentVariable(env_initializer, env_reinitializer) ``` diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py index ad7116f2d..37c30c5ac 100644 --- a/examples/rl_pong/driver.py +++ b/examples/rl_pong/driver.py @@ -29,8 +29,8 @@ def env_reinitializer(env): env.reset() return env -# Create a reusable variable for the gym environment. -ray.reusables.env = ray.Reusable(env_initializer, env_reinitializer) +# Create an environment variable for the gym environment. +ray.env.env = ray.EnvironmentVariable(env_initializer, env_reinitializer) def sigmoid(x): return 1.0 / (1.0 + np.exp(-x)) # sigmoid "squashing" function to interval [0,1] @@ -71,7 +71,7 @@ def policy_backward(eph, epx, epdlogp, model): @ray.remote(num_return_vals=2) def compute_gradient(model): - env = ray.reusables.env + env = ray.env.env observation = env.reset() prev_x = None # used in computing the difference frame xs, hs, dlogps, drs = [], [], [], [] diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index 50b7ba421..9f297e5c1 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -16,5 +16,5 @@ if hasattr(ctypes, "windll"): import ray.experimental import ray.serialization from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log -from ray.worker import Reusable, reusables +from ray.worker import EnvironmentVariable, env from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index fad7172b1..fb7869831 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -145,123 +145,126 @@ class RayGetArgumentError(Exception): return "Failed to get objectid {} as argument {} for remote function {}{}{}. It was created by remote function {}{}{} which failed with:\n{}".format(self.objectid, self.argument_index, colorama.Fore.RED, self.function_name, colorama.Fore.RESET, colorama.Fore.RED, self.task_error.function_name, colorama.Fore.RESET, self.task_error) -class Reusable(object): +class EnvironmentVariable(object): """An Python object that can be shared between tasks. Attributes: initializer (Callable[[], object]): A function used to create and initialize - the reusable variable. + the environment variable. reinitializer (Optional[Callable[[object], object]]): An optional function - used to reinitialize the reusable variable after it has been used. This + used to reinitialize the environment variable after it has been used. This argument can be used as an optimization if there is a fast way to reinitialize the state of the variable other than rerunning the initializer. """ def __init__(self, initializer, reinitializer=None): - """Initialize a Reusable object.""" + """Initialize an EnvironmentVariable object.""" if not callable(initializer): - raise Exception("When creating a RayReusable, initializer must be a function.") + raise Exception("When creating an EnvironmentVariable, initializer must be a function.") self.initializer = initializer if reinitializer is None: # If no reinitializer is passed in, use a wrapped version of the initializer. reinitializer = lambda value: initializer() if not callable(reinitializer): - raise Exception("When creating a RayReusable, reinitializer must be a function.") + raise Exception("When creating an EnvironmentVariable, reinitializer must be a function.") self.reinitializer = reinitializer -class RayReusables(object): +class RayEnvironmentVariables(object): """An object used to store Python variables that are shared between tasks. - Each worker process will have a single RayReusables object. This class serves - two purposes. First, some objects are not serializable, and so the code that - creates those objects must be run on the worker that uses them. This class is - responsible for running the code that creates those objects. Second, some of - these objects are expensive to create, and so they should be shared between - tasks. However, if a task mutates a variable that is shared between tasks, - then the behavior of the overall program may be nondeterministic (it could - depend on scheduling decisions). To fix this, if a task uses a one of these - shared objects, then that shared object will be reinitialized after the task - finishes. Since the initialization may be expensive, the user can pass in - custom reinitialization code that resets the state of the shared variable to - the way it was after initialization. If the reinitialization code does not do - this, then the behavior of the overall program is undefined. + Each worker process will have a single RayEnvironmentVariables object. This + class serves two purposes. First, some objects are not serializable, and so + the code that creates those objects must be run on the worker that uses them. + This class is responsible for running the code that creates those objects. + Second, some of these objects are expensive to create, and so they should be + shared between tasks. However, if a task mutates a variable that is shared + between tasks, then the behavior of the overall program may be + nondeterministic (it could depend on scheduling decisions). To fix this, if a + task uses a one of these shared objects, then that shared object will be + reinitialized after the task finishes. Since the initialization may be + expensive, the user can pass in custom reinitialization code that resets the + state of the shared variable to the way it was after initialization. If the + reinitialization code does not do this, then the behavior of the overall + program is undefined. Attributes: - _names (List[str]): A list of the names of all the reusable variables. + _names (List[str]): A list of the names of all the environment variables. _reinitializers (Dict[str, Callable]): A dictionary mapping the name of the - reusable variables to the corresponding reinitializer. + environment variables to the corresponding reinitializer. _running_remote_function_locally (bool): A flag used to indicate if a remote function is running locally on the driver so that we can simulate the same behavior as running a remote function remotely. - _reusables: A dictionary mapping the name of a reusable variable to the - value of the reusable variable. - _local_mode_reusables: A copy of _reusables used on the driver when running - remote functions locally on the driver. This is needed because there are - two ways in which reusable variables can be used on the driver. The first - is that the driver's copy can be manipulated. This copy is never reset - (think of the driver as a single long-running task). The second way is - that a remote function can be run locally on the driver, and this remote - function needs access to a copy of the reusable variable, and that copy - must be reinitialized after use. - _cached_reusables (List[Tuple[str, Reusable]]): A list of pairs. The first - element of each pair is the name of a reusable variable, and the second - element is the Reusable object. This list is used to store reusable - variables that are defined before the driver is connected. Once the driver - is connected, these variables will be exported. - _used (List[str]): A list of the names of all the reusable variables that + _environment_variables: A dictionary mapping the name of an environment + variable to the value of the environment variable. + _local_mode_environment_variables: A copy of _environment_variables used on + the driver when running remote functions locally on the driver. This is + needed because there are two ways in which environment variables can be + used on the driver. The first is that the driver's copy can be + manipulated. This copy is never reset (think of the driver as a single + long-running task). The second way is that a remote function can be run + locally on the driver, and this remote function needs access to a copy of + the environment variable, and that copy must be reinitialized after use. + _cached_environment_variables (List[Tuple[str, EnvironmentVariable]]): A + list of pairs. The first element of each pair is the name of an + environment variable, and the second element is the EnvironmentVariable + object. This list is used to store environment variables that are defined + before the driver is connected. Once the driver is connected, these + variables will be exported. + _used (List[str]): A list of the names of all the environment variables that have been accessed within the scope of the current task. This is reset to the empty list after each task. """ def __init__(self): - """Initialize a RayReusables object.""" + """Initialize an RayEnvironmentVariables object.""" self._names = set() self._reinitializers = {} self._running_remote_function_locally = False - self._reusables = {} - self._local_mode_reusables = {} - self._cached_reusables = [] + self._environment_variables = {} + self._local_mode_environment_variables = {} + self._cached_environment_variables = [] self._used = set() - self._slots = ("_names", "_reinitializers", "_running_remote_function_locally", "_reusables", "_local_mode_reusables", "_cached_reusables", "_used", "_slots", "_create_reusable_variable", "_reinitialize", "__getattribute__", "__setattr__", "__delattr__") + self._slots = ("_names", "_reinitializers", "_running_remote_function_locally", "_environment_variables", "_local_mode_environment_variables", "_cached_environment_variables", "_used", "_slots", "_create_environment_variable", "_reinitialize", "__getattribute__", "__setattr__", "__delattr__") # CHECKPOINT: Attributes must not be added after _slots. The above attributes are protected from deletion. - def _create_reusable_variable(self, name, reusable): - """Create a reusable variable locally. + def _create_environment_variable(self, name, environment_variable): + """Create an environment variable locally. Args: - name (str): The name of the reusable variable. - reusable (Reusable): The reusable object to use to create the reusable - variable. + name (str): The name of the environment variable. + environment_variable (EnvironmentVariable): The environment variable + object to use to create the environment variable variable. """ self._names.add(name) - self._reinitializers[name] = reusable.reinitializer - self._reusables[name] = reusable.initializer() - # We create a second copy of the reusable variable on the driver to use + self._reinitializers[name] = environment_variable.reinitializer + self._environment_variables[name] = environment_variable.initializer() + # We create a second copy of the environment variable on the driver to use # inside of remote functions that run locally. This occurs when we start Ray # in PYTHON_MODE and when we call a remote function locally. if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: - self._local_mode_reusables[name] = reusable.initializer() + self._local_mode_environment_variables[name] = environment_variable.initializer() def _reinitialize(self): - """Reinitialize the reusable variables that the current task used.""" + """Reinitialize the environment variables that the current task used.""" for name in self._used: - current_value = self._reusables[name] + current_value = self._environment_variables[name] new_value = self._reinitializers[name](current_value) - # If we are on the driver, reset the copy of the reusable variable in the - # _local_mode_reusables dictionary. + # If we are on the driver, reset the copy of the environment variable in + # the _local_mode_environment_variables dictionary. if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]: assert self._running_remote_function_locally - self._local_mode_reusables[name] = new_value + self._local_mode_environment_variables[name] = new_value else: - self._reusables[name] = new_value + self._environment_variables[name] = new_value self._used.clear() # Reset the _used list. def __getattribute__(self, name): - """Get an attribute. This handles reusable variables as a special case. + """Get an attribute. This handles environment variables as a special case. - When __getattribute__ is called with the name of a reusable variable, that - name is added to the list of variables that were used in the current task. + When __getattribute__ is called with the name of an environment variable, + that name is added to the list of variables that were used in the current + task. Args: name (str): The name of the attribute to get. @@ -270,33 +273,35 @@ class RayReusables(object): return object.__getattribute__(self, name) if name in self._slots: return object.__getattribute__(self, name) - # Handle various fields that are not reusable variables. + # Handle various fields that are not environment variables. if name not in self._names: return object.__getattribute__(self, name) - # Make a note of the fact that the reusable variable has been used. + # Make a note of the fact that the environment variable has been used. if name in self._names and name not in self._used: self._used.add(name) if self._running_remote_function_locally: - return self._local_mode_reusables[name] + return self._local_mode_environment_variables[name] else: - return self._reusables[name] + return self._environment_variables[name] def __setattr__(self, name, value): - """Set an attribute. This handles reusable variables as a special case. + """Set an attribute. This handles environment variables as a special case. - This is used to create reusable variables. When it is called, it runs the + This is used to create environment variables. When it is called, it runs the function for initializing the variable to create the variable. If this is called on the driver, then the functions for initializing and reinitializing the variable are shipped to the workers. - If this is called before ray.init has been run, then the reusable variable - will be cached and it will be created and exported when connect is called. + If this is called before ray.init has been run, then the environment + variable will be cached and it will be created and exported when connect is + called. Args: name (str): The name of the attribute to set. This is either a whitelisted - name or it is treated as the name of a reusable variable. + name or it is treated as the name of an environment variable. value: If name is a whitelisted name, then value can be any value. If name - is the name of a reusable variable, then this is a Reusable object. + is the name of an environment variable, then this is an + EnvironmentVariable object. """ try: slots = self._slots @@ -306,32 +311,32 @@ class RayReusables(object): return object.__setattr__(self, name, value) if name in slots: return object.__setattr__(self, name, value) - reusable = value - if not issubclass(type(reusable), Reusable): - raise Exception("To set a reusable variable, you must pass in a Reusable object") - # If ray.init has not been called, cache the reusable variable to export - # later. Otherwise, export the reusable variable to the workers and define - # it locally. + environment_variable = value + if not issubclass(type(environment_variable), EnvironmentVariable): + raise Exception("To set an environment variable, you must pass in an EnvironmentVariable object") + # If ray.init has not been called, cache the environment variable to export + # later. Otherwise, export the environment variable to the workers and + # define it locally. if _mode() is None: - self._cached_reusables.append((name, reusable)) + self._cached_environment_variables.append((name, environment_variable)) else: - # If we are on the driver, export the reusable variable to all the + # If we are on the driver, export the environment variable to all the # workers. if _mode() in [SCRIPT_MODE, SILENT_MODE]: - _export_reusable_variable(name, reusable) - # Define the reusable variable locally. - self._create_reusable_variable(name, reusable) - # Create an empty attribute with the name of the reusable variable. This - # allows the Python interpreter to do tab complete properly. + _export_environment_variable(name, environment_variable) + # Define the environment variable locally. + self._create_environment_variable(name, environment_variable) + # Create an empty attribute with the name of the environment variable. + # This allows the Python interpreter to do tab complete properly. object.__setattr__(self, name, None) def __delattr__(self, name): - """We do not allow attributes of RayReusables to be deleted. + """We do not allow attributes of RayEnvironmentVariables to be deleted. Args: name (str): The name of the attribute to delete. """ - raise Exception("Attempted deletion of attribute {}. Attributes of a RayReusable object may not be deleted.".format(name)) + raise Exception("Attempted deletion of attribute {}. Attributes of a RayEnvironmentVariables object may not be deleted.".format(name)) class Worker(object): """A class used to define the control flow of a worker process. @@ -518,14 +523,14 @@ We use a global Worker object to ensure that there is a single worker object per worker process. """ -reusables = RayReusables() -"""RayReusables: The reusable variables that are shared between tasks. +env = RayEnvironmentVariables() +"""RayEnvironmentVariables: The environment variables that are shared by tasks. -Each worker process has its own RayReusables object, and these objects should be -the same in all workers. This is used for storing variables that are not -serializable but must be used by remote tasks. In addition, it is used to -reinitialize these variables after they are used so that changes to their state -made by one task do not affect other tasks. +Each worker process has its own RayEnvironmentVariables object, and these +objects should be the same in all workers. This is used for storing variables +that are not serializable but must be used by remote tasks. In addition, it is +used to reinitialize these variables after they are used so that changes to +their state made by one task do not affect other tasks. """ class RayConnectionError(Exception): @@ -548,7 +553,7 @@ def check_connected(worker=global_worker): Exception: An exception is raised if the worker is not connected. """ if not worker.connected: - raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(num_workers=10)'.") + raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init()'.") def print_failed_task(task_status): """Print information about failed tasks. @@ -570,8 +575,8 @@ def error_info(worker=global_worker): check_connected(worker) result = {b"TaskError": [], b"RemoteFunctionImportError": [], - b"ReusableVariableImportError": [], - b"ReusableVariableReinitializeError": [], + b"EnvironmentVariableImportError": [], + b"EnvironmentVariableReinitializeError": [], b"FunctionToRunError": [], b"GenericWarning": [], } @@ -911,21 +916,21 @@ def fetch_and_register_remote_function(key, worker=global_worker): # Add the function to the function table. worker.redis_client.rpush("FunctionTable:{}".format(function_id.id()), worker.worker_id) -def fetch_and_register_reusable_variable(key, worker=global_worker): - """Import a reusable variable.""" - reusable_variable_name, serialized_initializer, serialized_reinitializer = worker.redis_client.hmget(key, ["name", "initializer", "reinitializer"]) - reusable_variable_name = reusable_variable_name.decode("ascii") +def fetch_and_register_environment_variable(key, worker=global_worker): + """Import an environment variable.""" + environment_variable_name, serialized_initializer, serialized_reinitializer = worker.redis_client.hmget(key, ["name", "initializer", "reinitializer"]) + environment_variable_name = environment_variable_name.decode("ascii") try: initializer = pickling.loads(serialized_initializer) reinitializer = pickling.loads(serialized_reinitializer) - reusables.__setattr__(reusable_variable_name, Reusable(initializer, reinitializer)) + env.__setattr__(environment_variable_name, EnvironmentVariable(initializer, reinitializer)) except: - # If an exception was thrown when the reusable variable was imported, we + # If an exception was thrown when the environment variable was imported, we # record the traceback and notify the scheduler of the failure. traceback_str = format_error_message(traceback.format_exc()) # Log the error message. - error_key = "ReusableVariableImportError:{}".format(random_string()) - worker.redis_client.hmset(error_key, {"name": reusable_variable_name, + error_key = "EnvironmentVariableImportError:{}".format(random_string()) + worker.redis_client.hmset(error_key, {"name": environment_variable_name, "message": traceback_str}) worker.redis_client.rpush("ErrorKeys", error_key) @@ -968,8 +973,8 @@ def import_thread(worker): for key in export_keys: if key.startswith(b"RemoteFunction"): fetch_and_register_remote_function(key, worker=worker) - elif key.startswith(b"ReusableVariables"): - fetch_and_register_reusable_variable(key, worker=worker) + elif key.startswith(b"EnvironmentVariables"): + fetch_and_register_environment_variable(key, worker=worker) elif key.startswith(b"FunctionsToRun"): fetch_and_execute_function_to_run(key, worker=worker) else: @@ -989,9 +994,9 @@ def import_thread(worker): if key.startswith(b"RemoteFunction"): with log_span("ray:import_remote_function", worker=worker): fetch_and_register_remote_function(key, worker=worker) - elif key.startswith(b"ReusableVariables"): - with log_span("ray:import_reusable_variable", worker=worker): - fetch_and_register_reusable_variable(key, worker=worker) + elif key.startswith(b"EnvironmentVariables"): + with log_span("ray:import_environment_variable", worker=worker): + fetch_and_register_environment_variable(key, worker=worker) elif key.startswith(b"FunctionsToRun"): with log_span("ray:import_function_to_run", worker=worker): fetch_and_execute_function_to_run(key, worker=worker) @@ -1015,7 +1020,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): assert not worker.connected, error_message assert worker.cached_functions_to_run is not None, error_message assert worker.cached_remote_functions is not None, error_message - assert reusables._cached_reusables is not None, error_message + assert env._cached_environment_variables is not None, error_message # Initialize some fields. worker.worker_id = random_string() worker.connected = True @@ -1093,28 +1098,28 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): current_directory = os.path.abspath(os.path.curdir) worker.run_function_on_all_workers(lambda worker_info: sys.path.insert(1, script_directory)) worker.run_function_on_all_workers(lambda worker_info: sys.path.insert(1, current_directory)) - # TODO(rkn): Here we first export functions to run, then reusable variables, - # then remote functions. The order matters. For example, one of the - # functions to run may set the Python path, which is needed to import a - # module used to define a reusable variable, which in turn is used inside a - # remote function. We may want to change the order to simply be the order in - # which the exports were defined on the driver. In addition, we will need to - # retain the ability to decide what the first few exports are (mostly to set - # the Python path). Additionally, note that the first exports to be defined - # on the driver will be the ones defined in separate modules that are - # imported by the driver. + # TODO(rkn): Here we first export functions to run, then environment + # variables, then remote functions. The order matters. For example, one of + # the functions to run may set the Python path, which is needed to import a + # module used to define an environment variable, which in turn is used + # inside a remote function. We may want to change the order to simply be the + # order in which the exports were defined on the driver. In addition, we + # will need to retain the ability to decide what the first few exports are + # (mostly to set the Python path). Additionally, note that the first exports + # to be defined on the driver will be the ones defined in separate modules + # that are imported by the driver. # Export cached functions_to_run. for function in worker.cached_functions_to_run: worker.run_function_on_all_workers(function) - # Export cached reusable variables to the workers. - for name, reusable_variable in reusables._cached_reusables: - reusables.__setattr__(name, reusable_variable) + # Export cached environment variables to the workers. + for name, environment_variable in env._cached_environment_variables: + env.__setattr__(name, environment_variable) # Export cached remote functions to the workers. for function_id, func_name, func, num_return_vals in worker.cached_remote_functions: export_remote_function(function_id, func_name, func, num_return_vals, worker) worker.cached_functions_to_run = None worker.cached_remote_functions = None - reusables._cached_reusables = None + env._cached_environment_variables = None def disconnect(worker=global_worker): """Disconnect this worker from the scheduler and object store.""" @@ -1124,7 +1129,7 @@ def disconnect(worker=global_worker): worker.connected = False worker.cached_functions_to_run = [] worker.cached_remote_functions = [] - reusables._cached_reusables = [] + env._cached_environment_variables = [] def register_class(cls, pickle=False, worker=global_worker): """Enable workers to serialize or deserialize objects of a particular class. @@ -1367,7 +1372,7 @@ def main_loop(worker=global_worker): This method is an infinite loop. It waits to receive commands from the scheduler. A command may consist of a task to execute, a remote function to - import, a reusable variable to import, or an order to terminate the worker + import, an environment variable to import, or an order to terminate the worker process. The worker executes the command, notifies the scheduler of any errors that occurred while executing the command, and waits for the next command. """ @@ -1380,8 +1385,8 @@ def main_loop(worker=global_worker): store. If the task throws an exception, RayTaskError objects are stored in the object store to represent the failed task (these will be retrieved by calls to get or by subsequent tasks that use the outputs of this task). - After the task executes, the worker resets any reusable variables that were - accessed by the task. + After the task executes, the worker resets any environment variables that + were accessed by the task. """ try: worker.current_task_id = task.task_id() @@ -1430,15 +1435,15 @@ def main_loop(worker=global_worker): "message": str(failure_object)}) worker.redis_client.rpush("ErrorKeys", error_key) try: - # Reinitialize the values of reusable variables that were used in the task - # above so that changes made to their state do not affect other tasks. - with log_span("ray:task:reinitialize_reusables", worker=worker): - reusables._reinitialize() + # Reinitialize the values of environment variables that were used in the + # task above so that changes made to their state do not affect other tasks. + with log_span("ray:task:reinitialize_environment_variables", worker=worker): + env._reinitialize() except Exception as e: - # The attempt to reinitialize the reusable variables threw an exception. - # We record the traceback and notify the scheduler. + # The attempt to reinitialize the environment variables threw an + # exception. We record the traceback and notify the scheduler. traceback_str = format_error_message(traceback.format_exc()) - error_key = "ReusableVariableReinitializeError:{}".format(random_string()) + error_key = "EnvironmentVariableReinitializeError:{}".format(random_string()) worker.redis_client.hmset(error_key, {"task_id": "NOTIMPLEMENTED", "function_id": function_id.id(), "function_name": function_name, @@ -1499,30 +1504,32 @@ def _mode(worker=global_worker): """ return worker.mode -def _reusables(): - """Return the reusables object. +def _env(): + """Return the env object. - We use this wrapper because so that functions which use the reusables variable - can be pickled. + We use this wrapper because so that functions which use the env object can be + pickled. """ - return reusables + return env -def _export_reusable_variable(name, reusable, worker=global_worker): - """Export a reusable variable to the workers. This is only called by a driver. +def _export_environment_variable(name, environment_variable, worker=global_worker): + """Export an environment variable to the workers. + + This is only called by a driver. Args: name (str): The name of the variable to export. - reusable (Reusable): The reusable object containing code for initializing - and reinitializing the variable. + environment_variable (EnvironmentVariable): The environment variable object + containing code for initializing and reinitializing the variable. """ check_main_thread() if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: - raise Exception("_export_reusable_variable can only be called on a driver.") - reusable_variable_id = name - key = "ReusableVariables:{}".format(reusable_variable_id) + raise Exception("_export_environment_variable can only be called on a driver.") + environment_variable_id = name + key = "EnvironmentVariables:{}".format(environment_variable_id) worker.redis_client.hmset(key, {"name": name, - "initializer": pickling.dumps(reusable.initializer), - "reinitializer": pickling.dumps(reusable.reinitializer)}) + "initializer": pickling.dumps(environment_variable.initializer), + "reinitializer": pickling.dumps(environment_variable.reinitializer)}) worker.redis_client.rpush("Exports", key) worker.driver_export_counter += 1 @@ -1571,11 +1578,11 @@ def remote(*args, **kwargs): # arguments to prevent the function call from mutating them and to match # the usual behavior of immutable remote objects. try: - _reusables()._running_remote_function_locally = True + _env()._running_remote_function_locally = True result = func(*copy.deepcopy(args)) finally: - _reusables()._reinitialize() - _reusables()._running_remote_function_locally = False + _env()._reinitialize() + _env()._running_remote_function_locally = False return result objectids = _submit_task(function_id, func_name, args) if len(objectids) == 1: diff --git a/test/failure_test.py b/test/failure_test.py index 8cf7c2aa1..425bb5577 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -114,19 +114,19 @@ class TaskStatusTest(unittest.TestCase): ray.worker.cleanup() - def testFailImportingReusableVariable(self): + def testFailImportingEnvironmentVariable(self): ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) - # This will throw an exception when the reusable variable is imported on the - # workers. + # This will throw an exception when the environment variable is imported on + # the workers. def initializer(): if ray.worker.global_worker.mode == ray.WORKER_MODE: raise Exception("The initializer failed.") return 0 - ray.reusables.foo = ray.Reusable(initializer) - wait_for_errors(b"ReusableVariableImportError", 2) + ray.env.foo = ray.EnvironmentVariable(initializer) + wait_for_errors(b"EnvironmentVariableImportError", 2) # Check that the error message is in the task info. - self.assertTrue(b"The initializer failed." in ray.error_info()[b"ReusableVariableImportError"][0][b"message"]) + self.assertTrue(b"The initializer failed." in ray.error_info()[b"EnvironmentVariableImportError"][0][b"message"]) ray.worker.cleanup() @@ -137,14 +137,14 @@ class TaskStatusTest(unittest.TestCase): return 0 def reinitializer(foo): raise Exception("The reinitializer failed.") - ray.reusables.foo = ray.Reusable(initializer, reinitializer) + ray.env.foo = ray.EnvironmentVariable(initializer, reinitializer) @ray.remote def use_foo(): - ray.reusables.foo + ray.env.foo use_foo.remote() - wait_for_errors(b"ReusableVariableReinitializeError", 1) + wait_for_errors(b"EnvironmentVariableReinitializeError", 1) # Check that the error message is in the task info. - self.assertTrue(b"The reinitializer failed." in ray.error_info()[b"ReusableVariableReinitializeError"][0][b"message"]) + self.assertTrue(b"The reinitializer failed." in ray.error_info()[b"EnvironmentVariableReinitializeError"][0][b"message"]) ray.worker.cleanup() diff --git a/test/runtest.py b/test/runtest.py index cdcf2e5b9..c6b3374fc 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -382,24 +382,24 @@ class APITest(unittest.TestCase): ray.worker.cleanup() - def testCachingReusables(self): - # Test that we can define reusable variables before the driver is connected. + def testCachingEnvironmentVariables(self): + # Test that we can define environment variables before the driver is connected. def foo_initializer(): return 1 def bar_initializer(): return [] def bar_reinitializer(bar): return [] - ray.reusables.foo = ray.Reusable(foo_initializer) - ray.reusables.bar = ray.Reusable(bar_initializer, bar_reinitializer) + ray.env.foo = ray.EnvironmentVariable(foo_initializer) + ray.env.bar = ray.EnvironmentVariable(bar_initializer, bar_reinitializer) @ray.remote def use_foo(): - return ray.reusables.foo + return ray.env.foo @ray.remote def use_bar(): - ray.reusables.bar.append(1) - return ray.reusables.bar + ray.env.bar.append(1) + return ray.env.bar ray.init(num_workers=2) @@ -572,7 +572,7 @@ class PythonModeTest(unittest.TestCase): ray.worker.cleanup() - def testReusableVariablesInPythonMode(self): + def testEnvironmentVariablesInPythonMode(self): reload(test_functions) ray.init(driver_mode=ray.PYTHON_MODE) @@ -580,38 +580,38 @@ class PythonModeTest(unittest.TestCase): return [] def l_reinit(l): return [] - ray.reusables.l = ray.Reusable(l_init, l_reinit) + ray.env.l = ray.EnvironmentVariable(l_init, l_reinit) @ray.remote def use_l(): - l = ray.reusables.l + l = ray.env.l l.append(1) return l - # Get the local copy of the reusable variable. This should be stateful. - l = ray.reusables.l + # Get the local copy of the environment variable. This should be stateful. + l = ray.env.l assert_equal(l, []) # Make sure the remote function does what we expect. assert_equal(ray.get(use_l.remote()), [1]) assert_equal(ray.get(use_l.remote()), [1]) - # Make sure the local copy of the reusable variable has not been mutated. + # Make sure the local copy of the environment variable has not been mutated. assert_equal(l, []) - l = ray.reusables.l + l = ray.env.l assert_equal(l, []) # Make sure that running a remote function does not reset the state of the - # local copy of the reusable variable. + # local copy of the environment variable. l.append(2) assert_equal(ray.get(use_l.remote()), [1]) assert_equal(l, [2]) ray.worker.cleanup() -class ReusablesTest(unittest.TestCase): +class EnvironmentVariablesTest(unittest.TestCase): - def testReusables(self): + def testEnvironmentVariables(self): ray.init(num_workers=1) # Test that we can add a variable to the key-value store. @@ -621,12 +621,12 @@ class ReusablesTest(unittest.TestCase): def foo_reinitializer(foo): return foo - ray.reusables.foo = ray.Reusable(foo_initializer, foo_reinitializer) - self.assertEqual(ray.reusables.foo, 1) + ray.env.foo = ray.EnvironmentVariable(foo_initializer, foo_reinitializer) + self.assertEqual(ray.env.foo, 1) @ray.remote def use_foo(): - return ray.reusables.foo + return ray.env.foo self.assertEqual(ray.get(use_foo.remote()), 1) self.assertEqual(ray.get(use_foo.remote()), 1) self.assertEqual(ray.get(use_foo.remote()), 1) @@ -636,12 +636,12 @@ class ReusablesTest(unittest.TestCase): def bar_initializer(): return [1, 2, 3] - ray.reusables.bar = ray.Reusable(bar_initializer) + ray.env.bar = ray.EnvironmentVariable(bar_initializer) @ray.remote def use_bar(): - ray.reusables.bar.append(4) - return ray.reusables.bar + ray.env.bar.append(4) + return ray.env.bar self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4]) self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4]) self.assertEqual(ray.get(use_bar.remote()), [1, 2, 3, 4]) @@ -655,11 +655,11 @@ class ReusablesTest(unittest.TestCase): baz[i] = 0 return baz - ray.reusables.baz = ray.Reusable(baz_initializer, baz_reinitializer) + ray.env.baz = ray.EnvironmentVariable(baz_initializer, baz_reinitializer) @ray.remote def use_baz(i): - baz = ray.reusables.baz + baz = ray.env.baz baz[i] = 1 return baz assert_equal(ray.get(use_baz.remote(0)), np.array([1, 0, 0, 0])) @@ -676,18 +676,18 @@ class ReusablesTest(unittest.TestCase): def qux_reinitializer(x): return x + 1 - ray.reusables.qux = ray.Reusable(qux_initializer, qux_reinitializer) + ray.env.qux = ray.EnvironmentVariable(qux_initializer, qux_reinitializer) @ray.remote def use_qux(): - return ray.reusables.qux + return ray.env.qux self.assertEqual(ray.get(use_qux.remote()), 0) self.assertEqual(ray.get(use_qux.remote()), 1) self.assertEqual(ray.get(use_qux.remote()), 2) ray.worker.cleanup() - def testUsingReusablesOnDriver(self): + def testUsingEnvironmentVariablesOnDriver(self): ray.init(num_workers=1) # Test that we can add a variable to the key-value store. @@ -697,17 +697,17 @@ class ReusablesTest(unittest.TestCase): def foo_reinitializer(foo): return [] - ray.reusables.foo = ray.Reusable(foo_initializer, foo_reinitializer) + ray.env.foo = ray.EnvironmentVariable(foo_initializer, foo_reinitializer) @ray.remote def use_foo(): - foo = ray.reusables.foo + foo = ray.env.foo foo.append(1) return foo - # Check that running a remote function does not reset the reusable variable - # on the driver. - foo = ray.reusables.foo + # Check that running a remote function does not reset the enviroment + # variable on the driver. + foo = ray.env.foo self.assertEqual(foo, []) foo.append(2) self.assertEqual(foo, [2]) @@ -720,7 +720,7 @@ class ReusablesTest(unittest.TestCase): # Check that the copy of foo on the driver has not changed. self.assertEqual(foo, [2, 3]) - foo = ray.reusables.foo + foo = ray.env.foo self.assertEqual(foo, [2, 3]) ray.worker.cleanup()