From b7722897b4c341a28002a917ef1d136458f4beec Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 28 Aug 2018 16:45:49 -0700 Subject: [PATCH] Deprecate 'driver_mode' argument. (#2758) * Deprecate 'driver_mode' argument. * Fix * Fix --- python/ray/__init__.py | 6 ++-- python/ray/import_thread.py | 2 +- python/ray/remote_function.py | 2 +- python/ray/worker.py | 60 ++++++++++++++++----------------- test/actor_test.py | 4 +-- test/component_failures_test.py | 2 -- test/failure_test.py | 34 ++++++++----------- test/multi_node_test.py | 8 ++--- test/runtest.py | 10 +++--- 9 files changed, 60 insertions(+), 68 deletions(-) diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 2130f9ff9..fe4881890 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -51,7 +51,7 @@ from ray.profiling import profile # noqa: E402 from ray.worker import (error_info, init, connect, disconnect, get, put, wait, remote, get_gpu_ids, get_resource_ids, get_webui_url, register_custom_serializer, shutdown) # noqa: E402 -from ray.worker import (SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, SILENT_MODE, +from ray.worker import (SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, PYTHON_MODE) # noqa: E402 from ray.worker import global_state # noqa: E402 import ray.internal # noqa: E402 @@ -68,8 +68,8 @@ __all__ = [ "error_info", "init", "connect", "disconnect", "get", "put", "wait", "remote", "profile", "actor", "method", "get_gpu_ids", "get_resource_ids", "get_webui_url", "register_custom_serializer", "shutdown", "SCRIPT_MODE", - "WORKER_MODE", "LOCAL_MODE", "SILENT_MODE", "PYTHON_MODE", "global_state", - "ObjectID", "_config", "__version__", "internal" + "WORKER_MODE", "LOCAL_MODE", "PYTHON_MODE", "global_state", "ObjectID", + "_config", "__version__", "internal" ] import ctypes # noqa: E402 diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 54c44db8f..669ff38a4 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -162,7 +162,7 @@ class ImportThread(object): key, ["driver_id", "function", "run_on_other_drivers"]) if (run_on_other_drivers == "False" - and self.worker.mode in [ray.SCRIPT_MODE, ray.SILENT_MODE] + and self.worker.mode == ray.SCRIPT_MODE and driver_id != self.worker.task_driver_id.id()): return diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index fb963393d..287d3d045 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -90,7 +90,7 @@ class RemoteFunction(object): # # Export the function. worker = ray.worker.get_global_worker() - if worker.mode in [ray.worker.SCRIPT_MODE, ray.worker.SILENT_MODE]: + if worker.mode == ray.worker.SCRIPT_MODE: self._export() elif worker.mode is None: worker.cached_remote_functions_and_actors.append( diff --git a/python/ray/worker.py b/python/ray/worker.py index 6ecac4903..9b73b4c50 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -43,8 +43,7 @@ from ray.utils import ( SCRIPT_MODE = 0 WORKER_MODE = 1 LOCAL_MODE = 2 -SILENT_MODE = 3 -PYTHON_MODE = 4 +PYTHON_MODE = 3 ERROR_KEY_PREFIX = b"Error:" @@ -194,8 +193,8 @@ class Worker(object): function itself. This is the set of remote functions that can be executed by this worker. connected (bool): True if Ray has been started and False otherwise. - mode: The mode of the worker. One of SCRIPT_MODE, LOCAL_MODE, - SILENT_MODE, and WORKER_MODE. + mode: The mode of the worker. One of SCRIPT_MODE, LOCAL_MODE, and + WORKER_MODE. cached_remote_functions_and_actors: A list of information for exporting remote functions and actor classes definitions that were defined before the worker called connect. When the worker eventually does @@ -294,13 +293,8 @@ class Worker(object): debugging purposes. It will not send remote function calls to the scheduler and will insead execute them in a blocking fashion. - The mode SILENT_MODE should be used only during testing. It does not - print any information about errors because some of the tests - intentionally fail. - Args: - mode: One of SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, and - SILENT_MODE. + mode: One of SCRIPT_MODE, WORKER_MODE, and LOCAL_MODE. """ self.mode = mode @@ -680,7 +674,7 @@ class Worker(object): decorated_function: The decorated function (this is used to enable the remote function to recursively call itself). """ - if self.mode not in [SCRIPT_MODE, SILENT_MODE]: + if self.mode != SCRIPT_MODE: raise Exception("export_remote_function can only be called on a " "driver.") @@ -1505,7 +1499,8 @@ def _init(address_info=None, num_workers=None, num_local_schedulers=None, object_store_memory=None, - driver_mode=SCRIPT_MODE, + local_mode=False, + driver_mode=None, redirect_worker_output=False, redirect_output=True, start_workers_from_local_scheduler=True, @@ -1545,8 +1540,8 @@ def _init(address_info=None, This is only provided if start_ray_local is True. object_store_memory: The maximum amount of memory (in bytes) to allow the object store to use. - driver_mode (bool): The mode in which to start the driver. This should - be one of ray.SCRIPT_MODE, ray.LOCAL_MODE, and ray.SILENT_MODE. + local_mode (bool): True if the code should be executed serially + without Ray. This is useful for debugging. redirect_worker_output: True if the stdout and stderr of worker processes should be redirected to files. redirect_output (bool): True if stdout and stderr for non-worker @@ -1581,12 +1576,13 @@ def _init(address_info=None, Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ - if driver_mode == PYTHON_MODE: - raise Exception("ray.PYTHON_MODE has been renamed to ray.LOCAL_MODE. " - "Please use ray.LOCAL_MODE.") - if driver_mode not in [SCRIPT_MODE, LOCAL_MODE, SILENT_MODE]: - raise Exception("Driver_mode must be in [ray.SCRIPT_MODE, " - "ray.LOCAL_MODE, ray.SILENT_MODE].") + if driver_mode is not None: + raise Exception("The 'driver_mode' argument has been deprecated. " + "To run Ray in local mode, pass in local_mode=True.") + if local_mode: + driver_mode = LOCAL_MODE + else: + driver_mode = SCRIPT_MODE if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": # This environment variable is used in our testing setup. @@ -1724,7 +1720,8 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=None, - driver_mode=SCRIPT_MODE, + local_mode=False, + driver_mode=None, redirect_worker_output=False, redirect_output=True, ignore_reinit_error=False, @@ -1779,8 +1776,8 @@ def init(redis_address=None, manner. However, the same ID should not be used for different jobs. num_workers (int): The number of workers to start. This is only provided if redis_address is not provided. - driver_mode (bool): The mode in which to start the driver. This should - be one of ray.SCRIPT_MODE, ray.LOCAL_MODE, and ray.SILENT_MODE. + local_mode (bool): True if the code should be executed serially + without Ray. This is useful for debugging. redirect_worker_output: True if the stdout and stderr of worker processes should be redirected to files. redirect_output (bool): True if stdout and stderr for non-worker @@ -1838,6 +1835,7 @@ def init(redis_address=None, address_info=info, start_ray_local=(redis_address is None), num_workers=num_workers, + local_mode=local_mode, driver_mode=driver_mode, redirect_worker_output=redirect_worker_output, redirect_output=redirect_output, @@ -1885,7 +1883,7 @@ def shutdown(worker=global_worker): if hasattr(worker, "plasma_client"): worker.plasma_client.disconnect() - if worker.mode in [SCRIPT_MODE, SILENT_MODE]: + if worker.mode == SCRIPT_MODE: # If this is a driver, push the finish time to Redis and clean up any # other services that were started with the driver. worker.redis_client.hmset(b"Drivers:" + worker.worker_id, @@ -1912,7 +1910,7 @@ normal_excepthook = sys.excepthook def custom_excepthook(type, value, tb): # If this is a driver, push the exception to redis. - if global_worker.mode in [SCRIPT_MODE, SILENT_MODE]: + if global_worker.mode == SCRIPT_MODE: error_message = "".join(traceback.format_tb(tb)) global_worker.redis_client.hmset(b"Drivers:" + global_worker.worker_id, {"exception": error_message}) @@ -2058,8 +2056,8 @@ def connect(info, sockets of the plasma store, plasma manager, and local scheduler. object_id_seed: A seed to use to make the generation of object IDs deterministic. - mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, - LOCAL_MODE, and SILENT_MODE. + mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, and + LOCAL_MODE. use_raylet: True if the new raylet code path should be used. """ # Do some basic checking to make sure we didn't call ray.init twice. @@ -2102,7 +2100,7 @@ def connect(info, try: ray.services.check_version_info(worker.redis_client) except Exception as e: - if mode in [SCRIPT_MODE, SILENT_MODE]: + if mode == SCRIPT_MODE: raise e elif mode == WORKER_MODE: traceback_str = traceback.format_exc() @@ -2138,7 +2136,7 @@ def connect(info, global_state._initialize_global_state(redis_ip_address, int(redis_port)) # Register the worker with Redis. - if mode in [SCRIPT_MODE, SILENT_MODE]: + if mode == SCRIPT_MODE: # The concept of a driver is the same as the concept of a "job". # Register the driver/job with Redis here. import __main__ as main @@ -2189,7 +2187,7 @@ def connect(info, # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. - if mode in [SCRIPT_MODE, SILENT_MODE]: + if mode == SCRIPT_MODE: # If the user provided an object_id_seed, then set the current task ID # deterministically based on that seed (without altering the state of # the user's random number generator). Otherwise, set the current task @@ -2275,7 +2273,7 @@ def connect(info, if mode != LOCAL_MODE and worker.use_raylet: worker.profiler.start_flush_thread() - if mode in [SCRIPT_MODE, SILENT_MODE]: + if mode == SCRIPT_MODE: # Add the directory containing the script that is running to the Python # paths of the workers. Also add the current directory. Note that this # assumes that the directory structures on the machines in the clusters diff --git a/test/actor_test.py b/test/actor_test.py index f844a83de..dbc42933b 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -21,7 +21,7 @@ class ActorAPI(unittest.TestCase): ray.shutdown() def testKeywordArgs(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) @ray.remote class Actor(object): @@ -189,7 +189,7 @@ class ActorAPI(unittest.TestCase): assert ray.get(f.get_val.remote()) == 3 def testDecoratorArgs(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) # This is an invalid way of using the actor decorator. with pytest.raises(Exception): diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 44f410834..04eb6ae68 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -31,7 +31,6 @@ class ComponentFailureTest(unittest.TestCase): ray.worker._init( num_workers=1, - driver_mode=ray.SILENT_MODE, start_workers_from_local_scheduler=False, start_ray_local=True, redirect_output=True) @@ -73,7 +72,6 @@ class ComponentFailureTest(unittest.TestCase): ray.worker._init( num_workers=1, - driver_mode=ray.SILENT_MODE, start_workers_from_local_scheduler=False, start_ray_local=True, redirect_output=True) diff --git a/test/failure_test.py b/test/failure_test.py index d15b8526c..f8604016f 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -44,7 +44,7 @@ class TaskStatusTest(unittest.TestCase): def throw_exception_fct3(x): raise Exception("Test function 3 intentionally failed.") - ray.init(num_workers=3, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=3) throw_exception_fct1.remote() throw_exception_fct1.remote() @@ -86,7 +86,7 @@ class TaskStatusTest(unittest.TestCase): assert False def testFailImportingRemoteFunction(self): - ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=2) # Create the contents of a temporary Python file. temporary_python_file = """ @@ -125,7 +125,7 @@ def temporary_helper_function(): sys.path.pop(-1) def testFailedFunctionToRun(self): - ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=2) def f(worker): if ray.worker.global_worker.mode == ray.WORKER_MODE: @@ -140,7 +140,7 @@ def temporary_helper_function(): assert "Function to run failed." in error_info[1]["message"] def testFailImportingActor(self): - ray.init(num_workers=2, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=2) # Create the contents of a temporary Python file. temporary_python_file = """ @@ -203,7 +203,7 @@ class ActorTest(unittest.TestCase): ray.shutdown() def testFailedActorInit(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) error_message1 = "actor constructor failed" error_message2 = "actor method failed" @@ -233,7 +233,7 @@ class ActorTest(unittest.TestCase): assert error_message2 in ray.error_info()[1]["message"] def testIncorrectMethodCalls(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) @ray.remote class Actor(object): @@ -275,7 +275,7 @@ class WorkerDeath(unittest.TestCase): ray.shutdown() def testWorkerRaisingException(self): - ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=1) @ray.remote def f(): @@ -290,7 +290,7 @@ class WorkerDeath(unittest.TestCase): assert len(ray.error_info()) == 2 def testWorkerDying(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) # Define a remote function that will kill the worker that runs it. @ray.remote @@ -306,7 +306,7 @@ class WorkerDeath(unittest.TestCase): assert "died or was killed while executing" in error_info[0]["message"] def testActorWorkerDying(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) @ray.remote class Actor(object): @@ -326,7 +326,7 @@ class WorkerDeath(unittest.TestCase): wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) def testActorWorkerDyingFutureTasks(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) @ray.remote class Actor(object): @@ -349,7 +349,7 @@ class WorkerDeath(unittest.TestCase): wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) def testActorWorkerDyingNothingInProgress(self): - ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=0) @ray.remote class Actor(object): @@ -374,10 +374,7 @@ class PutErrorTest(unittest.TestCase): def testPutError1(self): store_size = 10**6 - ray.worker._init( - start_ray_local=True, - driver_mode=ray.SILENT_MODE, - object_store_memory=store_size) + ray.worker._init(start_ray_local=True, object_store_memory=store_size) num_objects = 3 object_size = 4 * 10**5 @@ -421,10 +418,7 @@ class PutErrorTest(unittest.TestCase): def testPutError2(self): # This is the same as the previous test, but it calls ray.put directly. store_size = 10**6 - ray.worker._init( - start_ray_local=True, - driver_mode=ray.SILENT_MODE, - object_store_memory=store_size) + ray.worker._init(start_ray_local=True, object_store_memory=store_size) num_objects = 3 object_size = 4 * 10**5 @@ -473,7 +467,7 @@ class ConfigurationTest(unittest.TestCase): ray_version = ray.__version__ ray.__version__ = "fake ray version" - ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.init(num_workers=1) wait_for_errors(ray_constants.VERSION_MISMATCH_PUSH_ERROR, 1) diff --git a/test/multi_node_test.py b/test/multi_node_test.py index f660103f5..c38df6927 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -47,7 +47,7 @@ class MultiNodeTest(unittest.TestCase): def testErrorIsolation(self): # Connect a driver to the Ray cluster. - ray.init(redis_address=self.redis_address, driver_mode=ray.SILENT_MODE) + ray.init(redis_address=self.redis_address) # There shouldn't be any errors yet. assert len(ray.error_info()) == 0 @@ -115,7 +115,7 @@ print("success") def testRemoteFunctionIsolation(self): # This test will run multiple remote functions with the same names in # two different drivers. Connect a driver to the Ray cluster. - ray.init(redis_address=self.redis_address, driver_mode=ray.SILENT_MODE) + ray.init(redis_address=self.redis_address) # Start another driver and make sure that it can define and call its # own commands with the same names. @@ -158,7 +158,7 @@ print("success") def testDriverExitingQuickly(self): # This test will create some drivers that submit some tasks and then # exit without waiting for the tasks to complete. - ray.init(redis_address=self.redis_address, driver_mode=ray.SILENT_MODE) + ray.init(redis_address=self.redis_address) # Define a driver that creates an actor and exits. driver_script1 = """ @@ -322,7 +322,7 @@ def train_func(config, reporter): # add a reporter arg time.sleep(0.1) reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics -ray.init(redis_address="{}", driver_mode=ray.SILENT_MODE) +ray.init(redis_address="{}") ray.tune.register_trainable("train_func", train_func) tune.run_experiments({{ diff --git a/test/runtest.py b/test/runtest.py index ac96c82c5..9301a80f7 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1,4 +1,6 @@ -from __future__ import absolute_import, division, print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function import os import pytest @@ -1006,7 +1008,7 @@ class APITest(unittest.TestCase): os.environ.get("RAY_USE_XRAY") == "1", "This test does not work with xray (nor is it intended to).") def testLoggingAPI(self): - self.init_ray(driver_mode=ray.SILENT_MODE) + self.init_ray() def events(): # This is a hack for getting the event log. It is not part of the @@ -1175,7 +1177,7 @@ class APITest(unittest.TestCase): ray.get(3) def testMultithreading(self): - self.init_ray(driver_mode=ray.SILENT_MODE) + self.init_ray() @ray.remote def f(): @@ -1316,7 +1318,7 @@ class LocalModeTest(unittest.TestCase): x[0] = 1 return x - ray.init(driver_mode=ray.LOCAL_MODE) + ray.init(local_mode=True) @ray.remote def f():