Merge pull request #1493 from cowlicks/chunk-map

Add parallel function mapping
This commit is contained in:
Stefan van der Walt
2015-05-21 14:09:07 -07:00
7 changed files with 164 additions and 0 deletions
+1
View File
@@ -16,6 +16,7 @@ Runtime requirements
* `NetworkX <https://networkx.github.io>`__
* `Pillow <https://pypi.python.org/pypi/Pillow>`__
(or `PIL <http://www.pythonware.com/products/pil/>`__)
* `dask array <http://dask.pydata.org/en/latest/>`__
Known build errors
------------------
+1
View File
@@ -15,6 +15,7 @@ http://scikit-image.org
New Features
------------
- ``skimage.util.apply_parallel`` (#1493)
+2
View File
@@ -2,6 +2,7 @@ from .dtype import (img_as_float, img_as_int, img_as_uint, img_as_ubyte,
img_as_bool, dtype_limits)
from .shape import view_as_blocks, view_as_windows
from .noise import random_noise
from .apply_parallel import apply_parallel
from .arraypad import pad, crop
from ._regular_grid import regular_grid
@@ -20,4 +21,5 @@ __all__ = ['img_as_float',
'crop',
'random_noise',
'regular_grid',
'apply_parallel',
'unique_rows']
+96
View File
@@ -0,0 +1,96 @@
from math import ceil
from multiprocessing import cpu_count
__all__ = ['apply_parallel']
def _get_chunks(shape, ncpu):
"""
Split the array into equal sized chunks based on the number of
available processors. The last chunk in each dimension absorbs the
remainder array elements if the number of cpus does not divide evenly into
the number of array elements.
>>> _get_chunks((4, 4), 4)
((2, 2), (2, 2))
>>> _get_chunks((4, 4), 2)
((2, 2), (4,))
>>> _get_chunks((5, 5), 2)
((2, 3), (5,))
>>> _get_chunks((2, 4), 2)
((1, 1), (4,))
"""
chunks = []
nchunks_per_dim = int(ceil(ncpu ** (1./len(shape))))
used_chunks = 1
for i in shape:
if used_chunks < ncpu:
regular_chunk = i // nchunks_per_dim
remainder_chunk = regular_chunk + (i % nchunks_per_dim)
if regular_chunk == 0:
chunk_lens = (remainder_chunk,)
else:
chunk_lens = ((regular_chunk,) * (nchunks_per_dim - 1) +
(remainder_chunk,))
else:
chunk_lens = (i,)
chunks.append(chunk_lens)
used_chunks *= nchunks_per_dim
return tuple(chunks)
def apply_parallel(function, array, chunks=None, depth=0, mode=None,
extra_arguments=(), extra_keywords={}):
"""Map a function in parallel across an array.
Split an array into possibly overlapping chunks of a given depth and
boundary type, call the given function in parallel on the chunks, combine
the chunks and return the resulting array.
Parameters
----------
function : function
Function to be mapped which takes an array as an argument.
array : numpy array
Array which the function will be applied to.
chunks : int, tuple, or tuple of tuples, optional
A single integer is interpreted as the length of one side of a square
chunk that should be tiled across the array. One tuple of length
``array.ndim`` represents the shape of a chunk, and it is tiled across
the array. A list of tuples of length ``ndim``, where each sub-tuple
is a sequence of chunk sizes along the corresponding dimension. If
None, the array is broken up into chunks based on the number of
available cpus. More information about chunks is in the documentation
`here <https://dask.pydata.org/en/latest/array-design.html>`_.
depth : int, optional
Integer equal to the depth of the added boundary cells. Defaults to
zero.
mode : 'reflect', 'periodic', 'wrap', 'nearest', optional
type of external boundary padding
extra_arguments : tuple, optional
Tuple of arguments to be passed to the function.
extra_keywords : dictionary, optional
Dictionary of keyword arguments to be passed to the function.
"""
import dask.array as da
if chunks is None:
shape = array.shape
try:
ncpu = cpu_count()
except NotImplementedError:
ncpu = 4
chunks = _get_chunks(shape, ncpu)
if mode == 'wrap':
mode = 'periodic'
def wrapped_func(arr):
return function(arr, *extra_arguments, **extra_keywords)
darr = da.from_array(array, chunks=chunks)
return darr.map_overlap(wrapped_func, depth, boundary=mode).compute()
+61
View File
@@ -0,0 +1,61 @@
from __future__ import absolute_import
import numpy as np
from numpy.testing import assert_array_almost_equal
from skimage.filters import threshold_adaptive, gaussian_filter
from skimage.util.apply_parallel import apply_parallel
def test_apply_parallel():
# data
a = np.arange(144).reshape(12, 12).astype(float)
# apply the filter
expected1 = threshold_adaptive(a, 3)
result1 = apply_parallel(threshold_adaptive, a, chunks=(6, 6), depth=5,
extra_arguments=(3,),
extra_keywords={'mode': 'reflect'})
assert_array_almost_equal(result1, expected1)
def wrapped_gauss(arr):
return gaussian_filter(arr, 1, mode='reflect')
expected2 = gaussian_filter(a, 1, mode='reflect')
result2 = apply_parallel(wrapped_gauss, a, chunks=(6, 6), depth=5)
assert_array_almost_equal(result2, expected2)
def test_no_chunks():
a = np.ones(1 * 4 * 8 * 9).reshape(1, 4, 8, 9)
def add_42(arr):
return arr + 42
expected = add_42(a)
result = apply_parallel(add_42, a)
assert_array_almost_equal(result, expected)
def test_apply_parallel_wrap():
def wrapped(arr):
return gaussian_filter(arr, 1, mode='wrap')
a = np.arange(144).reshape(12, 12).astype(float)
expected = gaussian_filter(a, 1, mode='wrap')
result = apply_parallel(wrapped, a, chunks=(6, 6), depth=5, mode='wrap')
assert_array_almost_equal(result, expected)
def test_apply_parallel_nearest():
def wrapped(arr):
return gaussian_filter(arr, 1, mode='nearest')
a = np.arange(144).reshape(12, 12).astype(float)
expected = gaussian_filter(a, 1, mode='nearest')
result = apply_parallel(wrapped, a, chunks=(6, 6), depth={0: 5, 1: 5},
mode='nearest')
assert_array_almost_equal(result, expected)
+1
View File
@@ -13,5 +13,6 @@ scipy==0.14.0
cython==0.20.2
matplotlib==1.4.2
pillow==2.6.1
dask[array]>=0.5.0
wheel
nose
+2
View File
@@ -54,6 +54,8 @@ fi
retry pip install $WHEELHOUSE -r requirements.txt
pip install 'dask[array]>=0.5.0'
# clean up disk space
sudo apt-get clean
sudo rm -rf /tmp/*