From 2a66529fb739aa2b8639b6cf0b5161ec4cd1e555 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Sun, 29 Dec 2019 21:40:58 -0600 Subject: [PATCH] Add multiprocessing.Pool API (#6194) --- doc/source/index.rst | 1 + doc/source/multiprocessing.rst | 73 ++ .../experimental/multiprocessing/__init__.py | 5 + .../ray/experimental/multiprocessing/pool.py | 664 ++++++++++++++++++ python/ray/tests/BUILD | 8 + python/ray/tests/test_multiprocessing.py | 490 +++++++++++++ 6 files changed, 1241 insertions(+) create mode 100644 doc/source/multiprocessing.rst create mode 100644 python/ray/experimental/multiprocessing/__init__.py create mode 100644 python/ray/experimental/multiprocessing/pool.py create mode 100644 python/ray/tests/test_multiprocessing.py diff --git a/doc/source/index.rst b/doc/source/index.rst index 76b4f645e..e19e8194a 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -283,6 +283,7 @@ Getting Involved signals.rst async_api.rst serve.rst + multiprocessing.rst .. toctree:: :maxdepth: -1 diff --git a/doc/source/multiprocessing.rst b/doc/source/multiprocessing.rst new file mode 100644 index 000000000..af0129dac --- /dev/null +++ b/doc/source/multiprocessing.rst @@ -0,0 +1,73 @@ +multiprocessing.Pool API (Experimental) +======================================= + +.. warning:: + + Support for the multiprocessing.Pool API on Ray is an experimental feature, + so it may be changed at any time without warning. If you encounter any + bugs/shortcomings/incompatibilities, please file an `issue on GitHub`_. + Contributions are always welcome! + +.. _`issue on GitHub`: https://github.com/ray-project/ray/issues + +Ray supports running distributed python programs with the `multiprocessing.Pool API`_ +using `Ray Actors `__ instead of local processes. This makes it easy +to scale existing applications that use ``multiprocessing.Pool`` from a single node +to a cluster. + +.. _`multiprocessing.Pool API`: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool + +Quickstart +---------- + +To get started, first `install Ray `__, then use +``ray.experimental.multiprocessing.Pool`` in place of ``multiprocessing.Pool``. +This will start a local Ray cluster the first time you create a ``Pool`` and +distribute your tasks across it. See the `Run on a Cluster`_ section below for +instructions to run on a multi-node Ray cluster instead. + +.. code-block:: python + + from ray.experimental.multiprocessing import Pool + + def f(index): + return index + + pool = Pool() + for result in pool.map(f, range(100)): + print(result) + +The full ``multiprocessing.Pool`` API is currently supported. Please see the +`multiprocessing documentation`_ for details. + +.. _`multiprocessing documentation`: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool + +Run on a Cluster +---------------- + +This section assumes that you have a running Ray cluster. To start a Ray cluster, +please refer to the `cluster setup `__ instructions. + +To connect a ``Pool`` to a running Ray cluster, you can specify the address of the +head node in one of two ways: + +- By setting the ``RAY_ADDRESS`` environment variable. +- By passing the ``ray_address`` keyword argument to the ``Pool`` constructor. + +.. code-block:: python + + from ray.experimental.multiprocessing import Pool + + # Starts a new local Ray cluster. + pool = Pool() + + # Connects to a running Ray cluster, with the current node as the head node. + # Alternatively, set the environment variable RAY_ADDRESS="auto". + pool = Pool(ray_address="auto") + + # Connects to a running Ray cluster, with a remote node as the head node. + # Alternatively, set the environment variable RAY_ADDRESS=":". + pool = Pool(ray_address=":") + +You can also start Ray manually by calling ``ray.init()`` (with any of its supported +configuration options) before creating a ``Pool``. diff --git a/python/ray/experimental/multiprocessing/__init__.py b/python/ray/experimental/multiprocessing/__init__.py new file mode 100644 index 000000000..f9b741f1e --- /dev/null +++ b/python/ray/experimental/multiprocessing/__init__.py @@ -0,0 +1,5 @@ +from multiprocessing import TimeoutError + +from .pool import Pool + +__all__ = ["Pool", "TimeoutError"] diff --git a/python/ray/experimental/multiprocessing/pool.py b/python/ray/experimental/multiprocessing/pool.py new file mode 100644 index 000000000..f9184c1b6 --- /dev/null +++ b/python/ray/experimental/multiprocessing/pool.py @@ -0,0 +1,664 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from multiprocessing import TimeoutError +import os +import time +import random +import collections +import threading +import queue +import copy + +import ray + +RAY_ADDRESS_ENV = "RAY_ADDRESS" + + +# Helper function to divide a by b and round the result up. +def div_round_up(a, b): + return -(-a // b) + + +class PoolTaskError(Exception): + def __init__(self, underlying): + self.underlying = underlying + + +class ResultThread(threading.Thread): + def __init__(self, + object_ids, + callback=None, + error_callback=None, + total_object_ids=None): + threading.Thread.__init__(self) + self._got_error = False + self._object_ids = [] + self._num_ready = 0 + self._results = [] + self._ready_index_queue = queue.Queue() + self._callback = callback + self._error_callback = error_callback + self._total_object_ids = total_object_ids or len(object_ids) + self._indices = {} + # Thread-safe queue used to add ObjectIDs to fetch after creating + # this thread (used to lazily submit for imap and imap_unordered). + self._new_object_ids = queue.Queue() + for object_id in object_ids: + self._add_object_id(object_id) + + def _add_object_id(self, object_id): + self._indices[object_id] = len(self._object_ids) + self._object_ids.append(object_id) + self._results.append(None) + + def add_object_id(self, object_id): + self._new_object_ids.put(object_id) + + def run(self): + unready = copy.copy(self._object_ids) + while self._num_ready < self._total_object_ids: + # Get as many new IDs from the queue as possible without blocking, + # unless we have no IDs to wait on, in which case we block. + while True: + try: + block = len(unready) == 0 + new_object_id = self._new_object_ids.get(block=block) + self._add_object_id(new_object_id) + unready.append(new_object_id) + except queue.Empty: + # queue.Empty means no result was retrieved if block=False. + break + + [ready_id], unready = ray.wait(unready, num_returns=1) + batch = ray.get(ready_id) + for result in batch: + if isinstance(result, Exception): + self._got_error = True + if self._error_callback is not None: + self._error_callback(result) + elif self._callback is not None: + self._callback(result) + + self._num_ready += 1 + self._results[self._indices[ready_id]] = batch + self._ready_index_queue.put(self._indices[ready_id]) + + def got_error(self): + # Should only be called after the thread finishes. + return self._got_error + + def result(self, index): + # Should only be called on results that are ready. + return self._results[index] + + def results(self): + # Should only be called after the thread finishes. + return self._results + + def next_ready_index(self, timeout=None): + try: + return self._ready_index_queue.get(timeout=timeout) + except queue.Empty: + # queue.Queue signals a timeout by raising queue.Empty. + raise TimeoutError + + +class AsyncResult(object): + """An asynchronous interface to task results. + + This should not be constructed directly. + """ + + def __init__(self, + chunk_object_ids, + callback=None, + error_callback=None, + single_result=False): + self._single_result = single_result + self._result_thread = ResultThread(chunk_object_ids, callback, + error_callback) + self._result_thread.start() + + def wait(self, timeout=None): + """ + Returns once the result is ready or the timeout expires (does not + raise TimeoutError). + + Args: + timeout: timeout in milliseconds. + """ + + self._result_thread.join(timeout) + + def get(self, timeout=None): + self.wait(timeout) + if self._result_thread.is_alive(): + raise TimeoutError + + results = [] + for batch in self._result_thread.results(): + for result in batch: + if isinstance(result, PoolTaskError): + raise result.underlying + results.extend(batch) + + if self._single_result: + return results[0] + + return results + + def ready(self): + """ + Returns true if the result is ready, else false if the tasks are still + running. + """ + + return not self._result_thread.is_alive() + + def successful(self): + """ + Returns true if none of the submitted tasks errored, else false. Should + only be called once the result is ready (can be checked using `ready`). + """ + + if not self.ready(): + raise ValueError("{0!r} not ready".format(self)) + return not self._result_thread.got_error() + + +class IMapIterator(object): + """Base class for OrderedIMapIterator and UnorderedIMapIterator.""" + + def __init__(self, pool, func, iterable, chunksize=None): + self._next_chunk_index = 0 + # List of bools indicating if the given chunk is ready or not for all + # submitted chunks. Ordering mirrors that in the in the ResultThread. + self._submitted_chunks = [] + self._ready_objects = collections.deque() + if not hasattr(iterable, "__len__"): + iterable = [iterable] + self._iterator = iter(iterable) + self._chunksize = chunksize or pool._calculate_chunksize(iterable) + self._total_chunks = div_round_up(len(iterable), chunksize) + self._result_thread = ResultThread( + [], total_object_ids=self._total_chunks) + self._result_thread.start() + + for _ in range(len(pool._actor_pool)): + self._submit_next_chunk() + + def _submit_next_chunk(self): + # The full iterable has been submitted, so no-op. + if len(self._submitted_chunks) >= self._total_chunks: + return + + actor_index = len(self._submitted_chunks) % len(self._pool._actor_pool) + new_chunk_id = self._pool._submit_chunk(self._func, self._iterator, + self._chunksize, actor_index) + self._submitted_chunks.append(False) + self._result_thread.add_object_id(new_chunk_id) + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + # Should be implemented by subclasses. + raise NotImplementedError + + +class OrderedIMapIterator(IMapIterator): + """Iterator to the results of tasks submitted using `imap`. + + The results are returned in the same order that they were submitted, even + if they don't finish in that order. Only one batch of tasks per actor + process is submitted at a time - the rest are submitted as results come in. + + Should not be constructed directly. + """ + + def next(self, timeout=None): + if len(self._ready_objects) == 0: + if self._next_chunk_index == self._total_chunks: + raise StopIteration + + while timeout is None or timeout > 0: + start = time.time() + index = self._result_thread.next_ready_index(timeout=timeout) + self._submit_next_chunk() + self._submitted_chunks[index] = True + if index == self._next_chunk_index: + break + if timeout is not None: + timeout = max(0, timeout - (time.time() - start)) + + while self._next_chunk_index < len( + self._submitted_chunks + ) and self._submitted_chunks[self._next_chunk_index]: + for result in self._result_thread.result( + self._next_chunk_index): + self._ready_objects.append(result) + self._next_chunk_index += 1 + + return self._ready_objects.popleft() + + +class UnorderedIMapIterator(IMapIterator): + """Iterator to the results of tasks submitted using `imap`. + + The results are returned in the order that they finish. Only one batch of + tasks per actor process is submitted at a time - the rest are submitted as + results come in. + + Should not be constructed directly. + """ + + def next(self, timeout=None): + if len(self._ready_objects) == 0: + if self._next_chunk_index == self._total_chunks: + raise StopIteration + + index = self._result_thread.next_ready_index(timeout=timeout) + self._submit_next_chunk() + + for result in self._result_thread.result(index): + self._ready_objects.append(result) + self._next_chunk_index += 1 + + return self._ready_objects.popleft() + + +@ray.remote +class PoolActor(object): + """Actor used to process tasks submitted to a Pool.""" + + def __init__(self, initializer=None, initargs=None): + if initializer: + initargs = initargs or () + initializer(*initargs) + + def ping(self): + # Used to wait for this actor to be initialized. + pass + + def run_batch(self, func, batch): + results = [] + for args, kwargs in batch: + args = args or () + kwargs = kwargs or {} + try: + results.append(func(*args, **kwargs)) + except Exception as e: + results.append(PoolTaskError(e)) + return results + + +# https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool +class Pool(object): + """A pool of actor processes that is used to process tasks in parallel. + + Args: + processes: number of actor processes to start in the pool. Defaults to + the number of cores in the Ray cluster if one is already running, + otherwise the number of cores on this machine. + initializer: function to be run in each actor when it starts up. + initargs: iterable of arguments to the initializer function. + maxtasksperchild: maximum number of tasks to run in each actor process. + After a process has executed this many tasks, it will be killed and + replaced with a new one. + ray_address: address of the Ray cluster to run on. If None, a new local + Ray cluster will be started on this machine. Otherwise, this will + be passed to `ray.init()` to connect to a running cluster. This may + also be specified using the `RAY_ADDRESS` environment variable. + """ + + def __init__(self, + processes=None, + initializer=None, + initargs=None, + maxtasksperchild=None, + ray_address=None): + self._closed = False + self._initializer = initializer + self._initargs = initargs + self._maxtasksperchild = maxtasksperchild or -1 + self._actor_deletion_ids = [] + + processes = self._init_ray(processes, ray_address) + self._start_actor_pool(processes) + + def _init_ray(self, processes=None, ray_address=None): + # Initialize ray. If ray is already initialized, we do nothing. + # Else, the priority is: + # ray_address argument > RAY_ADDRESS > start new local cluster. + if not ray.is_initialized(): + if ray_address is None and RAY_ADDRESS_ENV in os.environ: + ray_address = os.environ[RAY_ADDRESS_ENV] + + # Cluster mode. + if ray_address is not None: + print("Connecting to ray cluster at address='{}'".format( + ray_address)) + ray.init(address=ray_address) + # Local mode. + else: + print("Starting local ray cluster") + ray.init(num_cpus=processes) + + ray_cpus = int(ray.state.cluster_resources()["CPU"]) + if processes is None: + processes = ray_cpus + if processes <= 0: + raise ValueError("Processes in the pool must be >0.") + if ray_cpus < processes: + raise ValueError("Tried to start a pool with {} processes on an " + "existing ray cluster, but there are only {} " + "CPUs in the ray cluster.".format( + processes, ray_cpus)) + + return processes + + def _start_actor_pool(self, processes): + self._actor_pool = [self._new_actor_entry() for _ in range(processes)] + ray.get([actor.ping.remote() for actor, _ in self._actor_pool]) + + def _wait_for_stopping_actors(self, timeout=None): + if len(self._actor_deletion_ids) == 0: + return + if timeout is not None: + timeout = float(timeout) + + _, deleting = ray.wait( + self._actor_deletion_ids, + num_returns=len(self._actor_deletion_ids), + timeout=timeout) + self._actor_deletion_ids = deleting + + def _stop_actor(self, actor): + # Check and clean up any outstanding IDs corresponding to deletions. + self._wait_for_stopping_actors(timeout=0.0) + # The deletion task will block until the actor has finished executing + # all pending tasks. + self._actor_deletion_ids.append(actor.__ray_terminate__.remote()) + + def _new_actor_entry(self): + # NOTE(edoakes): The initializer function can't currently be used to + # modify the global namespace (e.g., import packages or set globals) + # due to a limitation in cloudpickle. + return (PoolActor.remote(self._initializer, self._initargs), 0) + + def _random_actor_index(self): + return random.randrange(len(self._actor_pool)) + + # Batch should be a list of tuples: (args, kwargs). + def _run_batch(self, actor_index, func, batch): + actor, count = self._actor_pool[actor_index] + object_id = actor.run_batch.remote(func, batch) + count += 1 + assert self._maxtasksperchild == -1 or count <= self._maxtasksperchild + if count == self._maxtasksperchild: + self._stop_actor(actor) + actor, count = self._new_actor_entry() + self._actor_pool[actor_index] = (actor, count) + return object_id + + def apply(self, func, args=None, kwargs=None): + """Run the given function on a random actor process and return the + result synchronously. + + Args: + func: function to run. + args: optional arguments to the function. + kwargs: optional keyword arguments to the function. + + Returns: + The result. + """ + + return self.apply_async(func, args, kwargs).get() + + def apply_async(self, + func, + args=None, + kwargs=None, + callback=None, + error_callback=None): + """Run the given function on a random actor process and return an + asynchronous interface to the result. + + Args: + func: function to run. + args: optional arguments to the function. + kwargs: optional keyword arguments to the function. + callback: callback to be executed on the result once it is finished + if it succeeds. + error_callback: callback to be executed the result once it is + finished if the task errors. The exception raised by the + task will be passed as the only argument to the callback. + + Returns: + AsyncResult containing the result. + """ + + self._check_running() + object_id = self._run_batch(self._random_actor_index(), func, + [(args, kwargs)]) + return AsyncResult( + [object_id], callback, error_callback, single_result=True) + + def _calculate_chunksize(self, iterable): + chunksize, extra = divmod(len(iterable), len(self._actor_pool) * 4) + if extra: + chunksize += 1 + return chunksize + + def _submit_chunk(self, + func, + iterator, + chunksize, + actor_index, + unpack_args=False): + chunk = [] + while len(chunk) < chunksize: + try: + args = next(iterator) + if not unpack_args: + args = (args, ) + chunk.append((args, {})) + except StopIteration: + break + + # Nothing to submit. The caller should prevent this. + assert len(chunk) > 0 + + return self._run_batch(actor_index, func, chunk) + + def _chunk_and_run(self, func, iterable, chunksize=None, + unpack_args=False): + if not hasattr(iterable, "__len__"): + iterable = [iterable] + + if chunksize is None: + chunksize = self._calculate_chunksize(iterable) + + iterator = iter(iterable) + chunk_object_ids = [] + while len(chunk_object_ids) * chunksize < len(iterable): + actor_index = len(chunk_object_ids) % len(self._actor_pool) + chunk_object_ids.append( + self._submit_chunk( + func, + iterator, + chunksize, + actor_index, + unpack_args=unpack_args)) + + return chunk_object_ids + + def _map_async(self, + func, + iterable, + chunksize=None, + unpack_args=False, + callback=None, + error_callback=None): + self._check_running() + object_ids = self._chunk_and_run( + func, iterable, chunksize=chunksize, unpack_args=unpack_args) + return AsyncResult(object_ids, callback, error_callback) + + def map(self, func, iterable, chunksize=None): + """Run the given function on each element in the iterable round-robin + on the actor processes and return the results synchronously. + + Args: + func: function to run. + iterable: iterable of objects to be passed as the sole argument to + func. + chunksize: number of tasks to submit as a batch to each actor + process. If unspecified, a suitable chunksize will be chosen. + + Returns: + A list of results. + """ + + return self._map_async( + func, iterable, chunksize=chunksize, unpack_args=False).get() + + def map_async(self, + func, + iterable, + chunksize=None, + callback=None, + error_callback=None): + """Run the given function on each element in the iterable round-robin + on the actor processes and return an asynchronous interface to the + results. + + Args: + func: function to run. + iterable: iterable of objects to be passed as the only argument to + func. + chunksize: number of tasks to submit as a batch to each actor + process. If unspecified, a suitable chunksize will be chosen. + callback: callback to be executed on each successful result once it + is finished. + error_callback: callback to be executed on each errored result once + it is finished. The exception raised by the task will be passed + as the only argument to the callback. + + Returns: + AsyncResult + """ + return self._map_async( + func, + iterable, + chunksize=chunksize, + unpack_args=False, + callback=callback, + error_callback=error_callback) + + def starmap(self, func, iterable, chunksize=None): + """Same as `map`, but unpacks each element of the iterable as the + arguments to func like: [func(*args) for args in iterable]. + """ + + return self._map_async( + func, iterable, chunksize=chunksize, unpack_args=True).get() + + def starmap_async(self, func, iterable, callback=None, + error_callback=None): + """Same as `map_async`, but unpacks each element of the iterable as the + arguments to func like: [func(*args) for args in iterable]. + """ + + return self._map_async( + func, + iterable, + unpack_args=True, + callback=callback, + error_callback=error_callback) + + def imap(self, func, iterable, chunksize=1): + """Same as `map`, but only submits one batch of tasks to each actor + process at a time. + + This can be useful if the iterable of arguments is very large or each + task's arguments consumes a large amount of resources. + + The results are returned in the order corresponding to their arguments + in the iterable. + + Returns: + OrderedIMapIterator + """ + + self._check_running() + return OrderedIMapIterator(self, func, iterable, chunksize=chunksize) + + def imap_unordered(self, func, iterable, chunksize=1): + """Same as `map`, but only submits one batch of tasks to each actor + process at a time. + + This can be useful if the iterable of arguments is very large or each + task's arguments consumes a large amount of resources. + + The results are returned in the order that they finish. + + Returns: + UnorderedIMapIterator + """ + + self._check_running() + return UnorderedIMapIterator(self, func, iterable, chunksize=chunksize) + + def _check_running(self): + if self._closed: + raise ValueError("Pool not running") + + def __enter__(self): + self._check_running() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + + def close(self): + """Close the pool. + + Prevents any more tasks from being submitted on the pool but allows + outstanding work to finish. + """ + + for actor, _ in self._actor_pool: + self._stop_actor(actor) + self._closed = True + + def terminate(self): + """Close the pool. + + Prevents any more tasks from being submitted on the pool and stops + outstanding work. + """ + + if not self._closed: + self.close() + for actor, _ in self._actor_pool: + actor.__ray_kill__() + + def join(self): + """Wait for the actors in a closed pool to exit. + + If the pool was closed using `close`, this will return once all + outstanding work is completed. + + If the pool was closed using `terminate`, this will return quickly. + """ + + if not self._closed: + raise ValueError("Pool is still running") + self._wait_for_stopping_actors() diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 2767361c6..439d63ad0 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -243,6 +243,14 @@ py_test( deps = ["//:ray_lib"], ) +py_test( + name = "test_multiprocessing", + size = "medium", + srcs = ["test_multiprocessing.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) + py_test( name = "test_multi_node_2", size = "medium", diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py new file mode 100644 index 000000000..46cdba274 --- /dev/null +++ b/python/ray/tests/test_multiprocessing.py @@ -0,0 +1,490 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import os +import pytest +import tempfile +import time +import random +import subprocess +from collections import defaultdict +import queue + +import ray +from ray.experimental.multiprocessing import Pool, TimeoutError + + +@pytest.fixture +def cleanup_only(): + yield None + ray.shutdown() + subprocess.check_output(["ray", "stop"]) + if "RAY_ADDRESS" in os.environ: + del os.environ["RAY_ADDRESS"] + + +@pytest.fixture +def pool(): + pool = Pool(processes=1) + yield pool + pool.terminate() + ray.shutdown() + + +@pytest.fixture +def pool_4_processes(): + pool = Pool(processes=4) + yield pool + pool.terminate() + ray.shutdown() + + +def test_initialize_ray(cleanup_only): + def getpid(args): + return os.getpid() + + def check_pool_size(pool, size): + args = [tuple() for _ in range(size)] + assert len(set(pool.map(getpid, args))) == size + + # Check that starting a pool starts ray if not initialized. + pool = Pool(processes=2) + assert ray.is_initialized() + assert int(ray.state.cluster_resources()["CPU"]) == 2 + check_pool_size(pool, 2) + ray.shutdown() + + # Check that starting a pool doesn't affect ray if there is a local + # ray cluster running. + ray.init(num_cpus=3) + assert ray.is_initialized() + pool = Pool(processes=2) + assert int(ray.state.cluster_resources()["CPU"]) == 3 + check_pool_size(pool, 2) + ray.shutdown() + + # Check that trying to start a pool on an existing ray cluster throws an + # error if there aren't enough CPUs for the number of processes. + ray.init(num_cpus=1) + assert ray.is_initialized() + with pytest.raises(ValueError): + Pool(processes=2) + assert int(ray.state.cluster_resources()["CPU"]) == 1 + ray.shutdown() + + # Use different numbers of CPUs to distinguish between starting a local + # ray cluster and connecting to an existing one. + init_cpus = 2 + start_cpus = 3 + + # Start a ray cluster in the background. + subprocess.check_output( + ["ray", "start", "--head", "--num-cpus={}".format(start_cpus)]) + + # Check that starting a pool still starts ray if RAY_ADDRESS not set. + pool = Pool(processes=init_cpus) + assert ray.is_initialized() + assert int(ray.state.cluster_resources()["CPU"]) == init_cpus + check_pool_size(pool, init_cpus) + ray.shutdown() + + # Check that starting a pool connects to a running ray cluster if + # ray_address is passed in. + pool = Pool(ray_address="auto") + assert ray.is_initialized() + assert int(ray.state.cluster_resources()["CPU"]) == start_cpus + check_pool_size(pool, start_cpus) + ray.shutdown() + + # Set RAY_ADDRESS, so pools should connect to the running ray cluster. + os.environ["RAY_ADDRESS"] = "auto" + + # Check that starting a pool connects to a running ray cluster if + # RAY_ADDRESS is set. + pool = Pool() + assert ray.is_initialized() + assert int(ray.state.cluster_resources()["CPU"]) == start_cpus + check_pool_size(pool, start_cpus) + ray.shutdown() + + # Check that trying to start a pool on an existing ray cluster throws an + # error if there aren't enough CPUs for the number of processes. + with pytest.raises(Exception): + Pool(processes=start_cpus + 1) + assert int(ray.state.cluster_resources()["CPU"]) == start_cpus + ray.shutdown() + + # Clean up the background ray cluster. + subprocess.check_output(["ray", "stop"]) + + +def test_initializer(cleanup_only): + def init(dirname): + with open(os.path.join(dirname, str(os.getpid())), "w") as f: + print("hello", file=f) + + with tempfile.TemporaryDirectory() as dirname: + num_processes = 4 + pool = Pool( + processes=num_processes, initializer=init, initargs=(dirname, )) + + assert len(os.listdir(dirname)) == 4 + pool.terminate() + + +def test_close(pool_4_processes): + def f(object_id): + return ray.get(object_id) + + object_id = ray.ObjectID.from_random() + result = pool_4_processes.map_async(f, [object_id for _ in range(4)]) + assert not result.ready() + pool_4_processes.close() + assert not result.ready() + + # Fulfill the object_id, causing the head of line tasks to finish. + ray.worker.global_worker.put_object("hello", object_id=object_id) + pool_4_processes.join() + + # close() shouldn't interrupt pending tasks, so check that they succeeded. + assert result.ready() + assert result.successful() + assert result.get() == ["hello"] * 4 + + +def test_terminate(pool_4_processes): + def f(object_id): + return ray.get(object_id) + + object_id = ray.ObjectID.from_random() + result = pool_4_processes.map_async(f, [object_id for _ in range(4)]) + assert not result.ready() + pool_4_processes.terminate() + + # terminate() should interrupt pending tasks, so check that join() returns + # even though the tasks should be blocked forever. + pool_4_processes.join() + result.wait(timeout=10) + assert result.ready() + assert not result.successful() + + +def test_apply(pool): + def f(arg1, arg2, kwarg1=None, kwarg2=None): + assert arg1 == 1 + assert arg2 == 2 + assert kwarg1 is None + assert kwarg2 == 3 + return 1 + + assert pool.apply(f, (1, 2), {"kwarg2": 3}) == 1 + with pytest.raises(AssertionError): + pool.apply(f, ( + 2, + 2, + ), {"kwarg2": 3}) + with pytest.raises(Exception): + pool.apply(f, (1, )) + with pytest.raises(Exception): + pool.apply(f, (1, 2), {"kwarg1": 3}) + + +def test_apply_async(pool): + def f(arg1, arg2, kwarg1=None, kwarg2=None): + assert arg1 == 1 + assert arg2 == 2 + assert kwarg1 is None + assert kwarg2 == 3 + return 1 + + assert pool.apply_async(f, (1, 2), {"kwarg2": 3}).get() == 1 + with pytest.raises(AssertionError): + pool.apply_async(f, ( + 2, + 2, + ), { + "kwarg2": 3 + }).get() + with pytest.raises(Exception): + pool.apply_async(f, (1, )).get() + with pytest.raises(Exception): + pool.apply_async(f, (1, 2), {"kwarg1": 3}).get() + + # Won't return until the input ObjectID is fulfilled. + def ten_over(input): + return 10 / ray.get(input[0]) + + # Generate a random ObjectID that will be fulfilled later. + object_id = ray.ObjectID.from_random() + result = pool.apply_async(ten_over, ([object_id], )) + result.wait(timeout=0.01) + assert not result.ready() + with pytest.raises(TimeoutError): + result.get(timeout=0.01) + + # Fulfill the ObjectID. + ray.worker.global_worker.put_object(10, object_id=object_id) + result.wait(timeout=10) + assert result.ready() + assert result.successful() + assert result.get() == 1 + + # Generate a random ObjectID that will be fulfilled later. + object_id = ray.ObjectID.from_random() + result = pool.apply_async(ten_over, ([object_id], )) + with pytest.raises(ValueError, match="not ready"): + result.successful() + + # Fulfill the ObjectID with 0, causing the task to fail (divide by zero). + ray.worker.global_worker.put_object(0, object_id=object_id) + result.wait(timeout=10) + assert result.ready() + assert not result.successful() + with pytest.raises(ZeroDivisionError): + result.get() + + +def test_map(pool_4_processes): + def f(index): + return index, os.getpid() + + results = pool_4_processes.map(f, range(1000)) + assert len(results) == 1000 + + pid_counts = defaultdict(int) + for i, (index, pid) in enumerate(results): + assert i == index + pid_counts[pid] += 1 + + # Check that the functions are spread somewhat evenly. + for count in pid_counts.values(): + assert count > 100 + + def bad_func(args): + raise Exception("test_map failure") + + with pytest.raises(Exception, match="test_map failure"): + pool_4_processes.map(bad_func, range(100)) + + +def test_map_async(pool_4_processes): + def f(args): + index = args[0] + ray.get(args[1]) + return index, os.getpid() + + # Generate a random ObjectID that will be fulfilled later. + object_id = ray.ObjectID.from_random() + async_result = pool_4_processes.map_async( + f, [(i, object_id) for i in range(1000)]) + assert not async_result.ready() + with pytest.raises(TimeoutError): + async_result.get(timeout=0.01) + async_result.wait(timeout=0.01) + + # Fulfill the object ID, finishing the tasks. + ray.worker.global_worker.put_object(0, object_id=object_id) + async_result.wait(timeout=10) + assert async_result.ready() + assert async_result.successful() + + results = async_result.get() + assert len(results) == 1000 + + pid_counts = defaultdict(int) + for i, (index, pid) in enumerate(results): + assert i == index + pid_counts[pid] += 1 + + # Check that the functions are spread somewhat evenly. + for count in pid_counts.values(): + assert count > 100 + + def bad_func(index): + if index == 50: + raise Exception("test_map_async failure") + + async_result = pool_4_processes.map_async(bad_func, range(100)) + async_result.wait(10) + assert async_result.ready() + assert not async_result.successful() + + with pytest.raises(Exception, match="test_map_async failure"): + async_result.get() + + +def test_starmap(pool): + def f(*args): + return args + + args = [tuple(range(i)) for i in range(100)] + assert pool.starmap(f, args) == args + + +def test_callbacks(pool_4_processes): + def f(args): + time.sleep(0.1 * random.random()) + index = args[0] + err_indices = args[1] + if index in err_indices: + raise Exception("intentional failure") + return index + + callback_queue = queue.Queue() + + def callback(result): + callback_queue.put(result) + + def error_callback(error): + callback_queue.put(error) + + # Will not error, check that callback is called. + result = pool_4_processes.apply_async(f, ((0, [1]), ), callback=callback) + assert callback_queue.get() == 0 + result.get() + + # Will error, check that error_callback is called. + result = pool_4_processes.apply_async( + f, ((0, [0]), ), error_callback=error_callback) + assert isinstance(callback_queue.get(), Exception) + with pytest.raises(Exception, match="intentional failure"): + result.get() + + # Test callbacks for map_async. + error_indices = [2, 50, 98] + result = pool_4_processes.map_async( + f, [(index, error_indices) for index in range(100)], + callback=callback, + error_callback=error_callback) + callback_results = [] + while len(callback_results) < 100: + callback_results.append(callback_queue.get()) + + assert result.ready() + assert not result.successful() + + # Check that callbacks were called on every result, error or not. + assert len(callback_results) == 100 + # Check that callbacks were processed in the order that the tasks finished. + # NOTE: this could be flaky if the calls happened to finish in order due + # to the random sleeps, but it's very unlikely. + assert not all(i in error_indices or i == result + for i, result in enumerate(callback_results)) + # Check that the correct callbacks were called on errors/successes. + assert all(index not in callback_results for index in error_indices) + assert [isinstance(result, Exception) + for result in callback_results].count(True) == len(error_indices) + + +def test_imap(pool_4_processes): + def f(args): + time.sleep(0.1 * random.random()) + index = args[0] + err_indices = args[1] + if index in err_indices: + raise Exception("intentional failure") + return index + + error_indices = [2, 50, 98] + result_iter = pool_4_processes.imap( + f, [(index, error_indices) for index in range(100)], chunksize=11) + for i in range(100): + result = result_iter.next() + if i in error_indices: + assert isinstance(result, Exception) + else: + assert result == i + + with pytest.raises(StopIteration): + result_iter.next() + + +def test_imap_unordered(pool_4_processes): + def f(args): + time.sleep(0.1 * random.random()) + index = args[0] + err_indices = args[1] + if index in err_indices: + raise Exception("intentional failure") + return index + + error_indices = [2, 50, 98] + in_order = [] + num_errors = 0 + result_iter = pool_4_processes.imap_unordered( + f, [(index, error_indices) for index in range(100)], chunksize=11) + for i in range(100): + result = result_iter.next() + if isinstance(result, Exception): + in_order.append(True) + num_errors += 1 + else: + in_order.append(result == i) + + # Check that the results didn't come back all in order. + # NOTE: this could be flaky if the calls happened to finish in order due + # to the random sleeps, but it's very unlikely. + assert not all(in_order) + assert num_errors == len(error_indices) + + with pytest.raises(StopIteration): + result_iter.next() + + +def test_imap_timeout(pool_4_processes): + def f(args): + time.sleep(0.1 * random.random()) + index = args[0] + wait_index = args[1] + object_id = args[2] + if index == wait_index: + ray.get(object_id) + return index + + wait_index = 23 + object_id = ray.ObjectID.from_random() + result_iter = pool_4_processes.imap( + f, [(index, wait_index, object_id) for index in range(100)]) + for i in range(100): + if i == wait_index: + with pytest.raises(TimeoutError): + result = result_iter.next(timeout=0.1) + ray.worker.global_worker.put_object(None, object_id=object_id) + + result = result_iter.next() + assert result == i + + with pytest.raises(StopIteration): + result_iter.next() + + wait_index = 23 + object_id = ray.ObjectID.from_random() + result_iter = pool_4_processes.imap_unordered( + f, [(index, wait_index, object_id) for index in range(100)], + chunksize=11) + in_order = [] + for i in range(100): + try: + result = result_iter.next(timeout=1) + except TimeoutError: + ray.worker.global_worker.put_object(None, object_id=object_id) + result = result_iter.next() + + in_order.append(result == i) + + # Check that the results didn't come back all in order. + # NOTE: this could be flaky if the calls happened to finish in order due + # to the random sleeps, but it's very unlikely. + assert not all(in_order) + + with pytest.raises(StopIteration): + result_iter.next() + + +def test_maxtasksperchild(cleanup_only): + def f(args): + return os.getpid() + + pool = Pool(5, maxtasksperchild=1) + assert len(set(pool.map(f, range(20)))) == 20