From b0bf5450c28be0d6ee143d73211ddd059e933323 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 3 Mar 2020 15:07:59 -0600 Subject: [PATCH] Fix flaky multiprocessing tests (#7413) --- python/ray/tests/test_multiprocessing.py | 15 +++++++++++++++ python/ray/util/multiprocessing/pool.py | 9 +++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py index da8647197..2af155a4d 100644 --- a/python/ray/tests/test_multiprocessing.py +++ b/python/ray/tests/test_multiprocessing.py @@ -23,6 +23,7 @@ def pool(): pool = Pool(processes=1) yield pool pool.terminate() + pool.join() ray.shutdown() @@ -31,6 +32,7 @@ def pool_4_processes(): pool = Pool(processes=4) yield pool pool.terminate() + pool.join() ray.shutdown() @@ -47,6 +49,8 @@ def test_ray_init(shutdown_only): assert ray.is_initialized() assert int(ray.state.cluster_resources()["CPU"]) == 2 check_pool_size(pool, 2) + pool.terminate() + pool.join() ray.shutdown() # Check that starting a pool doesn't affect ray if there is a local @@ -56,6 +60,8 @@ def test_ray_init(shutdown_only): pool = Pool(processes=2) assert int(ray.state.cluster_resources()["CPU"]) == 3 check_pool_size(pool, 2) + pool.terminate() + pool.join() ray.shutdown() # Check that trying to start a pool on an existing ray cluster throws an @@ -94,6 +100,8 @@ def test_connect_to_ray(ray_start_cluster): assert ray.is_initialized() assert int(ray.state.cluster_resources()["CPU"]) == init_cpus check_pool_size(pool, init_cpus) + pool.terminate() + pool.join() ray.shutdown() # Check that starting a pool connects to a running ray cluster if @@ -102,6 +110,8 @@ def test_connect_to_ray(ray_start_cluster): assert ray.is_initialized() assert int(ray.state.cluster_resources()["CPU"]) == start_cpus check_pool_size(pool, start_cpus) + pool.terminate() + pool.join() ray.shutdown() # Set RAY_ADDRESS, so pools should connect to the running ray cluster. @@ -113,6 +123,8 @@ def test_connect_to_ray(ray_start_cluster): assert ray.is_initialized() assert int(ray.state.cluster_resources()["CPU"]) == start_cpus check_pool_size(pool, start_cpus) + pool.terminate() + pool.join() ray.shutdown() # Check that trying to start a pool on an existing ray cluster throws an @@ -135,6 +147,7 @@ def test_initializer(shutdown_only): assert len(os.listdir(dirname)) == 4 pool.terminate() + pool.join() def test_close(pool_4_processes): @@ -492,6 +505,8 @@ def test_maxtasksperchild(shutdown_only): pool = Pool(5, maxtasksperchild=1) assert len(set(pool.map(f, range(20)))) == 20 + pool.terminate() + pool.join() if __name__ == "__main__": diff --git a/python/ray/util/multiprocessing/pool.py b/python/ray/util/multiprocessing/pool.py index ac0afc993..cf3ce37e2 100644 --- a/python/ray/util/multiprocessing/pool.py +++ b/python/ray/util/multiprocessing/pool.py @@ -31,7 +31,7 @@ class ResultThread(threading.Thread): callback=None, error_callback=None, total_object_ids=None): - threading.Thread.__init__(self) + threading.Thread.__init__(self, daemon=True) self._got_error = False self._object_ids = [] self._num_ready = 0 @@ -232,13 +232,14 @@ class OrderedIMapIterator(IMapIterator): if self._next_chunk_index == self._total_chunks: raise StopIteration - while timeout is None or timeout > 0: + # This loop will break when the next index in order is ready or + # self._result_thread.next_ready_index() raises a timeout. + index = -1 + while index != self._next_chunk_index: start = time.time() index = self._result_thread.next_ready_index(timeout=timeout) self._submit_next_chunk() self._submitted_chunks[index] = True - if index == self._next_chunk_index: - break if timeout is not None: timeout = max(0, timeout - (time.time() - start))