diff --git a/DEPENDS.txt b/DEPENDS.txt index c7f91df5..e66d53cc 100644 --- a/DEPENDS.txt +++ b/DEPENDS.txt @@ -16,6 +16,7 @@ Runtime requirements * `NetworkX `__ * `Pillow `__ (or `PIL `__) +* `dask array `__ Known build errors ------------------ diff --git a/doc/release/release_dev.txt b/doc/release/release_dev.txt index 1123eb77..039274f6 100644 --- a/doc/release/release_dev.txt +++ b/doc/release/release_dev.txt @@ -15,6 +15,7 @@ http://scikit-image.org New Features ------------ +- ``skimage.util.apply_parallel`` (#1493) diff --git a/skimage/util/__init__.py b/skimage/util/__init__.py index 4739bd5b..2500469b 100644 --- a/skimage/util/__init__.py +++ b/skimage/util/__init__.py @@ -2,6 +2,7 @@ from .dtype import (img_as_float, img_as_int, img_as_uint, img_as_ubyte, img_as_bool, dtype_limits) from .shape import view_as_blocks, view_as_windows from .noise import random_noise +from .apply_parallel import apply_parallel from .arraypad import pad, crop from ._regular_grid import regular_grid @@ -20,4 +21,5 @@ __all__ = ['img_as_float', 'crop', 'random_noise', 'regular_grid', + 'apply_parallel', 'unique_rows'] diff --git a/skimage/util/apply_parallel.py b/skimage/util/apply_parallel.py new file mode 100644 index 00000000..fd50ba56 --- /dev/null +++ b/skimage/util/apply_parallel.py @@ -0,0 +1,96 @@ +from math import ceil +from multiprocessing import cpu_count + +__all__ = ['apply_parallel'] + + +def _get_chunks(shape, ncpu): + """ + Split the array into equal sized chunks based on the number of + available processors. The last chunk in each dimension absorbs the + remainder array elements if the number of cpus does not divide evenly into + the number of array elements. + + >>> _get_chunks((4, 4), 4) + ((2, 2), (2, 2)) + >>> _get_chunks((4, 4), 2) + ((2, 2), (4,)) + >>> _get_chunks((5, 5), 2) + ((2, 3), (5,)) + >>> _get_chunks((2, 4), 2) + ((1, 1), (4,)) + """ + chunks = [] + nchunks_per_dim = int(ceil(ncpu ** (1./len(shape)))) + + used_chunks = 1 + for i in shape: + if used_chunks < ncpu: + regular_chunk = i // nchunks_per_dim + remainder_chunk = regular_chunk + (i % nchunks_per_dim) + + if regular_chunk == 0: + chunk_lens = (remainder_chunk,) + else: + chunk_lens = ((regular_chunk,) * (nchunks_per_dim - 1) + + (remainder_chunk,)) + else: + chunk_lens = (i,) + + chunks.append(chunk_lens) + used_chunks *= nchunks_per_dim + return tuple(chunks) + + +def apply_parallel(function, array, chunks=None, depth=0, mode=None, + extra_arguments=(), extra_keywords={}): + """Map a function in parallel across an array. + + Split an array into possibly overlapping chunks of a given depth and + boundary type, call the given function in parallel on the chunks, combine + the chunks and return the resulting array. + + Parameters + ---------- + function : function + Function to be mapped which takes an array as an argument. + array : numpy array + Array which the function will be applied to. + chunks : int, tuple, or tuple of tuples, optional + A single integer is interpreted as the length of one side of a square + chunk that should be tiled across the array. One tuple of length + ``array.ndim`` represents the shape of a chunk, and it is tiled across + the array. A list of tuples of length ``ndim``, where each sub-tuple + is a sequence of chunk sizes along the corresponding dimension. If + None, the array is broken up into chunks based on the number of + available cpus. More information about chunks is in the documentation + `here `_. + depth : int, optional + Integer equal to the depth of the added boundary cells. Defaults to + zero. + mode : 'reflect', 'periodic', 'wrap', 'nearest', optional + type of external boundary padding + extra_arguments : tuple, optional + Tuple of arguments to be passed to the function. + extra_keywords : dictionary, optional + Dictionary of keyword arguments to be passed to the function. + + """ + import dask.array as da + + if chunks is None: + shape = array.shape + try: + ncpu = cpu_count() + except NotImplementedError: + ncpu = 4 + chunks = _get_chunks(shape, ncpu) + + if mode == 'wrap': + mode = 'periodic' + + def wrapped_func(arr): + return function(arr, *extra_arguments, **extra_keywords) + + darr = da.from_array(array, chunks=chunks) + return darr.map_overlap(wrapped_func, depth, boundary=mode).compute() diff --git a/skimage/util/tests/test_apply_parallel.py b/skimage/util/tests/test_apply_parallel.py new file mode 100644 index 00000000..a43f4338 --- /dev/null +++ b/skimage/util/tests/test_apply_parallel.py @@ -0,0 +1,61 @@ +from __future__ import absolute_import + +import numpy as np +from numpy.testing import assert_array_almost_equal + +from skimage.filters import threshold_adaptive, gaussian_filter +from skimage.util.apply_parallel import apply_parallel + + +def test_apply_parallel(): + # data + a = np.arange(144).reshape(12, 12).astype(float) + + # apply the filter + expected1 = threshold_adaptive(a, 3) + result1 = apply_parallel(threshold_adaptive, a, chunks=(6, 6), depth=5, + extra_arguments=(3,), + extra_keywords={'mode': 'reflect'}) + + assert_array_almost_equal(result1, expected1) + + def wrapped_gauss(arr): + return gaussian_filter(arr, 1, mode='reflect') + + expected2 = gaussian_filter(a, 1, mode='reflect') + result2 = apply_parallel(wrapped_gauss, a, chunks=(6, 6), depth=5) + + assert_array_almost_equal(result2, expected2) + + +def test_no_chunks(): + a = np.ones(1 * 4 * 8 * 9).reshape(1, 4, 8, 9) + + def add_42(arr): + return arr + 42 + + expected = add_42(a) + result = apply_parallel(add_42, a) + + assert_array_almost_equal(result, expected) + + +def test_apply_parallel_wrap(): + def wrapped(arr): + return gaussian_filter(arr, 1, mode='wrap') + a = np.arange(144).reshape(12, 12).astype(float) + expected = gaussian_filter(a, 1, mode='wrap') + result = apply_parallel(wrapped, a, chunks=(6, 6), depth=5, mode='wrap') + + assert_array_almost_equal(result, expected) + + +def test_apply_parallel_nearest(): + def wrapped(arr): + return gaussian_filter(arr, 1, mode='nearest') + a = np.arange(144).reshape(12, 12).astype(float) + expected = gaussian_filter(a, 1, mode='nearest') + result = apply_parallel(wrapped, a, chunks=(6, 6), depth={0: 5, 1: 5}, + mode='nearest') + + assert_array_almost_equal(result, expected) diff --git a/tools/appveyor/requirements.txt b/tools/appveyor/requirements.txt index 9cc4022f..58a4ecfa 100644 --- a/tools/appveyor/requirements.txt +++ b/tools/appveyor/requirements.txt @@ -13,5 +13,6 @@ scipy==0.14.0 cython==0.20.2 matplotlib==1.4.2 pillow==2.6.1 +dask[array]>=0.5.0 wheel nose diff --git a/tools/travis_before_install.sh b/tools/travis_before_install.sh index f98e48de..55e55c03 100755 --- a/tools/travis_before_install.sh +++ b/tools/travis_before_install.sh @@ -54,6 +54,8 @@ fi retry pip install $WHEELHOUSE -r requirements.txt +pip install 'dask[array]>=0.5.0' + # clean up disk space sudo apt-get clean sudo rm -rf /tmp/*