Change ray.worker.cleanup -> ray.shutdown and improve API documentation. (#2374)

* Change ray.worker.cleanup -> ray.shutdown and improve API documentation.

* Deprecate ray.worker.cleanup() gracefully.

* Fix linting
This commit is contained in:
Robert Nishihara
2018-07-12 14:00:00 -05:00
committed by Philipp Moritz
parent b316afeb43
commit 515da7721a
30 changed files with 282 additions and 407 deletions
+5 -4
View File
@@ -50,8 +50,8 @@ from ray.local_scheduler import ObjectID, _config # noqa: E402
from ray.worker import (error_info, init, connect, disconnect, get, put, wait,
remote, profile, flush_profile_data, get_gpu_ids,
get_resource_ids, get_webui_url,
register_custom_serializer) # noqa: E402
from ray.worker import (SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
register_custom_serializer, shutdown) # noqa: E402
from ray.worker import (SCRIPT_MODE, WORKER_MODE, LOCAL_MODE,
SILENT_MODE) # noqa: E402
from ray.worker import global_state # noqa: E402
# We import ray.actor because some code is run in actor.py which initializes
@@ -67,8 +67,9 @@ __all__ = [
"error_info", "init", "connect", "disconnect", "get", "put", "wait",
"remote", "profile", "flush_profile_data", "actor", "method",
"get_gpu_ids", "get_resource_ids", "get_webui_url",
"register_custom_serializer", "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE",
"SILENT_MODE", "global_state", "ObjectID", "_config", "__version__"
"register_custom_serializer", "shutdown", "SCRIPT_MODE", "WORKER_MODE",
"LOCAL_MODE", "SILENT_MODE", "global_state", "ObjectID", "_config",
"__version__"
]
import ctypes # noqa: E402
+23 -5
View File
@@ -421,6 +421,24 @@ def export_actor_class(class_id, Class, actor_method_names,
def method(*args, **kwargs):
"""Annotate an actor method.
.. code-block:: python
@ray.remote
class Foo(object):
@ray.method(num_return_vals=2)
def bar(self):
return 1, 2
f = Foo.remote()
_, _ = f.bar.remote()
Args:
num_return_vals: The number of object IDs that should be returned by
invocations of this actor method.
"""
assert len(args) == 0
assert len(kwargs) == 1
assert "num_return_vals" in kwargs
@@ -588,10 +606,10 @@ class ActorClass(object):
# updated to reflect the new invocation.
actor_cursor = None
# Do not export the actor class or the actor if run in PYTHON_MODE
# Do not export the actor class or the actor if run in LOCAL_MODE
# Instead, instantiate the actor locally and add it to the worker's
# dictionary
if worker.mode == ray.PYTHON_MODE:
if worker.mode == ray.LOCAL_MODE:
worker.actors[actor_id] = self._modified_class.__new__(
self._modified_class)
else:
@@ -764,9 +782,9 @@ class ActorHandle(object):
kwargs = {}
args = signature.extend_args(function_signature, args, kwargs)
# Execute functions locally if Ray is run in PYTHON_MODE
# Execute functions locally if Ray is run in LOCAL_MODE
# Copy args to prevent the function from mutating them.
if worker.mode == ray.PYTHON_MODE:
if worker.mode == ray.LOCAL_MODE:
return getattr(worker.actors[self._ray_actor_id],
method_name)(*copy.deepcopy(args))
@@ -963,7 +981,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus,
class Class(cls):
def __ray_terminate__(self):
worker = ray.worker.get_global_worker()
if worker.mode != ray.PYTHON_MODE:
if worker.mode != ray.LOCAL_MODE:
# Disconnect the worker from the local scheduler. The point of
# this is so that when the worker kills itself below, the local
# scheduler won't push an error message to the driver.
+2 -2
View File
@@ -125,8 +125,8 @@ class RemoteFunction(object):
resources = ray.utils.resources_from_resource_arguments(
self._num_cpus, self._num_gpus, self._resources, num_cpus,
num_gpus, resources)
if worker.mode == ray.worker.PYTHON_MODE:
# In PYTHON_MODE, remote calls simply execute the function.
if worker.mode == ray.worker.LOCAL_MODE:
# In LOCAL_MODE, remote calls simply execute the function.
# We copy the arguments to prevent the function call from
# mutating them and to match the usual behavior of
# immutable remote objects.
+1 -1
View File
@@ -29,7 +29,7 @@ class CustomModel(Model):
class ModelCatalogTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
def testGymPreprocessors(self):
p1 = ModelCatalog.get_preprocessor(gym.make("CartPole-v0"))
+1 -1
View File
@@ -78,7 +78,7 @@ class FilterManagerTest(unittest.TestCase):
ray.init(num_cpus=1)
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
def testSynchronize(self):
"""Synchronize applies filter buffer onto own filter"""
+1 -1
View File
@@ -14,7 +14,7 @@ from ray.rllib.evaluation import SampleBatch
class AsyncOptimizerTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
def testBasic(self):
ray.init(num_cpus=4)
@@ -43,7 +43,7 @@ class Regression():
raise NotImplementedError
def teardown(self, *args):
ray.worker.cleanup()
ray.shutdown()
def track_time(self, result):
return result["time_total_s"]
+1 -1
View File
@@ -147,7 +147,7 @@ def cli():
"--use-raylet",
is_flag=True,
default=None,
help="use the raylet code path, this is not supported yet")
help="use the raylet code path")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_shard_ports, object_manager_port,
object_store_memory, num_workers, num_cpus, num_gpus, resources,
+2 -2
View File
@@ -90,12 +90,12 @@ def throw_exception_fct3(x):
@ray.remote
def python_mode_f():
def local_mode_f():
return np.array([0, 0])
@ray.remote
def python_mode_g(x):
def local_mode_g(x):
x[0] = 1
return x
+4 -4
View File
@@ -26,7 +26,7 @@ class TrainableFunctionApiTest(unittest.TestCase):
ray.init(num_cpus=4, num_gpus=0)
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def testPinObject(self):
@@ -366,7 +366,7 @@ class RunExperimentTest(unittest.TestCase):
ray.init()
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def testDict(self):
@@ -441,7 +441,7 @@ class VariantGeneratorTest(unittest.TestCase):
ray.init()
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def testParseToTrials(self):
@@ -575,7 +575,7 @@ class VariantGeneratorTest(unittest.TestCase):
class TrialRunnerTest(unittest.TestCase):
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def testTrialStatus(self):
+4 -4
View File
@@ -29,7 +29,7 @@ class EarlyStoppingSuite(unittest.TestCase):
ray.init()
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def basicSetup(self, rule):
@@ -196,7 +196,7 @@ class HyperbandSuite(unittest.TestCase):
ray.init()
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def schedulerSetup(self, num_trials):
@@ -561,7 +561,7 @@ class PopulationBasedTestingSuite(unittest.TestCase):
ray.init()
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def basicSetup(self, resample_prob=0.0, explore=None):
@@ -781,7 +781,7 @@ class AsyncHyperBandSuite(unittest.TestCase):
ray.init()
def tearDown(self):
ray.worker.cleanup()
ray.shutdown()
_register_all() # re-register the evicted objects
def basicSetup(self, scheduler):
+1 -1
View File
@@ -51,7 +51,7 @@ class TuneServerSuite(unittest.TestCase):
self.runner = None
except Exception as e:
print(e)
ray.worker.cleanup()
ray.shutdown()
_register_all()
def testAddTrial(self):
+154 -65
View File
@@ -40,7 +40,7 @@ from ray.utils import (
SCRIPT_MODE = 0
WORKER_MODE = 1
PYTHON_MODE = 2
LOCAL_MODE = 2
SILENT_MODE = 3
LOG_POINT = 0
@@ -192,7 +192,7 @@ class Worker(object):
function itself. This is the set of remote functions that can be
executed by this worker.
connected (bool): True if Ray has been started and False otherwise.
mode: The mode of the worker. One of SCRIPT_MODE, PYTHON_MODE,
mode: The mode of the worker. One of SCRIPT_MODE, LOCAL_MODE,
SILENT_MODE, and WORKER_MODE.
cached_remote_functions_and_actors: A list of information for exporting
remote functions and actor classes definitions that were defined
@@ -258,7 +258,7 @@ class Worker(object):
The mode WORKER_MODE should be used if this Worker is not a driver. It
will not print information about tasks.
The mode PYTHON_MODE should be used if this Worker is a driver and if
The mode LOCAL_MODE should be used if this Worker is a driver and if
you want to run the driver in a manner equivalent to serial Python for
debugging purposes. It will not send remote function calls to the
scheduler and will insead execute them in a blocking fashion.
@@ -268,7 +268,7 @@ class Worker(object):
intentionally fail.
Args:
mode: One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and
mode: One of SCRIPT_MODE, WORKER_MODE, LOCAL_MODE, and
SILENT_MODE.
"""
self.mode = mode
@@ -1033,7 +1033,7 @@ class Worker(object):
"""The main loop a worker runs to receive and execute tasks."""
def exit(signum, frame):
cleanup(worker=self)
shutdown(worker=self)
sys.exit(0)
signal.signal(signal.SIGTERM, exit)
@@ -1055,7 +1055,7 @@ def get_gpu_ids():
Returns:
A list of GPU IDs.
"""
if _mode() == PYTHON_MODE:
if _mode() == LOCAL_MODE:
raise Exception("ray.get_gpu_ids() currently does not work in PYTHON "
"MODE.")
@@ -1080,16 +1080,18 @@ def get_gpu_ids():
def get_resource_ids():
"""Get the IDs of the resources that are available to the worker.
This function is only supported in the raylet code path.
Returns:
A dictionary mapping the name of a resource to a list of pairs, where
each pair consists of the ID of a resource and the fraction of that
resource reserved for this worker.
each pair consists of the ID of a resource and the fraction of that
resource reserved for this worker.
"""
if not global_worker.use_raylet:
raise Exception("ray.get_resource_ids() is only supported in the "
"raylet code path.")
if _mode() == PYTHON_MODE:
if _mode() == LOCAL_MODE:
raise Exception(
"ray.get_resource_ids() currently does not work in PYTHON "
"MODE.")
@@ -1118,7 +1120,7 @@ def get_webui_url():
Returns:
The URL of the web UI as a string.
"""
if _mode() == PYTHON_MODE:
if _mode() == LOCAL_MODE:
raise Exception("ray.get_webui_url() currently does not work in "
"PYTHON MODE.")
return _webui_url_helper(global_worker.redis_client)
@@ -1481,7 +1483,7 @@ def _init(address_info=None,
object_store_memory: The amount of memory (in bytes) to start the
object store with.
driver_mode (bool): The mode in which to start the driver. This should
be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
be one of ray.SCRIPT_MODE, ray.LOCAL_MODE, and ray.SILENT_MODE.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
@@ -1507,8 +1509,7 @@ def _init(address_info=None,
Store with hugetlbfs support. Requires plasma_directory.
include_webui: Boolean flag indicating whether to start the web
UI, which is a Jupyter notebook.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
use_raylet: True if the new raylet code path should be used.
Returns:
Address information about the started processes.
@@ -1518,9 +1519,9 @@ def _init(address_info=None,
arguments is passed in.
"""
check_main_thread()
if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]:
if driver_mode not in [SCRIPT_MODE, LOCAL_MODE, SILENT_MODE]:
raise Exception("Driver_mode must be in [ray.SCRIPT_MODE, "
"ray.PYTHON_MODE, ray.SILENT_MODE].")
"ray.LOCAL_MODE, ray.SILENT_MODE].")
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
@@ -1536,8 +1537,8 @@ def _init(address_info=None,
redis_address = address_info.get("redis_address")
# Start any services that do not yet exist.
if driver_mode == PYTHON_MODE:
# If starting Ray in PYTHON_MODE, don't start any other processes.
if driver_mode == LOCAL_MODE:
# If starting Ray in LOCAL_MODE, don't start any other processes.
pass
elif start_ray_local:
# In this case, we launch a scheduler, a new object store, and some
@@ -1562,8 +1563,8 @@ def _init(address_info=None,
num_cpus, num_gpus, resources, num_local_schedulers)
# Start the scheduler, object store, and some workers. These will be
# killed by the call to cleanup(), which happens when the Python script
# exits.
# killed by the call to shutdown(), which happens when the Python
# script exits.
address_info = services.start_ray_head(
address_info=address_info,
node_ip_address=node_ip_address,
@@ -1621,9 +1622,9 @@ def _init(address_info=None,
# Connect this driver to Redis, the object store, and the local scheduler.
# Choose the first object store and local scheduler if there are multiple.
# The corresponding call to disconnect will happen in the call to cleanup()
# when the Python script exits.
if driver_mode == PYTHON_MODE:
# The corresponding call to disconnect will happen in the call to
# shutdown() when the Python script exits.
if driver_mode == LOCAL_MODE:
driver_address_info = {}
else:
driver_address_info = {
@@ -1651,22 +1652,23 @@ def _init(address_info=None,
def init(redis_address=None,
num_cpus=None,
num_gpus=None,
resources=None,
object_store_memory=None,
node_ip_address=None,
object_id_seed=None,
num_workers=None,
driver_mode=SCRIPT_MODE,
redirect_worker_output=False,
redirect_output=True,
num_cpus=None,
num_gpus=None,
resources=None,
ignore_reinit_error=False,
num_custom_resource=None,
num_redis_shards=None,
redis_max_clients=None,
plasma_directory=None,
huge_pages=False,
include_webui=True,
object_store_memory=None,
use_raylet=None):
"""Connect to an existing Ray cluster or start one and connect to it.
@@ -1674,13 +1676,34 @@ def init(redis_address=None,
just attach this driver to it, or we start all of the processes associated
with a Ray cluster and attach to the newly started cluster.
To start Ray and all of the relevant processes, use this as follows:
.. code-block:: python
ray.init()
To connect to an existing Ray cluster, use this as follows (substituting
in the appropriate address):
.. code-block:: python
ray.init(redis_address="123.45.67.89:6379")
Args:
node_ip_address (str): The IP address of the node that we are on.
redis_address (str): The address of the Redis server to connect to. If
this address is not provided, then this command will start Redis, a
global scheduler, a local scheduler, a plasma store, a plasma
manager, and some workers. It will also kill these processes when
Python exits.
num_cpus (int): Number of cpus the user wishes all local schedulers to
be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to
be configured with.
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
node_ip_address (str): The IP address of the node that we are on.
object_id_seed (int): Used to seed the deterministic generation of
object IDs. The same value can be used across multiple runs of the
same job in order to generate the object IDs in a consistent
@@ -1688,17 +1711,13 @@ def init(redis_address=None,
num_workers (int): The number of workers to start. This is only
provided if redis_address is not provided.
driver_mode (bool): The mode in which to start the driver. This should
be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
be one of ray.SCRIPT_MODE, ray.LOCAL_MODE, and ray.SILENT_MODE.
redirect_worker_output: True if the stdout and stderr of worker
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
processes should be redirected to files and false otherwise.
num_cpus (int): Number of cpus the user wishes all local schedulers to
be configured with.
num_gpus (int): Number of gpus the user wishes all local schedulers to
be configured with.
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
ignore_reinit_error: True if we should suppress errors from calling
ray.init() a second time.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
@@ -1709,11 +1728,7 @@ def init(redis_address=None,
Store with hugetlbfs support. Requires plasma_directory.
include_webui: Boolean flag indicating whether to start the web
UI, which is a Jupyter notebook.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
use_raylet: True if the new raylet code path should be used.
Returns:
Address information about the started processes.
@@ -1722,6 +1737,14 @@ def init(redis_address=None,
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
if global_worker.connected:
if ignore_reinit_error:
logger.error("Calling ray.init() again after it has already been "
"called.")
return
else:
raise Exception("Perhaps you called ray.init twice by accident?")
if use_raylet is None and os.environ.get("RAY_USE_XRAY") == "1":
# This environment variable is used in our testing setup.
logger.info("Detected environment variable 'RAY_USE_XRAY'.")
@@ -1761,12 +1784,23 @@ _post_init_hooks = []
def cleanup(worker=global_worker):
"""Disconnect the worker, and terminate any processes started in init.
raise DeprecationWarning(
"The function ray.worker.cleanup() has been deprecated. Instead, "
"please call ray.shutdown().")
def shutdown(worker=global_worker):
"""Disconnect the worker, and terminate processes started by ray.init().
This will automatically run at the end when a Python process that uses Ray
exits. It is ok to run this twice in a row. Note that we manually call
services.cleanup() in the tests because we need to start and stop many
clusters in the tests, but the import and exit only happen once.
exits. It is ok to run this twice in a row. The primary use case for this
function is to cleanup state between tests.
Note that this will clear any remote function definitions, actor
definitions, and existing actors, so if you wish to use any previously
defined remote functions or actors after calling ray.shutdown(), then you
need to redefine them. If they were defined in an imported module, then you
will need to reload the module.
"""
disconnect(worker)
if hasattr(worker, "local_scheduler_client"):
@@ -1792,7 +1826,7 @@ def cleanup(worker=global_worker):
worker.set_mode(None)
atexit.register(cleanup)
atexit.register(shutdown)
# Define a custom excepthook so that if the driver exits with an exception, we
# can push that exception to Redis.
@@ -2132,9 +2166,8 @@ def connect(info,
object_id_seed: A seed to use to make the generation of object IDs
deterministic.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE,
PYTHON_MODE, and SILENT_MODE.
use_raylet: True if the new raylet code path should be used. This is
not supported yet.
LOCAL_MODE, and SILENT_MODE.
use_raylet: True if the new raylet code path should be used.
"""
check_main_thread()
# Do some basic checking to make sure we didn't call ray.init twice.
@@ -2164,9 +2197,9 @@ def connect(info,
# which is important because we will append to this field from multiple
# threads.
worker.events = []
# If running Ray in PYTHON_MODE, there is no need to create call
# If running Ray in LOCAL_MODE, there is no need to create call
# create_worker or to start the worker service.
if mode == PYTHON_MODE:
if mode == LOCAL_MODE:
return
# Set the node IP address.
worker.node_ip_address = info["node_ip_address"]
@@ -2466,7 +2499,7 @@ def register_custom_serializer(cls,
this class.
Args:
cls (type): The class that ray should serialize.
cls (type): The class that ray should use this custom serializer for.
use_pickle (bool): If true, then objects of this class will be
serialized using pickle.
use_dict: If true, then objects of this class be serialized turning
@@ -2646,8 +2679,12 @@ class RayLogSpanRaylet(object):
def profile(event_type, extra_data=None, worker=global_worker):
"""Profile a span of time so that it appears in the timeline visualization.
Note that this only works in the raylet code path.
This function can be used as follows (both on the driver or within a task).
.. code-block:: python
with ray.profile("custom event", extra_data={'key': 'value'}):
# Do some computation here.
@@ -2752,13 +2789,17 @@ def get(object_ids, worker=global_worker):
Returns:
A Python object or a list of Python objects.
Raises:
Exception: An exception is raised if the task that created the object
or that created one of the objects raised an exception.
"""
worker.check_connected()
with profile("ray.get", worker=worker):
check_main_thread()
if worker.mode == PYTHON_MODE:
# In PYTHON_MODE, ray.get is the identity operation (the input will
if worker.mode == LOCAL_MODE:
# In LOCAL_MODE, ray.get is the identity operation (the input will
# actually be a value not an objectid).
return object_ids
if isinstance(object_ids, list):
@@ -2790,8 +2831,8 @@ def put(value, worker=global_worker):
with profile("ray.put", worker=worker):
check_main_thread()
if worker.mode == PYTHON_MODE:
# In PYTHON_MODE, ray.put is the identity operation.
if worker.mode == LOCAL_MODE:
# In LOCAL_MODE, ray.put is the identity operation.
return value
object_id = worker.local_scheduler_client.compute_put_id(
worker.current_task_id, worker.put_index, worker.use_raylet)
@@ -2806,16 +2847,17 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
If timeout is set, the function returns either when the requested number of
IDs are ready or when the timeout is reached, whichever occurs first. If it
is not set, the function simply waits until that number of objects is ready
and returns that exact number of object_ids.
and returns that exact number of object IDs.
This method returns two lists. The first list consists of object IDs that
correspond to objects that are stored in the object store. The second list
corresponds to the rest of the object IDs (which may or may not be ready).
correspond to objects that are available in the object store. The second
list corresponds to the rest of the object IDs (which may or may not be
ready).
Ordering of the input list of object IDs is preserved: if A precedes B in
the input list, and both are in the ready list, then A will precede B in
the ready list. This also holds true if A and B are both in the remaining
list.
Ordering of the input list of object IDs is preserved. That is, if A
precedes B in the input list, and both are in the ready list, then A will
precede B in the ready list. This also holds true if A and B are both in
the remaining list.
Args:
object_ids (List[ObjectID]): List of object IDs for objects that may or
@@ -2826,7 +2868,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
Returns:
A list of object IDs that are ready and a list of the remaining object
IDs.
IDs.
"""
if isinstance(object_ids, ray.ObjectID):
@@ -2837,7 +2879,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
raise TypeError("wait() expected a list of ObjectID, got {}".format(
type(object_ids)))
if worker.mode != PYTHON_MODE:
if worker.mode != LOCAL_MODE:
for object_id in object_ids:
if not isinstance(object_id, ray.ObjectID):
raise TypeError("wait() expected a list of ObjectID, "
@@ -2848,9 +2890,9 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
with profile("ray.wait", worker=worker):
check_main_thread()
# When Ray is run in PYTHON_MODE, all functions are run immediately,
# When Ray is run in LOCAL_MODE, all functions are run immediately,
# so all objects in object_id are ready.
if worker.mode == PYTHON_MODE:
if worker.mode == LOCAL_MODE:
return object_ids[:num_returns], object_ids[num_returns:]
# TODO(rkn): This is a temporary workaround for
@@ -2953,6 +2995,53 @@ def make_decorator(num_return_vals=None,
def remote(*args, **kwargs):
"""Define a remote function or an actor class.
This can be used with no arguments to define a remote function or actor as
follows:
.. code-block:: python
@ray.remote
def f():
return 1
@ray.remote
class Foo(object):
def method(self):
return 1
It can also be used with specific keyword arguments:
* **num_return_vals:** This is only for *remote functions*. It specifies
the number of object IDs returned by the remote function invocation.
* **num_cpus:** The quantity of CPU cores to reserve for this task or for
the lifetime of the actor.
* **num_gpus:** The quantity of GPUs to reserve for this task or for the
lifetime of the actor.
* **resources:** The quantity of various custom resources to reserve for
this task or for the lifetime of the actor. This is a dictionary mapping
strings (resource names) to numbers.
* **max_calls:** Only for *remote functions*. This specifies the maximum
number of times that a given worker can execute the given remote function
before it must exit (this can be used to address memory leaks in
third-party libraries or to reclaim resources that cannot easily be
released, e.g., GPU memory that was acquired by TensorFlow). By
default this is infinite.
This can be done as follows:
.. code-block:: python
@ray.remote(num_gpus=1, max_calls=1, num_return_vals=2)
def f():
return 1, 2
@ray.remote(num_cpus=2, resources={"CustomResource": 1})
class Foo(object):
def method(self):
return 1
"""
worker = get_global_worker()
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):