mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:53:14 +08:00
Export remote functions when first used and also fix bug in which rem… (#4844)
* Export remote functions when first used and also fix bug in which remote functions and actor classes are not exported from workers during subsequent ray sessions. * Documentation update * Fix tests. * Fix grammar
This commit is contained in:
committed by
Philipp Moritz
parent
4e281ba938
commit
49fe894e22
@@ -66,32 +66,6 @@ listens for the addition of remote functions to the centralized control state.
|
||||
When a new remote function is added, the thread fetches the pickled remote
|
||||
function, unpickles it, and can then execute that function.
|
||||
|
||||
Notes and limitations
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
- Because we export remote functions as soon as they are defined, that means
|
||||
that remote functions can't close over variables that are defined after the
|
||||
remote function is defined. For example, the following code gives an error.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote
|
||||
def f(x):
|
||||
return helper(x)
|
||||
|
||||
def helper(x):
|
||||
return x + 1
|
||||
|
||||
If you call ``f.remote(0)``, it will give an error of the form.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
Traceback (most recent call last):
|
||||
File "<ipython-input-3-12a5beeb2306>", line 3, in f
|
||||
NameError: name 'helper' is not defined
|
||||
|
||||
On the other hand, if ``helper`` is defined before ``f``, then it will work.
|
||||
|
||||
Calling a remote function
|
||||
-------------------------
|
||||
|
||||
|
||||
+11
-7
@@ -186,9 +186,12 @@ class ActorClass(object):
|
||||
task.
|
||||
_resources: The default resources required by the actor creation task.
|
||||
_actor_method_cpus: The number of CPUs required by actor method tasks.
|
||||
_last_export_session: The index of the last session in which the remote
|
||||
function was exported. This is used to determine if we need to
|
||||
export the remote function again.
|
||||
_last_driver_id_exported_for: The ID of the driver ID of the last Ray
|
||||
session during which this actor class definition was exported. This
|
||||
is an imperfect mechanism used to determine if we need to export
|
||||
the remote function again. It is imperfect in the sense that the
|
||||
actor class definition could be exported multiple times by
|
||||
different workers.
|
||||
_actor_methods: The actor methods.
|
||||
_method_decorators: Optional decorators that should be applied to the
|
||||
method invocation function before invoking the actor methods. These
|
||||
@@ -209,7 +212,7 @@ class ActorClass(object):
|
||||
self._num_cpus = num_cpus
|
||||
self._num_gpus = num_gpus
|
||||
self._resources = resources
|
||||
self._last_export_session = None
|
||||
self._last_driver_id_exported_for = None
|
||||
|
||||
self._actor_methods = inspect.getmembers(
|
||||
self._modified_class, ray.utils.is_function_or_method)
|
||||
@@ -342,12 +345,13 @@ class ActorClass(object):
|
||||
*copy.deepcopy(args), **copy.deepcopy(kwargs))
|
||||
else:
|
||||
# Export the actor.
|
||||
if (self._last_export_session is None
|
||||
or self._last_export_session < worker._session_index):
|
||||
if (self._last_driver_id_exported_for is None
|
||||
or self._last_driver_id_exported_for !=
|
||||
worker.task_driver_id):
|
||||
# If this actor class was exported in a previous session, we
|
||||
# need to export this function again, because current GCS
|
||||
# doesn't have it.
|
||||
self._last_export_session = worker._session_index
|
||||
self._last_driver_id_exported_for = worker.task_driver_id
|
||||
worker.function_actor_manager.export_actor_class(
|
||||
self._modified_class, self._actor_method_names)
|
||||
|
||||
|
||||
@@ -342,7 +342,7 @@ class FunctionActorManager(object):
|
||||
# and export it later.
|
||||
self._functions_to_export.append(remote_function)
|
||||
return
|
||||
if self._worker.mode != ray.worker.SCRIPT_MODE:
|
||||
if self._worker.mode == ray.worker.LOCAL_MODE:
|
||||
# Don't need to export if the worker is not a driver.
|
||||
return
|
||||
self._do_export(remote_function)
|
||||
|
||||
@@ -43,9 +43,12 @@ class RemoteFunction(object):
|
||||
return the resulting ObjectIDs. For an example, see
|
||||
"test_decorated_function" in "python/ray/tests/test_basic.py".
|
||||
_function_signature: The function signature.
|
||||
_last_export_session: The index of the last session in which the remote
|
||||
function was exported. This is used to determine if we need to
|
||||
export the remote function again.
|
||||
_last_driver_id_exported_for: The ID of the driver ID of the last Ray
|
||||
session during which this remote function definition was exported.
|
||||
This is an imperfect mechanism used to determine if we need to
|
||||
export the remote function again. It is imperfect in the sense that
|
||||
the actor class definition could be exported multiple times by
|
||||
different workers.
|
||||
"""
|
||||
|
||||
def __init__(self, function, num_cpus, num_gpus, resources,
|
||||
@@ -69,10 +72,7 @@ class RemoteFunction(object):
|
||||
self._function_signature = ray.signature.extract_signature(
|
||||
self._function)
|
||||
|
||||
# Export the function.
|
||||
worker = ray.worker.get_global_worker()
|
||||
self._last_export_session = worker._session_index
|
||||
worker.function_actor_manager.export(self)
|
||||
self._last_driver_id_exported_for = None
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
raise Exception("Remote functions cannot be called directly. Instead "
|
||||
@@ -111,10 +111,11 @@ class RemoteFunction(object):
|
||||
worker = ray.worker.get_global_worker()
|
||||
worker.check_connected()
|
||||
|
||||
if self._last_export_session < worker._session_index:
|
||||
if (self._last_driver_id_exported_for is None
|
||||
or self._last_driver_id_exported_for != worker.task_driver_id):
|
||||
# If this function was exported in a previous session, we need to
|
||||
# export this function again, because current GCS doesn't have it.
|
||||
self._last_export_session = worker._session_index
|
||||
self._last_driver_id_exported_for = worker.task_driver_id
|
||||
worker.function_actor_manager.export(self)
|
||||
|
||||
kwargs = {} if kwargs is None else kwargs
|
||||
|
||||
@@ -303,6 +303,23 @@ def test_complex_serialization(ray_start_regular):
|
||||
assert_equal(obj, ray.get(ray.put(obj)))
|
||||
|
||||
|
||||
def test_nested_functions(ray_start_regular):
|
||||
# Make sure that remote functions can use other values that are defined
|
||||
# after the remote function but before the first function invocation.
|
||||
@ray.remote
|
||||
def f():
|
||||
return g(), ray.get(h.remote())
|
||||
|
||||
def g():
|
||||
return 1
|
||||
|
||||
@ray.remote
|
||||
def h():
|
||||
return 2
|
||||
|
||||
assert ray.get(f.remote()) == (1, 2)
|
||||
|
||||
|
||||
def test_ray_recursive_objects(ray_start_regular):
|
||||
class ClassA(object):
|
||||
pass
|
||||
@@ -2968,3 +2985,17 @@ def test_export_after_shutdown(ray_start_regular):
|
||||
ray.get(f.remote())
|
||||
a = Actor.remote()
|
||||
ray.get(a.method.remote())
|
||||
|
||||
ray.shutdown()
|
||||
|
||||
# Start Ray again and make sure that these definitions can be exported from
|
||||
# workers.
|
||||
ray.init(num_cpus=2)
|
||||
|
||||
@ray.remote
|
||||
def export_definitions_from_worker(remote_function, actor_class):
|
||||
ray.get(remote_function.remote())
|
||||
actor_handle = actor_class.remote()
|
||||
ray.get(actor_handle.method.remote())
|
||||
|
||||
ray.get(export_definitions_from_worker.remote(f, Actor))
|
||||
|
||||
@@ -95,7 +95,15 @@ def temporary_helper_function():
|
||||
# fail when it is unpickled.
|
||||
@ray.remote
|
||||
def g():
|
||||
return module.temporary_python_file()
|
||||
try:
|
||||
module.temporary_python_file()
|
||||
except Exception:
|
||||
# This test is not concerned with the error from running this
|
||||
# function. Only from unpickling the remote function.
|
||||
pass
|
||||
|
||||
# Invoke the function so that the definition is exported.
|
||||
g.remote()
|
||||
|
||||
wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2)
|
||||
errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR)
|
||||
@@ -499,6 +507,9 @@ def test_export_large_objects(ray_start_regular):
|
||||
def f():
|
||||
large_object
|
||||
|
||||
# Invoke the function so that the definition is exported.
|
||||
f.remote()
|
||||
|
||||
# Make sure that a warning is generated.
|
||||
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 1)
|
||||
|
||||
|
||||
@@ -46,13 +46,6 @@ def _test_cleanup_on_driver_exit(num_redis_shards):
|
||||
# Two new objects.
|
||||
ray.get(ray.put(1111))
|
||||
ray.get(ray.put(1111))
|
||||
attempts = 0
|
||||
while (2, 1, summary_start[2]) != StateSummary():
|
||||
time.sleep(0.1)
|
||||
attempts += 1
|
||||
if attempts == max_attempts_before_failing:
|
||||
success.value = False
|
||||
break
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
@@ -61,7 +54,7 @@ def _test_cleanup_on_driver_exit(num_redis_shards):
|
||||
|
||||
# 1 new function.
|
||||
attempts = 0
|
||||
while (2, 1, summary_start[2] + 1) != StateSummary():
|
||||
while (2, 1, summary_start[2]) != StateSummary():
|
||||
time.sleep(0.1)
|
||||
attempts += 1
|
||||
if attempts == max_attempts_before_failing:
|
||||
|
||||
Reference in New Issue
Block a user