diff --git a/doc/source/index.rst b/doc/source/index.rst index 342e9475d..f308cbea4 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -66,6 +66,7 @@ Ray comes with libraries that accelerate deep learning and reinforcement learnin actors.rst using-ray-with-gpus.rst webui.rst + signals.rst async_api.rst .. toctree:: diff --git a/doc/source/signals.rst b/doc/source/signals.rst new file mode 100644 index 000000000..76f9984d6 --- /dev/null +++ b/doc/source/signals.rst @@ -0,0 +1,168 @@ +Signal API (Experimental) +========================= + +This experimental API allows tasks and actors to generate signals which can +be received by other tasks and actors. In addition, task failures and actor +method failures generate error signals. The error signals enable applications +to detect failures and potentially recover from failures. + +.. autofunction:: ray.experimental.signal.send + +Here is a simple example of a remote function that sends a user-defined signal. + +.. code-block:: python + + import ray.experimental.signal as signal + + # Define an application level signal. + class UserSignal(signal.Signal): + def __init__(self, value): + self.value = value + + def get_value(self): + return self.value + + # Define a remote function that sends a user-defined signal. + @ray.remote + def send_signal(value): + signal.send(UserSignal(value)) + +.. autofunction:: ray.experimental.signal.receive + +Here is a simple example of how to receive signals from an actor or task identified +by ``a``. Note that an actor is identified by its handle, and a task by one of its +object ID return values. + +.. code-block:: python + + import ray.experimental.signal as signal + + # This returns a possibly empty list of all signals that have been sent by 'a' + # since the last invocation of signal.receive from within this process. If 'a' + # did not send any signals, then this will wait for up to 10 seconds to receive + # a signal from 'a'. + signal_list = signal.receive([a], timeout=10) + +.. autofunction:: ray.experimental.signal.reset + + +Example: sending a user signal +------------------------------ + +The code below show a simple example in which a task, called ``send_signal()`` +sends a user signal and the driver gets it by invoking ``signal.receive()``. + +.. code-block:: python + + import ray.experimental.signal as signal + + # Define a user signal. + class UserSignal(signal.Signal): + def __init__(self, value): + self.value = value + + def get_value(self): + return self.value + + @ray.remote + def send_signal(value): + signal.send(UserSignal(value)) + return + + signal_value = 'simple signal' + object_id = send_signal.remote(signal_value) + # Wait up to 10sec to receive a signal from the task. Note the task is + # identified by the object_id it returns. + result_list = signal.receive([object_id], timeout=10) + # Print signal values. This should print "simple_signal". + # Note that result_list[0] is the signal we expect from the task. + # The signal is a tuple where the first element is the first object ID + # returned by the task and the second element is the signal object. + print(result_list[0][1].get_value()) + +Example: Getting an error signals +--------------------------------- + +This is a simple example in which a driver gets an error signal caused +by the failure of ``task()``. + +.. code-block:: python + + @ray.remote + def task(): + raise Exception('exception message') + + object_id = task.remote() + try: + ray.get(object_id) + except Exception as e: + pass + finally: + result_list = signal.receive([object_id], timeout=10) + # Expected signal is 'ErrorSignal'. + assert type(result_list[0][1]) == signal.ErrorSignal + # Print the error. + print(result_list[0][1].get_error()) + + +Example: Sending signals between multiple actors +------------------------------------------------ + +This is a more involved example in which two actors ``a1`` and ``a2`` each +generate five signals, and another actor ``b`` waits to receive all signals +generated by ``a1`` and ``a2``, respectively. Note that ``b`` recursively calls +its own method ``get_signals()`` until it gets all signals it expects. + +.. code-block:: python + + @ray.remote + class ActorSendSignals(object): + def send_signals(self, value, count): + for i in range(count): + signal.send(UserSignal(value + str(i))) + + @ray.remote + class ActorGetAllSignals(object): + def __init__(self, num_expected_signals, *source_ids): + self.received_signals = [] + self.num_expected_signals = num_expected_signals + self.source_ids = source_ids + + def register_handle(self, handle): + self.this_actor = handle + + def get_signals(self): + new_signals = signal.receive(self.source_ids, timeout=10) + self.received_signals.extend(new_signals) + if len(self.received_signals) < self.num_expected_signals: + self.this_actor.get_signals.remote() + + def get_count(self): + return len(self.received_signals) + + # Create two actors to send signals. + a1 = ActorSendSignals.remote() + a2 = ActorSendSignals.remote() + signal_value = 'simple signal' + count = 5 + # Each actor sends five signals. + a1.send_signals.remote(signal_value, count) + a2.send_signals.remote(signal_value, count) + + # Create an actor that waits for all five signals sent by each actor. + b = ActorGetAllSignals.remote(2 * count, *[a1, a2]) + # Provide actor to its own handle, so it can recursively call itself + # to get all signals from a1, and a2, respectively. This enables the actor + # execute other methods if needed. + ray.get(b.register_handle.remote(b)) + b.get_signals.remote() + # Print total number of signals. This should be 2*count = 10. + print(ray.get(b.get_count.remote())) + +Note +---- + +A failed actor (e.g., an actor that crashed) generates an error message only +when another actor or task invokes one of its methods. + +Please `let us know `__ any issues you encounter.