mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 10:33:24 +08:00
[doc] Document experimental signal API. (#4019)
* [doc] Document signal API. * minor * resolve conflicts
This commit is contained in:
@@ -66,6 +66,7 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin
|
||||
actors.rst
|
||||
using-ray-with-gpus.rst
|
||||
webui.rst
|
||||
signals.rst
|
||||
async_api.rst
|
||||
|
||||
.. toctree::
|
||||
|
||||
@@ -0,0 +1,168 @@
|
||||
Signal API (Experimental)
|
||||
=========================
|
||||
|
||||
This experimental API allows tasks and actors to generate signals which can
|
||||
be received by other tasks and actors. In addition, task failures and actor
|
||||
method failures generate error signals. The error signals enable applications
|
||||
to detect failures and potentially recover from failures.
|
||||
|
||||
.. autofunction:: ray.experimental.signal.send
|
||||
|
||||
Here is a simple example of a remote function that sends a user-defined signal.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray.experimental.signal as signal
|
||||
|
||||
# Define an application level signal.
|
||||
class UserSignal(signal.Signal):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def get_value(self):
|
||||
return self.value
|
||||
|
||||
# Define a remote function that sends a user-defined signal.
|
||||
@ray.remote
|
||||
def send_signal(value):
|
||||
signal.send(UserSignal(value))
|
||||
|
||||
.. autofunction:: ray.experimental.signal.receive
|
||||
|
||||
Here is a simple example of how to receive signals from an actor or task identified
|
||||
by ``a``. Note that an actor is identified by its handle, and a task by one of its
|
||||
object ID return values.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray.experimental.signal as signal
|
||||
|
||||
# This returns a possibly empty list of all signals that have been sent by 'a'
|
||||
# since the last invocation of signal.receive from within this process. If 'a'
|
||||
# did not send any signals, then this will wait for up to 10 seconds to receive
|
||||
# a signal from 'a'.
|
||||
signal_list = signal.receive([a], timeout=10)
|
||||
|
||||
.. autofunction:: ray.experimental.signal.reset
|
||||
|
||||
|
||||
Example: sending a user signal
|
||||
------------------------------
|
||||
|
||||
The code below show a simple example in which a task, called ``send_signal()``
|
||||
sends a user signal and the driver gets it by invoking ``signal.receive()``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray.experimental.signal as signal
|
||||
|
||||
# Define a user signal.
|
||||
class UserSignal(signal.Signal):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def get_value(self):
|
||||
return self.value
|
||||
|
||||
@ray.remote
|
||||
def send_signal(value):
|
||||
signal.send(UserSignal(value))
|
||||
return
|
||||
|
||||
signal_value = 'simple signal'
|
||||
object_id = send_signal.remote(signal_value)
|
||||
# Wait up to 10sec to receive a signal from the task. Note the task is
|
||||
# identified by the object_id it returns.
|
||||
result_list = signal.receive([object_id], timeout=10)
|
||||
# Print signal values. This should print "simple_signal".
|
||||
# Note that result_list[0] is the signal we expect from the task.
|
||||
# The signal is a tuple where the first element is the first object ID
|
||||
# returned by the task and the second element is the signal object.
|
||||
print(result_list[0][1].get_value())
|
||||
|
||||
Example: Getting an error signals
|
||||
---------------------------------
|
||||
|
||||
This is a simple example in which a driver gets an error signal caused
|
||||
by the failure of ``task()``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote
|
||||
def task():
|
||||
raise Exception('exception message')
|
||||
|
||||
object_id = task.remote()
|
||||
try:
|
||||
ray.get(object_id)
|
||||
except Exception as e:
|
||||
pass
|
||||
finally:
|
||||
result_list = signal.receive([object_id], timeout=10)
|
||||
# Expected signal is 'ErrorSignal'.
|
||||
assert type(result_list[0][1]) == signal.ErrorSignal
|
||||
# Print the error.
|
||||
print(result_list[0][1].get_error())
|
||||
|
||||
|
||||
Example: Sending signals between multiple actors
|
||||
------------------------------------------------
|
||||
|
||||
This is a more involved example in which two actors ``a1`` and ``a2`` each
|
||||
generate five signals, and another actor ``b`` waits to receive all signals
|
||||
generated by ``a1`` and ``a2``, respectively. Note that ``b`` recursively calls
|
||||
its own method ``get_signals()`` until it gets all signals it expects.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote
|
||||
class ActorSendSignals(object):
|
||||
def send_signals(self, value, count):
|
||||
for i in range(count):
|
||||
signal.send(UserSignal(value + str(i)))
|
||||
|
||||
@ray.remote
|
||||
class ActorGetAllSignals(object):
|
||||
def __init__(self, num_expected_signals, *source_ids):
|
||||
self.received_signals = []
|
||||
self.num_expected_signals = num_expected_signals
|
||||
self.source_ids = source_ids
|
||||
|
||||
def register_handle(self, handle):
|
||||
self.this_actor = handle
|
||||
|
||||
def get_signals(self):
|
||||
new_signals = signal.receive(self.source_ids, timeout=10)
|
||||
self.received_signals.extend(new_signals)
|
||||
if len(self.received_signals) < self.num_expected_signals:
|
||||
self.this_actor.get_signals.remote()
|
||||
|
||||
def get_count(self):
|
||||
return len(self.received_signals)
|
||||
|
||||
# Create two actors to send signals.
|
||||
a1 = ActorSendSignals.remote()
|
||||
a2 = ActorSendSignals.remote()
|
||||
signal_value = 'simple signal'
|
||||
count = 5
|
||||
# Each actor sends five signals.
|
||||
a1.send_signals.remote(signal_value, count)
|
||||
a2.send_signals.remote(signal_value, count)
|
||||
|
||||
# Create an actor that waits for all five signals sent by each actor.
|
||||
b = ActorGetAllSignals.remote(2 * count, *[a1, a2])
|
||||
# Provide actor to its own handle, so it can recursively call itself
|
||||
# to get all signals from a1, and a2, respectively. This enables the actor
|
||||
# execute other methods if needed.
|
||||
ray.get(b.register_handle.remote(b))
|
||||
b.get_signals.remote()
|
||||
# Print total number of signals. This should be 2*count = 10.
|
||||
print(ray.get(b.get_count.remote()))
|
||||
|
||||
Note
|
||||
----
|
||||
|
||||
A failed actor (e.g., an actor that crashed) generates an error message only
|
||||
when another actor or task invokes one of its methods.
|
||||
|
||||
Please `let us know <https://github.com/ray-project/ray/issues>`__ any issues you encounter.
|
||||
Reference in New Issue
Block a user