[Experimental] Queue: replace polling with async actor (#10120)

This commit is contained in:
architkulkarni
2020-08-19 09:55:42 -07:00
committed by GitHub
parent 2cbe29a7fa
commit de46464aa3
2 changed files with 95 additions and 120 deletions
+44 -74
View File
@@ -1,5 +1,4 @@
from collections import deque
import time
import asyncio
import ray
@@ -16,11 +15,10 @@ class Queue:
"""Queue implementation on Ray.
Args:
maxsize (int): maximum size of the queue. If zero, size is unboundend.
maxsize (int): maximum size of the queue. If zero, size is unbounded.
"""
def __init__(self, maxsize=0):
self.maxsize = maxsize
self.actor = _QueueActor.remote(maxsize)
def __len__(self):
@@ -45,70 +43,49 @@ class Queue:
def put(self, item, block=True, timeout=None):
"""Adds an item to the queue.
Uses polling if block=True, so there is no guarantee of order if
multiple producers put to the same full queue.
There is no guarantee of order if multiple producers put to the same
full queue.
Raises:
Full if the queue is full and blocking is False.
Full if the queue is full, blocking is True, and it timed out.
ValueError if timeout is negative.
"""
if self.maxsize <= 0:
self.actor.put.remote(item)
elif not block:
if not ray.get(self.actor.put.remote(item)):
if not block:
try:
ray.get(self.actor.put_nowait.remote(item))
except asyncio.QueueFull:
raise Full
elif timeout is None:
# Polling
# Use a not_full condition variable or promise?
while not ray.get(self.actor.put.remote(item)):
# Consider adding time.sleep here
pass
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.time() + timeout
# Polling
# Use a condition variable or switch to promise?
success = False
while not success and time.time() < endtime:
success = ray.get(self.actor.put.remote(item))
if not success:
raise Full
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
ray.get(self.actor.put.remote(item, timeout))
def get(self, block=True, timeout=None):
"""Gets an item from the queue.
Uses polling if block=True, so there is no guarantee of order if
multiple consumers get from the same empty queue.
There is no guarantee of order if multiple consumers get from the
same empty queue.
Returns:
The next item in the queue.
Raises:
Empty if the queue is empty and blocking is False.
Empty if the queue is empty, blocking is True, and it timed out.
ValueError if timeout is negative.
"""
if not block:
success, item = ray.get(self.actor.get.remote())
if not success:
try:
return ray.get(self.actor.get_nowait.remote())
except asyncio.QueueEmpty:
raise Empty
elif timeout is None:
# Polling
# Use a not_empty condition variable or return a promise?
success, item = ray.get(self.actor.get.remote())
while not success:
# Consider adding time.sleep here
success, item = ray.get(self.actor.get.remote())
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.time() + timeout
# Polling
# Use a not_full condition variable or return a promise?
success = False
while not success and time.time() < endtime:
success, item = ray.get(self.actor.get.remote())
if not success:
raise Empty
return item
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
return ray.get(self.actor.get.remote(timeout))
def put_nowait(self, item):
"""Equivalent to put(item, block=False).
@@ -119,7 +96,7 @@ class Queue:
return self.put(item, block=False)
def get_nowait(self):
"""Equivalent to get(item, block=False).
"""Equivalent to get(block=False).
Raises:
Empty if the queue is empty.
@@ -130,38 +107,31 @@ class Queue:
@ray.remote
class _QueueActor:
def __init__(self, maxsize):
self.maxsize = maxsize
self._init(maxsize)
self.queue = asyncio.Queue(maxsize)
def qsize(self):
return self._qsize()
return self.queue.qsize()
def empty(self):
return not self._qsize()
return self.queue.empty()
def full(self):
return 0 < self.maxsize <= self._qsize()
return self.queue.full()
def put(self, item):
if self.maxsize > 0 and self._qsize() >= self.maxsize:
return False
self._put(item)
return True
async def put(self, item, timeout=None):
try:
await asyncio.wait_for(self.queue.put(item), timeout)
except asyncio.TimeoutError:
raise Full
def get(self):
if not self._qsize():
return False, None
return True, self._get()
async def get(self, timeout=None):
try:
return await asyncio.wait_for(self.queue.get(), timeout)
except asyncio.TimeoutError:
raise Empty
# Override these for different queue implementations
def _init(self, maxsize):
self.queue = deque()
def put_nowait(self, item):
self.queue.put_nowait(item)
def _qsize(self):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.popleft()
def get_nowait(self):
return self.queue.get_nowait()
+51 -46
View File
@@ -1,22 +1,21 @@
import pytest
import time
import ray
from ray.exceptions import RayTimeoutError
from ray.experimental.queue import Queue, Empty, Full
def test_queue(ray_start_regular):
@ray.remote
def get_async(queue, block, timeout, sleep):
time.sleep(sleep)
return queue.get(block, timeout)
@ray.remote
def async_get(queue):
return queue.get(block=True)
@ray.remote
def put_async(queue, item, block, timeout, sleep):
time.sleep(sleep)
queue.put(item, block, timeout)
# Test simple usage.
@ray.remote
def async_put(queue, item):
return queue.put(item, block=True)
def test_simple_usage(ray_start_regular):
q = Queue()
@@ -28,21 +27,30 @@ def test_queue(ray_start_regular):
for item in items:
assert item == q.get()
# Test asynchronous usage.
def test_get(ray_start_regular):
q = Queue()
items = set(range(10))
producers = [ # noqa
put_async.remote(q, item, True, None, 0.5) for item in items
]
consumers = [get_async.remote(q, True, None, 0) for _ in items]
item = 0
q.put(item)
assert q.get(block=False) == item
result = set(ray.get(consumers))
item = 1
q.put(item)
assert q.get(timeout=0.2) == item
assert items == result
with pytest.raises(ValueError):
q.get(timeout=-1)
# Test put.
with pytest.raises(Empty):
q.get_nowait()
with pytest.raises(Empty):
q.get(timeout=0.2)
def test_put(ray_start_regular):
q = Queue(1)
@@ -64,40 +72,37 @@ def test_queue(ray_start_regular):
with pytest.raises(Full):
q.put(1, timeout=0.2)
q.get()
q.put(1)
get_id = get_async.remote(q, False, None, 0.2)
q.put(2)
assert ray.get(get_id) == 1
# Test get.
def test_async_get(ray_start_regular):
q = Queue()
item = 0
q.put(item)
assert q.get(block=False) == item
item = 1
q.put(item)
assert q.get(timeout=0.2) == item
with pytest.raises(ValueError):
q.get(timeout=-1)
future = async_get.remote(q)
with pytest.raises(Empty):
q.get_nowait()
with pytest.raises(Empty):
q.get(timeout=0.2)
with pytest.raises(RayTimeoutError):
ray.get(future, timeout=0.1) # task not canceled on timeout.
item = 0
put_async.remote(q, item, True, None, 0.2)
assert q.get() == item
q.put(1)
assert ray.get(future) == 1
# Test qsize.
def test_async_put(ray_start_regular):
q = Queue(1)
q.put(1)
future = async_put.remote(q, 2)
with pytest.raises(Full):
q.put_nowait(3)
with pytest.raises(RayTimeoutError):
ray.get(future, timeout=0.1) # task not canceled on timeout.
assert q.get() == 1
assert q.get() == 2
def test_qsize(ray_start_regular):
q = Queue()