Merge pull request #843 from tonysyu/refactor/io

Refactor `io` package
This commit is contained in:
Johannes Schönberger
2014-01-13 03:53:44 -08:00
14 changed files with 498 additions and 326 deletions
+28 -46
View File
@@ -1,49 +1,34 @@
__doc__ = """Utilities to read and write images in various formats.
"""Utilities to read and write images in various formats.
The following plug-ins are available:
"""
from ._plugins import use as use_plugin
from ._plugins import available as plugins
from ._plugins import info as plugin_info
from ._plugins import configuration as plugin_order
from ._plugins import reset_plugins as _reset_plugins
from .manage_plugins import *
from .sift import *
from .collection import *
from ._io import *
from ._image_stack import *
from .video import *
available_plugins = plugins()
reset_plugins()
WRAP_LEN = 73
def _load_preferred_plugins():
# Load preferred plugin for each io function.
io_funcs = ['imsave', 'imshow', 'imread_collection', 'imread']
preferred_plugins = ['matplotlib', 'pil', 'qt', 'freeimage', 'null']
for func in io_funcs:
for plugin in preferred_plugins:
if plugin not in available_plugins:
continue
try:
use_plugin(plugin, kind=func)
break
except (ImportError, RuntimeError, OSError):
pass
def _separator(char, lengths):
return [char * separator_length for separator_length in lengths]
# Use PIL as the default imread plugin, since matplotlib (1.2.x)
# is buggy (flips PNGs around, returns bytes as floats, etc.)
try:
use_plugin('pil', 'imread')
except ImportError:
pass
def reset_plugins():
_reset_plugins()
_load_preferred_plugins()
def _format_plugin_info_table(info_table, column_lengths):
"""Add separators and column titles to plugin info table."""
info_table.insert(0, _separator('=', column_lengths))
info_table.insert(1, ('Plugin', 'Description'))
info_table.insert(2, _separator('-', column_lengths))
info_table.append(_separator('-', column_lengths))
def _update_doc(doc):
"""Add a list of plugins to the module docstring, formatted as
@@ -52,27 +37,24 @@ def _update_doc(doc):
"""
from textwrap import wrap
info = [(p, plugin_info(p)) for p in plugins() if not p == 'test']
col_1_len = max([len(n) for (n, _) in info])
wrap_len = 73
col_2_len = wrap_len - 1 - col_1_len
info_table = [(p, plugin_info(p).get('description', 'no description'))
for p in available_plugins if not p == 'test']
# Insert table header
info.insert(0, ('=' * col_1_len, {'description': '=' * col_2_len}))
info.insert(1, ('Plugin', {'description': 'Description'}))
info.insert(2, ('-' * col_1_len, {'description': '-' * col_2_len}))
info.append(('=' * col_1_len, {'description': '=' * col_2_len}))
name_length = max([len(n) for (n, _) in info_table])
description_length = WRAP_LEN - 1 - name_length
column_lengths = [name_length, description_length]
_format_plugin_info_table(info_table, column_lengths)
for (name, meta_data) in info:
wrapped_descr = wrap(meta_data.get('description', ''),
col_2_len)
doc += "%s %s\n" % (name.ljust(col_1_len),
'\n'.join(wrapped_descr))
for (name, plugin_description) in info_table:
description_lines = wrap(plugin_description, description_length)
name_column = [name]
name_column.extend(['' for _ in range(len(description_lines) - 1)])
for name, description in zip(name_column, description_lines):
doc += "%s %s\n" % (name.ljust(name_length), description)
doc = doc.strip()
return doc
__doc__ = _update_doc(__doc__)
reset_plugins()
__doc__ = _update_doc(__doc__)
+35
View File
@@ -0,0 +1,35 @@
import numpy as np
__all__ = ['image_stack', 'push', 'pop']
# Shared image queue
image_stack = []
def push(img):
"""Push an image onto the shared image stack.
Parameters
----------
img : ndarray
Image to push.
"""
if not isinstance(img, np.ndarray):
raise ValueError("Can only push ndarrays to the image stack.")
image_stack.append(img)
def pop():
"""Pop an image from the shared image stack.
Returns
-------
img : ndarray
Image popped from the stack.
"""
return image_stack.pop()
+4 -58
View File
@@ -1,34 +1,14 @@
__all__ = ['Image', 'imread', 'imread_collection', 'imsave', 'imshow', 'show',
'push', 'pop']
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
import os
import re
import tempfile
from io import BytesIO
import numpy as np
import six
from skimage.io._plugins import call as call_plugin
from skimage.io.manage_plugins import call_plugin
from skimage.color import rgb2grey
from .util import file_or_url_context
# Shared image queue
_image_stack = []
URL_REGEX = re.compile(r'http://|https://|ftp://|file://|file:\\')
def is_url(filename):
"""Return True if string is an http or ftp path."""
return (isinstance(filename, six.string_types) and
URL_REGEX.match(filename) is not None)
__all__ = ['Image', 'imread', 'imread_collection', 'imsave', 'imshow', 'show']
class Image(np.ndarray):
@@ -77,33 +57,6 @@ class Image(np.ndarray):
return return_str
def push(img):
"""Push an image onto the shared image stack.
Parameters
----------
img : ndarray
Image to push.
"""
if not isinstance(img, np.ndarray):
raise ValueError("Can only push ndarrays to the image stack.")
_image_stack.append(img)
def pop():
"""Pop an image from the shared image stack.
Returns
-------
img : ndarray
Image popped from the stack.
"""
return _image_stack.pop()
def imread(fname, as_grey=False, plugin=None, flatten=None,
**plugin_args):
"""Load an image from file.
@@ -140,14 +93,7 @@ def imread(fname, as_grey=False, plugin=None, flatten=None,
if flatten is not None:
as_grey = flatten
if is_url(fname):
_, ext = os.path.splitext(fname)
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as f:
u = urlopen(fname)
f.write(u.read())
img = call_plugin('imread', f.name, plugin=plugin, **plugin_args)
os.remove(f.name)
else:
with file_or_url_context(fname) as fname:
img = call_plugin('imread', fname, plugin=plugin, **plugin_args)
if as_grey and getattr(img, 'ndim', 0) >= 3:
-1
View File
@@ -1 +0,0 @@
from .plugin import *
+1 -1
View File
@@ -1,3 +1,3 @@
[null]
description = Default plugin that does nothing
provides = imshow, imread, _app_show
provides = imshow, imread, imsave, _app_show
+6 -1
View File
@@ -1,4 +1,4 @@
__all__ = ['imshow', 'imread', '_app_show']
__all__ = ['imshow', 'imread', 'imsave', '_app_show']
import warnings
@@ -17,4 +17,9 @@ def imshow(*args, **kwargs):
def imread(*args, **kwargs):
warnings.warn(RuntimeWarning(message))
def imsave(*args, **kwargs):
warnings.warn(RuntimeWarning(message))
_app_show = imshow
@@ -2,25 +2,37 @@
"""
__all__ = ['use', 'available', 'call', 'info', 'configuration', 'reset_plugins']
try:
from configparser import ConfigParser
from configparser import ConfigParser # Python 3
except ImportError:
from ConfigParser import ConfigParser
from ConfigParser import ConfigParser # Python 2
import os.path
from glob import glob
__all__ = ['use_plugin', 'call_plugin', 'plugin_info', 'plugin_order',
'reset_plugins', 'find_available_plugins', 'available_plugins']
# The plugin store will save a list of *loaded* io functions for each io type
# (e.g. 'imread', 'imsave', etc.). Plugins are loaded as requested.
plugin_store = None
plugin_provides = {}
plugin_module_name = {}
plugin_meta_data = {}
preferred_plugins = {
# Default plugins for all types (overridden by specific types below).
'all': ['matplotlib', 'pil', 'qt', 'freeimage', 'null'],
# Use PIL as the default imread plugin, since matplotlib (1.2.x)
# is buggy (flips PNGs around, returns bytes as floats, etc.)
'imread': ['pil'],
}
def reset_plugins():
def _clear_plugins():
"""Clear the plugin state to the default, i.e., where no plugins are loaded
"""
@@ -30,8 +42,47 @@ def reset_plugins():
'imshow': [],
'imread_collection': [],
'_app_show': []}
_clear_plugins()
reset_plugins()
def _load_preferred_plugins():
# Load preferred plugin for each io function.
io_types = ['imsave', 'imshow', 'imread_collection', 'imread']
for p_type in io_types:
_set_plugin(p_type, preferred_plugins['all'])
plugin_types = (p for p in preferred_plugins.keys() if p != 'all')
for p_type in plugin_types:
_set_plugin(p_type, preferred_plugins[p_type])
def _set_plugin(plugin_type, plugin_list):
for plugin in plugin_list:
if plugin not in available_plugins:
continue
try:
use_plugin(plugin, kind=plugin_type)
break
except (ImportError, RuntimeError, OSError):
pass
def reset_plugins():
_clear_plugins()
_load_preferred_plugins()
def _parse_config_file(filename):
"""Return plugin name and meta-data dict from plugin config file."""
parser = ConfigParser()
parser.read(filename)
name = parser.sections()[0]
meta_data = {}
for opt in parser.options(name):
meta_data[opt] = parser.get(name, opt)
return name, meta_data
def _scan_plugins():
@@ -40,19 +91,13 @@ def _scan_plugins():
"""
pd = os.path.dirname(__file__)
ini = glob(os.path.join(pd, '*.ini'))
config_files = glob(os.path.join(pd, '_plugins', '*.ini'))
for f in ini:
cp = ConfigParser()
cp.read(f)
name = cp.sections()[0]
meta_data = {}
for opt in cp.options(name):
meta_data[opt] = cp.get(name, opt)
for filename in config_files:
name, meta_data = _parse_config_file(filename)
plugin_meta_data[name] = meta_data
provides = [s.strip() for s in cp.get(name, 'provides').split(',')]
provides = [s.strip() for s in meta_data['provides'].split(',')]
valid_provides = [p for p in provides if p in plugin_store]
for p in provides:
@@ -61,12 +106,45 @@ def _scan_plugins():
" Ignoring." % (name, p))
plugin_provides[name] = valid_provides
plugin_module_name[name] = os.path.basename(f)[:-4]
plugin_module_name[name] = os.path.basename(filename)[:-4]
_scan_plugins()
def call(kind, *args, **kwargs):
def find_available_plugins(loaded=False):
"""List available plugins.
Parameters
----------
loaded : bool
If True, show only those plugins currently loaded. By default,
all plugins are shown.
Returns
-------
p : dict
Dictionary with plugin names as keys and exposed functions as
values.
"""
active_plugins = set()
for plugin_func in plugin_store.values():
for plugin, func in plugin_func:
active_plugins.add(plugin)
d = {}
for plugin in plugin_provides:
if not loaded or plugin in active_plugins:
d[plugin] = [f for f in plugin_provides[plugin] \
if not f.startswith('_')]
return d
available_plugins = find_available_plugins()
def call_plugin(kind, *args, **kwargs):
"""Find the appropriate plugin of 'kind' and execute it.
Parameters
@@ -85,11 +163,11 @@ def call(kind, *args, **kwargs):
plugin_funcs = plugin_store[kind]
if len(plugin_funcs) == 0:
raise RuntimeError('''No suitable plugin registered for %s.
You may load I/O plugins with the `skimage.io.use_plugin`
command. A list of all available plugins can be found using
`skimage.io.plugins()`.''' % kind)
msg = ("No suitable plugin registered for %s.\n\n"
"You may load I/O plugins with the `skimage.io.use_plugin` "
"command. A list of all available plugins can be found using "
"`skimage.io.plugins()`.")
raise RuntimeError(msg % kind)
plugin = kwargs.pop('plugin', None)
if plugin is None:
@@ -105,7 +183,7 @@ command. A list of all available plugins can be found using
return func(*args, **kwargs)
def use(name, kind=None):
def use_plugin(name, kind=None):
"""Set the default plugin for a specified operation. The plugin
will be loaded if it hasn't been already.
@@ -119,15 +197,19 @@ def use(name, kind=None):
See Also
--------
plugins : List of available plugins
available_plugins : List of available plugins
Examples
--------
Use the Python Imaging Library to read images:
To use Matplotlib as the default image reader, you would write:
>>> from skimage.io import use_plugin
>>> use_plugin('pil', 'imread')
>>> from skimage import io
>>> io.use_plugin('matplotlib', 'imread')
To see a list of available plugins run ``io.available_plugins``. Note that
this lists plugins that are defined, but the full list may not be usable
if your system does not have the required libraries installed.
"""
if kind is None:
@@ -158,36 +240,6 @@ def use(name, kind=None):
plugin_store[k] = funcs
def available(loaded=False):
"""List available plugins.
Parameters
----------
loaded : bool
If True, show only those plugins currently loaded. By default,
all plugins are shown.
Returns
-------
p : dict
Dictionary with plugin names as keys and exposed functions as
values.
"""
active_plugins = set()
for plugin_func in plugin_store.values():
for plugin, func in plugin_func:
active_plugins.add(plugin)
d = {}
for plugin in plugin_provides:
if not loaded or plugin in active_plugins:
d[plugin] = [f for f in plugin_provides[plugin] \
if not f.startswith('_')]
return d
def _load(plugin):
"""Load the given plugin.
@@ -201,7 +253,7 @@ def _load(plugin):
plugins : List of available plugins
"""
if plugin in available(loaded=True):
if plugin in find_available_plugins(loaded=True):
return
if not plugin in plugin_module_name:
raise ValueError("Plugin %s not found." % plugin)
@@ -222,7 +274,7 @@ def _load(plugin):
store.append((plugin, func))
def info(plugin):
def plugin_info(plugin):
"""Return plugin meta-data.
Parameters
@@ -242,7 +294,7 @@ def info(plugin):
raise ValueError('No information on plugin "%s"' % plugin)
def configuration():
def plugin_order():
"""Return the currently preferred plugin order.
Returns
+45 -108
View File
@@ -1,88 +1,72 @@
import sys
import os.path
import numpy as np
from numpy.testing import (assert_raises,
assert_equal,
assert_array_almost_equal,
)
from numpy.testing.decorators import skipif
from numpy.testing import assert_raises, assert_equal, assert_allclose
from skimage import data_dir
from skimage.io import ImageCollection, MultiImage
from skimage.io.collection import alphanumeric_key
from skimage.io import Image as ioImage
import six
from skimage.io.collection import ImageCollection, alphanumeric_key
try:
from PIL import Image
except ImportError:
PIL_available = False
else:
PIL_available = True
def test_string_split():
test_string = 'z23a'
test_str_result = ['z', 23, 'a']
assert_equal(alphanumeric_key(test_string), test_str_result)
class TestAlphanumericKey():
def setUp(self):
self.test_string = 'z23a'
self.test_str_result = ['z', 23, 'a']
self.filenames = ['f9.10.png', 'f9.9.png', 'f10.10.png', 'f10.9.png',
'e9.png', 'e10.png', 'em.png']
self.sorted_filenames = \
['e9.png', 'e10.png', 'em.png', 'f9.9.png', 'f9.10.png',
'f10.9.png', 'f10.10.png']
def test_string_split(self):
assert_equal(alphanumeric_key(self.test_string), self.test_str_result)
def test_string_sort(self):
sorted_filenames = sorted(self.filenames, key=alphanumeric_key)
assert_equal(sorted_filenames, self.sorted_filenames)
def test_string_sort():
filenames = ['f9.10.png', 'f9.9.png', 'f10.10.png', 'f10.9.png',
'e9.png', 'e10.png', 'em.png']
sorted_filenames = ['e9.png', 'e10.png', 'em.png', 'f9.9.png',
'f9.10.png', 'f10.9.png', 'f10.10.png']
sorted_filenames = sorted(filenames, key=alphanumeric_key)
assert_equal(sorted_filenames, sorted_filenames)
class TestImageCollection():
pattern = [os.path.join(data_dir, pic) for pic in ['camera.png',
'color.png']]
pattern_matched = [os.path.join(data_dir, pic) for pic in
['camera.png', 'moon.png']]
pattern = [os.path.join(data_dir, pic)
for pic in ['camera.png', 'color.png']]
pattern_matched = [os.path.join(data_dir, pic)
for pic in ['camera.png', 'moon.png']]
def setUp(self):
self.collection = ImageCollection(self.pattern)
self.collection_matched = ImageCollection(self.pattern_matched)
# Generic image collection with images of different shapes.
self.images = ImageCollection(self.pattern)
# Image collection with images having shapes that match.
self.images_matched = ImageCollection(self.pattern_matched)
def test_len(self):
assert len(self.collection) == 2
assert len(self.images) == 2
def test_getitem(self):
num = len(self.collection)
num = len(self.images)
for i in range(-num, num):
assert type(self.collection[i]) is np.ndarray
assert_array_almost_equal(self.collection[0],
self.collection[-num])
assert type(self.images[i]) is np.ndarray
assert_allclose(self.images[0],
self.images[-num])
#assert_raises expects a callable, hence this do-very-little func
# assert_raises expects a callable, hence this thin wrapper function.
def return_img(n):
return self.collection[n]
return self.images[n]
assert_raises(IndexError, return_img, num)
assert_raises(IndexError, return_img, -num - 1)
def test_slicing(self):
assert type(self.collection[:]) is ImageCollection
assert len(self.collection[:]) == 2
assert len(self.collection[:1]) == 1
assert len(self.collection[1:]) == 1
assert_array_almost_equal(self.collection[0], self.collection[:1][0])
assert_array_almost_equal(self.collection[1], self.collection[1:][0])
assert_array_almost_equal(self.collection[1], self.collection[::-1][0])
assert_array_almost_equal(self.collection[0], self.collection[::-1][1])
assert type(self.images[:]) is ImageCollection
assert len(self.images[:]) == 2
assert len(self.images[:1]) == 1
assert len(self.images[1:]) == 1
assert_allclose(self.images[0], self.images[:1][0])
assert_allclose(self.images[1], self.images[1:][0])
assert_allclose(self.images[1], self.images[::-1][0])
assert_allclose(self.images[0], self.images[::-1][1])
def test_files_property(self):
assert isinstance(self.collection.files, list)
assert isinstance(self.images.files, list)
def set_files(f):
self.collection.files = f
self.images.files = f
assert_raises(AttributeError, set_files, 'newfiles')
def test_custom_load(self):
@@ -95,59 +79,12 @@ class TestImageCollection():
assert_equal(ic[1], (2, 'two'))
def test_concatenate(self):
ar = self.collection_matched.concatenate()
assert_equal(ar.shape, (len(self.collection_matched),) +
self.collection[0].shape)
assert_raises(ValueError, self.collection.concatenate)
array = self.images_matched.concatenate()
expected_shape = (len(self.images_matched),) + self.images[0].shape
assert_equal(array.shape, expected_shape)
class TestMultiImage():
def setUp(self):
# This multipage TIF file was created with imagemagick:
# convert im1.tif im2.tif -adjoin multipage.tif
if PIL_available:
self.img = MultiImage(os.path.join(data_dir, 'multipage.tif'))
@skipif(not PIL_available)
def test_len(self):
assert len(self.img) == 2
@skipif(not PIL_available)
def test_getitem(self):
num = len(self.img)
for i in range(-num, num):
assert type(self.img[i]) is np.ndarray
assert_array_almost_equal(self.img[0],
self.img[-num])
#assert_raises expects a callable, hence this do-very-little func
def return_img(n):
return self.img[n]
assert_raises(IndexError, return_img, num)
assert_raises(IndexError, return_img, -num - 1)
@skipif(not PIL_available)
def test_files_property(self):
assert isinstance(self.img.filename, six.string_types)
def set_filename(f):
self.img.filename = f
assert_raises(AttributeError, set_filename, 'newfile')
@skipif(not PIL_available)
def test_conserve_memory_property(self):
assert isinstance(self.img.conserve_memory, bool)
def set_mem(val):
self.img.conserve_memory = val
assert_raises(AttributeError, set_mem, True)
@skipif(not PIL_available)
def test_concatenate(self):
ar = self.img.concatenate()
assert_equal(ar.shape, (len(self.img),) +
self.img[0].shape)
def test_concatentate_mismatched_image_shapes(self):
assert_raises(ValueError, self.images.concatenate)
if __name__ == "__main__":
+18 -2
View File
@@ -1,7 +1,12 @@
from skimage.io import Image
from io import BytesIO
import numpy as np
from skimage import img_as_ubyte
from skimage.io import Image, imread
from numpy.testing import assert_equal, assert_array_equal
def test_tags():
f = Image([1, 2, 3], foo='bar', sigma='delta')
g = Image([3, 2, 1], sun='moon')
@@ -11,7 +16,18 @@ def test_tags():
assert_array_equal((g + 2).tags['sun'], 'moon')
assert_equal(h.tags, {})
def test_repr_png_roundtrip():
# Use RGB-like shape since some backends convert grayscale to RGB
original_array = 255 * np.ones((5, 5, 3), dtype=np.uint8)
image = Image(original_array)
array = imread(BytesIO(image._repr_png_()))
# Force output to ubyte range for plugin compatibility.
# For example, Matplotlib will return floats even if the image is uint8.
assert_array_equal(img_as_ubyte(array), original_array)
# Note that PIL breaks with `_repr_jpeg_`.
if __name__ == "__main__":
from numpy.testing import run_module_suite
run_module_suite()
+13
View File
@@ -4,6 +4,7 @@ from numpy.testing import assert_array_equal, raises, run_module_suite
import numpy as np
import skimage.io as io
from skimage.io.manage_plugins import plugin_store
from skimage import data_dir
@@ -28,5 +29,17 @@ def test_imread_url():
assert image.shape == (512, 512)
@raises(RuntimeError)
def test_imread_no_plugin():
# tweak data path so that file URI works on both unix and windows.
image_path = os.path.join(data_dir, 'lena.png')
plugins = plugin_store['imread']
plugin_store['imread'] = []
try:
io.imread(image_path)
finally:
plugin_store['imread'] = plugins
if __name__ == "__main__":
run_module_suite()
+69
View File
@@ -0,0 +1,69 @@
import os
import numpy as np
from numpy.testing.decorators import skipif
from numpy.testing import assert_raises, assert_equal, assert_allclose
from skimage import data_dir
from skimage.io.collection import MultiImage
try:
from PIL import Image
except ImportError:
PIL_available = False
else:
PIL_available = True
import six
class TestMultiImage():
def setUp(self):
# This multipage TIF file was created with imagemagick:
# convert im1.tif im2.tif -adjoin multipage.tif
if PIL_available:
self.img = MultiImage(os.path.join(data_dir, 'multipage.tif'))
@skipif(not PIL_available)
def test_len(self):
assert len(self.img) == 2
@skipif(not PIL_available)
def test_getitem(self):
num = len(self.img)
for i in range(-num, num):
assert type(self.img[i]) is np.ndarray
assert_allclose(self.img[0], self.img[-num])
# assert_raises expects a callable, hence this thin wrapper function.
def return_img(n):
return self.img[n]
assert_raises(IndexError, return_img, num)
assert_raises(IndexError, return_img, -num - 1)
@skipif(not PIL_available)
def test_files_property(self):
assert isinstance(self.img.filename, six.string_types)
def set_filename(f):
self.img.filename = f
assert_raises(AttributeError, set_filename, 'newfile')
@skipif(not PIL_available)
def test_conserve_memory_property(self):
assert isinstance(self.img.conserve_memory, bool)
def set_mem(val):
self.img.conserve_memory = val
assert_raises(AttributeError, set_mem, True)
@skipif(not PIL_available)
def test_concatenate(self):
array = self.img.concatenate()
assert_equal(array.shape, (len(self.img),) + self.img[0].shape)
if __name__ == "__main__":
from numpy.testing import run_module_suite
run_module_suite()
+35
View File
@@ -0,0 +1,35 @@
import os
import warnings
import numpy as np
from numpy.testing import raises
from skimage import io
from skimage import data_dir
@raises(Warning)
def test_null_imread():
path = os.path.join(data_dir, 'color.png')
with warnings.catch_warnings(): # Temporarily set warnings as errors.
warnings.filterwarnings('error')
io.imread(path, plugin='null')
@raises(Warning)
def test_null_imsave():
with warnings.catch_warnings(): # Temporarily set warnings as errors.
warnings.filterwarnings('error')
io.imsave('dummy.png', np.zeros((3, 3)), plugin='null')
@raises(Warning)
def test_null_imshow():
with warnings.catch_warnings(): # Temporarily set warnings as errors.
warnings.filterwarnings('error')
io.imshow(np.zeros((3, 3)), plugin='null')
if __name__ == '__main__':
from numpy.testing import run_module_suite
run_module_suite()
+95 -48
View File
@@ -1,7 +1,9 @@
from numpy.testing import *
from contextlib import contextmanager
from numpy.testing import assert_equal, raises
from skimage import io
from skimage.io._plugins import plugin
from skimage.io import manage_plugins
from numpy.testing.decorators import skipif
try:
@@ -19,70 +21,115 @@ except RuntimeError:
FI_available = False
def setup_module(self):
plugin.use('test') # see ../_plugins/test_plugin.py
def setup_module():
manage_plugins.use_plugin('test') # see ../_plugins/test_plugin.py
def teardown_module(self):
def teardown_module():
io.reset_plugins()
class TestPlugin:
def test_read(self):
io.imread('test.png', as_grey=True, dtype='i4', plugin='test')
@contextmanager
def protect_preferred_plugins():
"""Contexts where `preferred_plugins` can be modified w/o side-effects."""
preferred_plugins = manage_plugins.preferred_plugins.copy()
try:
yield
finally:
manage_plugins.preferred_plugins = preferred_plugins
def test_save(self):
io.imsave('test.png', [1, 2, 3], plugin='test')
def test_show(self):
io.imshow([1, 2, 3], plugin_arg=(1, 2), plugin='test')
def test_read():
io.imread('test.png', as_grey=True, dtype='i4', plugin='test')
def test_collection(self):
io.imread_collection('*.png', conserve_memory=False, plugin='test')
def test_use(self):
plugin.use('test')
plugin.use('test', 'imshow')
def test_save():
io.imsave('test.png', [1, 2, 3], plugin='test')
@raises(ValueError)
def test_failed_use(self):
plugin.use('asd')
@skipif(not PIL_available and not FI_available)
def test_use_priority(self):
plugin.use(priority_plugin)
plug, func = plugin.plugin_store['imread'][0]
assert_equal(plug, priority_plugin)
def test_show():
io.imshow([1, 2, 3], plugin_arg=(1, 2), plugin='test')
plugin.use('test')
plug, func = plugin.plugin_store['imread'][0]
assert_equal(plug, 'test')
@skipif(not PIL_available)
def test_use_priority_with_func(self):
plugin.use('pil')
plug, func = plugin.plugin_store['imread'][0]
assert_equal(plug, 'pil')
def test_collection():
io.imread_collection('*.png', conserve_memory=False, plugin='test')
plugin.use('test', 'imread')
plug, func = plugin.plugin_store['imread'][0]
assert_equal(plug, 'test')
plug, func = plugin.plugin_store['imsave'][0]
assert_equal(plug, 'pil')
def test_use():
manage_plugins.use_plugin('test')
manage_plugins.use_plugin('test', 'imshow')
plugin.use('test')
plug, func = plugin.plugin_store['imsave'][0]
assert_equal(plug, 'test')
def test_plugin_order(self):
p = io.plugin_order()
assert 'imread' in p
assert 'test' in p['imread']
@raises(ValueError)
def test_failed_use():
manage_plugins.use_plugin('asd')
@skipif(not PIL_available and not FI_available)
def test_use_priority():
manage_plugins.use_plugin(priority_plugin)
plug, func = manage_plugins.plugin_store['imread'][0]
assert_equal(plug, priority_plugin)
manage_plugins.use_plugin('test')
plug, func = manage_plugins.plugin_store['imread'][0]
assert_equal(plug, 'test')
@skipif(not PIL_available)
def test_use_priority_with_func():
manage_plugins.use_plugin('pil')
plug, func = manage_plugins.plugin_store['imread'][0]
assert_equal(plug, 'pil')
manage_plugins.use_plugin('test', 'imread')
plug, func = manage_plugins.plugin_store['imread'][0]
assert_equal(plug, 'test')
plug, func = manage_plugins.plugin_store['imsave'][0]
assert_equal(plug, 'pil')
manage_plugins.use_plugin('test')
plug, func = manage_plugins.plugin_store['imsave'][0]
assert_equal(plug, 'test')
def test_plugin_order():
p = io.plugin_order()
assert 'imread' in p
assert 'test' in p['imread']
def test_available():
assert 'qt' in io.available_plugins
assert 'test' in io.find_available_plugins(loaded=True)
def test_load_preferred_plugins_all():
from skimage.io._plugins import null_plugin
with protect_preferred_plugins():
manage_plugins.preferred_plugins = {'all': ['null']}
manage_plugins.reset_plugins()
for plugin_type in ('imread', 'imsave', 'imshow'):
plug, func = manage_plugins.plugin_store[plugin_type][0]
assert func == getattr(null_plugin, plugin_type)
def test_load_preferred_plugins_imread():
from skimage.io._plugins import null_plugin
with protect_preferred_plugins():
manage_plugins.preferred_plugins['imread'] = ['null']
manage_plugins.reset_plugins()
plug, func = manage_plugins.plugin_store['imread'][0]
assert func == null_plugin.imread
plug, func = manage_plugins.plugin_store['imshow'][0]
assert func != null_plugin.imshow
def test_available(self):
assert 'qt' in io.plugins()
assert 'test' in io.plugins(loaded=True)
if __name__ == "__main__":
from numpy.testing import run_module_suite
run_module_suite()
+36
View File
@@ -0,0 +1,36 @@
try:
from urllib.request import urlopen # Python 3
except ImportError:
from urllib2 import urlopen # Python 2
import os
import re
import tempfile
from contextlib import contextmanager
from skimage._shared import six
URL_REGEX = re.compile(r'http://|https://|ftp://|file://|file:\\')
def is_url(filename):
"""Return True if string is an http or ftp path."""
return (isinstance(filename, six.string_types) and
URL_REGEX.match(filename) is not None)
@contextmanager
def file_or_url_context(resource_name):
"""Yield name of file from the given resource (i.e. file or url)."""
if is_url(resource_name):
_, ext = os.path.splitext(resource_name)
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as f:
u = urlopen(resource_name)
f.write(u.read())
try:
yield f.name
finally:
os.remove(f.name)
else:
yield resource_name