diff --git a/python/ray/experimental/queue.py b/python/ray/experimental/queue.py index d0adcd6c7..cc6f6bd05 100644 --- a/python/ray/experimental/queue.py +++ b/python/ray/experimental/queue.py @@ -1,5 +1,4 @@ -from collections import deque -import time +import asyncio import ray @@ -16,11 +15,10 @@ class Queue: """Queue implementation on Ray. Args: - maxsize (int): maximum size of the queue. If zero, size is unboundend. + maxsize (int): maximum size of the queue. If zero, size is unbounded. """ def __init__(self, maxsize=0): - self.maxsize = maxsize self.actor = _QueueActor.remote(maxsize) def __len__(self): @@ -45,70 +43,49 @@ class Queue: def put(self, item, block=True, timeout=None): """Adds an item to the queue. - Uses polling if block=True, so there is no guarantee of order if - multiple producers put to the same full queue. + There is no guarantee of order if multiple producers put to the same + full queue. Raises: Full if the queue is full and blocking is False. + Full if the queue is full, blocking is True, and it timed out. + ValueError if timeout is negative. """ - if self.maxsize <= 0: - self.actor.put.remote(item) - elif not block: - if not ray.get(self.actor.put.remote(item)): + if not block: + try: + ray.get(self.actor.put_nowait.remote(item)) + except asyncio.QueueFull: raise Full - elif timeout is None: - # Polling - # Use a not_full condition variable or promise? - while not ray.get(self.actor.put.remote(item)): - # Consider adding time.sleep here - pass - elif timeout < 0: - raise ValueError("'timeout' must be a non-negative number") else: - endtime = time.time() + timeout - # Polling - # Use a condition variable or switch to promise? - success = False - while not success and time.time() < endtime: - success = ray.get(self.actor.put.remote(item)) - if not success: - raise Full + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + ray.get(self.actor.put.remote(item, timeout)) def get(self, block=True, timeout=None): """Gets an item from the queue. - Uses polling if block=True, so there is no guarantee of order if - multiple consumers get from the same empty queue. + There is no guarantee of order if multiple consumers get from the + same empty queue. Returns: The next item in the queue. Raises: Empty if the queue is empty and blocking is False. + Empty if the queue is empty, blocking is True, and it timed out. + ValueError if timeout is negative. """ if not block: - success, item = ray.get(self.actor.get.remote()) - if not success: + try: + return ray.get(self.actor.get_nowait.remote()) + except asyncio.QueueEmpty: raise Empty - elif timeout is None: - # Polling - # Use a not_empty condition variable or return a promise? - success, item = ray.get(self.actor.get.remote()) - while not success: - # Consider adding time.sleep here - success, item = ray.get(self.actor.get.remote()) - elif timeout < 0: - raise ValueError("'timeout' must be a non-negative number") else: - endtime = time.time() + timeout - # Polling - # Use a not_full condition variable or return a promise? - success = False - while not success and time.time() < endtime: - success, item = ray.get(self.actor.get.remote()) - if not success: - raise Empty - return item + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + return ray.get(self.actor.get.remote(timeout)) def put_nowait(self, item): """Equivalent to put(item, block=False). @@ -119,7 +96,7 @@ class Queue: return self.put(item, block=False) def get_nowait(self): - """Equivalent to get(item, block=False). + """Equivalent to get(block=False). Raises: Empty if the queue is empty. @@ -130,38 +107,31 @@ class Queue: @ray.remote class _QueueActor: def __init__(self, maxsize): - self.maxsize = maxsize - self._init(maxsize) + self.queue = asyncio.Queue(maxsize) def qsize(self): - return self._qsize() + return self.queue.qsize() def empty(self): - return not self._qsize() + return self.queue.empty() def full(self): - return 0 < self.maxsize <= self._qsize() + return self.queue.full() - def put(self, item): - if self.maxsize > 0 and self._qsize() >= self.maxsize: - return False - self._put(item) - return True + async def put(self, item, timeout=None): + try: + await asyncio.wait_for(self.queue.put(item), timeout) + except asyncio.TimeoutError: + raise Full - def get(self): - if not self._qsize(): - return False, None - return True, self._get() + async def get(self, timeout=None): + try: + return await asyncio.wait_for(self.queue.get(), timeout) + except asyncio.TimeoutError: + raise Empty - # Override these for different queue implementations - def _init(self, maxsize): - self.queue = deque() + def put_nowait(self, item): + self.queue.put_nowait(item) - def _qsize(self): - return len(self.queue) - - def _put(self, item): - self.queue.append(item) - - def _get(self): - return self.queue.popleft() + def get_nowait(self): + return self.queue.get_nowait() diff --git a/python/ray/tests/test_queue.py b/python/ray/tests/test_queue.py index 247bcb2b1..72bbb821c 100644 --- a/python/ray/tests/test_queue.py +++ b/python/ray/tests/test_queue.py @@ -1,22 +1,21 @@ import pytest -import time import ray +from ray.exceptions import RayTimeoutError from ray.experimental.queue import Queue, Empty, Full -def test_queue(ray_start_regular): - @ray.remote - def get_async(queue, block, timeout, sleep): - time.sleep(sleep) - return queue.get(block, timeout) +@ray.remote +def async_get(queue): + return queue.get(block=True) - @ray.remote - def put_async(queue, item, block, timeout, sleep): - time.sleep(sleep) - queue.put(item, block, timeout) - # Test simple usage. +@ray.remote +def async_put(queue, item): + return queue.put(item, block=True) + + +def test_simple_usage(ray_start_regular): q = Queue() @@ -28,21 +27,30 @@ def test_queue(ray_start_regular): for item in items: assert item == q.get() - # Test asynchronous usage. + +def test_get(ray_start_regular): q = Queue() - items = set(range(10)) - producers = [ # noqa - put_async.remote(q, item, True, None, 0.5) for item in items - ] - consumers = [get_async.remote(q, True, None, 0) for _ in items] + item = 0 + q.put(item) + assert q.get(block=False) == item - result = set(ray.get(consumers)) + item = 1 + q.put(item) + assert q.get(timeout=0.2) == item - assert items == result + with pytest.raises(ValueError): + q.get(timeout=-1) - # Test put. + with pytest.raises(Empty): + q.get_nowait() + + with pytest.raises(Empty): + q.get(timeout=0.2) + + +def test_put(ray_start_regular): q = Queue(1) @@ -64,40 +72,37 @@ def test_queue(ray_start_regular): with pytest.raises(Full): q.put(1, timeout=0.2) - q.get() - q.put(1) - - get_id = get_async.remote(q, False, None, 0.2) - q.put(2) - - assert ray.get(get_id) == 1 - - # Test get. +def test_async_get(ray_start_regular): q = Queue() - - item = 0 - q.put(item) - assert q.get(block=False) == item - - item = 1 - q.put(item) - assert q.get(timeout=0.2) == item - - with pytest.raises(ValueError): - q.get(timeout=-1) + future = async_get.remote(q) with pytest.raises(Empty): q.get_nowait() - with pytest.raises(Empty): - q.get(timeout=0.2) + with pytest.raises(RayTimeoutError): + ray.get(future, timeout=0.1) # task not canceled on timeout. - item = 0 - put_async.remote(q, item, True, None, 0.2) - assert q.get() == item + q.put(1) + assert ray.get(future) == 1 - # Test qsize. + +def test_async_put(ray_start_regular): + q = Queue(1) + q.put(1) + future = async_put.remote(q, 2) + + with pytest.raises(Full): + q.put_nowait(3) + + with pytest.raises(RayTimeoutError): + ray.get(future, timeout=0.1) # task not canceled on timeout. + + assert q.get() == 1 + assert q.get() == 2 + + +def test_qsize(ray_start_regular): q = Queue()