Promote ray.experimental.queue to ray.util (#10624)

This commit is contained in:
Simon Mo
2020-09-08 12:56:53 -07:00
committed by GitHub
parent 8d5b8b2956
commit fdd3acd492
6 changed files with 164 additions and 144 deletions
+8 -134
View File
@@ -1,137 +1,11 @@
import asyncio
import warnings
import ray
from ray.util.queue import Empty, Full, Queue
warnings.warn(
DeprecationWarning(
"ray.experimental.queue has been moved to ray.util.queue. "
"Please update your import path."),
stacklevel=2)
class Empty(Exception):
pass
class Full(Exception):
pass
class Queue:
"""Queue implementation on Ray.
Args:
maxsize (int): maximum size of the queue. If zero, size is unbounded.
"""
def __init__(self, maxsize=0):
self.actor = _QueueActor.remote(maxsize)
def __len__(self):
return self.size()
def size(self):
"""The size of the queue."""
return ray.get(self.actor.qsize.remote())
def qsize(self):
"""The size of the queue."""
return self.size()
def empty(self):
"""Whether the queue is empty."""
return ray.get(self.actor.empty.remote())
def full(self):
"""Whether the queue is full."""
return ray.get(self.actor.full.remote())
def put(self, item, block=True, timeout=None):
"""Adds an item to the queue.
There is no guarantee of order if multiple producers put to the same
full queue.
Raises:
Full if the queue is full and blocking is False.
Full if the queue is full, blocking is True, and it timed out.
ValueError if timeout is negative.
"""
if not block:
try:
ray.get(self.actor.put_nowait.remote(item))
except asyncio.QueueFull:
raise Full
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
ray.get(self.actor.put.remote(item, timeout))
def get(self, block=True, timeout=None):
"""Gets an item from the queue.
There is no guarantee of order if multiple consumers get from the
same empty queue.
Returns:
The next item in the queue.
Raises:
Empty if the queue is empty and blocking is False.
Empty if the queue is empty, blocking is True, and it timed out.
ValueError if timeout is negative.
"""
if not block:
try:
return ray.get(self.actor.get_nowait.remote())
except asyncio.QueueEmpty:
raise Empty
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
return ray.get(self.actor.get.remote(timeout))
def put_nowait(self, item):
"""Equivalent to put(item, block=False).
Raises:
Full if the queue is full.
"""
return self.put(item, block=False)
def get_nowait(self):
"""Equivalent to get(block=False).
Raises:
Empty if the queue is empty.
"""
return self.get(block=False)
@ray.remote
class _QueueActor:
def __init__(self, maxsize):
self.queue = asyncio.Queue(maxsize)
def qsize(self):
return self.queue.qsize()
def empty(self):
return self.queue.empty()
def full(self):
return self.queue.full()
async def put(self, item, timeout=None):
try:
await asyncio.wait_for(self.queue.put(item), timeout)
except asyncio.TimeoutError:
raise Full
async def get(self, timeout=None):
try:
return await asyncio.wait_for(self.queue.get(), timeout)
except asyncio.TimeoutError:
raise Empty
def put_nowait(self, item):
self.queue.put_nowait(item)
def get_nowait(self):
return self.queue.get_nowait()
__all__ = ["Empty", "Full", "Queue"]
+1
View File
@@ -88,6 +88,7 @@ py_test_module_list(
"test_mini.py",
"test_node_manager.py",
"test_numba.py",
"test_queue.py",
"test_ray_init.py",
"test_serialization.py",
"test_tempfile.py",
+7 -7
View File
@@ -2,7 +2,7 @@ import pytest
import ray
from ray.exceptions import GetTimeoutError
from ray.experimental.queue import Queue, Empty, Full
from ray.util.queue import Queue, Empty, Full
@ray.remote
@@ -15,7 +15,7 @@ def async_put(queue, item):
return queue.put(item, block=True)
def test_simple_usage(ray_start_regular):
def test_simple_usage(ray_start_regular_shared):
q = Queue()
@@ -28,7 +28,7 @@ def test_simple_usage(ray_start_regular):
assert item == q.get()
def test_get(ray_start_regular):
def test_get(ray_start_regular_shared):
q = Queue()
@@ -50,7 +50,7 @@ def test_get(ray_start_regular):
q.get(timeout=0.2)
def test_put(ray_start_regular):
def test_put(ray_start_regular_shared):
q = Queue(1)
@@ -73,7 +73,7 @@ def test_put(ray_start_regular):
q.put(1, timeout=0.2)
def test_async_get(ray_start_regular):
def test_async_get(ray_start_regular_shared):
q = Queue()
future = async_get.remote(q)
@@ -87,7 +87,7 @@ def test_async_get(ray_start_regular):
assert ray.get(future) == 1
def test_async_put(ray_start_regular):
def test_async_put(ray_start_regular_shared):
q = Queue(1)
q.put(1)
future = async_put.remote(q, 2)
@@ -102,7 +102,7 @@ def test_async_put(ray_start_regular):
assert q.get() == 2
def test_qsize(ray_start_regular):
def test_qsize(ray_start_regular_shared):
q = Queue()
+137
View File
@@ -0,0 +1,137 @@
import asyncio
import ray
class Empty(Exception):
pass
class Full(Exception):
pass
class Queue:
"""Queue implementation on Ray.
Args:
maxsize (int): maximum size of the queue. If zero, size is unbounded.
"""
def __init__(self, maxsize=0):
self.actor = _QueueActor.remote(maxsize)
def __len__(self):
return self.size()
def size(self):
"""The size of the queue."""
return ray.get(self.actor.qsize.remote())
def qsize(self):
"""The size of the queue."""
return self.size()
def empty(self):
"""Whether the queue is empty."""
return ray.get(self.actor.empty.remote())
def full(self):
"""Whether the queue is full."""
return ray.get(self.actor.full.remote())
def put(self, item, block=True, timeout=None):
"""Adds an item to the queue.
There is no guarantee of order if multiple producers put to the same
full queue.
Raises:
Full if the queue is full and blocking is False.
Full if the queue is full, blocking is True, and it timed out.
ValueError if timeout is negative.
"""
if not block:
try:
ray.get(self.actor.put_nowait.remote(item))
except asyncio.QueueFull:
raise Full
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
ray.get(self.actor.put.remote(item, timeout))
def get(self, block=True, timeout=None):
"""Gets an item from the queue.
There is no guarantee of order if multiple consumers get from the
same empty queue.
Returns:
The next item in the queue.
Raises:
Empty if the queue is empty and blocking is False.
Empty if the queue is empty, blocking is True, and it timed out.
ValueError if timeout is negative.
"""
if not block:
try:
return ray.get(self.actor.get_nowait.remote())
except asyncio.QueueEmpty:
raise Empty
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
return ray.get(self.actor.get.remote(timeout))
def put_nowait(self, item):
"""Equivalent to put(item, block=False).
Raises:
Full if the queue is full.
"""
return self.put(item, block=False)
def get_nowait(self):
"""Equivalent to get(block=False).
Raises:
Empty if the queue is empty.
"""
return self.get(block=False)
@ray.remote
class _QueueActor:
def __init__(self, maxsize):
self.queue = asyncio.Queue(maxsize)
def qsize(self):
return self.queue.qsize()
def empty(self):
return self.queue.empty()
def full(self):
return self.queue.full()
async def put(self, item, timeout=None):
try:
await asyncio.wait_for(self.queue.put(item), timeout)
except asyncio.TimeoutError:
raise Full
async def get(self, timeout=None):
try:
return await asyncio.wait_for(self.queue.get(), timeout)
except asyncio.TimeoutError:
raise Empty
def put_nowait(self, item):
self.queue.put_nowait(item)
def get_nowait(self):
return self.queue.get_nowait()