From dc5a27dac0af8bb8e98923b78f2a642d3c449ff4 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Fri, 14 Feb 2020 16:17:05 -0800 Subject: [PATCH] Move ray.experimental.multiprocessing to ray.util.multiprocessing (#7149) --- doc/source/multiprocessing.rst | 10 +- .../ray/experimental/multiprocessing/pool.py | 672 +---------------- python/ray/tests/test_multiprocessing.py | 55 +- python/ray/util/__init__.py | 0 python/ray/util/multiprocessing/__init__.py | 5 + python/ray/util/multiprocessing/pool.py | 676 ++++++++++++++++++ 6 files changed, 724 insertions(+), 694 deletions(-) create mode 100644 python/ray/util/__init__.py create mode 100644 python/ray/util/multiprocessing/__init__.py create mode 100644 python/ray/util/multiprocessing/pool.py diff --git a/doc/source/multiprocessing.rst b/doc/source/multiprocessing.rst index 21b7335f0..bd28ebff3 100644 --- a/doc/source/multiprocessing.rst +++ b/doc/source/multiprocessing.rst @@ -1,5 +1,5 @@ -multiprocessing.Pool API (Experimental) -======================================= +multiprocessing.Pool API +======================== .. warning:: @@ -21,14 +21,14 @@ Quickstart ---------- To get started, first `install Ray `__, then use -``ray.experimental.multiprocessing.Pool`` in place of ``multiprocessing.Pool``. +``ray.util.multiprocessing.Pool`` in place of ``multiprocessing.Pool``. This will start a local Ray cluster the first time you create a ``Pool`` and distribute your tasks across it. See the `Run on a Cluster`_ section below for instructions to run on a multi-node Ray cluster instead. .. code-block:: python - from ray.experimental.multiprocessing import Pool + from ray.util.multiprocessing import Pool def f(index): return index @@ -59,7 +59,7 @@ head node in one of two ways: .. code-block:: python - from ray.experimental.multiprocessing import Pool + from ray.util.multiprocessing import Pool # Starts a new local Ray cluster. pool = Pool() diff --git a/python/ray/experimental/multiprocessing/pool.py b/python/ray/experimental/multiprocessing/pool.py index d3e4dbbd6..bf0e442ff 100644 --- a/python/ray/experimental/multiprocessing/pool.py +++ b/python/ray/experimental/multiprocessing/pool.py @@ -1,671 +1,5 @@ -import logging -from multiprocessing import TimeoutError -import os -import time -import random -import collections -import threading -import queue -import copy +from ray.util import multiprocessing -import ray -logger = logging.getLogger(__name__) - -RAY_ADDRESS_ENV = "RAY_ADDRESS" - - -# Helper function to divide a by b and round the result up. -def div_round_up(a, b): - return -(-a // b) - - -class PoolTaskError(Exception): - def __init__(self, underlying): - self.underlying = underlying - - -class ResultThread(threading.Thread): - def __init__(self, - object_ids, - callback=None, - error_callback=None, - total_object_ids=None): - threading.Thread.__init__(self) - self._got_error = False - self._object_ids = [] - self._num_ready = 0 - self._results = [] - self._ready_index_queue = queue.Queue() - self._callback = callback - self._error_callback = error_callback - self._total_object_ids = total_object_ids or len(object_ids) - self._indices = {} - # Thread-safe queue used to add ObjectIDs to fetch after creating - # this thread (used to lazily submit for imap and imap_unordered). - self._new_object_ids = queue.Queue() - for object_id in object_ids: - self._add_object_id(object_id) - - def _add_object_id(self, object_id): - self._indices[object_id] = len(self._object_ids) - self._object_ids.append(object_id) - self._results.append(None) - - def add_object_id(self, object_id): - self._new_object_ids.put(object_id) - - def run(self): - unready = copy.copy(self._object_ids) - while self._num_ready < self._total_object_ids: - # Get as many new IDs from the queue as possible without blocking, - # unless we have no IDs to wait on, in which case we block. - while True: - try: - block = len(unready) == 0 - new_object_id = self._new_object_ids.get(block=block) - self._add_object_id(new_object_id) - unready.append(new_object_id) - except queue.Empty: - # queue.Empty means no result was retrieved if block=False. - break - - [ready_id], unready = ray.wait(unready, num_returns=1) - batch = ray.get(ready_id) - for result in batch: - if isinstance(result, Exception): - self._got_error = True - if self._error_callback is not None: - self._error_callback(result) - elif self._callback is not None: - self._callback(result) - - self._num_ready += 1 - self._results[self._indices[ready_id]] = batch - self._ready_index_queue.put(self._indices[ready_id]) - - def got_error(self): - # Should only be called after the thread finishes. - return self._got_error - - def result(self, index): - # Should only be called on results that are ready. - return self._results[index] - - def results(self): - # Should only be called after the thread finishes. - return self._results - - def next_ready_index(self, timeout=None): - try: - return self._ready_index_queue.get(timeout=timeout) - except queue.Empty: - # queue.Queue signals a timeout by raising queue.Empty. - raise TimeoutError - - -class AsyncResult: - """An asynchronous interface to task results. - - This should not be constructed directly. - """ - - def __init__(self, - chunk_object_ids, - callback=None, - error_callback=None, - single_result=False): - self._single_result = single_result - self._result_thread = ResultThread(chunk_object_ids, callback, - error_callback) - self._result_thread.start() - - def wait(self, timeout=None): - """ - Returns once the result is ready or the timeout expires (does not - raise TimeoutError). - - Args: - timeout: timeout in milliseconds. - """ - - self._result_thread.join(timeout) - - def get(self, timeout=None): - self.wait(timeout) - if self._result_thread.is_alive(): - raise TimeoutError - - results = [] - for batch in self._result_thread.results(): - for result in batch: - if isinstance(result, PoolTaskError): - raise result.underlying - results.extend(batch) - - if self._single_result: - return results[0] - - return results - - def ready(self): - """ - Returns true if the result is ready, else false if the tasks are still - running. - """ - - return not self._result_thread.is_alive() - - def successful(self): - """ - Returns true if none of the submitted tasks errored, else false. Should - only be called once the result is ready (can be checked using `ready`). - """ - - if not self.ready(): - raise ValueError("{0!r} not ready".format(self)) - return not self._result_thread.got_error() - - -class IMapIterator: - """Base class for OrderedIMapIterator and UnorderedIMapIterator.""" - - def __init__(self, pool, func, iterable, chunksize=None): - self._pool = pool - self._func = func - self._next_chunk_index = 0 - # List of bools indicating if the given chunk is ready or not for all - # submitted chunks. Ordering mirrors that in the in the ResultThread. - self._submitted_chunks = [] - self._ready_objects = collections.deque() - if not hasattr(iterable, "__len__"): - iterable = [iterable] - self._iterator = iter(iterable) - self._chunksize = chunksize or pool._calculate_chunksize(iterable) - self._total_chunks = div_round_up(len(iterable), chunksize) - self._result_thread = ResultThread( - [], total_object_ids=self._total_chunks) - self._result_thread.start() - - for _ in range(len(self._pool._actor_pool)): - self._submit_next_chunk() - - def _submit_next_chunk(self): - # The full iterable has been submitted, so no-op. - if len(self._submitted_chunks) >= self._total_chunks: - return - - actor_index = len(self._submitted_chunks) % len(self._pool._actor_pool) - new_chunk_id = self._pool._submit_chunk(self._func, self._iterator, - self._chunksize, actor_index) - self._submitted_chunks.append(False) - self._result_thread.add_object_id(new_chunk_id) - - def __iter__(self): - return self - - def __next__(self): - return self.next() - - def next(self): - # Should be implemented by subclasses. - raise NotImplementedError - - -class OrderedIMapIterator(IMapIterator): - """Iterator to the results of tasks submitted using `imap`. - - The results are returned in the same order that they were submitted, even - if they don't finish in that order. Only one batch of tasks per actor - process is submitted at a time - the rest are submitted as results come in. - - Should not be constructed directly. - """ - - def next(self, timeout=None): - if len(self._ready_objects) == 0: - if self._next_chunk_index == self._total_chunks: - raise StopIteration - - while timeout is None or timeout > 0: - start = time.time() - index = self._result_thread.next_ready_index(timeout=timeout) - self._submit_next_chunk() - self._submitted_chunks[index] = True - if index == self._next_chunk_index: - break - if timeout is not None: - timeout = max(0, timeout - (time.time() - start)) - - while self._next_chunk_index < len( - self._submitted_chunks - ) and self._submitted_chunks[self._next_chunk_index]: - for result in self._result_thread.result( - self._next_chunk_index): - self._ready_objects.append(result) - self._next_chunk_index += 1 - - return self._ready_objects.popleft() - - -class UnorderedIMapIterator(IMapIterator): - """Iterator to the results of tasks submitted using `imap`. - - The results are returned in the order that they finish. Only one batch of - tasks per actor process is submitted at a time - the rest are submitted as - results come in. - - Should not be constructed directly. - """ - - def next(self, timeout=None): - if len(self._ready_objects) == 0: - if self._next_chunk_index == self._total_chunks: - raise StopIteration - - index = self._result_thread.next_ready_index(timeout=timeout) - self._submit_next_chunk() - - for result in self._result_thread.result(index): - self._ready_objects.append(result) - self._next_chunk_index += 1 - - return self._ready_objects.popleft() - - -@ray.remote(num_cpus=1) -class PoolActor: - """Actor used to process tasks submitted to a Pool.""" - - def __init__(self, initializer=None, initargs=None): - if initializer: - initargs = initargs or () - initializer(*initargs) - - def ping(self): - # Used to wait for this actor to be initialized. - pass - - def run_batch(self, func, batch): - results = [] - for args, kwargs in batch: - args = args or () - kwargs = kwargs or {} - try: - results.append(func(*args, **kwargs)) - except Exception as e: - results.append(PoolTaskError(e)) - return results - - -# https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool -class Pool: - """A pool of actor processes that is used to process tasks in parallel. - - Args: - processes: number of actor processes to start in the pool. Defaults to - the number of cores in the Ray cluster if one is already running, - otherwise the number of cores on this machine. - initializer: function to be run in each actor when it starts up. - initargs: iterable of arguments to the initializer function. - maxtasksperchild: maximum number of tasks to run in each actor process. - After a process has executed this many tasks, it will be killed and - replaced with a new one. - ray_address: address of the Ray cluster to run on. If None, a new local - Ray cluster will be started on this machine. Otherwise, this will - be passed to `ray.init()` to connect to a running cluster. This may - also be specified using the `RAY_ADDRESS` environment variable. - """ - - def __init__(self, - processes=None, - initializer=None, - initargs=None, - maxtasksperchild=None, - context=None, - ray_address=None): - self._closed = False - self._initializer = initializer - self._initargs = initargs - self._maxtasksperchild = maxtasksperchild or -1 - self._actor_deletion_ids = [] - - if context: - logger.warning("The 'context' argument is not supported using " - "ray. Please refer to the documentation for how " - "to control ray initialization.") - - processes = self._init_ray(processes, ray_address) - self._start_actor_pool(processes) - - def _init_ray(self, processes=None, ray_address=None): - # Initialize ray. If ray is already initialized, we do nothing. - # Else, the priority is: - # ray_address argument > RAY_ADDRESS > start new local cluster. - if not ray.is_initialized(): - if ray_address is None and RAY_ADDRESS_ENV in os.environ: - ray_address = os.environ[RAY_ADDRESS_ENV] - - # Cluster mode. - if ray_address is not None: - logger.info("Connecting to ray cluster at address='{}'".format( - ray_address)) - ray.init(address=ray_address) - # Local mode. - else: - logger.info("Starting local ray cluster") - ray.init(num_cpus=processes) - - ray_cpus = int(ray.state.cluster_resources()["CPU"]) - if processes is None: - processes = ray_cpus - if processes <= 0: - raise ValueError("Processes in the pool must be >0.") - if ray_cpus < processes: - raise ValueError("Tried to start a pool with {} processes on an " - "existing ray cluster, but there are only {} " - "CPUs in the ray cluster.".format( - processes, ray_cpus)) - - return processes - - def _start_actor_pool(self, processes): - self._actor_pool = [self._new_actor_entry() for _ in range(processes)] - ray.get([actor.ping.remote() for actor, _ in self._actor_pool]) - - def _wait_for_stopping_actors(self, timeout=None): - if len(self._actor_deletion_ids) == 0: - return - if timeout is not None: - timeout = float(timeout) - - _, deleting = ray.wait( - self._actor_deletion_ids, - num_returns=len(self._actor_deletion_ids), - timeout=timeout) - self._actor_deletion_ids = deleting - - def _stop_actor(self, actor): - # Check and clean up any outstanding IDs corresponding to deletions. - self._wait_for_stopping_actors(timeout=0.0) - # The deletion task will block until the actor has finished executing - # all pending tasks. - self._actor_deletion_ids.append(actor.__ray_terminate__.remote()) - - def _new_actor_entry(self): - # NOTE(edoakes): The initializer function can't currently be used to - # modify the global namespace (e.g., import packages or set globals) - # due to a limitation in cloudpickle. - return (PoolActor.remote(self._initializer, self._initargs), 0) - - def _random_actor_index(self): - return random.randrange(len(self._actor_pool)) - - # Batch should be a list of tuples: (args, kwargs). - def _run_batch(self, actor_index, func, batch): - actor, count = self._actor_pool[actor_index] - object_id = actor.run_batch.remote(func, batch) - count += 1 - assert self._maxtasksperchild == -1 or count <= self._maxtasksperchild - if count == self._maxtasksperchild: - self._stop_actor(actor) - actor, count = self._new_actor_entry() - self._actor_pool[actor_index] = (actor, count) - return object_id - - def apply(self, func, args=None, kwargs=None): - """Run the given function on a random actor process and return the - result synchronously. - - Args: - func: function to run. - args: optional arguments to the function. - kwargs: optional keyword arguments to the function. - - Returns: - The result. - """ - - return self.apply_async(func, args, kwargs).get() - - def apply_async(self, - func, - args=None, - kwargs=None, - callback=None, - error_callback=None): - """Run the given function on a random actor process and return an - asynchronous interface to the result. - - Args: - func: function to run. - args: optional arguments to the function. - kwargs: optional keyword arguments to the function. - callback: callback to be executed on the result once it is finished - if it succeeds. - error_callback: callback to be executed the result once it is - finished if the task errors. The exception raised by the - task will be passed as the only argument to the callback. - - Returns: - AsyncResult containing the result. - """ - - self._check_running() - object_id = self._run_batch(self._random_actor_index(), func, - [(args, kwargs)]) - return AsyncResult( - [object_id], callback, error_callback, single_result=True) - - def _calculate_chunksize(self, iterable): - chunksize, extra = divmod(len(iterable), len(self._actor_pool) * 4) - if extra: - chunksize += 1 - return chunksize - - def _submit_chunk(self, - func, - iterator, - chunksize, - actor_index, - unpack_args=False): - chunk = [] - while len(chunk) < chunksize: - try: - args = next(iterator) - if not unpack_args: - args = (args, ) - chunk.append((args, {})) - except StopIteration: - break - - # Nothing to submit. The caller should prevent this. - assert len(chunk) > 0 - - return self._run_batch(actor_index, func, chunk) - - def _chunk_and_run(self, func, iterable, chunksize=None, - unpack_args=False): - if not hasattr(iterable, "__len__"): - iterable = [iterable] - - if chunksize is None: - chunksize = self._calculate_chunksize(iterable) - - iterator = iter(iterable) - chunk_object_ids = [] - while len(chunk_object_ids) * chunksize < len(iterable): - actor_index = len(chunk_object_ids) % len(self._actor_pool) - chunk_object_ids.append( - self._submit_chunk( - func, - iterator, - chunksize, - actor_index, - unpack_args=unpack_args)) - - return chunk_object_ids - - def _map_async(self, - func, - iterable, - chunksize=None, - unpack_args=False, - callback=None, - error_callback=None): - self._check_running() - object_ids = self._chunk_and_run( - func, iterable, chunksize=chunksize, unpack_args=unpack_args) - return AsyncResult(object_ids, callback, error_callback) - - def map(self, func, iterable, chunksize=None): - """Run the given function on each element in the iterable round-robin - on the actor processes and return the results synchronously. - - Args: - func: function to run. - iterable: iterable of objects to be passed as the sole argument to - func. - chunksize: number of tasks to submit as a batch to each actor - process. If unspecified, a suitable chunksize will be chosen. - - Returns: - A list of results. - """ - - return self._map_async( - func, iterable, chunksize=chunksize, unpack_args=False).get() - - def map_async(self, - func, - iterable, - chunksize=None, - callback=None, - error_callback=None): - """Run the given function on each element in the iterable round-robin - on the actor processes and return an asynchronous interface to the - results. - - Args: - func: function to run. - iterable: iterable of objects to be passed as the only argument to - func. - chunksize: number of tasks to submit as a batch to each actor - process. If unspecified, a suitable chunksize will be chosen. - callback: callback to be executed on each successful result once it - is finished. - error_callback: callback to be executed on each errored result once - it is finished. The exception raised by the task will be passed - as the only argument to the callback. - - Returns: - AsyncResult - """ - return self._map_async( - func, - iterable, - chunksize=chunksize, - unpack_args=False, - callback=callback, - error_callback=error_callback) - - def starmap(self, func, iterable, chunksize=None): - """Same as `map`, but unpacks each element of the iterable as the - arguments to func like: [func(*args) for args in iterable]. - """ - - return self._map_async( - func, iterable, chunksize=chunksize, unpack_args=True).get() - - def starmap_async(self, func, iterable, callback=None, - error_callback=None): - """Same as `map_async`, but unpacks each element of the iterable as the - arguments to func like: [func(*args) for args in iterable]. - """ - - return self._map_async( - func, - iterable, - unpack_args=True, - callback=callback, - error_callback=error_callback) - - def imap(self, func, iterable, chunksize=1): - """Same as `map`, but only submits one batch of tasks to each actor - process at a time. - - This can be useful if the iterable of arguments is very large or each - task's arguments consumes a large amount of resources. - - The results are returned in the order corresponding to their arguments - in the iterable. - - Returns: - OrderedIMapIterator - """ - - self._check_running() - return OrderedIMapIterator(self, func, iterable, chunksize=chunksize) - - def imap_unordered(self, func, iterable, chunksize=1): - """Same as `map`, but only submits one batch of tasks to each actor - process at a time. - - This can be useful if the iterable of arguments is very large or each - task's arguments consumes a large amount of resources. - - The results are returned in the order that they finish. - - Returns: - UnorderedIMapIterator - """ - - self._check_running() - return UnorderedIMapIterator(self, func, iterable, chunksize=chunksize) - - def _check_running(self): - if self._closed: - raise ValueError("Pool not running") - - def __enter__(self): - self._check_running() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.terminate() - - def close(self): - """Close the pool. - - Prevents any more tasks from being submitted on the pool but allows - outstanding work to finish. - """ - - for actor, _ in self._actor_pool: - self._stop_actor(actor) - self._closed = True - - def terminate(self): - """Close the pool. - - Prevents any more tasks from being submitted on the pool and stops - outstanding work. - """ - - if not self._closed: - self.close() - for actor, _ in self._actor_pool: - actor.__ray_kill__() - - def join(self): - """Wait for the actors in a closed pool to exit. - - If the pool was closed using `close`, this will return once all - outstanding work is completed. - - If the pool was closed using `terminate`, this will return quickly. - """ - - if not self._closed: - raise ValueError("Pool is still running") - self._wait_for_stopping_actors() +class Pool(multiprocessing.Pool): + pass # moved to util package diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py index ad271d391..389d58231 100644 --- a/python/ray/tests/test_multiprocessing.py +++ b/python/ray/tests/test_multiprocessing.py @@ -1,21 +1,18 @@ import os +import sys import pytest import tempfile import time import random -import subprocess from collections import defaultdict import queue import ray -from ray.experimental.multiprocessing import Pool, TimeoutError +from ray.util.multiprocessing import Pool, TimeoutError -@pytest.fixture -def cleanup_only(): - yield None - ray.shutdown() - subprocess.check_output(["ray", "stop"]) +def teardown_function(function): + # Delete environment variable if set. if "RAY_ADDRESS" in os.environ: del os.environ["RAY_ADDRESS"] @@ -36,7 +33,7 @@ def pool_4_processes(): ray.shutdown() -def test_initialize_ray(cleanup_only): +def test_ray_init(shutdown_only): def getpid(args): return os.getpid() @@ -69,14 +66,27 @@ def test_initialize_ray(cleanup_only): assert int(ray.state.cluster_resources()["CPU"]) == 1 ray.shutdown() + +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_cpus": 1, + "num_nodes": 1, + "do_init": False, + }], + indirect=True) +def test_connect_to_ray(ray_start_cluster): + def getpid(args): + return os.getpid() + + def check_pool_size(pool, size): + args = [tuple() for _ in range(size)] + assert len(set(pool.map(getpid, args))) == size + + address = ray_start_cluster.address # Use different numbers of CPUs to distinguish between starting a local # ray cluster and connecting to an existing one. + start_cpus = 1 # Set in fixture. init_cpus = 2 - start_cpus = 3 - - # Start a ray cluster in the background. - subprocess.check_output( - ["ray", "start", "--head", "--num-cpus={}".format(start_cpus)]) # Check that starting a pool still starts ray if RAY_ADDRESS not set. pool = Pool(processes=init_cpus) @@ -87,14 +97,14 @@ def test_initialize_ray(cleanup_only): # Check that starting a pool connects to a running ray cluster if # ray_address is passed in. - pool = Pool(ray_address="auto") + pool = Pool(ray_address=address) assert ray.is_initialized() assert int(ray.state.cluster_resources()["CPU"]) == start_cpus check_pool_size(pool, start_cpus) ray.shutdown() # Set RAY_ADDRESS, so pools should connect to the running ray cluster. - os.environ["RAY_ADDRESS"] = "auto" + os.environ["RAY_ADDRESS"] = address # Check that starting a pool connects to a running ray cluster if # RAY_ADDRESS is set. @@ -111,11 +121,8 @@ def test_initialize_ray(cleanup_only): assert int(ray.state.cluster_resources()["CPU"]) == start_cpus ray.shutdown() - # Clean up the background ray cluster. - subprocess.check_output(["ray", "stop"]) - -def test_initializer(cleanup_only): +def test_initializer(shutdown_only): def init(dirname): with open(os.path.join(dirname, str(os.getpid())), "w") as f: print("hello", file=f) @@ -144,6 +151,7 @@ def test_close(pool_4_processes): pool_4_processes.join() # close() shouldn't interrupt pending tasks, so check that they succeeded. + result.wait(timeout=10) assert result.ready() assert result.successful() assert result.get() == ["hello"] * 4 @@ -164,6 +172,8 @@ def test_terminate(pool_4_processes): result.wait(timeout=10) assert result.ready() assert not result.successful() + with pytest.raises(ray.exceptions.RayError): + result.get() def test_apply(pool): @@ -478,9 +488,14 @@ def test_imap_timeout(pool_4_processes): result_iter.next() -def test_maxtasksperchild(cleanup_only): +def test_maxtasksperchild(shutdown_only): def f(args): return os.getpid() pool = Pool(5, maxtasksperchild=1) assert len(set(pool.map(f, range(20)))) == 20 + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/__init__.py b/python/ray/util/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/util/multiprocessing/__init__.py b/python/ray/util/multiprocessing/__init__.py new file mode 100644 index 000000000..f9b741f1e --- /dev/null +++ b/python/ray/util/multiprocessing/__init__.py @@ -0,0 +1,5 @@ +from multiprocessing import TimeoutError + +from .pool import Pool + +__all__ = ["Pool", "TimeoutError"] diff --git a/python/ray/util/multiprocessing/pool.py b/python/ray/util/multiprocessing/pool.py new file mode 100644 index 000000000..ed7dea655 --- /dev/null +++ b/python/ray/util/multiprocessing/pool.py @@ -0,0 +1,676 @@ +import logging +from multiprocessing import TimeoutError +import os +import time +import random +import collections +import threading +import queue +import copy + +import ray + +logger = logging.getLogger(__name__) + +RAY_ADDRESS_ENV = "RAY_ADDRESS" + + +# Helper function to divide a by b and round the result up. +def div_round_up(a, b): + return -(-a // b) + + +class PoolTaskError(Exception): + def __init__(self, underlying): + self.underlying = underlying + + +class ResultThread(threading.Thread): + def __init__(self, + object_ids, + callback=None, + error_callback=None, + total_object_ids=None): + threading.Thread.__init__(self) + self._got_error = False + self._object_ids = [] + self._num_ready = 0 + self._results = [] + self._ready_index_queue = queue.Queue() + self._callback = callback + self._error_callback = error_callback + self._total_object_ids = total_object_ids or len(object_ids) + self._indices = {} + # Thread-safe queue used to add ObjectIDs to fetch after creating + # this thread (used to lazily submit for imap and imap_unordered). + self._new_object_ids = queue.Queue() + for object_id in object_ids: + self._add_object_id(object_id) + + def _add_object_id(self, object_id): + self._indices[object_id] = len(self._object_ids) + self._object_ids.append(object_id) + self._results.append(None) + + def add_object_id(self, object_id): + self._new_object_ids.put(object_id) + + def run(self): + unready = copy.copy(self._object_ids) + while self._num_ready < self._total_object_ids: + # Get as many new IDs from the queue as possible without blocking, + # unless we have no IDs to wait on, in which case we block. + while True: + try: + block = len(unready) == 0 + new_object_id = self._new_object_ids.get(block=block) + self._add_object_id(new_object_id) + unready.append(new_object_id) + except queue.Empty: + # queue.Empty means no result was retrieved if block=False. + break + + [ready_id], unready = ray.wait(unready, num_returns=1) + try: + batch = ray.get(ready_id) + except ray.exceptions.RayError as e: + batch = [e] + for result in batch: + if isinstance(result, Exception): + self._got_error = True + if self._error_callback is not None: + self._error_callback(result) + elif self._callback is not None: + self._callback(result) + + self._num_ready += 1 + self._results[self._indices[ready_id]] = batch + self._ready_index_queue.put(self._indices[ready_id]) + + def got_error(self): + # Should only be called after the thread finishes. + return self._got_error + + def result(self, index): + # Should only be called on results that are ready. + return self._results[index] + + def results(self): + # Should only be called after the thread finishes. + return self._results + + def next_ready_index(self, timeout=None): + try: + return self._ready_index_queue.get(timeout=timeout) + except queue.Empty: + # queue.Queue signals a timeout by raising queue.Empty. + raise TimeoutError + + +class AsyncResult: + """An asynchronous interface to task results. + + This should not be constructed directly. + """ + + def __init__(self, + chunk_object_ids, + callback=None, + error_callback=None, + single_result=False): + self._single_result = single_result + self._result_thread = ResultThread(chunk_object_ids, callback, + error_callback) + self._result_thread.start() + + def wait(self, timeout=None): + """ + Returns once the result is ready or the timeout expires (does not + raise TimeoutError). + + Args: + timeout: timeout in milliseconds. + """ + + self._result_thread.join(timeout) + + def get(self, timeout=None): + self.wait(timeout) + if self._result_thread.is_alive(): + raise TimeoutError + + results = [] + for batch in self._result_thread.results(): + for result in batch: + if isinstance(result, PoolTaskError): + raise result.underlying + elif isinstance(result, Exception): + raise result + results.extend(batch) + + if self._single_result: + return results[0] + + return results + + def ready(self): + """ + Returns true if the result is ready, else false if the tasks are still + running. + """ + + return not self._result_thread.is_alive() + + def successful(self): + """ + Returns true if none of the submitted tasks errored, else false. Should + only be called once the result is ready (can be checked using `ready`). + """ + + if not self.ready(): + raise ValueError("{0!r} not ready".format(self)) + return not self._result_thread.got_error() + + +class IMapIterator: + """Base class for OrderedIMapIterator and UnorderedIMapIterator.""" + + def __init__(self, pool, func, iterable, chunksize=None): + self._pool = pool + self._func = func + self._next_chunk_index = 0 + # List of bools indicating if the given chunk is ready or not for all + # submitted chunks. Ordering mirrors that in the in the ResultThread. + self._submitted_chunks = [] + self._ready_objects = collections.deque() + if not hasattr(iterable, "__len__"): + iterable = [iterable] + self._iterator = iter(iterable) + self._chunksize = chunksize or pool._calculate_chunksize(iterable) + self._total_chunks = div_round_up(len(iterable), chunksize) + self._result_thread = ResultThread( + [], total_object_ids=self._total_chunks) + self._result_thread.start() + + for _ in range(len(self._pool._actor_pool)): + self._submit_next_chunk() + + def _submit_next_chunk(self): + # The full iterable has been submitted, so no-op. + if len(self._submitted_chunks) >= self._total_chunks: + return + + actor_index = len(self._submitted_chunks) % len(self._pool._actor_pool) + new_chunk_id = self._pool._submit_chunk(self._func, self._iterator, + self._chunksize, actor_index) + self._submitted_chunks.append(False) + self._result_thread.add_object_id(new_chunk_id) + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + # Should be implemented by subclasses. + raise NotImplementedError + + +class OrderedIMapIterator(IMapIterator): + """Iterator to the results of tasks submitted using `imap`. + + The results are returned in the same order that they were submitted, even + if they don't finish in that order. Only one batch of tasks per actor + process is submitted at a time - the rest are submitted as results come in. + + Should not be constructed directly. + """ + + def next(self, timeout=None): + if len(self._ready_objects) == 0: + if self._next_chunk_index == self._total_chunks: + raise StopIteration + + while timeout is None or timeout > 0: + start = time.time() + index = self._result_thread.next_ready_index(timeout=timeout) + self._submit_next_chunk() + self._submitted_chunks[index] = True + if index == self._next_chunk_index: + break + if timeout is not None: + timeout = max(0, timeout - (time.time() - start)) + + while self._next_chunk_index < len( + self._submitted_chunks + ) and self._submitted_chunks[self._next_chunk_index]: + for result in self._result_thread.result( + self._next_chunk_index): + self._ready_objects.append(result) + self._next_chunk_index += 1 + + return self._ready_objects.popleft() + + +class UnorderedIMapIterator(IMapIterator): + """Iterator to the results of tasks submitted using `imap`. + + The results are returned in the order that they finish. Only one batch of + tasks per actor process is submitted at a time - the rest are submitted as + results come in. + + Should not be constructed directly. + """ + + def next(self, timeout=None): + if len(self._ready_objects) == 0: + if self._next_chunk_index == self._total_chunks: + raise StopIteration + + index = self._result_thread.next_ready_index(timeout=timeout) + self._submit_next_chunk() + + for result in self._result_thread.result(index): + self._ready_objects.append(result) + self._next_chunk_index += 1 + + return self._ready_objects.popleft() + + +@ray.remote(num_cpus=1) +class PoolActor: + """Actor used to process tasks submitted to a Pool.""" + + def __init__(self, initializer=None, initargs=None): + if initializer: + initargs = initargs or () + initializer(*initargs) + + def ping(self): + # Used to wait for this actor to be initialized. + pass + + def run_batch(self, func, batch): + results = [] + for args, kwargs in batch: + args = args or () + kwargs = kwargs or {} + try: + results.append(func(*args, **kwargs)) + except Exception as e: + results.append(PoolTaskError(e)) + return results + + +# https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool +class Pool: + """A pool of actor processes that is used to process tasks in parallel. + + Args: + processes: number of actor processes to start in the pool. Defaults to + the number of cores in the Ray cluster if one is already running, + otherwise the number of cores on this machine. + initializer: function to be run in each actor when it starts up. + initargs: iterable of arguments to the initializer function. + maxtasksperchild: maximum number of tasks to run in each actor process. + After a process has executed this many tasks, it will be killed and + replaced with a new one. + ray_address: address of the Ray cluster to run on. If None, a new local + Ray cluster will be started on this machine. Otherwise, this will + be passed to `ray.init()` to connect to a running cluster. This may + also be specified using the `RAY_ADDRESS` environment variable. + """ + + def __init__(self, + processes=None, + initializer=None, + initargs=None, + maxtasksperchild=None, + context=None, + ray_address=None): + self._closed = False + self._initializer = initializer + self._initargs = initargs + self._maxtasksperchild = maxtasksperchild or -1 + self._actor_deletion_ids = [] + + if context: + logger.warning("The 'context' argument is not supported using " + "ray. Please refer to the documentation for how " + "to control ray initialization.") + + processes = self._init_ray(processes, ray_address) + self._start_actor_pool(processes) + + def _init_ray(self, processes=None, ray_address=None): + # Initialize ray. If ray is already initialized, we do nothing. + # Else, the priority is: + # ray_address argument > RAY_ADDRESS > start new local cluster. + if not ray.is_initialized(): + if ray_address is None and RAY_ADDRESS_ENV in os.environ: + ray_address = os.environ[RAY_ADDRESS_ENV] + + # Cluster mode. + if ray_address is not None: + logger.info("Connecting to ray cluster at address='{}'".format( + ray_address)) + ray.init(address=ray_address) + # Local mode. + else: + logger.info("Starting local ray cluster") + ray.init(num_cpus=processes) + + ray_cpus = int(ray.state.cluster_resources()["CPU"]) + if processes is None: + processes = ray_cpus + if processes <= 0: + raise ValueError("Processes in the pool must be >0.") + if ray_cpus < processes: + raise ValueError("Tried to start a pool with {} processes on an " + "existing ray cluster, but there are only {} " + "CPUs in the ray cluster.".format( + processes, ray_cpus)) + + return processes + + def _start_actor_pool(self, processes): + self._actor_pool = [self._new_actor_entry() for _ in range(processes)] + ray.get([actor.ping.remote() for actor, _ in self._actor_pool]) + + def _wait_for_stopping_actors(self, timeout=None): + if len(self._actor_deletion_ids) == 0: + return + if timeout is not None: + timeout = float(timeout) + + _, deleting = ray.wait( + self._actor_deletion_ids, + num_returns=len(self._actor_deletion_ids), + timeout=timeout) + self._actor_deletion_ids = deleting + + def _stop_actor(self, actor): + # Check and clean up any outstanding IDs corresponding to deletions. + self._wait_for_stopping_actors(timeout=0.0) + # The deletion task will block until the actor has finished executing + # all pending tasks. + self._actor_deletion_ids.append(actor.__ray_terminate__.remote()) + + def _new_actor_entry(self): + # NOTE(edoakes): The initializer function can't currently be used to + # modify the global namespace (e.g., import packages or set globals) + # due to a limitation in cloudpickle. + return (PoolActor.remote(self._initializer, self._initargs), 0) + + def _random_actor_index(self): + return random.randrange(len(self._actor_pool)) + + # Batch should be a list of tuples: (args, kwargs). + def _run_batch(self, actor_index, func, batch): + actor, count = self._actor_pool[actor_index] + object_id = actor.run_batch.remote(func, batch) + count += 1 + assert self._maxtasksperchild == -1 or count <= self._maxtasksperchild + if count == self._maxtasksperchild: + self._stop_actor(actor) + actor, count = self._new_actor_entry() + self._actor_pool[actor_index] = (actor, count) + return object_id + + def apply(self, func, args=None, kwargs=None): + """Run the given function on a random actor process and return the + result synchronously. + + Args: + func: function to run. + args: optional arguments to the function. + kwargs: optional keyword arguments to the function. + + Returns: + The result. + """ + + return self.apply_async(func, args, kwargs).get() + + def apply_async(self, + func, + args=None, + kwargs=None, + callback=None, + error_callback=None): + """Run the given function on a random actor process and return an + asynchronous interface to the result. + + Args: + func: function to run. + args: optional arguments to the function. + kwargs: optional keyword arguments to the function. + callback: callback to be executed on the result once it is finished + if it succeeds. + error_callback: callback to be executed the result once it is + finished if the task errors. The exception raised by the + task will be passed as the only argument to the callback. + + Returns: + AsyncResult containing the result. + """ + + self._check_running() + object_id = self._run_batch(self._random_actor_index(), func, + [(args, kwargs)]) + return AsyncResult( + [object_id], callback, error_callback, single_result=True) + + def _calculate_chunksize(self, iterable): + chunksize, extra = divmod(len(iterable), len(self._actor_pool) * 4) + if extra: + chunksize += 1 + return chunksize + + def _submit_chunk(self, + func, + iterator, + chunksize, + actor_index, + unpack_args=False): + chunk = [] + while len(chunk) < chunksize: + try: + args = next(iterator) + if not unpack_args: + args = (args, ) + chunk.append((args, {})) + except StopIteration: + break + + # Nothing to submit. The caller should prevent this. + assert len(chunk) > 0 + + return self._run_batch(actor_index, func, chunk) + + def _chunk_and_run(self, func, iterable, chunksize=None, + unpack_args=False): + if not hasattr(iterable, "__len__"): + iterable = [iterable] + + if chunksize is None: + chunksize = self._calculate_chunksize(iterable) + + iterator = iter(iterable) + chunk_object_ids = [] + while len(chunk_object_ids) * chunksize < len(iterable): + actor_index = len(chunk_object_ids) % len(self._actor_pool) + chunk_object_ids.append( + self._submit_chunk( + func, + iterator, + chunksize, + actor_index, + unpack_args=unpack_args)) + + return chunk_object_ids + + def _map_async(self, + func, + iterable, + chunksize=None, + unpack_args=False, + callback=None, + error_callback=None): + self._check_running() + object_ids = self._chunk_and_run( + func, iterable, chunksize=chunksize, unpack_args=unpack_args) + return AsyncResult(object_ids, callback, error_callback) + + def map(self, func, iterable, chunksize=None): + """Run the given function on each element in the iterable round-robin + on the actor processes and return the results synchronously. + + Args: + func: function to run. + iterable: iterable of objects to be passed as the sole argument to + func. + chunksize: number of tasks to submit as a batch to each actor + process. If unspecified, a suitable chunksize will be chosen. + + Returns: + A list of results. + """ + + return self._map_async( + func, iterable, chunksize=chunksize, unpack_args=False).get() + + def map_async(self, + func, + iterable, + chunksize=None, + callback=None, + error_callback=None): + """Run the given function on each element in the iterable round-robin + on the actor processes and return an asynchronous interface to the + results. + + Args: + func: function to run. + iterable: iterable of objects to be passed as the only argument to + func. + chunksize: number of tasks to submit as a batch to each actor + process. If unspecified, a suitable chunksize will be chosen. + callback: callback to be executed on each successful result once it + is finished. + error_callback: callback to be executed on each errored result once + it is finished. The exception raised by the task will be passed + as the only argument to the callback. + + Returns: + AsyncResult + """ + return self._map_async( + func, + iterable, + chunksize=chunksize, + unpack_args=False, + callback=callback, + error_callback=error_callback) + + def starmap(self, func, iterable, chunksize=None): + """Same as `map`, but unpacks each element of the iterable as the + arguments to func like: [func(*args) for args in iterable]. + """ + + return self._map_async( + func, iterable, chunksize=chunksize, unpack_args=True).get() + + def starmap_async(self, func, iterable, callback=None, + error_callback=None): + """Same as `map_async`, but unpacks each element of the iterable as the + arguments to func like: [func(*args) for args in iterable]. + """ + + return self._map_async( + func, + iterable, + unpack_args=True, + callback=callback, + error_callback=error_callback) + + def imap(self, func, iterable, chunksize=1): + """Same as `map`, but only submits one batch of tasks to each actor + process at a time. + + This can be useful if the iterable of arguments is very large or each + task's arguments consumes a large amount of resources. + + The results are returned in the order corresponding to their arguments + in the iterable. + + Returns: + OrderedIMapIterator + """ + + self._check_running() + return OrderedIMapIterator(self, func, iterable, chunksize=chunksize) + + def imap_unordered(self, func, iterable, chunksize=1): + """Same as `map`, but only submits one batch of tasks to each actor + process at a time. + + This can be useful if the iterable of arguments is very large or each + task's arguments consumes a large amount of resources. + + The results are returned in the order that they finish. + + Returns: + UnorderedIMapIterator + """ + + self._check_running() + return UnorderedIMapIterator(self, func, iterable, chunksize=chunksize) + + def _check_running(self): + if self._closed: + raise ValueError("Pool not running") + + def __enter__(self): + self._check_running() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + + def close(self): + """Close the pool. + + Prevents any more tasks from being submitted on the pool but allows + outstanding work to finish. + """ + + for actor, _ in self._actor_pool: + self._stop_actor(actor) + self._closed = True + + def terminate(self): + """Close the pool. + + Prevents any more tasks from being submitted on the pool and stops + outstanding work. + """ + + if not self._closed: + self.close() + for actor, _ in self._actor_pool: + actor.__ray_kill__() + + def join(self): + """Wait for the actors in a closed pool to exit. + + If the pool was closed using `close`, this will return once all + outstanding work is completed. + + If the pool was closed using `terminate`, this will return quickly. + """ + + if not self._closed: + raise ValueError("Pool is still running") + self._wait_for_stopping_actors()