Add multiprocessing.Pool API (#6194)

This commit is contained in:
Edward Oakes
2019-12-29 21:40:58 -06:00
committed by GitHub
parent e2bc489a18
commit 2a66529fb7
6 changed files with 1241 additions and 0 deletions
@@ -0,0 +1,5 @@
from multiprocessing import TimeoutError
from .pool import Pool
__all__ = ["Pool", "TimeoutError"]
@@ -0,0 +1,664 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from multiprocessing import TimeoutError
import os
import time
import random
import collections
import threading
import queue
import copy
import ray
RAY_ADDRESS_ENV = "RAY_ADDRESS"
# Helper function to divide a by b and round the result up.
def div_round_up(a, b):
return -(-a // b)
class PoolTaskError(Exception):
def __init__(self, underlying):
self.underlying = underlying
class ResultThread(threading.Thread):
def __init__(self,
object_ids,
callback=None,
error_callback=None,
total_object_ids=None):
threading.Thread.__init__(self)
self._got_error = False
self._object_ids = []
self._num_ready = 0
self._results = []
self._ready_index_queue = queue.Queue()
self._callback = callback
self._error_callback = error_callback
self._total_object_ids = total_object_ids or len(object_ids)
self._indices = {}
# Thread-safe queue used to add ObjectIDs to fetch after creating
# this thread (used to lazily submit for imap and imap_unordered).
self._new_object_ids = queue.Queue()
for object_id in object_ids:
self._add_object_id(object_id)
def _add_object_id(self, object_id):
self._indices[object_id] = len(self._object_ids)
self._object_ids.append(object_id)
self._results.append(None)
def add_object_id(self, object_id):
self._new_object_ids.put(object_id)
def run(self):
unready = copy.copy(self._object_ids)
while self._num_ready < self._total_object_ids:
# Get as many new IDs from the queue as possible without blocking,
# unless we have no IDs to wait on, in which case we block.
while True:
try:
block = len(unready) == 0
new_object_id = self._new_object_ids.get(block=block)
self._add_object_id(new_object_id)
unready.append(new_object_id)
except queue.Empty:
# queue.Empty means no result was retrieved if block=False.
break
[ready_id], unready = ray.wait(unready, num_returns=1)
batch = ray.get(ready_id)
for result in batch:
if isinstance(result, Exception):
self._got_error = True
if self._error_callback is not None:
self._error_callback(result)
elif self._callback is not None:
self._callback(result)
self._num_ready += 1
self._results[self._indices[ready_id]] = batch
self._ready_index_queue.put(self._indices[ready_id])
def got_error(self):
# Should only be called after the thread finishes.
return self._got_error
def result(self, index):
# Should only be called on results that are ready.
return self._results[index]
def results(self):
# Should only be called after the thread finishes.
return self._results
def next_ready_index(self, timeout=None):
try:
return self._ready_index_queue.get(timeout=timeout)
except queue.Empty:
# queue.Queue signals a timeout by raising queue.Empty.
raise TimeoutError
class AsyncResult(object):
"""An asynchronous interface to task results.
This should not be constructed directly.
"""
def __init__(self,
chunk_object_ids,
callback=None,
error_callback=None,
single_result=False):
self._single_result = single_result
self._result_thread = ResultThread(chunk_object_ids, callback,
error_callback)
self._result_thread.start()
def wait(self, timeout=None):
"""
Returns once the result is ready or the timeout expires (does not
raise TimeoutError).
Args:
timeout: timeout in milliseconds.
"""
self._result_thread.join(timeout)
def get(self, timeout=None):
self.wait(timeout)
if self._result_thread.is_alive():
raise TimeoutError
results = []
for batch in self._result_thread.results():
for result in batch:
if isinstance(result, PoolTaskError):
raise result.underlying
results.extend(batch)
if self._single_result:
return results[0]
return results
def ready(self):
"""
Returns true if the result is ready, else false if the tasks are still
running.
"""
return not self._result_thread.is_alive()
def successful(self):
"""
Returns true if none of the submitted tasks errored, else false. Should
only be called once the result is ready (can be checked using `ready`).
"""
if not self.ready():
raise ValueError("{0!r} not ready".format(self))
return not self._result_thread.got_error()
class IMapIterator(object):
"""Base class for OrderedIMapIterator and UnorderedIMapIterator."""
def __init__(self, pool, func, iterable, chunksize=None):
self._next_chunk_index = 0
# List of bools indicating if the given chunk is ready or not for all
# submitted chunks. Ordering mirrors that in the in the ResultThread.
self._submitted_chunks = []
self._ready_objects = collections.deque()
if not hasattr(iterable, "__len__"):
iterable = [iterable]
self._iterator = iter(iterable)
self._chunksize = chunksize or pool._calculate_chunksize(iterable)
self._total_chunks = div_round_up(len(iterable), chunksize)
self._result_thread = ResultThread(
[], total_object_ids=self._total_chunks)
self._result_thread.start()
for _ in range(len(pool._actor_pool)):
self._submit_next_chunk()
def _submit_next_chunk(self):
# The full iterable has been submitted, so no-op.
if len(self._submitted_chunks) >= self._total_chunks:
return
actor_index = len(self._submitted_chunks) % len(self._pool._actor_pool)
new_chunk_id = self._pool._submit_chunk(self._func, self._iterator,
self._chunksize, actor_index)
self._submitted_chunks.append(False)
self._result_thread.add_object_id(new_chunk_id)
def __iter__(self):
return self
def __next__(self):
return self.next()
def next(self):
# Should be implemented by subclasses.
raise NotImplementedError
class OrderedIMapIterator(IMapIterator):
"""Iterator to the results of tasks submitted using `imap`.
The results are returned in the same order that they were submitted, even
if they don't finish in that order. Only one batch of tasks per actor
process is submitted at a time - the rest are submitted as results come in.
Should not be constructed directly.
"""
def next(self, timeout=None):
if len(self._ready_objects) == 0:
if self._next_chunk_index == self._total_chunks:
raise StopIteration
while timeout is None or timeout > 0:
start = time.time()
index = self._result_thread.next_ready_index(timeout=timeout)
self._submit_next_chunk()
self._submitted_chunks[index] = True
if index == self._next_chunk_index:
break
if timeout is not None:
timeout = max(0, timeout - (time.time() - start))
while self._next_chunk_index < len(
self._submitted_chunks
) and self._submitted_chunks[self._next_chunk_index]:
for result in self._result_thread.result(
self._next_chunk_index):
self._ready_objects.append(result)
self._next_chunk_index += 1
return self._ready_objects.popleft()
class UnorderedIMapIterator(IMapIterator):
"""Iterator to the results of tasks submitted using `imap`.
The results are returned in the order that they finish. Only one batch of
tasks per actor process is submitted at a time - the rest are submitted as
results come in.
Should not be constructed directly.
"""
def next(self, timeout=None):
if len(self._ready_objects) == 0:
if self._next_chunk_index == self._total_chunks:
raise StopIteration
index = self._result_thread.next_ready_index(timeout=timeout)
self._submit_next_chunk()
for result in self._result_thread.result(index):
self._ready_objects.append(result)
self._next_chunk_index += 1
return self._ready_objects.popleft()
@ray.remote
class PoolActor(object):
"""Actor used to process tasks submitted to a Pool."""
def __init__(self, initializer=None, initargs=None):
if initializer:
initargs = initargs or ()
initializer(*initargs)
def ping(self):
# Used to wait for this actor to be initialized.
pass
def run_batch(self, func, batch):
results = []
for args, kwargs in batch:
args = args or ()
kwargs = kwargs or {}
try:
results.append(func(*args, **kwargs))
except Exception as e:
results.append(PoolTaskError(e))
return results
# https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool
class Pool(object):
"""A pool of actor processes that is used to process tasks in parallel.
Args:
processes: number of actor processes to start in the pool. Defaults to
the number of cores in the Ray cluster if one is already running,
otherwise the number of cores on this machine.
initializer: function to be run in each actor when it starts up.
initargs: iterable of arguments to the initializer function.
maxtasksperchild: maximum number of tasks to run in each actor process.
After a process has executed this many tasks, it will be killed and
replaced with a new one.
ray_address: address of the Ray cluster to run on. If None, a new local
Ray cluster will be started on this machine. Otherwise, this will
be passed to `ray.init()` to connect to a running cluster. This may
also be specified using the `RAY_ADDRESS` environment variable.
"""
def __init__(self,
processes=None,
initializer=None,
initargs=None,
maxtasksperchild=None,
ray_address=None):
self._closed = False
self._initializer = initializer
self._initargs = initargs
self._maxtasksperchild = maxtasksperchild or -1
self._actor_deletion_ids = []
processes = self._init_ray(processes, ray_address)
self._start_actor_pool(processes)
def _init_ray(self, processes=None, ray_address=None):
# Initialize ray. If ray is already initialized, we do nothing.
# Else, the priority is:
# ray_address argument > RAY_ADDRESS > start new local cluster.
if not ray.is_initialized():
if ray_address is None and RAY_ADDRESS_ENV in os.environ:
ray_address = os.environ[RAY_ADDRESS_ENV]
# Cluster mode.
if ray_address is not None:
print("Connecting to ray cluster at address='{}'".format(
ray_address))
ray.init(address=ray_address)
# Local mode.
else:
print("Starting local ray cluster")
ray.init(num_cpus=processes)
ray_cpus = int(ray.state.cluster_resources()["CPU"])
if processes is None:
processes = ray_cpus
if processes <= 0:
raise ValueError("Processes in the pool must be >0.")
if ray_cpus < processes:
raise ValueError("Tried to start a pool with {} processes on an "
"existing ray cluster, but there are only {} "
"CPUs in the ray cluster.".format(
processes, ray_cpus))
return processes
def _start_actor_pool(self, processes):
self._actor_pool = [self._new_actor_entry() for _ in range(processes)]
ray.get([actor.ping.remote() for actor, _ in self._actor_pool])
def _wait_for_stopping_actors(self, timeout=None):
if len(self._actor_deletion_ids) == 0:
return
if timeout is not None:
timeout = float(timeout)
_, deleting = ray.wait(
self._actor_deletion_ids,
num_returns=len(self._actor_deletion_ids),
timeout=timeout)
self._actor_deletion_ids = deleting
def _stop_actor(self, actor):
# Check and clean up any outstanding IDs corresponding to deletions.
self._wait_for_stopping_actors(timeout=0.0)
# The deletion task will block until the actor has finished executing
# all pending tasks.
self._actor_deletion_ids.append(actor.__ray_terminate__.remote())
def _new_actor_entry(self):
# NOTE(edoakes): The initializer function can't currently be used to
# modify the global namespace (e.g., import packages or set globals)
# due to a limitation in cloudpickle.
return (PoolActor.remote(self._initializer, self._initargs), 0)
def _random_actor_index(self):
return random.randrange(len(self._actor_pool))
# Batch should be a list of tuples: (args, kwargs).
def _run_batch(self, actor_index, func, batch):
actor, count = self._actor_pool[actor_index]
object_id = actor.run_batch.remote(func, batch)
count += 1
assert self._maxtasksperchild == -1 or count <= self._maxtasksperchild
if count == self._maxtasksperchild:
self._stop_actor(actor)
actor, count = self._new_actor_entry()
self._actor_pool[actor_index] = (actor, count)
return object_id
def apply(self, func, args=None, kwargs=None):
"""Run the given function on a random actor process and return the
result synchronously.
Args:
func: function to run.
args: optional arguments to the function.
kwargs: optional keyword arguments to the function.
Returns:
The result.
"""
return self.apply_async(func, args, kwargs).get()
def apply_async(self,
func,
args=None,
kwargs=None,
callback=None,
error_callback=None):
"""Run the given function on a random actor process and return an
asynchronous interface to the result.
Args:
func: function to run.
args: optional arguments to the function.
kwargs: optional keyword arguments to the function.
callback: callback to be executed on the result once it is finished
if it succeeds.
error_callback: callback to be executed the result once it is
finished if the task errors. The exception raised by the
task will be passed as the only argument to the callback.
Returns:
AsyncResult containing the result.
"""
self._check_running()
object_id = self._run_batch(self._random_actor_index(), func,
[(args, kwargs)])
return AsyncResult(
[object_id], callback, error_callback, single_result=True)
def _calculate_chunksize(self, iterable):
chunksize, extra = divmod(len(iterable), len(self._actor_pool) * 4)
if extra:
chunksize += 1
return chunksize
def _submit_chunk(self,
func,
iterator,
chunksize,
actor_index,
unpack_args=False):
chunk = []
while len(chunk) < chunksize:
try:
args = next(iterator)
if not unpack_args:
args = (args, )
chunk.append((args, {}))
except StopIteration:
break
# Nothing to submit. The caller should prevent this.
assert len(chunk) > 0
return self._run_batch(actor_index, func, chunk)
def _chunk_and_run(self, func, iterable, chunksize=None,
unpack_args=False):
if not hasattr(iterable, "__len__"):
iterable = [iterable]
if chunksize is None:
chunksize = self._calculate_chunksize(iterable)
iterator = iter(iterable)
chunk_object_ids = []
while len(chunk_object_ids) * chunksize < len(iterable):
actor_index = len(chunk_object_ids) % len(self._actor_pool)
chunk_object_ids.append(
self._submit_chunk(
func,
iterator,
chunksize,
actor_index,
unpack_args=unpack_args))
return chunk_object_ids
def _map_async(self,
func,
iterable,
chunksize=None,
unpack_args=False,
callback=None,
error_callback=None):
self._check_running()
object_ids = self._chunk_and_run(
func, iterable, chunksize=chunksize, unpack_args=unpack_args)
return AsyncResult(object_ids, callback, error_callback)
def map(self, func, iterable, chunksize=None):
"""Run the given function on each element in the iterable round-robin
on the actor processes and return the results synchronously.
Args:
func: function to run.
iterable: iterable of objects to be passed as the sole argument to
func.
chunksize: number of tasks to submit as a batch to each actor
process. If unspecified, a suitable chunksize will be chosen.
Returns:
A list of results.
"""
return self._map_async(
func, iterable, chunksize=chunksize, unpack_args=False).get()
def map_async(self,
func,
iterable,
chunksize=None,
callback=None,
error_callback=None):
"""Run the given function on each element in the iterable round-robin
on the actor processes and return an asynchronous interface to the
results.
Args:
func: function to run.
iterable: iterable of objects to be passed as the only argument to
func.
chunksize: number of tasks to submit as a batch to each actor
process. If unspecified, a suitable chunksize will be chosen.
callback: callback to be executed on each successful result once it
is finished.
error_callback: callback to be executed on each errored result once
it is finished. The exception raised by the task will be passed
as the only argument to the callback.
Returns:
AsyncResult
"""
return self._map_async(
func,
iterable,
chunksize=chunksize,
unpack_args=False,
callback=callback,
error_callback=error_callback)
def starmap(self, func, iterable, chunksize=None):
"""Same as `map`, but unpacks each element of the iterable as the
arguments to func like: [func(*args) for args in iterable].
"""
return self._map_async(
func, iterable, chunksize=chunksize, unpack_args=True).get()
def starmap_async(self, func, iterable, callback=None,
error_callback=None):
"""Same as `map_async`, but unpacks each element of the iterable as the
arguments to func like: [func(*args) for args in iterable].
"""
return self._map_async(
func,
iterable,
unpack_args=True,
callback=callback,
error_callback=error_callback)
def imap(self, func, iterable, chunksize=1):
"""Same as `map`, but only submits one batch of tasks to each actor
process at a time.
This can be useful if the iterable of arguments is very large or each
task's arguments consumes a large amount of resources.
The results are returned in the order corresponding to their arguments
in the iterable.
Returns:
OrderedIMapIterator
"""
self._check_running()
return OrderedIMapIterator(self, func, iterable, chunksize=chunksize)
def imap_unordered(self, func, iterable, chunksize=1):
"""Same as `map`, but only submits one batch of tasks to each actor
process at a time.
This can be useful if the iterable of arguments is very large or each
task's arguments consumes a large amount of resources.
The results are returned in the order that they finish.
Returns:
UnorderedIMapIterator
"""
self._check_running()
return UnorderedIMapIterator(self, func, iterable, chunksize=chunksize)
def _check_running(self):
if self._closed:
raise ValueError("Pool not running")
def __enter__(self):
self._check_running()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
def close(self):
"""Close the pool.
Prevents any more tasks from being submitted on the pool but allows
outstanding work to finish.
"""
for actor, _ in self._actor_pool:
self._stop_actor(actor)
self._closed = True
def terminate(self):
"""Close the pool.
Prevents any more tasks from being submitted on the pool and stops
outstanding work.
"""
if not self._closed:
self.close()
for actor, _ in self._actor_pool:
actor.__ray_kill__()
def join(self):
"""Wait for the actors in a closed pool to exit.
If the pool was closed using `close`, this will return once all
outstanding work is completed.
If the pool was closed using `terminate`, this will return quickly.
"""
if not self._closed:
raise ValueError("Pool is still running")
self._wait_for_stopping_actors()
+8
View File
@@ -243,6 +243,14 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_multiprocessing",
size = "medium",
srcs = ["test_multiprocessing.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_multi_node_2",
size = "medium",
+490
View File
@@ -0,0 +1,490 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import pytest
import tempfile
import time
import random
import subprocess
from collections import defaultdict
import queue
import ray
from ray.experimental.multiprocessing import Pool, TimeoutError
@pytest.fixture
def cleanup_only():
yield None
ray.shutdown()
subprocess.check_output(["ray", "stop"])
if "RAY_ADDRESS" in os.environ:
del os.environ["RAY_ADDRESS"]
@pytest.fixture
def pool():
pool = Pool(processes=1)
yield pool
pool.terminate()
ray.shutdown()
@pytest.fixture
def pool_4_processes():
pool = Pool(processes=4)
yield pool
pool.terminate()
ray.shutdown()
def test_initialize_ray(cleanup_only):
def getpid(args):
return os.getpid()
def check_pool_size(pool, size):
args = [tuple() for _ in range(size)]
assert len(set(pool.map(getpid, args))) == size
# Check that starting a pool starts ray if not initialized.
pool = Pool(processes=2)
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == 2
check_pool_size(pool, 2)
ray.shutdown()
# Check that starting a pool doesn't affect ray if there is a local
# ray cluster running.
ray.init(num_cpus=3)
assert ray.is_initialized()
pool = Pool(processes=2)
assert int(ray.state.cluster_resources()["CPU"]) == 3
check_pool_size(pool, 2)
ray.shutdown()
# Check that trying to start a pool on an existing ray cluster throws an
# error if there aren't enough CPUs for the number of processes.
ray.init(num_cpus=1)
assert ray.is_initialized()
with pytest.raises(ValueError):
Pool(processes=2)
assert int(ray.state.cluster_resources()["CPU"]) == 1
ray.shutdown()
# Use different numbers of CPUs to distinguish between starting a local
# ray cluster and connecting to an existing one.
init_cpus = 2
start_cpus = 3
# Start a ray cluster in the background.
subprocess.check_output(
["ray", "start", "--head", "--num-cpus={}".format(start_cpus)])
# Check that starting a pool still starts ray if RAY_ADDRESS not set.
pool = Pool(processes=init_cpus)
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == init_cpus
check_pool_size(pool, init_cpus)
ray.shutdown()
# Check that starting a pool connects to a running ray cluster if
# ray_address is passed in.
pool = Pool(ray_address="auto")
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == start_cpus
check_pool_size(pool, start_cpus)
ray.shutdown()
# Set RAY_ADDRESS, so pools should connect to the running ray cluster.
os.environ["RAY_ADDRESS"] = "auto"
# Check that starting a pool connects to a running ray cluster if
# RAY_ADDRESS is set.
pool = Pool()
assert ray.is_initialized()
assert int(ray.state.cluster_resources()["CPU"]) == start_cpus
check_pool_size(pool, start_cpus)
ray.shutdown()
# Check that trying to start a pool on an existing ray cluster throws an
# error if there aren't enough CPUs for the number of processes.
with pytest.raises(Exception):
Pool(processes=start_cpus + 1)
assert int(ray.state.cluster_resources()["CPU"]) == start_cpus
ray.shutdown()
# Clean up the background ray cluster.
subprocess.check_output(["ray", "stop"])
def test_initializer(cleanup_only):
def init(dirname):
with open(os.path.join(dirname, str(os.getpid())), "w") as f:
print("hello", file=f)
with tempfile.TemporaryDirectory() as dirname:
num_processes = 4
pool = Pool(
processes=num_processes, initializer=init, initargs=(dirname, ))
assert len(os.listdir(dirname)) == 4
pool.terminate()
def test_close(pool_4_processes):
def f(object_id):
return ray.get(object_id)
object_id = ray.ObjectID.from_random()
result = pool_4_processes.map_async(f, [object_id for _ in range(4)])
assert not result.ready()
pool_4_processes.close()
assert not result.ready()
# Fulfill the object_id, causing the head of line tasks to finish.
ray.worker.global_worker.put_object("hello", object_id=object_id)
pool_4_processes.join()
# close() shouldn't interrupt pending tasks, so check that they succeeded.
assert result.ready()
assert result.successful()
assert result.get() == ["hello"] * 4
def test_terminate(pool_4_processes):
def f(object_id):
return ray.get(object_id)
object_id = ray.ObjectID.from_random()
result = pool_4_processes.map_async(f, [object_id for _ in range(4)])
assert not result.ready()
pool_4_processes.terminate()
# terminate() should interrupt pending tasks, so check that join() returns
# even though the tasks should be blocked forever.
pool_4_processes.join()
result.wait(timeout=10)
assert result.ready()
assert not result.successful()
def test_apply(pool):
def f(arg1, arg2, kwarg1=None, kwarg2=None):
assert arg1 == 1
assert arg2 == 2
assert kwarg1 is None
assert kwarg2 == 3
return 1
assert pool.apply(f, (1, 2), {"kwarg2": 3}) == 1
with pytest.raises(AssertionError):
pool.apply(f, (
2,
2,
), {"kwarg2": 3})
with pytest.raises(Exception):
pool.apply(f, (1, ))
with pytest.raises(Exception):
pool.apply(f, (1, 2), {"kwarg1": 3})
def test_apply_async(pool):
def f(arg1, arg2, kwarg1=None, kwarg2=None):
assert arg1 == 1
assert arg2 == 2
assert kwarg1 is None
assert kwarg2 == 3
return 1
assert pool.apply_async(f, (1, 2), {"kwarg2": 3}).get() == 1
with pytest.raises(AssertionError):
pool.apply_async(f, (
2,
2,
), {
"kwarg2": 3
}).get()
with pytest.raises(Exception):
pool.apply_async(f, (1, )).get()
with pytest.raises(Exception):
pool.apply_async(f, (1, 2), {"kwarg1": 3}).get()
# Won't return until the input ObjectID is fulfilled.
def ten_over(input):
return 10 / ray.get(input[0])
# Generate a random ObjectID that will be fulfilled later.
object_id = ray.ObjectID.from_random()
result = pool.apply_async(ten_over, ([object_id], ))
result.wait(timeout=0.01)
assert not result.ready()
with pytest.raises(TimeoutError):
result.get(timeout=0.01)
# Fulfill the ObjectID.
ray.worker.global_worker.put_object(10, object_id=object_id)
result.wait(timeout=10)
assert result.ready()
assert result.successful()
assert result.get() == 1
# Generate a random ObjectID that will be fulfilled later.
object_id = ray.ObjectID.from_random()
result = pool.apply_async(ten_over, ([object_id], ))
with pytest.raises(ValueError, match="not ready"):
result.successful()
# Fulfill the ObjectID with 0, causing the task to fail (divide by zero).
ray.worker.global_worker.put_object(0, object_id=object_id)
result.wait(timeout=10)
assert result.ready()
assert not result.successful()
with pytest.raises(ZeroDivisionError):
result.get()
def test_map(pool_4_processes):
def f(index):
return index, os.getpid()
results = pool_4_processes.map(f, range(1000))
assert len(results) == 1000
pid_counts = defaultdict(int)
for i, (index, pid) in enumerate(results):
assert i == index
pid_counts[pid] += 1
# Check that the functions are spread somewhat evenly.
for count in pid_counts.values():
assert count > 100
def bad_func(args):
raise Exception("test_map failure")
with pytest.raises(Exception, match="test_map failure"):
pool_4_processes.map(bad_func, range(100))
def test_map_async(pool_4_processes):
def f(args):
index = args[0]
ray.get(args[1])
return index, os.getpid()
# Generate a random ObjectID that will be fulfilled later.
object_id = ray.ObjectID.from_random()
async_result = pool_4_processes.map_async(
f, [(i, object_id) for i in range(1000)])
assert not async_result.ready()
with pytest.raises(TimeoutError):
async_result.get(timeout=0.01)
async_result.wait(timeout=0.01)
# Fulfill the object ID, finishing the tasks.
ray.worker.global_worker.put_object(0, object_id=object_id)
async_result.wait(timeout=10)
assert async_result.ready()
assert async_result.successful()
results = async_result.get()
assert len(results) == 1000
pid_counts = defaultdict(int)
for i, (index, pid) in enumerate(results):
assert i == index
pid_counts[pid] += 1
# Check that the functions are spread somewhat evenly.
for count in pid_counts.values():
assert count > 100
def bad_func(index):
if index == 50:
raise Exception("test_map_async failure")
async_result = pool_4_processes.map_async(bad_func, range(100))
async_result.wait(10)
assert async_result.ready()
assert not async_result.successful()
with pytest.raises(Exception, match="test_map_async failure"):
async_result.get()
def test_starmap(pool):
def f(*args):
return args
args = [tuple(range(i)) for i in range(100)]
assert pool.starmap(f, args) == args
def test_callbacks(pool_4_processes):
def f(args):
time.sleep(0.1 * random.random())
index = args[0]
err_indices = args[1]
if index in err_indices:
raise Exception("intentional failure")
return index
callback_queue = queue.Queue()
def callback(result):
callback_queue.put(result)
def error_callback(error):
callback_queue.put(error)
# Will not error, check that callback is called.
result = pool_4_processes.apply_async(f, ((0, [1]), ), callback=callback)
assert callback_queue.get() == 0
result.get()
# Will error, check that error_callback is called.
result = pool_4_processes.apply_async(
f, ((0, [0]), ), error_callback=error_callback)
assert isinstance(callback_queue.get(), Exception)
with pytest.raises(Exception, match="intentional failure"):
result.get()
# Test callbacks for map_async.
error_indices = [2, 50, 98]
result = pool_4_processes.map_async(
f, [(index, error_indices) for index in range(100)],
callback=callback,
error_callback=error_callback)
callback_results = []
while len(callback_results) < 100:
callback_results.append(callback_queue.get())
assert result.ready()
assert not result.successful()
# Check that callbacks were called on every result, error or not.
assert len(callback_results) == 100
# Check that callbacks were processed in the order that the tasks finished.
# NOTE: this could be flaky if the calls happened to finish in order due
# to the random sleeps, but it's very unlikely.
assert not all(i in error_indices or i == result
for i, result in enumerate(callback_results))
# Check that the correct callbacks were called on errors/successes.
assert all(index not in callback_results for index in error_indices)
assert [isinstance(result, Exception)
for result in callback_results].count(True) == len(error_indices)
def test_imap(pool_4_processes):
def f(args):
time.sleep(0.1 * random.random())
index = args[0]
err_indices = args[1]
if index in err_indices:
raise Exception("intentional failure")
return index
error_indices = [2, 50, 98]
result_iter = pool_4_processes.imap(
f, [(index, error_indices) for index in range(100)], chunksize=11)
for i in range(100):
result = result_iter.next()
if i in error_indices:
assert isinstance(result, Exception)
else:
assert result == i
with pytest.raises(StopIteration):
result_iter.next()
def test_imap_unordered(pool_4_processes):
def f(args):
time.sleep(0.1 * random.random())
index = args[0]
err_indices = args[1]
if index in err_indices:
raise Exception("intentional failure")
return index
error_indices = [2, 50, 98]
in_order = []
num_errors = 0
result_iter = pool_4_processes.imap_unordered(
f, [(index, error_indices) for index in range(100)], chunksize=11)
for i in range(100):
result = result_iter.next()
if isinstance(result, Exception):
in_order.append(True)
num_errors += 1
else:
in_order.append(result == i)
# Check that the results didn't come back all in order.
# NOTE: this could be flaky if the calls happened to finish in order due
# to the random sleeps, but it's very unlikely.
assert not all(in_order)
assert num_errors == len(error_indices)
with pytest.raises(StopIteration):
result_iter.next()
def test_imap_timeout(pool_4_processes):
def f(args):
time.sleep(0.1 * random.random())
index = args[0]
wait_index = args[1]
object_id = args[2]
if index == wait_index:
ray.get(object_id)
return index
wait_index = 23
object_id = ray.ObjectID.from_random()
result_iter = pool_4_processes.imap(
f, [(index, wait_index, object_id) for index in range(100)])
for i in range(100):
if i == wait_index:
with pytest.raises(TimeoutError):
result = result_iter.next(timeout=0.1)
ray.worker.global_worker.put_object(None, object_id=object_id)
result = result_iter.next()
assert result == i
with pytest.raises(StopIteration):
result_iter.next()
wait_index = 23
object_id = ray.ObjectID.from_random()
result_iter = pool_4_processes.imap_unordered(
f, [(index, wait_index, object_id) for index in range(100)],
chunksize=11)
in_order = []
for i in range(100):
try:
result = result_iter.next(timeout=1)
except TimeoutError:
ray.worker.global_worker.put_object(None, object_id=object_id)
result = result_iter.next()
in_order.append(result == i)
# Check that the results didn't come back all in order.
# NOTE: this could be flaky if the calls happened to finish in order due
# to the random sleeps, but it's very unlikely.
assert not all(in_order)
with pytest.raises(StopIteration):
result_iter.next()
def test_maxtasksperchild(cleanup_only):
def f(args):
return os.getpid()
pool = Pool(5, maxtasksperchild=1)
assert len(set(pool.map(f, range(20)))) == 20