mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 21:23:10 +08:00
Remove Ray environment variables from codebase. (#590)
This commit is contained in:
committed by
Philipp Moritz
parent
c647dd5f6c
commit
c5bc76193f
@@ -6,7 +6,6 @@ from ray.worker import (register_class, error_info, init, connect, disconnect,
|
||||
get, put, wait, remote, log_event, log_span,
|
||||
flush_log, get_gpu_ids)
|
||||
from ray.actor import actor
|
||||
from ray.worker import EnvironmentVariable, env
|
||||
from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE
|
||||
from ray.worker import global_state
|
||||
|
||||
@@ -16,9 +15,8 @@ __version__ = "0.1.0"
|
||||
|
||||
__all__ = ["register_class", "error_info", "init", "connect", "disconnect",
|
||||
"get", "put", "wait", "remote", "log_event", "log_span",
|
||||
"flush_log", "actor", "get_gpu_ids", "EnvironmentVariable", "env",
|
||||
"SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE",
|
||||
"global_state", "__version__"]
|
||||
"flush_log", "actor", "get_gpu_ids", "SCRIPT_MODE", "WORKER_MODE",
|
||||
"PYTHON_MODE", "SILENT_MODE", "global_state", "__version__"]
|
||||
|
||||
import ctypes
|
||||
# Windows only
|
||||
|
||||
+11
-337
@@ -186,223 +186,6 @@ class RayGetArgumentError(Exception):
|
||||
colorama.Fore.RESET, self.task_error))
|
||||
|
||||
|
||||
class EnvironmentVariable(object):
|
||||
"""An Python object that can be shared between tasks.
|
||||
|
||||
Attributes:
|
||||
initializer (Callable[[], object]): A function used to create and
|
||||
initialize the environment variable.
|
||||
reinitializer (Optional[Callable[[object], object]]): An optional function
|
||||
used to reinitialize the environment variable after it has been used.
|
||||
This argument can be used as an optimization if there is a fast way to
|
||||
reinitialize the state of the variable other than rerunning the
|
||||
initializer.
|
||||
"""
|
||||
|
||||
def __init__(self, initializer, reinitializer=None):
|
||||
"""Initialize an EnvironmentVariable object."""
|
||||
if not callable(initializer):
|
||||
raise Exception("When creating an EnvironmentVariable, initializer must "
|
||||
"be a function.")
|
||||
self.initializer = initializer
|
||||
if reinitializer is None:
|
||||
# If no reinitializer is passed in, use a wrapped version of the
|
||||
# initializer.
|
||||
def reinitializer(value):
|
||||
return initializer()
|
||||
if not callable(reinitializer):
|
||||
raise Exception("When creating an EnvironmentVariable, reinitializer "
|
||||
"must be a function.")
|
||||
self.reinitializer = reinitializer
|
||||
|
||||
|
||||
class RayEnvironmentVariables(object):
|
||||
"""An object used to store Python variables that are shared between tasks.
|
||||
|
||||
Each worker process will have a single RayEnvironmentVariables object. This
|
||||
class serves two purposes. First, some objects are not serializable, and so
|
||||
the code that creates those objects must be run on the worker that uses them.
|
||||
This class is responsible for running the code that creates those objects.
|
||||
Second, some of these objects are expensive to create, and so they should be
|
||||
shared between tasks. However, if a task mutates a variable that is shared
|
||||
between tasks, then the behavior of the overall program may be
|
||||
nondeterministic (it could depend on scheduling decisions). To fix this, if a
|
||||
task uses a one of these shared objects, then that shared object will be
|
||||
reinitialized after the task finishes. Since the initialization may be
|
||||
expensive, the user can pass in custom reinitialization code that resets the
|
||||
state of the shared variable to the way it was after initialization. If the
|
||||
reinitialization code does not do this, then the behavior of the overall
|
||||
program is undefined.
|
||||
|
||||
Attributes:
|
||||
_names (List[str]): A list of the names of all the environment variables.
|
||||
_reinitializers (Dict[str, Callable]): A dictionary mapping the name of the
|
||||
environment variables to the corresponding reinitializer.
|
||||
_running_remote_function_locally (bool): A flag used to indicate if a
|
||||
remote function is running locally on the driver so that we can simulate
|
||||
the same behavior as running a remote function remotely.
|
||||
_environment_variables: A dictionary mapping the name of an environment
|
||||
variable to the value of the environment variable.
|
||||
_local_mode_environment_variables: A copy of _environment_variables used on
|
||||
the driver when running remote functions locally on the driver. This is
|
||||
needed because there are two ways in which environment variables can be
|
||||
used on the driver. The first is that the driver's copy can be
|
||||
manipulated. This copy is never reset (think of the driver as a single
|
||||
long-running task). The second way is that a remote function can be run
|
||||
locally on the driver, and this remote function needs access to a copy of
|
||||
the environment variable, and that copy must be reinitialized after use.
|
||||
_cached_environment_variables (List[Tuple[str, EnvironmentVariable]]): A
|
||||
list of pairs. The first element of each pair is the name of an
|
||||
environment variable, and the second element is the EnvironmentVariable
|
||||
object. This list is used to store environment variables that are defined
|
||||
before the driver is connected. Once the driver is connected, these
|
||||
variables will be exported.
|
||||
_used (List[str]): A list of the names of all the environment variables
|
||||
that have been accessed within the scope of the current task. This is
|
||||
reset to the empty list after each task.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize an RayEnvironmentVariables object."""
|
||||
self._names = set()
|
||||
self._reinitializers = {}
|
||||
self._running_remote_function_locally = False
|
||||
self._environment_variables = {}
|
||||
self._local_mode_environment_variables = {}
|
||||
self._cached_environment_variables = []
|
||||
self._used = set()
|
||||
self._slots = ("_names",
|
||||
"_reinitializers",
|
||||
"_running_remote_function_locally",
|
||||
"_environment_variables",
|
||||
"_local_mode_environment_variables",
|
||||
"_cached_environment_variables",
|
||||
"_used",
|
||||
"_slots",
|
||||
"_create_environment_variable",
|
||||
"_reinitialize",
|
||||
"__getattribute__",
|
||||
"__setattr__",
|
||||
"__delattr__")
|
||||
# CHECKPOINT: Attributes must not be added after _slots. The above
|
||||
# attributes are protected from deletion.
|
||||
|
||||
def _create_environment_variable(self, name, environment_variable):
|
||||
"""Create an environment variable locally.
|
||||
|
||||
Args:
|
||||
name (str): The name of the environment variable.
|
||||
environment_variable (EnvironmentVariable): The environment variable
|
||||
object to use to create the environment variable variable.
|
||||
"""
|
||||
self._names.add(name)
|
||||
self._reinitializers[name] = environment_variable.reinitializer
|
||||
self._environment_variables[name] = environment_variable.initializer()
|
||||
# We create a second copy of the environment variable on the driver to use
|
||||
# inside of remote functions that run locally. This occurs when we start
|
||||
# Ray in PYTHON_MODE and when we call a remote function locally.
|
||||
if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]:
|
||||
self._local_mode_environment_variables[name] = (environment_variable
|
||||
.initializer())
|
||||
|
||||
def _reinitialize(self):
|
||||
"""Reinitialize the environment variables that the current task used."""
|
||||
for name in self._used:
|
||||
current_value = self._environment_variables[name]
|
||||
new_value = self._reinitializers[name](current_value)
|
||||
# If we are on the driver, reset the copy of the environment variable in
|
||||
# the _local_mode_environment_variables dictionary.
|
||||
if _mode() in [SCRIPT_MODE, SILENT_MODE, PYTHON_MODE]:
|
||||
assert self._running_remote_function_locally
|
||||
self._local_mode_environment_variables[name] = new_value
|
||||
else:
|
||||
self._environment_variables[name] = new_value
|
||||
self._used.clear() # Reset the _used list.
|
||||
|
||||
def __getattribute__(self, name):
|
||||
"""Get an attribute. This handles environment variables as a special case.
|
||||
|
||||
When __getattribute__ is called with the name of an environment variable,
|
||||
that name is added to the list of variables that were used in the current
|
||||
task.
|
||||
|
||||
Args:
|
||||
name (str): The name of the attribute to get.
|
||||
"""
|
||||
if name == "_slots":
|
||||
return object.__getattribute__(self, name)
|
||||
if name in self._slots:
|
||||
return object.__getattribute__(self, name)
|
||||
# Handle various fields that are not environment variables.
|
||||
if name not in self._names:
|
||||
return object.__getattribute__(self, name)
|
||||
# Make a note of the fact that the environment variable has been used.
|
||||
if name in self._names and name not in self._used:
|
||||
self._used.add(name)
|
||||
if self._running_remote_function_locally:
|
||||
return self._local_mode_environment_variables[name]
|
||||
else:
|
||||
return self._environment_variables[name]
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
"""Set an attribute. This handles environment variables as a special case.
|
||||
|
||||
This is used to create environment variables. When it is called, it runs
|
||||
the function for initializing the variable to create the variable. If this
|
||||
is called on the driver, then the functions for initializing and
|
||||
reinitializing the variable are shipped to the workers.
|
||||
|
||||
If this is called before ray.init has been run, then the environment
|
||||
variable will be cached and it will be created and exported when connect is
|
||||
called.
|
||||
|
||||
Args:
|
||||
name (str): The name of the attribute to set. This is either a
|
||||
whitelisted name or it is treated as the name of an environment
|
||||
variable.
|
||||
value: If name is a whitelisted name, then value can be any value. If
|
||||
name is the name of an environment variable, then this is an
|
||||
EnvironmentVariable object.
|
||||
"""
|
||||
try:
|
||||
slots = self._slots
|
||||
except AttributeError:
|
||||
slots = ()
|
||||
if slots == ():
|
||||
return object.__setattr__(self, name, value)
|
||||
if name in slots:
|
||||
return object.__setattr__(self, name, value)
|
||||
environment_variable = value
|
||||
if not issubclass(type(environment_variable), EnvironmentVariable):
|
||||
raise Exception("To set an environment variable, you must pass in an "
|
||||
"EnvironmentVariable object")
|
||||
# If ray.init has not been called, cache the environment variable to export
|
||||
# later. Otherwise, export the environment variable to the workers and
|
||||
# define it locally.
|
||||
if _mode() is None:
|
||||
self._cached_environment_variables.append((name, environment_variable))
|
||||
else:
|
||||
# If we are on the driver, export the environment variable to all the
|
||||
# workers.
|
||||
if _mode() in [SCRIPT_MODE, SILENT_MODE]:
|
||||
_export_environment_variable(name, environment_variable)
|
||||
# Define the environment variable locally.
|
||||
self._create_environment_variable(name, environment_variable)
|
||||
# Create an empty attribute with the name of the environment variable.
|
||||
# This allows the Python interpreter to do tab complete properly.
|
||||
object.__setattr__(self, name, None)
|
||||
|
||||
def __delattr__(self, name):
|
||||
"""We do not allow attributes of RayEnvironmentVariables to be deleted.
|
||||
|
||||
Args:
|
||||
name (str): The name of the attribute to delete.
|
||||
"""
|
||||
raise Exception("Attempted deletion of attribute {}. Attributes of a "
|
||||
"RayEnvironmentVariables object may not be deleted."
|
||||
.format(name))
|
||||
|
||||
|
||||
class Worker(object):
|
||||
"""A class used to define the control flow of a worker process.
|
||||
|
||||
@@ -766,16 +549,6 @@ per worker process.
|
||||
|
||||
global_state = state.GlobalState()
|
||||
|
||||
env = RayEnvironmentVariables()
|
||||
"""RayEnvironmentVariables: The environment variables that are shared by tasks.
|
||||
|
||||
Each worker process has its own RayEnvironmentVariables object, and these
|
||||
objects should be the same in all workers. This is used for storing variables
|
||||
that are not serializable but must be used by remote tasks. In addition, it is
|
||||
used to reinitialize these variables after they are used so that changes to
|
||||
their state made by one task do not affect other tasks.
|
||||
"""
|
||||
|
||||
|
||||
class RayConnectionError(Exception):
|
||||
pass
|
||||
@@ -1334,27 +1107,6 @@ def fetch_and_register_remote_function(key, worker=global_worker):
|
||||
worker.worker_id)
|
||||
|
||||
|
||||
def fetch_and_register_environment_variable(key, worker=global_worker):
|
||||
"""Import an environment variable."""
|
||||
(driver_id, environment_variable_name, serialized_initializer,
|
||||
serialized_reinitializer) = worker.redis_client.hmget(
|
||||
key, ["driver_id", "name", "initializer", "reinitializer"])
|
||||
environment_variable_name = environment_variable_name.decode("ascii")
|
||||
try:
|
||||
initializer = pickling.loads(serialized_initializer)
|
||||
reinitializer = pickling.loads(serialized_reinitializer)
|
||||
env.__setattr__(environment_variable_name,
|
||||
EnvironmentVariable(initializer, reinitializer))
|
||||
except:
|
||||
# If an exception was thrown when the environment variable was imported, we
|
||||
# record the traceback and notify the scheduler of the failure.
|
||||
traceback_str = format_error_message(traceback.format_exc())
|
||||
# Log the error message.
|
||||
worker.push_error_to_driver(driver_id, "register_environment_variable",
|
||||
traceback_str,
|
||||
data={"name": environment_variable_name})
|
||||
|
||||
|
||||
def fetch_and_execute_function_to_run(key, worker=global_worker):
|
||||
"""Run on arbitrary function on the worker."""
|
||||
driver_id, serialized_function = worker.redis_client.hmget(
|
||||
@@ -1404,8 +1156,6 @@ def import_thread(worker, mode):
|
||||
|
||||
if key.startswith(b"RemoteFunction"):
|
||||
fetch_and_register_remote_function(key, worker=worker)
|
||||
elif key.startswith(b"EnvironmentVariables"):
|
||||
fetch_and_register_environment_variable(key, worker=worker)
|
||||
elif key.startswith(b"FunctionsToRun"):
|
||||
fetch_and_execute_function_to_run(key, worker=worker)
|
||||
elif key.startswith(b"ActorClass"):
|
||||
@@ -1441,9 +1191,6 @@ def import_thread(worker, mode):
|
||||
if key.startswith(b"RemoteFunction"):
|
||||
with log_span("ray:import_remote_function", worker=worker):
|
||||
fetch_and_register_remote_function(key, worker=worker)
|
||||
elif key.startswith(b"EnvironmentVariables"):
|
||||
with log_span("ray:import_environment_variable", worker=worker):
|
||||
fetch_and_register_environment_variable(key, worker=worker)
|
||||
elif key.startswith(b"FunctionsToRun"):
|
||||
with log_span("ray:import_function_to_run", worker=worker):
|
||||
fetch_and_execute_function_to_run(key, worker=worker)
|
||||
@@ -1481,7 +1228,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
|
||||
assert not worker.connected, error_message
|
||||
assert worker.cached_functions_to_run is not None, error_message
|
||||
assert worker.cached_remote_functions is not None, error_message
|
||||
assert env._cached_environment_variables is not None, error_message
|
||||
# Initialize some fields.
|
||||
worker.worker_id = random_string()
|
||||
worker.actor_id = actor_id
|
||||
@@ -1640,22 +1386,18 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
|
||||
lambda worker_info: sys.path.insert(1, script_directory))
|
||||
worker.run_function_on_all_workers(
|
||||
lambda worker_info: sys.path.insert(1, current_directory))
|
||||
# TODO(rkn): Here we first export functions to run, then environment
|
||||
# variables, then remote functions. The order matters. For example, one of
|
||||
# the functions to run may set the Python path, which is needed to import a
|
||||
# module used to define an environment variable, which in turn is used
|
||||
# inside a remote function. We may want to change the order to simply be
|
||||
# the order in which the exports were defined on the driver. In addition,
|
||||
# we will need to retain the ability to decide what the first few exports
|
||||
# are (mostly to set the Python path). Additionally, note that the first
|
||||
# exports to be defined on the driver will be the ones defined in separate
|
||||
# modules that are imported by the driver.
|
||||
# TODO(rkn): Here we first export functions to run, then remote functions.
|
||||
# The order matters. For example, one of the functions to run may set the
|
||||
# Python path, which is needed to import a module used to define a remote
|
||||
# function. We may want to change the order to simply be the order in which
|
||||
# the exports were defined on the driver. In addition, we will need to
|
||||
# retain the ability to decide what the first few exports are (mostly to
|
||||
# set the Python path). Additionally, note that the first exports to be
|
||||
# defined on the driver will be the ones defined in separate modules that
|
||||
# are imported by the driver.
|
||||
# Export cached functions_to_run.
|
||||
for function in worker.cached_functions_to_run:
|
||||
worker.run_function_on_all_workers(function)
|
||||
# Export cached environment variables to the workers.
|
||||
for name, environment_variable in env._cached_environment_variables:
|
||||
env.__setattr__(name, environment_variable)
|
||||
# Export cached remote functions to the workers.
|
||||
for info in worker.cached_remote_functions:
|
||||
(function_id, func_name, func,
|
||||
@@ -1664,7 +1406,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
|
||||
num_return_vals, num_cpus, num_gpus, worker)
|
||||
worker.cached_functions_to_run = None
|
||||
worker.cached_remote_functions = None
|
||||
env._cached_environment_variables = None
|
||||
|
||||
|
||||
def disconnect(worker=global_worker):
|
||||
@@ -1675,7 +1416,6 @@ def disconnect(worker=global_worker):
|
||||
worker.connected = False
|
||||
worker.cached_functions_to_run = []
|
||||
worker.cached_remote_functions = []
|
||||
env._cached_environment_variables = []
|
||||
serialization.clear_state()
|
||||
|
||||
|
||||
@@ -1949,15 +1689,7 @@ def format_error_message(exception_message, task_exception=False):
|
||||
|
||||
|
||||
def main_loop(worker=global_worker):
|
||||
"""The main loop a worker runs to receive and execute tasks.
|
||||
|
||||
This method is an infinite loop. It waits to receive commands from the
|
||||
scheduler. A command may consist of a task to execute, a remote function to
|
||||
import, an environment variable to import, or an order to terminate the
|
||||
worker process. The worker executes the command, notifies the scheduler of
|
||||
any errors that occurred while executing the command, and waits for the next
|
||||
command.
|
||||
"""
|
||||
"""The main loop a worker runs to receive and execute tasks."""
|
||||
|
||||
def exit(signum, frame):
|
||||
cleanup(worker=worker)
|
||||
@@ -1973,8 +1705,6 @@ def main_loop(worker=global_worker):
|
||||
store. If the task throws an exception, RayTaskError objects are stored in
|
||||
the object store to represent the failed task (these will be retrieved by
|
||||
calls to get or by subsequent tasks that use the outputs of this task).
|
||||
After the task executes, the worker resets any environment variables that
|
||||
were accessed by the task.
|
||||
"""
|
||||
try:
|
||||
# The ID of the driver that this task belongs to. This is needed so that
|
||||
@@ -2041,22 +1771,6 @@ def main_loop(worker=global_worker):
|
||||
str(failure_object),
|
||||
data={"function_id": function_id.id(),
|
||||
"function_name": function_name})
|
||||
try:
|
||||
# Reinitialize the values of environment variables that were used in the
|
||||
# task above so that changes made to their state do not affect other
|
||||
# tasks.
|
||||
with log_span("ray:task:reinitialize_environment_variables",
|
||||
worker=worker):
|
||||
env._reinitialize()
|
||||
except Exception as e:
|
||||
# The attempt to reinitialize the environment variables threw an
|
||||
# exception. We record the traceback and notify the scheduler.
|
||||
traceback_str = format_error_message(traceback.format_exc())
|
||||
worker.push_error_to_driver(worker.task_driver_id.id(),
|
||||
"reinitialize_environment_variable",
|
||||
traceback_str,
|
||||
data={"function_id": function_id.id(),
|
||||
"function_name": function_name})
|
||||
|
||||
check_main_thread()
|
||||
while True:
|
||||
@@ -2112,41 +1826,6 @@ def _mode(worker=global_worker):
|
||||
return worker.mode
|
||||
|
||||
|
||||
def _env():
|
||||
"""Return the env object.
|
||||
|
||||
We use this wrapper because so that functions which use the env object can be
|
||||
pickled.
|
||||
"""
|
||||
return env
|
||||
|
||||
|
||||
def _export_environment_variable(name, environment_variable,
|
||||
worker=global_worker):
|
||||
"""Export an environment variable to the workers.
|
||||
|
||||
This is only called by a driver.
|
||||
|
||||
Args:
|
||||
name (str): The name of the variable to export.
|
||||
environment_variable (EnvironmentVariable): The environment variable object
|
||||
containing code for initializing and reinitializing the variable.
|
||||
"""
|
||||
check_main_thread()
|
||||
if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]:
|
||||
raise Exception("_export_environment_variable can only be called on a "
|
||||
"driver.")
|
||||
environment_variable_id = name
|
||||
key = "EnvironmentVariables:{}:{}".format(random_string(),
|
||||
environment_variable_id)
|
||||
worker.redis_client.hmset(key, {
|
||||
"driver_id": worker.task_driver_id.id(),
|
||||
"name": name,
|
||||
"initializer": pickling.dumps(environment_variable.initializer),
|
||||
"reinitializer": pickling.dumps(environment_variable.reinitializer)})
|
||||
worker.redis_client.rpush("Exports", key)
|
||||
|
||||
|
||||
def export_remote_function(function_id, func_name, func, func_invoker,
|
||||
num_return_vals, num_cpus, num_gpus,
|
||||
worker=global_worker):
|
||||
@@ -2259,12 +1938,7 @@ def remote(*args, **kwargs):
|
||||
# In PYTHON_MODE, remote calls simply execute the function. We copy
|
||||
# the arguments to prevent the function call from mutating them and
|
||||
# to match the usual behavior of immutable remote objects.
|
||||
try:
|
||||
_env()._running_remote_function_locally = True
|
||||
result = func(*copy.deepcopy(args))
|
||||
finally:
|
||||
_env()._reinitialize()
|
||||
_env()._running_remote_function_locally = False
|
||||
result = func(*copy.deepcopy(args))
|
||||
return result
|
||||
objectids = _submit_task(function_id, func_name, args)
|
||||
if len(objectids) == 1:
|
||||
|
||||
Reference in New Issue
Block a user