diff --git a/doc/requirements-doc.txt b/doc/requirements-doc.txt index edf90aa48..b92994d26 100644 --- a/doc/requirements-doc.txt +++ b/doc/requirements-doc.txt @@ -11,5 +11,6 @@ psutil recommonmark redis sphinx +sphinx-click sphinx_rtd_theme pandas diff --git a/doc/source/api.rst b/doc/source/api.rst index b61f86060..e114d3b3c 100644 --- a/doc/source/api.rst +++ b/doc/source/api.rst @@ -1,284 +1,49 @@ The Ray API =========== -Starting Ray ------------- - -There are two main ways in which Ray can be used. First, you can start all of -the relevant Ray processes and shut them all down within the scope of a single -script. Second, you can connect to and use an existing Ray cluster. - -Starting and stopping a cluster within a script -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -One use case is to start all of the relevant Ray processes when you call -``ray.init`` and shut them down when the script exits. These processes include -local and global schedulers, an object store and an object manager, a redis -server, and more. - -**Note:** this approach is limited to a single machine. - -This can be done as follows. - -.. code-block:: python - - ray.init() - -If there are GPUs available on the machine, you should specify this with the -``num_gpus`` argument. Similarly, you can also specify the number of CPUs with -``num_cpus``. - -.. code-block:: python - - ray.init(num_cpus=20, num_gpus=2) - -By default, Ray will use ``psutil.cpu_count()`` to determine the number of CPUs. -Ray will also attempt to automatically determine the number of GPUs. - -Instead of thinking about the number of "worker" processes on each node, we -prefer to think in terms of the quantities of CPU and GPU resources on each -node and to provide the illusion of an infinite pool of workers. Tasks will be -assigned to workers based on the availability of resources so as to avoid -contention and not based on the number of available worker processes. - -Connecting to an existing cluster -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Once a Ray cluster has been started, the only thing you need in order to connect -to it is the address of the Redis server in the cluster. In this case, your -script will not start up or shut down any processes. The cluster and all of its -processes may be shared between multiple scripts and multiple users. To do this, -you simply need to know the address of the cluster's Redis server. This can be -done with a command like the following. - -.. code-block:: python - - ray.init(redis_address="12.345.67.89:6379") - -In this case, you cannot specify ``num_cpus`` or ``num_gpus`` in ``ray.init`` -because that information is passed into the cluster when the cluster is started, -not when your script is started. - -View the instructions for how to `start a Ray cluster`_ on multiple nodes. - -.. _`start a Ray cluster`: http://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html - .. autofunction:: ray.init -Defining remote functions -------------------------- - -Remote functions are used to create tasks. To define a remote function, the -``@ray.remote`` decorator is placed over the function definition. - -The function can then be invoked with ``f.remote``. Invoking the function -creates a **task** which will be scheduled on and executed by some worker -process in the Ray cluster. The call will return an **object ID** (essentially a -future) representing the eventual return value of the task. Anyone with the -object ID can retrieve its value, regardless of where the task was executed (see -`Getting values from object IDs`_). - -When a task executes, its outputs will be serialized into a string of bytes and -stored in the object store. - -Note that arguments to remote functions can be values or object IDs. - -.. code-block:: python - - @ray.remote - def f(x): - return x + 1 - - x_id = f.remote(0) - ray.get(x_id) # 1 - - y_id = f.remote(x_id) - ray.get(y_id) # 2 - -If you want a remote function to return multiple object IDs, you can do that by -passing the ``num_return_vals`` argument into the remote decorator. - -.. code-block:: python - - @ray.remote(num_return_vals=2) - def f(): - return 1, 2 - - x_id, y_id = f.remote() - ray.get(x_id) # 1 - ray.get(y_id) # 2 - .. autofunction:: ray.remote -Getting values from object IDs ------------------------------- - -Object IDs can be converted into objects by calling ``ray.get`` on the object -ID. Note that ``ray.get`` accepts either a single object ID or a list of object -IDs. - -.. code-block:: python - - @ray.remote - def f(): - return {'key1': ['value']} - - # Get one object ID. - ray.get(f.remote()) # {'key1': ['value']} - - # Get a list of object IDs. - ray.get([f.remote() for _ in range(2)]) # [{'key1': ['value']}, {'key1': ['value']}] - -Numpy arrays -~~~~~~~~~~~~ - -Numpy arrays are handled more efficiently than other data types, so **use numpy -arrays whenever possible**. - -Any numpy arrays that are part of the serialized object will not be copied out -of the object store. They will remain in the object store and the resulting -deserialized object will simply have a pointer to the relevant place in the -object store's memory. - -Since objects in the object store are immutable, this means that if you want to -mutate a numpy array that was returned by a remote function, you will have to -first copy it. - .. autofunction:: ray.get -Putting objects in the object store ------------------------------------ - -The primary way that objects are placed in the object store is by being returned -by a task. However, it is also possible to directly place objects in the object -store using ``ray.put``. - -.. code-block:: python - - x_id = ray.put(1) - ray.get(x_id) # 1 - -The main reason to use ``ray.put`` is that you want to pass the same large -object into a number of tasks. By first doing ``ray.put`` and then passing the -resulting object ID into each of the tasks, the large object is copied into the -object store only once, whereas when we directly pass the object in, it is -copied multiple times. - -.. code-block:: python - - import numpy as np - - @ray.remote - def f(x): - pass - - x = np.zeros(10 ** 6) - - # Alternative 1: Here, x is copied into the object store 10 times. - [f.remote(x) for _ in range(10)] - - # Alternative 2: Here, x is copied into the object store once. - x_id = ray.put(x) - [f.remote(x_id) for _ in range(10)] - -Note that ``ray.put`` is called under the hood in a couple situations. - -- It is called on the values returned by a task. -- It is called on the arguments to a task, unless the arguments are Python - primitives like integers or short strings, lists, tuples, or dictionaries. - -.. autofunction:: ray.put - -Waiting for a subset of tasks to finish ---------------------------------------- - -It is often desirable to adapt the computation being done based on when -different tasks finish. For example, if a bunch of tasks each take a variable -length of time, and their results can be processed in any order, then it makes -sense to simply process the results in the order that they finish. In other -settings, it makes sense to discard straggler tasks whose results may not be -needed. - -To do this, we introduce the ``ray.wait`` primitive, which takes a list of -object IDs and returns when a subset of them are available. By default it blocks -until a single object is available, but the ``num_returns`` value can be -specified to wait for a different number. If a ``timeout`` argument is passed -in, it will block for at most that many milliseconds and may return a list with -fewer than ``num_returns`` elements. - -The ``ray.wait`` function returns two lists. The first list is a list of object -IDs of available objects (of length at most ``num_returns``), and the second -list is a list of the remaining object IDs, so the combination of these two -lists is equal to the list passed in to ``ray.wait`` (up to ordering). - -.. code-block:: python - - import time - import numpy as np - - @ray.remote - def f(n): - time.sleep(n) - return n - - # Start 3 tasks with different durations. - results = [f.remote(i) for i in range(3)] - # Block until 2 of them have finished. - ready_ids, remaining_ids = ray.wait(results, num_returns=2) - - # Start 5 tasks with different durations. - results = [f.remote(i) for i in range(5)] - # Block until 4 of them have finished or 2.5 seconds pass. - ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500) - -It is easy to use this construct to create an infinite loop in which multiple -tasks are executing, and whenever one task finishes, a new one is launched. - -.. code-block:: python - - @ray.remote - def f(): - return 1 - - # Start 5 tasks. - remaining_ids = [f.remote() for i in range(5)] - # Whenever one task finishes, start a new one. - for _ in range(100): - ready_ids, remaining_ids = ray.wait(remaining_ids) - # Get the available object and do something with it. - print(ray.get(ready_ids)) - # Start a new task. - remaining_ids.append(f.remote()) - .. autofunction:: ray.wait -Viewing errors --------------- +.. autofunction:: ray.put -Keeping track of errors that occur in different processes throughout a cluster -can be challenging. There are a couple mechanisms to help with this. +.. autofunction:: ray.get_gpu_ids -1. If a task throws an exception, that exception will be printed in the - background of the driver process. +.. autofunction:: ray.get_resource_ids -2. If ``ray.get`` is called on an object ID whose parent task threw an exception - before creating the object, the exception will be re-raised by ``ray.get``. +.. autofunction:: ray.get_webui_url -The errors will also be accumulated in Redis and can be accessed with -``ray.error_info``. Normally, you shouldn't need to do this, but it is possible. +.. autofunction:: ray.shutdown -.. code-block:: python +.. autofunction:: ray.register_custom_serializer - @ray.remote - def f(): - raise Exception("This task failed!!") +.. autofunction:: ray.profile - f.remote() # An error message will be printed in the background. +.. autofunction:: ray.method - # Wait for the error to propagate to Redis. - import time - time.sleep(1) +The Ray Command Line API +------------------------ - ray.error_info() # This returns a list containing the error message. +.. click:: ray.scripts.scripts:start + :prog: ray start + :show-nested: -.. autofunction:: ray.error_info +.. click:: ray.scripts.scripts:stop + :prog: ray stop + :show-nested: + +.. click:: ray.scripts.scripts:create_or_update + :prog: ray create_or_update + :show-nested: + +.. click:: ray.scripts.scripts:teardown + :prog: ray teardown + :show-nested: + +.. click:: ray.scripts.scripts:get_head_ip + :prog: ray get_head_ip + :show-nested: diff --git a/doc/source/conf.py b/doc/source/conf.py index 4c2488ce3..3178715a5 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -72,6 +72,7 @@ sys.path.insert(0, os.path.abspath("../../python/")) extensions = [ 'sphinx.ext.autodoc', 'sphinx.ext.napoleon', + 'sphinx_click.ext', ] # Add any paths that contain templates here, relative to this directory. diff --git a/python/ray/__init__.py b/python/ray/__init__.py index a05e90c81..da38977d1 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -50,8 +50,8 @@ from ray.local_scheduler import ObjectID, _config # noqa: E402 from ray.worker import (error_info, init, connect, disconnect, get, put, wait, remote, profile, flush_profile_data, get_gpu_ids, get_resource_ids, get_webui_url, - register_custom_serializer) # noqa: E402 -from ray.worker import (SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, + register_custom_serializer, shutdown) # noqa: E402 +from ray.worker import (SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, SILENT_MODE) # noqa: E402 from ray.worker import global_state # noqa: E402 # We import ray.actor because some code is run in actor.py which initializes @@ -67,8 +67,9 @@ __all__ = [ "error_info", "init", "connect", "disconnect", "get", "put", "wait", "remote", "profile", "flush_profile_data", "actor", "method", "get_gpu_ids", "get_resource_ids", "get_webui_url", - "register_custom_serializer", "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", - "SILENT_MODE", "global_state", "ObjectID", "_config", "__version__" + "register_custom_serializer", "shutdown", "SCRIPT_MODE", "WORKER_MODE", + "LOCAL_MODE", "SILENT_MODE", "global_state", "ObjectID", "_config", + "__version__" ] import ctypes # noqa: E402 diff --git a/python/ray/actor.py b/python/ray/actor.py index ec983855b..93ee1cec4 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -421,6 +421,24 @@ def export_actor_class(class_id, Class, actor_method_names, def method(*args, **kwargs): + """Annotate an actor method. + + .. code-block:: python + + @ray.remote + class Foo(object): + @ray.method(num_return_vals=2) + def bar(self): + return 1, 2 + + f = Foo.remote() + + _, _ = f.bar.remote() + + Args: + num_return_vals: The number of object IDs that should be returned by + invocations of this actor method. + """ assert len(args) == 0 assert len(kwargs) == 1 assert "num_return_vals" in kwargs @@ -588,10 +606,10 @@ class ActorClass(object): # updated to reflect the new invocation. actor_cursor = None - # Do not export the actor class or the actor if run in PYTHON_MODE + # Do not export the actor class or the actor if run in LOCAL_MODE # Instead, instantiate the actor locally and add it to the worker's # dictionary - if worker.mode == ray.PYTHON_MODE: + if worker.mode == ray.LOCAL_MODE: worker.actors[actor_id] = self._modified_class.__new__( self._modified_class) else: @@ -764,9 +782,9 @@ class ActorHandle(object): kwargs = {} args = signature.extend_args(function_signature, args, kwargs) - # Execute functions locally if Ray is run in PYTHON_MODE + # Execute functions locally if Ray is run in LOCAL_MODE # Copy args to prevent the function from mutating them. - if worker.mode == ray.PYTHON_MODE: + if worker.mode == ray.LOCAL_MODE: return getattr(worker.actors[self._ray_actor_id], method_name)(*copy.deepcopy(args)) @@ -963,7 +981,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, class Class(cls): def __ray_terminate__(self): worker = ray.worker.get_global_worker() - if worker.mode != ray.PYTHON_MODE: + if worker.mode != ray.LOCAL_MODE: # Disconnect the worker from the local scheduler. The point of # this is so that when the worker kills itself below, the local # scheduler won't push an error message to the driver. diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 1bd254cf4..64514696a 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -125,8 +125,8 @@ class RemoteFunction(object): resources = ray.utils.resources_from_resource_arguments( self._num_cpus, self._num_gpus, self._resources, num_cpus, num_gpus, resources) - if worker.mode == ray.worker.PYTHON_MODE: - # In PYTHON_MODE, remote calls simply execute the function. + if worker.mode == ray.worker.LOCAL_MODE: + # In LOCAL_MODE, remote calls simply execute the function. # We copy the arguments to prevent the function call from # mutating them and to match the usual behavior of # immutable remote objects. diff --git a/python/ray/rllib/test/test_catalog.py b/python/ray/rllib/test/test_catalog.py index e3865c3e7..3e8a08990 100644 --- a/python/ray/rllib/test/test_catalog.py +++ b/python/ray/rllib/test/test_catalog.py @@ -29,7 +29,7 @@ class CustomModel(Model): class ModelCatalogTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testGymPreprocessors(self): p1 = ModelCatalog.get_preprocessor(gym.make("CartPole-v0")) diff --git a/python/ray/rllib/test/test_filters.py b/python/ray/rllib/test/test_filters.py index 1147c1768..7cb7da6b5 100644 --- a/python/ray/rllib/test/test_filters.py +++ b/python/ray/rllib/test/test_filters.py @@ -78,7 +78,7 @@ class FilterManagerTest(unittest.TestCase): ray.init(num_cpus=1) def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testSynchronize(self): """Synchronize applies filter buffer onto own filter""" diff --git a/python/ray/rllib/test/test_optimizers.py b/python/ray/rllib/test/test_optimizers.py index a64079dc7..c39327255 100644 --- a/python/ray/rllib/test/test_optimizers.py +++ b/python/ray/rllib/test/test_optimizers.py @@ -14,7 +14,7 @@ from ray.rllib.evaluation import SampleBatch class AsyncOptimizerTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testBasic(self): ray.init(num_cpus=4) diff --git a/python/ray/rllib/tuned_examples/regression_tests/regression_test.py b/python/ray/rllib/tuned_examples/regression_tests/regression_test.py index 9ad1b5113..58433da57 100644 --- a/python/ray/rllib/tuned_examples/regression_tests/regression_test.py +++ b/python/ray/rllib/tuned_examples/regression_tests/regression_test.py @@ -43,7 +43,7 @@ class Regression(): raise NotImplementedError def teardown(self, *args): - ray.worker.cleanup() + ray.shutdown() def track_time(self, result): return result["time_total_s"] diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 7126f8634..c7c3d2b78 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -147,7 +147,7 @@ def cli(): "--use-raylet", is_flag=True, default=None, - help="use the raylet code path, this is not supported yet") + help="use the raylet code path") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_shard_ports, object_manager_port, object_store_memory, num_workers, num_cpus, num_gpus, resources, diff --git a/python/ray/test/test_functions.py b/python/ray/test/test_functions.py index b2b7ac7d1..ade1c8183 100644 --- a/python/ray/test/test_functions.py +++ b/python/ray/test/test_functions.py @@ -90,12 +90,12 @@ def throw_exception_fct3(x): @ray.remote -def python_mode_f(): +def local_mode_f(): return np.array([0, 0]) @ray.remote -def python_mode_g(x): +def local_mode_g(x): x[0] = 1 return x diff --git a/python/ray/tune/test/trial_runner_test.py b/python/ray/tune/test/trial_runner_test.py index c72ceab68..0e2c61ddd 100644 --- a/python/ray/tune/test/trial_runner_test.py +++ b/python/ray/tune/test/trial_runner_test.py @@ -26,7 +26,7 @@ class TrainableFunctionApiTest(unittest.TestCase): ray.init(num_cpus=4, num_gpus=0) def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def testPinObject(self): @@ -366,7 +366,7 @@ class RunExperimentTest(unittest.TestCase): ray.init() def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def testDict(self): @@ -441,7 +441,7 @@ class VariantGeneratorTest(unittest.TestCase): ray.init() def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def testParseToTrials(self): @@ -575,7 +575,7 @@ class VariantGeneratorTest(unittest.TestCase): class TrialRunnerTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def testTrialStatus(self): diff --git a/python/ray/tune/test/trial_scheduler_test.py b/python/ray/tune/test/trial_scheduler_test.py index 97669626f..a2e0a52dd 100644 --- a/python/ray/tune/test/trial_scheduler_test.py +++ b/python/ray/tune/test/trial_scheduler_test.py @@ -29,7 +29,7 @@ class EarlyStoppingSuite(unittest.TestCase): ray.init() def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def basicSetup(self, rule): @@ -196,7 +196,7 @@ class HyperbandSuite(unittest.TestCase): ray.init() def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def schedulerSetup(self, num_trials): @@ -561,7 +561,7 @@ class PopulationBasedTestingSuite(unittest.TestCase): ray.init() def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def basicSetup(self, resample_prob=0.0, explore=None): @@ -781,7 +781,7 @@ class AsyncHyperBandSuite(unittest.TestCase): ray.init() def tearDown(self): - ray.worker.cleanup() + ray.shutdown() _register_all() # re-register the evicted objects def basicSetup(self, scheduler): diff --git a/python/ray/tune/test/tune_server_test.py b/python/ray/tune/test/tune_server_test.py index 395db08d0..c5be74bcd 100644 --- a/python/ray/tune/test/tune_server_test.py +++ b/python/ray/tune/test/tune_server_test.py @@ -51,7 +51,7 @@ class TuneServerSuite(unittest.TestCase): self.runner = None except Exception as e: print(e) - ray.worker.cleanup() + ray.shutdown() _register_all() def testAddTrial(self): diff --git a/python/ray/worker.py b/python/ray/worker.py index 181b58147..de2c92854 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -40,7 +40,7 @@ from ray.utils import ( SCRIPT_MODE = 0 WORKER_MODE = 1 -PYTHON_MODE = 2 +LOCAL_MODE = 2 SILENT_MODE = 3 LOG_POINT = 0 @@ -192,7 +192,7 @@ class Worker(object): function itself. This is the set of remote functions that can be executed by this worker. connected (bool): True if Ray has been started and False otherwise. - mode: The mode of the worker. One of SCRIPT_MODE, PYTHON_MODE, + mode: The mode of the worker. One of SCRIPT_MODE, LOCAL_MODE, SILENT_MODE, and WORKER_MODE. cached_remote_functions_and_actors: A list of information for exporting remote functions and actor classes definitions that were defined @@ -258,7 +258,7 @@ class Worker(object): The mode WORKER_MODE should be used if this Worker is not a driver. It will not print information about tasks. - The mode PYTHON_MODE should be used if this Worker is a driver and if + The mode LOCAL_MODE should be used if this Worker is a driver and if you want to run the driver in a manner equivalent to serial Python for debugging purposes. It will not send remote function calls to the scheduler and will insead execute them in a blocking fashion. @@ -268,7 +268,7 @@ class Worker(object): intentionally fail. Args: - mode: One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and + mode: One of SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, and SILENT_MODE. """ self.mode = mode @@ -1033,7 +1033,7 @@ class Worker(object): """The main loop a worker runs to receive and execute tasks.""" def exit(signum, frame): - cleanup(worker=self) + shutdown(worker=self) sys.exit(0) signal.signal(signal.SIGTERM, exit) @@ -1055,7 +1055,7 @@ def get_gpu_ids(): Returns: A list of GPU IDs. """ - if _mode() == PYTHON_MODE: + if _mode() == LOCAL_MODE: raise Exception("ray.get_gpu_ids() currently does not work in PYTHON " "MODE.") @@ -1080,16 +1080,18 @@ def get_gpu_ids(): def get_resource_ids(): """Get the IDs of the resources that are available to the worker. + This function is only supported in the raylet code path. + Returns: A dictionary mapping the name of a resource to a list of pairs, where - each pair consists of the ID of a resource and the fraction of that - resource reserved for this worker. + each pair consists of the ID of a resource and the fraction of that + resource reserved for this worker. """ if not global_worker.use_raylet: raise Exception("ray.get_resource_ids() is only supported in the " "raylet code path.") - if _mode() == PYTHON_MODE: + if _mode() == LOCAL_MODE: raise Exception( "ray.get_resource_ids() currently does not work in PYTHON " "MODE.") @@ -1118,7 +1120,7 @@ def get_webui_url(): Returns: The URL of the web UI as a string. """ - if _mode() == PYTHON_MODE: + if _mode() == LOCAL_MODE: raise Exception("ray.get_webui_url() currently does not work in " "PYTHON MODE.") return _webui_url_helper(global_worker.redis_client) @@ -1481,7 +1483,7 @@ def _init(address_info=None, object_store_memory: The amount of memory (in bytes) to start the object store with. driver_mode (bool): The mode in which to start the driver. This should - be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. + be one of ray.SCRIPT_MODE, ray.LOCAL_MODE, and ray.SILENT_MODE. redirect_worker_output: True if the stdout and stderr of worker processes should be redirected to files. redirect_output (bool): True if stdout and stderr for non-worker @@ -1507,8 +1509,7 @@ def _init(address_info=None, Store with hugetlbfs support. Requires plasma_directory. include_webui: Boolean flag indicating whether to start the web UI, which is a Jupyter notebook. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. + use_raylet: True if the new raylet code path should be used. Returns: Address information about the started processes. @@ -1518,9 +1519,9 @@ def _init(address_info=None, arguments is passed in. """ check_main_thread() - if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]: + if driver_mode not in [SCRIPT_MODE, LOCAL_MODE, SILENT_MODE]: raise Exception("Driver_mode must be in [ray.SCRIPT_MODE, " - "ray.PYTHON_MODE, ray.SILENT_MODE].") + "ray.LOCAL_MODE, ray.SILENT_MODE].") if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": # This environment variable is used in our testing setup. @@ -1536,8 +1537,8 @@ def _init(address_info=None, redis_address = address_info.get("redis_address") # Start any services that do not yet exist. - if driver_mode == PYTHON_MODE: - # If starting Ray in PYTHON_MODE, don't start any other processes. + if driver_mode == LOCAL_MODE: + # If starting Ray in LOCAL_MODE, don't start any other processes. pass elif start_ray_local: # In this case, we launch a scheduler, a new object store, and some @@ -1562,8 +1563,8 @@ def _init(address_info=None, num_cpus, num_gpus, resources, num_local_schedulers) # Start the scheduler, object store, and some workers. These will be - # killed by the call to cleanup(), which happens when the Python script - # exits. + # killed by the call to shutdown(), which happens when the Python + # script exits. address_info = services.start_ray_head( address_info=address_info, node_ip_address=node_ip_address, @@ -1621,9 +1622,9 @@ def _init(address_info=None, # Connect this driver to Redis, the object store, and the local scheduler. # Choose the first object store and local scheduler if there are multiple. - # The corresponding call to disconnect will happen in the call to cleanup() - # when the Python script exits. - if driver_mode == PYTHON_MODE: + # The corresponding call to disconnect will happen in the call to + # shutdown() when the Python script exits. + if driver_mode == LOCAL_MODE: driver_address_info = {} else: driver_address_info = { @@ -1651,22 +1652,23 @@ def _init(address_info=None, def init(redis_address=None, + num_cpus=None, + num_gpus=None, + resources=None, + object_store_memory=None, node_ip_address=None, object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE, redirect_worker_output=False, redirect_output=True, - num_cpus=None, - num_gpus=None, - resources=None, + ignore_reinit_error=False, num_custom_resource=None, num_redis_shards=None, redis_max_clients=None, plasma_directory=None, huge_pages=False, include_webui=True, - object_store_memory=None, use_raylet=None): """Connect to an existing Ray cluster or start one and connect to it. @@ -1674,13 +1676,34 @@ def init(redis_address=None, just attach this driver to it, or we start all of the processes associated with a Ray cluster and attach to the newly started cluster. + To start Ray and all of the relevant processes, use this as follows: + + .. code-block:: python + + ray.init() + + To connect to an existing Ray cluster, use this as follows (substituting + in the appropriate address): + + .. code-block:: python + + ray.init(redis_address="123.45.67.89:6379") + Args: - node_ip_address (str): The IP address of the node that we are on. redis_address (str): The address of the Redis server to connect to. If this address is not provided, then this command will start Redis, a global scheduler, a local scheduler, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exits. + num_cpus (int): Number of cpus the user wishes all local schedulers to + be configured with. + num_gpus (int): Number of gpus the user wishes all local schedulers to + be configured with. + resources: A dictionary mapping the name of a resource to the quantity + of that resource available. + object_store_memory: The amount of memory (in bytes) to start the + object store with. + node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent @@ -1688,17 +1711,13 @@ def init(redis_address=None, num_workers (int): The number of workers to start. This is only provided if redis_address is not provided. driver_mode (bool): The mode in which to start the driver. This should - be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. + be one of ray.SCRIPT_MODE, ray.LOCAL_MODE, and ray.SILENT_MODE. redirect_worker_output: True if the stdout and stderr of worker processes should be redirected to files. redirect_output (bool): True if stdout and stderr for non-worker processes should be redirected to files and false otherwise. - num_cpus (int): Number of cpus the user wishes all local schedulers to - be configured with. - num_gpus (int): Number of gpus the user wishes all local schedulers to - be configured with. - resources: A dictionary mapping the name of a resource to the quantity - of that resource available. + ignore_reinit_error: True if we should suppress errors from calling + ray.init() a second time. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. redis_max_clients: If provided, attempt to configure Redis with this @@ -1709,11 +1728,7 @@ def init(redis_address=None, Store with hugetlbfs support. Requires plasma_directory. include_webui: Boolean flag indicating whether to start the web UI, which is a Jupyter notebook. - object_store_memory: The amount of memory (in bytes) to start the - object store with. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. - + use_raylet: True if the new raylet code path should be used. Returns: Address information about the started processes. @@ -1722,6 +1737,14 @@ def init(redis_address=None, Exception: An exception is raised if an inappropriate combination of arguments is passed in. """ + if global_worker.connected: + if ignore_reinit_error: + logger.error("Calling ray.init() again after it has already been " + "called.") + return + else: + raise Exception("Perhaps you called ray.init twice by accident?") + if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1": # This environment variable is used in our testing setup. logger.info("Detected environment variable 'RAY_USE_XRAY'.") @@ -1761,12 +1784,23 @@ _post_init_hooks = [] def cleanup(worker=global_worker): - """Disconnect the worker, and terminate any processes started in init. + raise DeprecationWarning( + "The function ray.worker.cleanup() has been deprecated. Instead, " + "please call ray.shutdown().") + + +def shutdown(worker=global_worker): + """Disconnect the worker, and terminate processes started by ray.init(). This will automatically run at the end when a Python process that uses Ray - exits. It is ok to run this twice in a row. Note that we manually call - services.cleanup() in the tests because we need to start and stop many - clusters in the tests, but the import and exit only happen once. + exits. It is ok to run this twice in a row. The primary use case for this + function is to cleanup state between tests. + + Note that this will clear any remote function definitions, actor + definitions, and existing actors, so if you wish to use any previously + defined remote functions or actors after calling ray.shutdown(), then you + need to redefine them. If they were defined in an imported module, then you + will need to reload the module. """ disconnect(worker) if hasattr(worker, "local_scheduler_client"): @@ -1792,7 +1826,7 @@ def cleanup(worker=global_worker): worker.set_mode(None) -atexit.register(cleanup) +atexit.register(shutdown) # Define a custom excepthook so that if the driver exits with an exception, we # can push that exception to Redis. @@ -2132,9 +2166,8 @@ def connect(info, object_id_seed: A seed to use to make the generation of object IDs deterministic. mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, - PYTHON_MODE, and SILENT_MODE. - use_raylet: True if the new raylet code path should be used. This is - not supported yet. + LOCAL_MODE, and SILENT_MODE. + use_raylet: True if the new raylet code path should be used. """ check_main_thread() # Do some basic checking to make sure we didn't call ray.init twice. @@ -2164,9 +2197,9 @@ def connect(info, # which is important because we will append to this field from multiple # threads. worker.events = [] - # If running Ray in PYTHON_MODE, there is no need to create call + # If running Ray in LOCAL_MODE, there is no need to create call # create_worker or to start the worker service. - if mode == PYTHON_MODE: + if mode == LOCAL_MODE: return # Set the node IP address. worker.node_ip_address = info["node_ip_address"] @@ -2466,7 +2499,7 @@ def register_custom_serializer(cls, this class. Args: - cls (type): The class that ray should serialize. + cls (type): The class that ray should use this custom serializer for. use_pickle (bool): If true, then objects of this class will be serialized using pickle. use_dict: If true, then objects of this class be serialized turning @@ -2646,8 +2679,12 @@ class RayLogSpanRaylet(object): def profile(event_type, extra_data=None, worker=global_worker): """Profile a span of time so that it appears in the timeline visualization. + Note that this only works in the raylet code path. + This function can be used as follows (both on the driver or within a task). + .. code-block:: python + with ray.profile("custom event", extra_data={'key': 'value'}): # Do some computation here. @@ -2752,13 +2789,17 @@ def get(object_ids, worker=global_worker): Returns: A Python object or a list of Python objects. + + Raises: + Exception: An exception is raised if the task that created the object + or that created one of the objects raised an exception. """ worker.check_connected() with profile("ray.get", worker=worker): check_main_thread() - if worker.mode == PYTHON_MODE: - # In PYTHON_MODE, ray.get is the identity operation (the input will + if worker.mode == LOCAL_MODE: + # In LOCAL_MODE, ray.get is the identity operation (the input will # actually be a value not an objectid). return object_ids if isinstance(object_ids, list): @@ -2790,8 +2831,8 @@ def put(value, worker=global_worker): with profile("ray.put", worker=worker): check_main_thread() - if worker.mode == PYTHON_MODE: - # In PYTHON_MODE, ray.put is the identity operation. + if worker.mode == LOCAL_MODE: + # In LOCAL_MODE, ray.put is the identity operation. return value object_id = worker.local_scheduler_client.compute_put_id( worker.current_task_id, worker.put_index, worker.use_raylet) @@ -2806,16 +2847,17 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready - and returns that exact number of object_ids. + and returns that exact number of object IDs. This method returns two lists. The first list consists of object IDs that - correspond to objects that are stored in the object store. The second list - corresponds to the rest of the object IDs (which may or may not be ready). + correspond to objects that are available in the object store. The second + list corresponds to the rest of the object IDs (which may or may not be + ready). - Ordering of the input list of object IDs is preserved: if A precedes B in - the input list, and both are in the ready list, then A will precede B in - the ready list. This also holds true if A and B are both in the remaining - list. + Ordering of the input list of object IDs is preserved. That is, if A + precedes B in the input list, and both are in the ready list, then A will + precede B in the ready list. This also holds true if A and B are both in + the remaining list. Args: object_ids (List[ObjectID]): List of object IDs for objects that may or @@ -2826,7 +2868,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): Returns: A list of object IDs that are ready and a list of the remaining object - IDs. + IDs. """ if isinstance(object_ids, ray.ObjectID): @@ -2837,7 +2879,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): raise TypeError("wait() expected a list of ObjectID, got {}".format( type(object_ids))) - if worker.mode != PYTHON_MODE: + if worker.mode != LOCAL_MODE: for object_id in object_ids: if not isinstance(object_id, ray.ObjectID): raise TypeError("wait() expected a list of ObjectID, " @@ -2848,9 +2890,9 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): with profile("ray.wait", worker=worker): check_main_thread() - # When Ray is run in PYTHON_MODE, all functions are run immediately, + # When Ray is run in LOCAL_MODE, all functions are run immediately, # so all objects in object_id are ready. - if worker.mode == PYTHON_MODE: + if worker.mode == LOCAL_MODE: return object_ids[:num_returns], object_ids[num_returns:] # TODO(rkn): This is a temporary workaround for @@ -2953,6 +2995,53 @@ def make_decorator(num_return_vals=None, def remote(*args, **kwargs): + """Define a remote function or an actor class. + + This can be used with no arguments to define a remote function or actor as + follows: + + .. code-block:: python + + @ray.remote + def f(): + return 1 + + @ray.remote + class Foo(object): + def method(self): + return 1 + + It can also be used with specific keyword arguments: + + * **num_return_vals:** This is only for *remote functions*. It specifies + the number of object IDs returned by the remote function invocation. + * **num_cpus:** The quantity of CPU cores to reserve for this task or for + the lifetime of the actor. + * **num_gpus:** The quantity of GPUs to reserve for this task or for the + lifetime of the actor. + * **resources:** The quantity of various custom resources to reserve for + this task or for the lifetime of the actor. This is a dictionary mapping + strings (resource names) to numbers. + * **max_calls:** Only for *remote functions*. This specifies the maximum + number of times that a given worker can execute the given remote function + before it must exit (this can be used to address memory leaks in + third-party libraries or to reclaim resources that cannot easily be + released, e.g., GPU memory that was acquired by TensorFlow). By + default this is infinite. + + This can be done as follows: + + .. code-block:: python + + @ray.remote(num_gpus=1, max_calls=1, num_return_vals=2) + def f(): + return 1, 2 + + @ray.remote(num_cpus=2, resources={"CustomResource": 1}) + class Foo(object): + def method(self): + return 1 + """ worker = get_global_worker() if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): diff --git a/test/actor_test.py b/test/actor_test.py index 2832385e4..3d4bf1372 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -17,7 +17,7 @@ import ray.test.test_utils class ActorAPI(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testKeywordArgs(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -314,7 +314,7 @@ class ActorAPI(unittest.TestCase): class ActorMethods(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testDefineActor(self): ray.init() @@ -480,7 +480,7 @@ class ActorMethods(unittest.TestCase): class ActorNesting(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testRemoteFunctionWithinActor(self): # Make sure we can use remote funtions within actors. @@ -656,7 +656,7 @@ class ActorNesting(unittest.TestCase): class ActorInheritance(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testInheritActorFromClass(self): # Make sure we can define an actor by inheriting from a regular class. @@ -688,7 +688,7 @@ class ActorInheritance(unittest.TestCase): class ActorSchedulingProperties(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testRemoteFunctionsNotScheduledOnActors(self): # Make sure that regular remote functions are not scheduled on actors. @@ -715,7 +715,7 @@ class ActorSchedulingProperties(unittest.TestCase): class ActorsOnMultipleNodes(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testActorsOnNodesWithNoCPUs(self): ray.init(num_cpus=0) @@ -774,7 +774,7 @@ class ActorsOnMultipleNodes(unittest.TestCase): class ActorsWithGPUs(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() @unittest.skipIf( os.environ.get('RAY_USE_NEW_GCS', False), "Crashing with new GCS API.") @@ -1276,7 +1276,7 @@ class ActorsWithGPUs(unittest.TestCase): "This test does not work with xray yet.") class ActorReconstruction(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() @unittest.skipIf( os.environ.get('RAY_USE_NEW_GCS', False), "Hanging with new GCS API.") @@ -1799,7 +1799,7 @@ class ActorReconstruction(unittest.TestCase): class DistributedActorHandles(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def setup_queue_actor(self): ray.init() @@ -1967,7 +1967,7 @@ class DistributedActorHandles(unittest.TestCase): class ActorPlacementAndResources(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() @unittest.skipIf( os.environ.get("RAY_USE_XRAY") == "1", diff --git a/test/array_test.py b/test/array_test.py index 07ccac99a..feace8c55 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -18,7 +18,7 @@ if sys.version_info >= (3, 0): class RemoteArrayTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testMethods(self): for module in [ @@ -55,7 +55,7 @@ class RemoteArrayTest(unittest.TestCase): class DistributedArrayTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testAssemble(self): for module in [ diff --git a/test/autoscaler_test.py b/test/autoscaler_test.py index 8e0c5a3f0..845a1c870 100644 --- a/test/autoscaler_test.py +++ b/test/autoscaler_test.py @@ -178,7 +178,7 @@ class AutoscalingTest(unittest.TestCase): def tearDown(self): del NODE_PROVIDERS["mock"] shutil.rmtree(self.tmpdir) - ray.worker.cleanup() + ray.shutdown() def waitFor(self, condition): for _ in range(50): diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 1a4c95fc7..864033498 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -12,7 +12,7 @@ import pyarrow as pa class ComponentFailureTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() # This test checks that when a worker dies in the middle of a get, the # plasma store and manager will not die. diff --git a/test/credis_test.py b/test/credis_test.py index 63dffbb8b..9796f0d56 100644 --- a/test/credis_test.py +++ b/test/credis_test.py @@ -20,7 +20,7 @@ class CredisTest(unittest.TestCase): self.config = ray.init(num_workers=0) def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def test_credis_started(self): assert "redis_address" in self.config diff --git a/test/cython_test.py b/test/cython_test.py index c438c2f20..027dbafd8 100644 --- a/test/cython_test.py +++ b/test/cython_test.py @@ -19,7 +19,7 @@ class CythonTest(unittest.TestCase): ray.init() def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def assertEqualHelper(self, cython_func, expected, *args): self.assertEqual(get_ray_result(cython_func, *args), expected) diff --git a/test/failure_test.py b/test/failure_test.py index 24818bffa..bc74b78d0 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -32,7 +32,7 @@ def wait_for_errors(error_type, num_errors, timeout=10): class TaskStatusTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testFailedTask(self): reload(test_functions) @@ -195,7 +195,7 @@ def temporary_helper_function(): class ActorTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testFailedActorInit(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -267,7 +267,7 @@ class ActorTest(unittest.TestCase): class WorkerDeath(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testWorkerRaisingException(self): ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) @@ -370,7 +370,7 @@ class WorkerDeath(unittest.TestCase): "This test does not work with xray yet.") class PutErrorTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testPutError1(self): store_size = 10**6 @@ -467,7 +467,7 @@ class PutErrorTest(unittest.TestCase): class ConfigurationTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testVersionMismatch(self): ray_version = ray.__version__ @@ -482,7 +482,7 @@ class ConfigurationTest(unittest.TestCase): class WarningTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testExportLargeObjects(self): import ray.ray_constants as ray_constants diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index bd358d3d9..a3b8aee59 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -17,7 +17,7 @@ if sys.version_info >= (3, 0): class MicroBenchmarkTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testTiming(self): reload(test_functions) diff --git a/test/monitor_test.py b/test/monitor_test.py index 968b3a3df..e497b4347 100644 --- a/test/monitor_test.py +++ b/test/monitor_test.py @@ -60,7 +60,7 @@ class MonitorTest(unittest.TestCase): if (4, 2, summary_start[2] + 1) != StateSummary(): success.value = False - ray.worker.cleanup() + ray.shutdown() success = multiprocessing.Value('b', False) driver = multiprocessing.Process(target=Driver, args=(success, )) @@ -81,7 +81,7 @@ class MonitorTest(unittest.TestCase): # the global state. self.assertEqual((0, 1), StateSummary()[:2]) - ray.worker.cleanup() + ray.shutdown() subprocess.Popen(["ray", "stop"]).wait() @unittest.skipIf( diff --git a/test/multi_node_test.py b/test/multi_node_test.py index 4cf38ebdc..405010973 100644 --- a/test/multi_node_test.py +++ b/test/multi_node_test.py @@ -40,7 +40,7 @@ class MultiNodeTest(unittest.TestCase): self.redis_address = redis_address.split("\"")[0] def tearDown(self): - ray.worker.cleanup() + ray.shutdown() # Kill the Ray cluster. subprocess.Popen(["ray", "stop"]).wait() @@ -281,7 +281,7 @@ class StartRayScriptTest(unittest.TestCase): class MiscellaneousTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testConnectingInLocalCase(self): address_info = ray.init(num_cpus=0) diff --git a/test/runtest.py b/test/runtest.py index 7cba4f010..27e017189 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -195,7 +195,7 @@ RAY_TEST_OBJECTS = BASE_OBJECTS + LIST_OBJECTS + TUPLE_OBJECTS + DICT_OBJECTS class SerializationTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testRecursiveObjects(self): ray.init(num_workers=0) @@ -289,7 +289,7 @@ class SerializationTest(unittest.TestCase): class WorkerTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testPythonWorkers(self): # Test the codepath for starting workers from the Python script, @@ -343,7 +343,7 @@ class APITest(unittest.TestCase): ray.init(**kwargs) def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testCustomSerializers(self): self.init_ray(num_workers=1) @@ -1163,13 +1163,13 @@ class APITestSharded(APITest): ray.worker._init(**kwargs) -class PythonModeTest(unittest.TestCase): +class LocalModeTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() - def testPythonMode(self): + def testLocalMode(self): reload(test_functions) - ray.init(driver_mode=ray.PYTHON_MODE) + ray.init(driver_mode=ray.LOCAL_MODE) @ray.remote def f(): @@ -1186,10 +1186,10 @@ class PythonModeTest(unittest.TestCase): # Make sure objects are immutable, this example is why we need to copy # arguments before passing them into remote functions in python mode - aref = test_functions.python_mode_f.remote() + aref = test_functions.local_mode_f.remote() assert_equal(aref, np.array([0, 0])) - bref = test_functions.python_mode_g.remote(aref) - # Make sure python_mode_g does not mutate aref. + bref = test_functions.local_mode_g.remote(aref) + # Make sure local_mode_g does not mutate aref. assert_equal(aref, np.array([0, 0])) assert_equal(bref, np.array([1, 0])) @@ -1202,10 +1202,10 @@ class PythonModeTest(unittest.TestCase): assert_equal(ready, object_ids[:num_returns]) assert_equal(remaining, object_ids[num_returns:]) - # Test actors in PYTHON_MODE. + # Test actors in LOCAL_MODE. @ray.remote - class PythonModeTestClass(object): + class LocalModeTestClass(object): def __init__(self, array): self.array = array @@ -1219,7 +1219,7 @@ class PythonModeTest(unittest.TestCase): array[0] = -1 self.array = array - test_actor = PythonModeTestClass.remote(np.arange(10)) + test_actor = LocalModeTestClass.remote(np.arange(10)) # Remote actor functions should return by value assert_equal(test_actor.get_array.remote(), np.arange(10)) @@ -1244,7 +1244,7 @@ class PythonModeTest(unittest.TestCase): class ResourcesTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testResourceConstraints(self): num_workers = 20 @@ -1852,7 +1852,7 @@ class CudaVisibleDevicesTest(unittest.TestCase): self.original_gpu_ids = os.environ.get("CUDA_VISIBLE_DEVICES", None) def tearDown(self): - ray.worker.cleanup() + ray.shutdown() # Reset the environment variable. if self.original_gpu_ids is not None: os.environ["CUDA_VISIBLE_DEVICES"] = self.original_gpu_ids @@ -1884,7 +1884,7 @@ class CudaVisibleDevicesTest(unittest.TestCase): class WorkerPoolTests(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testNoWorkers(self): ray.init(num_workers=0) @@ -1950,7 +1950,7 @@ class WorkerPoolTests(unittest.TestCase): class SchedulingAlgorithm(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def attempt_to_load_balance(self, remote_function, @@ -2036,7 +2036,7 @@ def wait_for_num_objects(num_objects, timeout=10): "New GCS API doesn't have a Python API yet.") class GlobalStateAPI(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testGlobalStateAPI(self): with self.assertRaises(Exception): diff --git a/test/stress_tests.py b/test/stress_tests.py index 340759140..2875d5b88 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -17,7 +17,7 @@ def ray_start_regular(): ray.init(num_cpus=10) yield None # The code after the yield will run as teardown code. - ray.worker.cleanup() + ray.shutdown() @pytest.fixture(params=[(1, 4), (4, 4)]) @@ -32,7 +32,7 @@ def ray_start_combination(request): num_cpus=10) yield num_local_schedulers, num_workers_per_scheduler # The code after the yield will run as teardown code. - ray.worker.cleanup() + ray.shutdown() def test_submitting_tasks(ray_start_combination): @@ -224,7 +224,7 @@ def ray_start_reconstruction(request): assert len(local_scheduler_ids) == num_local_schedulers + 1 # Clean up the Ray cluster. - ray.worker.cleanup() + ray.shutdown() @pytest.mark.skipif( @@ -529,7 +529,7 @@ def test_driver_put_errors(ray_start_reconstruction): # class WorkerPoolTests(unittest.TestCase): # # def tearDown(self): -# ray.worker.cleanup() +# ray.shutdown() # # def testBlockingTasks(self): # @ray.remote @@ -545,4 +545,4 @@ def test_driver_put_errors(ray_start_reconstruction): # # ray.init(num_workers=1) # ray.get([g.remote(i) for i in range(1000)]) -# ray.worker.cleanup() +# ray.shutdown() diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index ca3f92299..04261673f 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -95,7 +95,7 @@ class TrainActor(object): class TensorFlowTest(unittest.TestCase): def tearDown(self): - ray.worker.cleanup() + ray.shutdown() def testTensorFlowVariables(self): ray.init(num_workers=2) diff --git a/test/xray_test.py b/test/xray_test.py index fba65c1c2..13c9f2c17 100644 --- a/test/xray_test.py +++ b/test/xray_test.py @@ -14,7 +14,7 @@ def ray_start(): ray.init(num_cpus=1, use_raylet=True) yield None # The code after the yield will run as teardown code. - ray.worker.cleanup() + ray.shutdown() def test_basic_task_api(ray_start):