diff --git a/doc/source/advanced.rst b/doc/source/advanced.rst index c89e3131d..14fa2c61a 100644 --- a/doc/source/advanced.rst +++ b/doc/source/advanced.rst @@ -98,12 +98,12 @@ Message passing using Ray Queue ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Sometimes just using one signal to synchronize is not enough. If you need to send data among many tasks or -actors, you can use ``ray.experimental.queue.Queue`` (`source code `_). +actors, you can use :ref:`ray.util.queue.Queue `. .. code-block:: python import ray - from ray.experimental.queue import Queue + from ray.util.queue import Queue ray.init() # You can pass this object around to different tasks/actors @@ -228,7 +228,7 @@ Ray supports resource specific accelerator types. The `accelerator_type` field c .. code-block:: python from ray.accelerators import NVIDIA_TESLA_V100 - + @ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100) def train(data): return "This function was run on a node with a Tesla V100 GPU" diff --git a/doc/source/package-ref.rst b/doc/source/package-ref.rst index 42c5ed2be..b05daec0b 100644 --- a/doc/source/package-ref.rst +++ b/doc/source/package-ref.rst @@ -87,6 +87,14 @@ ray.util.ActorPool .. autoclass:: ray.util.ActorPool :members: +ray.util.queue.Queue +~~~~~~~~~~~~~~~~~~~~ + +.. _ray-queue-ref: + +.. autoclass:: ray.util.queue.Queue + :members: + .. _ray-nodes-ref: ray.nodes diff --git a/python/ray/experimental/queue.py b/python/ray/experimental/queue.py index cc6f6bd05..b5acc95a9 100644 --- a/python/ray/experimental/queue.py +++ b/python/ray/experimental/queue.py @@ -1,137 +1,11 @@ -import asyncio +import warnings -import ray +from ray.util.queue import Empty, Full, Queue +warnings.warn( + DeprecationWarning( + "ray.experimental.queue has been moved to ray.util.queue. " + "Please update your import path."), + stacklevel=2) -class Empty(Exception): - pass - - -class Full(Exception): - pass - - -class Queue: - """Queue implementation on Ray. - - Args: - maxsize (int): maximum size of the queue. If zero, size is unbounded. - """ - - def __init__(self, maxsize=0): - self.actor = _QueueActor.remote(maxsize) - - def __len__(self): - return self.size() - - def size(self): - """The size of the queue.""" - return ray.get(self.actor.qsize.remote()) - - def qsize(self): - """The size of the queue.""" - return self.size() - - def empty(self): - """Whether the queue is empty.""" - return ray.get(self.actor.empty.remote()) - - def full(self): - """Whether the queue is full.""" - return ray.get(self.actor.full.remote()) - - def put(self, item, block=True, timeout=None): - """Adds an item to the queue. - - There is no guarantee of order if multiple producers put to the same - full queue. - - Raises: - Full if the queue is full and blocking is False. - Full if the queue is full, blocking is True, and it timed out. - ValueError if timeout is negative. - """ - if not block: - try: - ray.get(self.actor.put_nowait.remote(item)) - except asyncio.QueueFull: - raise Full - else: - if timeout is not None and timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - else: - ray.get(self.actor.put.remote(item, timeout)) - - def get(self, block=True, timeout=None): - """Gets an item from the queue. - - There is no guarantee of order if multiple consumers get from the - same empty queue. - - Returns: - The next item in the queue. - - Raises: - Empty if the queue is empty and blocking is False. - Empty if the queue is empty, blocking is True, and it timed out. - ValueError if timeout is negative. - """ - if not block: - try: - return ray.get(self.actor.get_nowait.remote()) - except asyncio.QueueEmpty: - raise Empty - else: - if timeout is not None and timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - else: - return ray.get(self.actor.get.remote(timeout)) - - def put_nowait(self, item): - """Equivalent to put(item, block=False). - - Raises: - Full if the queue is full. - """ - return self.put(item, block=False) - - def get_nowait(self): - """Equivalent to get(block=False). - - Raises: - Empty if the queue is empty. - """ - return self.get(block=False) - - -@ray.remote -class _QueueActor: - def __init__(self, maxsize): - self.queue = asyncio.Queue(maxsize) - - def qsize(self): - return self.queue.qsize() - - def empty(self): - return self.queue.empty() - - def full(self): - return self.queue.full() - - async def put(self, item, timeout=None): - try: - await asyncio.wait_for(self.queue.put(item), timeout) - except asyncio.TimeoutError: - raise Full - - async def get(self, timeout=None): - try: - return await asyncio.wait_for(self.queue.get(), timeout) - except asyncio.TimeoutError: - raise Empty - - def put_nowait(self, item): - self.queue.put_nowait(item) - - def get_nowait(self): - return self.queue.get_nowait() +__all__ = ["Empty", "Full", "Queue"] diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index f467ac7c6..d79a18359 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -88,6 +88,7 @@ py_test_module_list( "test_mini.py", "test_node_manager.py", "test_numba.py", + "test_queue.py", "test_ray_init.py", "test_serialization.py", "test_tempfile.py", diff --git a/python/ray/tests/test_queue.py b/python/ray/tests/test_queue.py index cfb254884..a3f39ec4b 100644 --- a/python/ray/tests/test_queue.py +++ b/python/ray/tests/test_queue.py @@ -2,7 +2,7 @@ import pytest import ray from ray.exceptions import GetTimeoutError -from ray.experimental.queue import Queue, Empty, Full +from ray.util.queue import Queue, Empty, Full @ray.remote @@ -15,7 +15,7 @@ def async_put(queue, item): return queue.put(item, block=True) -def test_simple_usage(ray_start_regular): +def test_simple_usage(ray_start_regular_shared): q = Queue() @@ -28,7 +28,7 @@ def test_simple_usage(ray_start_regular): assert item == q.get() -def test_get(ray_start_regular): +def test_get(ray_start_regular_shared): q = Queue() @@ -50,7 +50,7 @@ def test_get(ray_start_regular): q.get(timeout=0.2) -def test_put(ray_start_regular): +def test_put(ray_start_regular_shared): q = Queue(1) @@ -73,7 +73,7 @@ def test_put(ray_start_regular): q.put(1, timeout=0.2) -def test_async_get(ray_start_regular): +def test_async_get(ray_start_regular_shared): q = Queue() future = async_get.remote(q) @@ -87,7 +87,7 @@ def test_async_get(ray_start_regular): assert ray.get(future) == 1 -def test_async_put(ray_start_regular): +def test_async_put(ray_start_regular_shared): q = Queue(1) q.put(1) future = async_put.remote(q, 2) @@ -102,7 +102,7 @@ def test_async_put(ray_start_regular): assert q.get() == 2 -def test_qsize(ray_start_regular): +def test_qsize(ray_start_regular_shared): q = Queue() diff --git a/python/ray/util/queue.py b/python/ray/util/queue.py new file mode 100644 index 000000000..cc6f6bd05 --- /dev/null +++ b/python/ray/util/queue.py @@ -0,0 +1,137 @@ +import asyncio + +import ray + + +class Empty(Exception): + pass + + +class Full(Exception): + pass + + +class Queue: + """Queue implementation on Ray. + + Args: + maxsize (int): maximum size of the queue. If zero, size is unbounded. + """ + + def __init__(self, maxsize=0): + self.actor = _QueueActor.remote(maxsize) + + def __len__(self): + return self.size() + + def size(self): + """The size of the queue.""" + return ray.get(self.actor.qsize.remote()) + + def qsize(self): + """The size of the queue.""" + return self.size() + + def empty(self): + """Whether the queue is empty.""" + return ray.get(self.actor.empty.remote()) + + def full(self): + """Whether the queue is full.""" + return ray.get(self.actor.full.remote()) + + def put(self, item, block=True, timeout=None): + """Adds an item to the queue. + + There is no guarantee of order if multiple producers put to the same + full queue. + + Raises: + Full if the queue is full and blocking is False. + Full if the queue is full, blocking is True, and it timed out. + ValueError if timeout is negative. + """ + if not block: + try: + ray.get(self.actor.put_nowait.remote(item)) + except asyncio.QueueFull: + raise Full + else: + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + ray.get(self.actor.put.remote(item, timeout)) + + def get(self, block=True, timeout=None): + """Gets an item from the queue. + + There is no guarantee of order if multiple consumers get from the + same empty queue. + + Returns: + The next item in the queue. + + Raises: + Empty if the queue is empty and blocking is False. + Empty if the queue is empty, blocking is True, and it timed out. + ValueError if timeout is negative. + """ + if not block: + try: + return ray.get(self.actor.get_nowait.remote()) + except asyncio.QueueEmpty: + raise Empty + else: + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + return ray.get(self.actor.get.remote(timeout)) + + def put_nowait(self, item): + """Equivalent to put(item, block=False). + + Raises: + Full if the queue is full. + """ + return self.put(item, block=False) + + def get_nowait(self): + """Equivalent to get(block=False). + + Raises: + Empty if the queue is empty. + """ + return self.get(block=False) + + +@ray.remote +class _QueueActor: + def __init__(self, maxsize): + self.queue = asyncio.Queue(maxsize) + + def qsize(self): + return self.queue.qsize() + + def empty(self): + return self.queue.empty() + + def full(self): + return self.queue.full() + + async def put(self, item, timeout=None): + try: + await asyncio.wait_for(self.queue.put(item), timeout) + except asyncio.TimeoutError: + raise Full + + async def get(self, timeout=None): + try: + return await asyncio.wait_for(self.queue.get(), timeout) + except asyncio.TimeoutError: + raise Empty + + def put_nowait(self, item): + self.queue.put_nowait(item) + + def get_nowait(self): + return self.queue.get_nowait()