Rename reusable variables -> environment variables. (#195)

This commit is contained in:
Robert Nishihara
2017-01-10 20:14:33 -08:00
committed by Philipp Moritz
parent aaf3be3c53
commit 87d8d05792
10 changed files with 265 additions and 253 deletions
+1 -1
View File
@@ -16,5 +16,5 @@ if hasattr(ctypes, "windll"):
import ray.experimental
import ray.serialization
from ray.worker import register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log
from ray.worker import Reusable, reusables
from ray.worker import EnvironmentVariable, env
from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE
+159 -152
View File
@@ -145,123 +145,126 @@ class RayGetArgumentError(Exception):
return "Failed to get objectid {} as argument {} for remote function {}{}{}. It was created by remote function {}{}{} which failed with:\n{}".format(self.objectid, self.argument_index, colorama.Fore.RED, self.function_name, colorama.Fore.RESET, colorama.Fore.RED, self.task_error.function_name, colorama.Fore.RESET, self.task_error)
class Reusable(object):
class EnvironmentVariable(object):
"""An Python object that can be shared between tasks.
Attributes:
initializer (Callable[[], object]): A function used to create and initialize
the reusable variable.
the environment variable.
reinitializer (Optional[Callable[[object], object]]): An optional function
used to reinitialize the reusable variable after it has been used. This
used to reinitialize the environment variable after it has been used. This
argument can be used as an optimization if there is a fast way to
reinitialize the state of the variable other than rerunning the
initializer.
"""
def __init__(self, initializer, reinitializer=None):
"""Initialize a Reusable object."""
"""Initialize an EnvironmentVariable object."""
if not callable(initializer):
raise Exception("When creating a RayReusable, initializer must be a function.")
raise Exception("When creating an EnvironmentVariable, initializer must be a function.")
self.initializer = initializer
if reinitializer is None:
# If no reinitializer is passed in, use a wrapped version of the initializer.
reinitializer = lambda value: initializer()
if not callable(reinitializer):
raise Exception("When creating a RayReusable, reinitializer must be a function.")
raise Exception("When creating an EnvironmentVariable, reinitializer must be a function.")
self.reinitializer = reinitializer
class RayReusables(object):
class RayEnvironmentVariables(object):
"""An object used to store Python variables that are shared between tasks.
Each worker process will have a single RayReusables object. This class serves
two purposes. First, some objects are not serializable, and so the code that
creates those objects must be run on the worker that uses them. This class is
responsible for running the code that creates those objects. Second, some of
these objects are expensive to create, and so they should be shared between
tasks. However, if a task mutates a variable that is shared between tasks,
then the behavior of the overall program may be nondeterministic (it could
depend on scheduling decisions). To fix this, if a task uses a one of these
shared objects, then that shared object will be reinitialized after the task
finishes. Since the initialization may be expensive, the user can pass in
custom reinitialization code that resets the state of the shared variable to
the way it was after initialization. If the reinitialization code does not do
this, then the behavior of the overall program is undefined.
Each worker process will have a single RayEnvironmentVariables object. This
class serves two purposes. First, some objects are not serializable, and so
the code that creates those objects must be run on the worker that uses them.
This class is responsible for running the code that creates those objects.
Second, some of these objects are expensive to create, and so they should be
shared between tasks. However, if a task mutates a variable that is shared
between tasks, then the behavior of the overall program may be
nondeterministic (it could depend on scheduling decisions). To fix this, if a
task uses a one of these shared objects, then that shared object will be
reinitialized after the task finishes. Since the initialization may be
expensive, the user can pass in custom reinitialization code that resets the
state of the shared variable to the way it was after initialization. If the
reinitialization code does not do this, then the behavior of the overall
program is undefined.
Attributes:
_names (List[str]): A list of the names of all the reusable variables.
_names (List[str]): A list of the names of all the environment variables.
_reinitializers (Dict[str, Callable]): A dictionary mapping the name of the
reusable variables to the corresponding reinitializer.
environment variables to the corresponding reinitializer.
_running_remote_function_locally (bool): A flag used to indicate if a remote
function is running locally on the driver so that we can simulate the same
behavior as running a remote function remotely.
_reusables: A dictionary mapping the name of a reusable variable to the
value of the reusable variable.
_local_mode_reusables: A copy of _reusables used on the driver when running
remote functions locally on the driver. This is needed because there are
two ways in which reusable variables can be used on the driver. The first
is that the driver's copy can be manipulated. This copy is never reset
(think of the driver as a single long-running task). The second way is
that a remote function can be run locally on the driver, and this remote
function needs access to a copy of the reusable variable, and that copy
must be reinitialized after use.
_cached_reusables (List[Tuple[str, Reusable]]): A list of pairs. The first
element of each pair is the name of a reusable variable, and the second
element is the Reusable object. This list is used to store reusable
variables that are defined before the driver is connected. Once the driver
is connected, these variables will be exported.
_used (List[str]): A list of the names of all the reusable variables that
_environment_variables: A dictionary mapping the name of an environment
variable to the value of the environment variable.
_local_mode_environment_variables: A copy of _environment_variables used on
the driver when running remote functions locally on the driver. This is
needed because there are two ways in which environment variables can be
used on the driver. The first is that the driver's copy can be
manipulated. This copy is never reset (think of the driver as a single
long-running task). The second way is that a remote function can be run
locally on the driver, and this remote function needs access to a copy of
the environment variable, and that copy must be reinitialized after use.
_cached_environment_variables (List[Tuple[str, EnvironmentVariable]]): A
list of pairs. The first element of each pair is the name of an
environment variable, and the second element is the EnvironmentVariable
object. This list is used to store environment variables that are defined
before the driver is connected. Once the driver is connected, these
variables will be exported.
_used (List[str]): A list of the names of all the environment variables that
have been accessed within the scope of the current task. This is reset to
the empty list after each task.
"""
def __init__(self):
"""Initialize a RayReusables object."""
"""Initialize an RayEnvironmentVariables object."""
self._names = set()
self._reinitializers = {}
self._running_remote_function_locally = False
self._reusables = {}
self._local_mode_reusables = {}
self._cached_reusables = []
self._environment_variables = {}
self._local_mode_environment_variables = {}
self._cached_environment_variables = []
self._used = set()
self._slots = ("_names", "_reinitializers", "_running_remote_function_locally", "_reusables", "_local_mode_reusables", "_cached_reusables", "_used", "_slots", "_create_reusable_variable", "_reinitialize", "__getattribute__", "__setattr__", "__delattr__")
self._slots = ("_names", "_reinitializers", "_running_remote_function_locally", "_environment_variables", "_local_mode_environment_variables", "_cached_environment_variables", "_used", "_slots", "_create_environment_variable", "_reinitialize", "__getattribute__", "__setattr__", "__delattr__")
# CHECKPOINT: Attributes must not be added after _slots. The above attributes are protected from deletion.
def _create_reusable_variable(self, name, reusable):
"""Create a reusable variable locally.
def _create_environment_variable(self, name, environment_variable):
"""Create an environment variable locally.
Args:
name (str): The name of the reusable variable.
reusable (Reusable): The reusable object to use to create the reusable
variable.
name (str): The name of the environment variable.
environment_variable (EnvironmentVariable): The environment variable
object to use to create the environment variable variable.
"""
self._names.add(name)
self._reinitializers[name] = reusable.reinitializer
self._reusables[name] = reusable.initializer()
# We create a second copy of the reusable variable on the driver to use
self._reinitializers[name] = environment_variable.reinitializer
self._environment_variables[name] = environment_variable.initializer()
# We create a second copy of the environment variable on the driver to use
# inside of remote functions that run locally. This occurs when we start Ray
# in PYTHON_MODE and when we call a remote function locally.
if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]:
self._local_mode_reusables[name] = reusable.initializer()
self._local_mode_environment_variables[name] = environment_variable.initializer()
def _reinitialize(self):
"""Reinitialize the reusable variables that the current task used."""
"""Reinitialize the environment variables that the current task used."""
for name in self._used:
current_value = self._reusables[name]
current_value = self._environment_variables[name]
new_value = self._reinitializers[name](current_value)
# If we are on the driver, reset the copy of the reusable variable in the
# _local_mode_reusables dictionary.
# If we are on the driver, reset the copy of the environment variable in
# the _local_mode_environment_variables dictionary.
if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]:
assert self._running_remote_function_locally
self._local_mode_reusables[name] = new_value
self._local_mode_environment_variables[name] = new_value
else:
self._reusables[name] = new_value
self._environment_variables[name] = new_value
self._used.clear() # Reset the _used list.
def __getattribute__(self, name):
"""Get an attribute. This handles reusable variables as a special case.
"""Get an attribute. This handles environment variables as a special case.
When __getattribute__ is called with the name of a reusable variable, that
name is added to the list of variables that were used in the current task.
When __getattribute__ is called with the name of an environment variable,
that name is added to the list of variables that were used in the current
task.
Args:
name (str): The name of the attribute to get.
@@ -270,33 +273,35 @@ class RayReusables(object):
return object.__getattribute__(self, name)
if name in self._slots:
return object.__getattribute__(self, name)
# Handle various fields that are not reusable variables.
# Handle various fields that are not environment variables.
if name not in self._names:
return object.__getattribute__(self, name)
# Make a note of the fact that the reusable variable has been used.
# Make a note of the fact that the environment variable has been used.
if name in self._names and name not in self._used:
self._used.add(name)
if self._running_remote_function_locally:
return self._local_mode_reusables[name]
return self._local_mode_environment_variables[name]
else:
return self._reusables[name]
return self._environment_variables[name]
def __setattr__(self, name, value):
"""Set an attribute. This handles reusable variables as a special case.
"""Set an attribute. This handles environment variables as a special case.
This is used to create reusable variables. When it is called, it runs the
This is used to create environment variables. When it is called, it runs the
function for initializing the variable to create the variable. If this is
called on the driver, then the functions for initializing and reinitializing
the variable are shipped to the workers.
If this is called before ray.init has been run, then the reusable variable
will be cached and it will be created and exported when connect is called.
If this is called before ray.init has been run, then the environment
variable will be cached and it will be created and exported when connect is
called.
Args:
name (str): The name of the attribute to set. This is either a whitelisted
name or it is treated as the name of a reusable variable.
name or it is treated as the name of an environment variable.
value: If name is a whitelisted name, then value can be any value. If name
is the name of a reusable variable, then this is a Reusable object.
is the name of an environment variable, then this is an
EnvironmentVariable object.
"""
try:
slots = self._slots
@@ -306,32 +311,32 @@ class RayReusables(object):
return object.__setattr__(self, name, value)
if name in slots:
return object.__setattr__(self, name, value)
reusable = value
if not issubclass(type(reusable), Reusable):
raise Exception("To set a reusable variable, you must pass in a Reusable object")
# If ray.init has not been called, cache the reusable variable to export
# later. Otherwise, export the reusable variable to the workers and define
# it locally.
environment_variable = value
if not issubclass(type(environment_variable), EnvironmentVariable):
raise Exception("To set an environment variable, you must pass in an EnvironmentVariable object")
# If ray.init has not been called, cache the environment variable to export
# later. Otherwise, export the environment variable to the workers and
# define it locally.
if _mode() is None:
self._cached_reusables.append((name, reusable))
self._cached_environment_variables.append((name, environment_variable))
else:
# If we are on the driver, export the reusable variable to all the
# If we are on the driver, export the environment variable to all the
# workers.
if _mode() in [SCRIPT_MODE, SILENT_MODE]:
_export_reusable_variable(name, reusable)
# Define the reusable variable locally.
self._create_reusable_variable(name, reusable)
# Create an empty attribute with the name of the reusable variable. This
# allows the Python interpreter to do tab complete properly.
_export_environment_variable(name, environment_variable)
# Define the environment variable locally.
self._create_environment_variable(name, environment_variable)
# Create an empty attribute with the name of the environment variable.
# This allows the Python interpreter to do tab complete properly.
object.__setattr__(self, name, None)
def __delattr__(self, name):
"""We do not allow attributes of RayReusables to be deleted.
"""We do not allow attributes of RayEnvironmentVariables to be deleted.
Args:
name (str): The name of the attribute to delete.
"""
raise Exception("Attempted deletion of attribute {}. Attributes of a RayReusable object may not be deleted.".format(name))
raise Exception("Attempted deletion of attribute {}. Attributes of a RayEnvironmentVariables object may not be deleted.".format(name))
class Worker(object):
"""A class used to define the control flow of a worker process.
@@ -518,14 +523,14 @@ We use a global Worker object to ensure that there is a single worker object
per worker process.
"""
reusables = RayReusables()
"""RayReusables: The reusable variables that are shared between tasks.
env = RayEnvironmentVariables()
"""RayEnvironmentVariables: The environment variables that are shared by tasks.
Each worker process has its own RayReusables object, and these objects should be
the same in all workers. This is used for storing variables that are not
serializable but must be used by remote tasks. In addition, it is used to
reinitialize these variables after they are used so that changes to their state
made by one task do not affect other tasks.
Each worker process has its own RayEnvironmentVariables object, and these
objects should be the same in all workers. This is used for storing variables
that are not serializable but must be used by remote tasks. In addition, it is
used to reinitialize these variables after they are used so that changes to
their state made by one task do not affect other tasks.
"""
class RayConnectionError(Exception):
@@ -548,7 +553,7 @@ def check_connected(worker=global_worker):
Exception: An exception is raised if the worker is not connected.
"""
if not worker.connected:
raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(num_workers=10)'.")
raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init()'.")
def print_failed_task(task_status):
"""Print information about failed tasks.
@@ -570,8 +575,8 @@ def error_info(worker=global_worker):
check_connected(worker)
result = {b"TaskError": [],
b"RemoteFunctionImportError": [],
b"ReusableVariableImportError": [],
b"ReusableVariableReinitializeError": [],
b"EnvironmentVariableImportError": [],
b"EnvironmentVariableReinitializeError": [],
b"FunctionToRunError": [],
b"GenericWarning": [],
}
@@ -911,21 +916,21 @@ def fetch_and_register_remote_function(key, worker=global_worker):
# Add the function to the function table.
worker.redis_client.rpush("FunctionTable:{}".format(function_id.id()), worker.worker_id)
def fetch_and_register_reusable_variable(key, worker=global_worker):
"""Import a reusable variable."""
reusable_variable_name, serialized_initializer, serialized_reinitializer = worker.redis_client.hmget(key, ["name", "initializer", "reinitializer"])
reusable_variable_name = reusable_variable_name.decode("ascii")
def fetch_and_register_environment_variable(key, worker=global_worker):
"""Import an environment variable."""
environment_variable_name, serialized_initializer, serialized_reinitializer = worker.redis_client.hmget(key, ["name", "initializer", "reinitializer"])
environment_variable_name = environment_variable_name.decode("ascii")
try:
initializer = pickling.loads(serialized_initializer)
reinitializer = pickling.loads(serialized_reinitializer)
reusables.__setattr__(reusable_variable_name, Reusable(initializer, reinitializer))
env.__setattr__(environment_variable_name, EnvironmentVariable(initializer, reinitializer))
except:
# If an exception was thrown when the reusable variable was imported, we
# If an exception was thrown when the environment variable was imported, we
# record the traceback and notify the scheduler of the failure.
traceback_str = format_error_message(traceback.format_exc())
# Log the error message.
error_key = "ReusableVariableImportError:{}".format(random_string())
worker.redis_client.hmset(error_key, {"name": reusable_variable_name,
error_key = "EnvironmentVariableImportError:{}".format(random_string())
worker.redis_client.hmset(error_key, {"name": environment_variable_name,
"message": traceback_str})
worker.redis_client.rpush("ErrorKeys", error_key)
@@ -968,8 +973,8 @@ def import_thread(worker):
for key in export_keys:
if key.startswith(b"RemoteFunction"):
fetch_and_register_remote_function(key, worker=worker)
elif key.startswith(b"ReusableVariables"):
fetch_and_register_reusable_variable(key, worker=worker)
elif key.startswith(b"EnvironmentVariables"):
fetch_and_register_environment_variable(key, worker=worker)
elif key.startswith(b"FunctionsToRun"):
fetch_and_execute_function_to_run(key, worker=worker)
else:
@@ -989,9 +994,9 @@ def import_thread(worker):
if key.startswith(b"RemoteFunction"):
with log_span("ray:import_remote_function", worker=worker):
fetch_and_register_remote_function(key, worker=worker)
elif key.startswith(b"ReusableVariables"):
with log_span("ray:import_reusable_variable", worker=worker):
fetch_and_register_reusable_variable(key, worker=worker)
elif key.startswith(b"EnvironmentVariables"):
with log_span("ray:import_environment_variable", worker=worker):
fetch_and_register_environment_variable(key, worker=worker)
elif key.startswith(b"FunctionsToRun"):
with log_span("ray:import_function_to_run", worker=worker):
fetch_and_execute_function_to_run(key, worker=worker)
@@ -1015,7 +1020,7 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
assert not worker.connected, error_message
assert worker.cached_functions_to_run is not None, error_message
assert worker.cached_remote_functions is not None, error_message
assert reusables._cached_reusables is not None, error_message
assert env._cached_environment_variables is not None, error_message
# Initialize some fields.
worker.worker_id = random_string()
worker.connected = True
@@ -1093,28 +1098,28 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
current_directory = os.path.abspath(os.path.curdir)
worker.run_function_on_all_workers(lambda worker_info: sys.path.insert(1, script_directory))
worker.run_function_on_all_workers(lambda worker_info: sys.path.insert(1, current_directory))
# TODO(rkn): Here we first export functions to run, then reusable variables,
# then remote functions. The order matters. For example, one of the
# functions to run may set the Python path, which is needed to import a
# module used to define a reusable variable, which in turn is used inside a
# remote function. We may want to change the order to simply be the order in
# which the exports were defined on the driver. In addition, we will need to
# retain the ability to decide what the first few exports are (mostly to set
# the Python path). Additionally, note that the first exports to be defined
# on the driver will be the ones defined in separate modules that are
# imported by the driver.
# TODO(rkn): Here we first export functions to run, then environment
# variables, then remote functions. The order matters. For example, one of
# the functions to run may set the Python path, which is needed to import a
# module used to define an environment variable, which in turn is used
# inside a remote function. We may want to change the order to simply be the
# order in which the exports were defined on the driver. In addition, we
# will need to retain the ability to decide what the first few exports are
# (mostly to set the Python path). Additionally, note that the first exports
# to be defined on the driver will be the ones defined in separate modules
# that are imported by the driver.
# Export cached functions_to_run.
for function in worker.cached_functions_to_run:
worker.run_function_on_all_workers(function)
# Export cached reusable variables to the workers.
for name, reusable_variable in reusables._cached_reusables:
reusables.__setattr__(name, reusable_variable)
# Export cached environment variables to the workers.
for name, environment_variable in env._cached_environment_variables:
env.__setattr__(name, environment_variable)
# Export cached remote functions to the workers.
for function_id, func_name, func, num_return_vals in worker.cached_remote_functions:
export_remote_function(function_id, func_name, func, num_return_vals, worker)
worker.cached_functions_to_run = None
worker.cached_remote_functions = None
reusables._cached_reusables = None
env._cached_environment_variables = None
def disconnect(worker=global_worker):
"""Disconnect this worker from the scheduler and object store."""
@@ -1124,7 +1129,7 @@ def disconnect(worker=global_worker):
worker.connected = False
worker.cached_functions_to_run = []
worker.cached_remote_functions = []
reusables._cached_reusables = []
env._cached_environment_variables = []
def register_class(cls, pickle=False, worker=global_worker):
"""Enable workers to serialize or deserialize objects of a particular class.
@@ -1367,7 +1372,7 @@ def main_loop(worker=global_worker):
This method is an infinite loop. It waits to receive commands from the
scheduler. A command may consist of a task to execute, a remote function to
import, a reusable variable to import, or an order to terminate the worker
import, an environment variable to import, or an order to terminate the worker
process. The worker executes the command, notifies the scheduler of any errors
that occurred while executing the command, and waits for the next command.
"""
@@ -1380,8 +1385,8 @@ def main_loop(worker=global_worker):
store. If the task throws an exception, RayTaskError objects are stored in
the object store to represent the failed task (these will be retrieved by
calls to get or by subsequent tasks that use the outputs of this task).
After the task executes, the worker resets any reusable variables that were
accessed by the task.
After the task executes, the worker resets any environment variables that
were accessed by the task.
"""
try:
worker.current_task_id = task.task_id()
@@ -1430,15 +1435,15 @@ def main_loop(worker=global_worker):
"message": str(failure_object)})
worker.redis_client.rpush("ErrorKeys", error_key)
try:
# Reinitialize the values of reusable variables that were used in the task
# above so that changes made to their state do not affect other tasks.
with log_span("ray:task:reinitialize_reusables", worker=worker):
reusables._reinitialize()
# Reinitialize the values of environment variables that were used in the
# task above so that changes made to their state do not affect other tasks.
with log_span("ray:task:reinitialize_environment_variables", worker=worker):
env._reinitialize()
except Exception as e:
# The attempt to reinitialize the reusable variables threw an exception.
# We record the traceback and notify the scheduler.
# The attempt to reinitialize the environment variables threw an
# exception. We record the traceback and notify the scheduler.
traceback_str = format_error_message(traceback.format_exc())
error_key = "ReusableVariableReinitializeError:{}".format(random_string())
error_key = "EnvironmentVariableReinitializeError:{}".format(random_string())
worker.redis_client.hmset(error_key, {"task_id": "NOTIMPLEMENTED",
"function_id": function_id.id(),
"function_name": function_name,
@@ -1499,30 +1504,32 @@ def _mode(worker=global_worker):
"""
return worker.mode
def _reusables():
"""Return the reusables object.
def _env():
"""Return the env object.
We use this wrapper because so that functions which use the reusables variable
can be pickled.
We use this wrapper because so that functions which use the env object can be
pickled.
"""
return reusables
return env
def _export_reusable_variable(name, reusable, worker=global_worker):
"""Export a reusable variable to the workers. This is only called by a driver.
def _export_environment_variable(name, environment_variable, worker=global_worker):
"""Export an environment variable to the workers.
This is only called by a driver.
Args:
name (str): The name of the variable to export.
reusable (Reusable): The reusable object containing code for initializing
and reinitializing the variable.
environment_variable (EnvironmentVariable): The environment variable object
containing code for initializing and reinitializing the variable.
"""
check_main_thread()
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
raise Exception("_export_reusable_variable can only be called on a driver.")
reusable_variable_id = name
key = "ReusableVariables:{}".format(reusable_variable_id)
raise Exception("_export_environment_variable can only be called on a driver.")
environment_variable_id = name
key = "EnvironmentVariables:{}".format(environment_variable_id)
worker.redis_client.hmset(key, {"name": name,
"initializer": pickling.dumps(reusable.initializer),
"reinitializer": pickling.dumps(reusable.reinitializer)})
"initializer": pickling.dumps(environment_variable.initializer),
"reinitializer": pickling.dumps(environment_variable.reinitializer)})
worker.redis_client.rpush("Exports", key)
worker.driver_export_counter += 1
@@ -1571,11 +1578,11 @@ def remote(*args, **kwargs):
# arguments to prevent the function call from mutating them and to match
# the usual behavior of immutable remote objects.
try:
_reusables()._running_remote_function_locally = True
_env()._running_remote_function_locally = True
result = func(*copy.deepcopy(args))
finally:
_reusables()._reinitialize()
_reusables()._running_remote_function_locally = False
_env()._reinitialize()
_env()._running_remote_function_locally = False
return result
objectids = _submit_task(function_id, func_name, args)
if len(objectids) == 1: