From f5c46c7765556c14cfd907f44aacc4304f11d63c Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Mon, 16 Jul 2018 16:26:20 -0700 Subject: [PATCH] Add queue data structures (#2261) --- .travis.yml | 1 + python/ray/experimental/queue.py | 164 +++++++++++++++++++++++++++++++ python/ray/test/test_queue.py | 132 +++++++++++++++++++++++++ 3 files changed, 297 insertions(+) create mode 100644 python/ray/experimental/queue.py create mode 100644 python/ray/test/test_queue.py diff --git a/.travis.yml b/.travis.yml index 7e8da64f2..21a1ec213 100644 --- a/.travis.yml +++ b/.travis.yml @@ -185,6 +185,7 @@ script: - python python/ray/local_scheduler/test/test.py - python python/ray/global_scheduler/test/test.py + - python -m pytest python/ray/test/test_queue.py - python -m pytest test/xray_test.py - python test/runtest.py diff --git a/python/ray/experimental/queue.py b/python/ray/experimental/queue.py new file mode 100644 index 000000000..39961db08 --- /dev/null +++ b/python/ray/experimental/queue.py @@ -0,0 +1,164 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import deque +import time + +import ray + + +class Empty(Exception): + pass + + +class Full(Exception): + pass + + +class Queue(object): + """Queue implementation on Ray. + + Args: + maxsize (int): maximum size of the queue. If zero, size is unboundend. + """ + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self.actor = _QueueActor.remote(maxsize) + + def qsize(self): + """The size of the queue.""" + return ray.get(self.actor.qsize.remote()) + + def empty(self): + """Whether the queue is empty.""" + return ray.get(self.actor.qsize.remote()) + + def full(self): + """Whether the queue is full.""" + return ray.get(self.actor.full.remote()) + + def put(self, item, block=True, timeout=None): + """Adds an item to the queue. + + Uses polling if block=True, so there is no guarantee of order if + multiple producers put to the same full queue. + + Raises: + Full if the queue is full and blocking is False. + """ + if self.maxsize <= 0: + self.actor.put.remote(item) + elif not block: + if not ray.get(self.actor.put.remote(item)): + raise Full + elif timeout is None: + # Polling + # Use a not_full condition variable or promise? + while not ray.get(self.actor.put.remote(item)): + # Consider adding time.sleep here + pass + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time.time() + timeout + # Polling + # Use a condition variable or switch to promise? + success = False + while not success and time.time() < endtime: + success = ray.get(self.actor.put.remote(item)) + if not success: + raise Full + + def get(self, block=True, timeout=None): + """Gets an item from the queue. + + Uses polling if block=True, so there is no guarantee of order if + multiple consumers get from the same empty queue. + + Returns: + The next item in the queue. + + Raises: + Empty if the queue is empty and blocking is False. + """ + if not block: + success, item = ray.get(self.actor.get.remote()) + if not success: + raise Empty + elif timeout is None: + # Polling + # Use a not_empty condition variable or return a promise? + success, item = ray.get(self.actor.get.remote()) + while not success: + # Consider adding time.sleep here + success, item = ray.get(self.actor.get.remote()) + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = time.time() + timeout + # Polling + # Use a not_full condition variable or return a promise? + success = False + while not success and time.time() < endtime: + success, item = ray.get(self.actor.get.remote()) + if not success: + raise Empty + return item + + def put_nowait(self, item): + """Equivalent to put(item, block=False). + + Raises: + Full if the queue is full. + """ + return self.put(item, block=False) + + def get_nowait(self): + """Equivalent to get(item, block=False). + + Raises: + Empty if the queue is empty. + """ + return self.get(block=False) + + +@ray.remote +class _QueueActor(object): + def __init__(self, maxsize): + self.maxsize = maxsize + self._init(maxsize) + + def qsize(self): + return self._qsize() + + def empty(self): + return not self._qsize() + + def full(self): + return 0 < self.maxsize <= self._qsize() + + def put(self, item): + if self.maxsize > 0 and self._qsize() >= self.maxsize: + return False + self._put(item) + return True + + def get(self): + if not self._qsize(): + return False, None + return True, self._get() + + # Override these for different queue implementations + def _init(self, maxsize): + self.queue = deque() + + def _qsize(self): + return len(self.queue) + + def _put(self, item): + self.queue.append(item) + + def _get(self): + return self.queue.popleft() diff --git a/python/ray/test/test_queue.py b/python/ray/test/test_queue.py new file mode 100644 index 000000000..c93c8c553 --- /dev/null +++ b/python/ray/test/test_queue.py @@ -0,0 +1,132 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time +import pytest + +import ray + +from ray.experimental.queue import Queue, Empty, Full + + +def start_ray(): + if not ray.worker.global_worker.connected: + ray.init() + + +@ray.remote +def get_async(queue, block, timeout, sleep): + time.sleep(sleep) + return queue.get(block, timeout) + + +@ray.remote +def put_async(queue, item, block, timeout, sleep): + time.sleep(sleep) + queue.put(item, block, timeout) + + +def test_simple_use(): + start_ray() + q = Queue() + + items = list(range(10)) + + for item in items: + q.put(item) + + for item in items: + assert item == q.get() + + +def test_async(): + start_ray() + q = Queue() + + items = set(range(10)) + producers = [ # noqa + put_async.remote(q, item, True, None, 0.5) for item in items + ] + consumers = [get_async.remote(q, True, None, 0) for _ in items] + + result = set(ray.get(consumers)) + + assert items == result + + +def test_put(): + start_ray() + q = Queue(1) + + item = 0 + q.put(item, block=False) + assert q.get() == item + + item = 1 + q.put(item, timeout=0.2) + assert q.get() == item + + with pytest.raises(ValueError): + q.put(0, timeout=-1) + + q.put(0) + with pytest.raises(Full): + q.put_nowait(1) + + with pytest.raises(Full): + q.put(1, timeout=0.2) + + q.get() + q.put(1) + + get_id = get_async.remote(q, False, None, 0.2) + q.put(2) + + assert ray.get(get_id) == 1 + + +def test_get(): + start_ray() + q = Queue() + + item = 0 + q.put(item) + assert q.get(block=False) == item + + item = 1 + q.put(item) + assert q.get(timeout=0.2) == item + + with pytest.raises(ValueError): + q.get(timeout=-1) + + with pytest.raises(Empty): + q.get_nowait() + + with pytest.raises(Empty): + q.get(timeout=0.2) + + item = 0 + put_async.remote(q, item, True, None, 0.2) + assert q.get() == item + + +def test_qsize(): + start_ray() + q = Queue() + + items = list(range(10)) + size = 0 + + assert q.qsize() == size + + for item in items: + q.put(item) + size += 1 + assert q.qsize() == size + + for item in items: + assert q.get() == item + size -= 1 + assert q.qsize() == size