diff --git a/python/ray/test/test_queue.py b/python/ray/test/test_queue.py index 42c9d8834..2e4e7aa6d 100644 --- a/python/ray/test/test_queue.py +++ b/python/ray/test/test_queue.py @@ -2,32 +2,35 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import time import pytest +import time import ray - from ray.experimental.queue import Queue, Empty, Full -def setup_module(): - if not ray.worker.global_worker.connected: - ray.init() +@pytest.fixture +def ray_start(): + # Start the Ray process. + ray.init(num_cpus=1) + yield None + # The code after the yield will run as teardown code. + ray.shutdown() -@ray.remote -def get_async(queue, block, timeout, sleep): - time.sleep(sleep) - return queue.get(block, timeout) +def test_queue(ray_start): + @ray.remote + def get_async(queue, block, timeout, sleep): + time.sleep(sleep) + return queue.get(block, timeout) + @ray.remote + def put_async(queue, item, block, timeout, sleep): + time.sleep(sleep) + queue.put(item, block, timeout) -@ray.remote -def put_async(queue, item, block, timeout, sleep): - time.sleep(sleep) - queue.put(item, block, timeout) + # Test simple usage. - -def test_simple_use(): q = Queue() items = list(range(10)) @@ -38,8 +41,8 @@ def test_simple_use(): for item in items: assert item == q.get() + # Test asynchronous usage. -def test_async(): q = Queue() items = set(range(10)) @@ -52,8 +55,8 @@ def test_async(): assert items == result + # Test put. -def test_put(): q = Queue(1) item = 0 @@ -82,8 +85,8 @@ def test_put(): assert ray.get(get_id) == 1 + # Test get. -def test_get(): q = Queue() item = 0 @@ -107,8 +110,8 @@ def test_get(): put_async.remote(q, item, True, None, 0.2) assert q.get() == item + # Test qsize. -def test_qsize(): q = Queue() items = list(range(10))