mirror of
https://github.com/wassname/IndicoIo-python.git
synced 2026-06-27 16:10:34 +08:00
Merge pull request #62 from IndicoDataSolutions/Chris/refactor-utils
REFACTOR: indicio.utils.__init__.py to multiple utils modules
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import requests
|
||||
|
||||
from indicoio.utils import image_preprocess, api_handler
|
||||
from indicoio.utils.image import image_preprocess
|
||||
from indicoio.utils.api import api_handler
|
||||
|
||||
def facial_features(image, cloud=None, batch=False, api_key=None, **kwargs):
|
||||
"""
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import requests
|
||||
|
||||
from indicoio.utils import api_handler, image_preprocess
|
||||
from indicoio.utils.api import api_handler
|
||||
from indicoio.utils.image import image_preprocess
|
||||
import indicoio.config as config
|
||||
|
||||
def fer(image, cloud=None, batch=False, api_key=None, **kwargs):
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from indicoio.utils import api_handler
|
||||
from indicoio.utils.api import api_handler
|
||||
import indicoio.config as config
|
||||
|
||||
def language(text, cloud=None, batch=False, api_key=None, **kwargs):
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from indicoio.utils import api_handler
|
||||
from indicoio.utils.api import api_handler
|
||||
|
||||
def political(text, cloud=None, batch=False, api_key=None, **kwargs):
|
||||
"""
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from indicoio.utils import api_handler
|
||||
from indicoio.utils.api import api_handler
|
||||
import indicoio.config as config
|
||||
|
||||
def text_tags(text, cloud=None, batch=False, api_key=None, **kwargs):
|
||||
|
||||
+5
-154
@@ -1,44 +1,9 @@
|
||||
import inspect, json, getpass, os.path, base64, StringIO, re, warnings
|
||||
import requests
|
||||
from PIL import Image
|
||||
|
||||
from indicoio.utils.errors import IndicoError
|
||||
from indicoio import JSON_HEADERS
|
||||
from indicoio import config
|
||||
|
||||
B64_PATTERN = re.compile("^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)")
|
||||
|
||||
|
||||
def api_handler(arg, cloud, api, url_params = {"batch":False, "api_key":None}, **kwargs):
|
||||
data = {'data': arg}
|
||||
data.update(**kwargs)
|
||||
json_data = json.dumps(data)
|
||||
if not cloud:
|
||||
cloud=config.cloud
|
||||
|
||||
if cloud:
|
||||
host = "%s.indico.domains" % cloud
|
||||
else:
|
||||
# default to indico public cloud
|
||||
host = config.PUBLIC_API_HOST
|
||||
|
||||
url = config.url_protocol + "//%s/%s" % (host, api)
|
||||
url = url + "/batch" if url_params.get("batch", False) else url
|
||||
url += "?key=%s" % (url_params.get("api_key", None) or config.api_key)
|
||||
if "apis" in url_params:
|
||||
url += "&apis=%s" % ",".join(url_params["apis"])
|
||||
|
||||
response = requests.post(url, data=json_data, headers=JSON_HEADERS)
|
||||
if response.status_code == 503 and cloud != None:
|
||||
raise IndicoError("Private cloud '%s' does not include api '%s'" % (cloud, api))
|
||||
|
||||
json_results = response.json()
|
||||
results = json_results.get('results', False)
|
||||
if results is False:
|
||||
error = json_results.get('error')
|
||||
raise IndicoError(error)
|
||||
return results
|
||||
"""
|
||||
Basic utility classes and functions
|
||||
"""
|
||||
import inspect
|
||||
|
||||
from indicoio.utils.errors import DataStructureException
|
||||
|
||||
class TypeCheck(object):
|
||||
"""
|
||||
@@ -67,120 +32,6 @@ class TypeCheck(object):
|
||||
return check_args
|
||||
|
||||
|
||||
class DataStructureException(Exception):
|
||||
"""
|
||||
If a non-accepted datastructure is passed, throws an exception
|
||||
"""
|
||||
def __init__(self, callback, passed_structure, accepted_structures):
|
||||
self.callback = callback.__name__
|
||||
self.structure = str(type(passed_structure))
|
||||
self.accepted = [str(structure) for structure in accepted_structures]
|
||||
|
||||
def __str__(self):
|
||||
return """
|
||||
function %s does not accept %s, accepted types are: %s
|
||||
""" % (self.callback, self.structure, str(self.accepted))
|
||||
|
||||
|
||||
def image_preprocess(image, size=(48,48), batch=False):
|
||||
"""
|
||||
Takes an image and prepares it for sending to the api including
|
||||
resizing and image data/structure standardizing.
|
||||
"""
|
||||
if batch:
|
||||
return [image_preprocess(img, batch=False) for img in image]
|
||||
|
||||
if isinstance(image, basestring):
|
||||
b64_str = re.sub('^data:image/.+;base64,', '', image)
|
||||
if os.path.isfile(image):
|
||||
# check type of element
|
||||
outImage = Image.open(image)
|
||||
elif B64_PATTERN.match(b64_str) is not None:
|
||||
return b64_str
|
||||
else:
|
||||
raise IndicoError("Snose tring provided must be a valid filepath or base64 encoded string")
|
||||
|
||||
elif isinstance(image, list): # image passed in is a list and not np.array
|
||||
warnings.warn(
|
||||
"Input as lists of pixels will be deprecated in the next major update",
|
||||
DeprecationWarning
|
||||
)
|
||||
outImage = process_list_image(image)
|
||||
elif isinstance(image, Image.Image):
|
||||
outImage = image
|
||||
elif type(image).__name__ == "ndarray": # image is from numpy/scipy
|
||||
out_image = Image.fromarray(image)
|
||||
else:
|
||||
raise IndicoError("Image must be a filepath, base64 encoded string, or a numpy array")
|
||||
|
||||
# image resizing
|
||||
outImage = outImage.resize(size)
|
||||
|
||||
# convert to base64
|
||||
temp_output = StringIO.StringIO()
|
||||
outImage.save(temp_output, format='PNG')
|
||||
temp_output.seek(0)
|
||||
output_s = temp_output.read()
|
||||
|
||||
return base64.b64encode(output_s)
|
||||
|
||||
|
||||
def get_list_dimensions(_list):
|
||||
"""
|
||||
Takes a nested list and returns the size of each dimension followed
|
||||
by the element type in the list
|
||||
"""
|
||||
if isinstance(_list, list) or isinstance(_list, tuple):
|
||||
return [len(_list)] + get_list_dimensions(_list[0])
|
||||
return []
|
||||
|
||||
|
||||
def get_element_type(_list, dimens):
|
||||
"""
|
||||
Given the dimensions of a nested list and the list, returns the type of the
|
||||
elements in the inner list.
|
||||
"""
|
||||
elem = _list
|
||||
for _ in xrange(len(dimens)):
|
||||
elem = elem[0]
|
||||
return type(elem)
|
||||
|
||||
|
||||
def process_list_image(_list):
|
||||
"""
|
||||
Processes list to be [[(int, int, int), ...]]
|
||||
"""
|
||||
# Check if list is empty
|
||||
if not _list:
|
||||
return _list
|
||||
|
||||
dimens = get_list_dimensions(_list)
|
||||
data_type = get_element_type(_list, dimens)
|
||||
|
||||
seq_obj = []
|
||||
|
||||
outImage = Image.new("RGB", (dimens[0], dimens[1]))
|
||||
for i in xrange(dimens[0]):
|
||||
for j in xrange(dimens[1]):
|
||||
elem = _list[i][j]
|
||||
if len(dimens) >= 3:
|
||||
#RGB(A)
|
||||
if data_type == float:
|
||||
seq_obj.append((int(elem[0] * 255), int(elem[1] * 255), int(elem[2] * 255)))
|
||||
else:
|
||||
seq_obj.append(elem[0:3])
|
||||
elif data_type == float:
|
||||
#Grayscale 0 - 1.0f
|
||||
seq_obj.append((int(elem * 255), ) * 3)
|
||||
else:
|
||||
#Grayscale 0 - 255
|
||||
seq_obj.append((elem, ) * 3)
|
||||
|
||||
#Needs to be 0 - 255 in flattened list of (R, G, B)
|
||||
outImage.putdata(data = seq_obj)
|
||||
|
||||
return outImage
|
||||
|
||||
|
||||
def is_url(data, batch=False):
|
||||
if batch and isinstance(data[0], basestring):
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
"""
|
||||
Handles making requests to the IndicoApi Server
|
||||
"""
|
||||
|
||||
import json, requests
|
||||
|
||||
from indicoio.utils.errors import IndicoError, DataStructureException
|
||||
from indicoio import JSON_HEADERS
|
||||
from indicoio import config
|
||||
|
||||
def api_handler(arg, cloud, api, url_params = {"batch":False, "api_key":None}, **kwargs):
|
||||
data = {'data': arg}
|
||||
data.update(**kwargs)
|
||||
json_data = json.dumps(data)
|
||||
if not cloud:
|
||||
cloud=config.cloud
|
||||
|
||||
if cloud:
|
||||
host = "%s.indico.domains" % cloud
|
||||
else:
|
||||
# default to indico public cloud
|
||||
host = config.PUBLIC_API_HOST
|
||||
|
||||
url = config.url_protocol + "//%s/%s" % (host, api)
|
||||
url = url + "/batch" if url_params.get("batch", False) else url
|
||||
url += "?key=%s" % (url_params.get("api_key", None) or config.api_key)
|
||||
if "apis" in url_params:
|
||||
url += "&apis=%s" % ",".join(url_params["apis"])
|
||||
|
||||
response = requests.post(url, data=json_data, headers=JSON_HEADERS)
|
||||
if response.status_code == 503 and cloud != None:
|
||||
raise IndicoError("Private cloud '%s' does not include api '%s'" % (cloud, api))
|
||||
|
||||
json_results = response.json()
|
||||
results = json_results.get('results', False)
|
||||
if results is False:
|
||||
error = json_results.get('error')
|
||||
raise IndicoError(error)
|
||||
return results
|
||||
@@ -1,2 +1,20 @@
|
||||
"""
|
||||
Contains Indico Custom Errors
|
||||
"""
|
||||
|
||||
class IndicoError(ValueError):
|
||||
pass
|
||||
|
||||
class DataStructureException(Exception):
|
||||
"""
|
||||
If a non-accepted datastructure is passed, throws an exception
|
||||
"""
|
||||
def __init__(self, callback, passed_structure, accepted_structures):
|
||||
self.callback = callback.__name__
|
||||
self.structure = str(type(passed_structure))
|
||||
self.accepted = [str(structure) for structure in accepted_structures]
|
||||
|
||||
def __str__(self):
|
||||
return """
|
||||
function %s does not accept %s, accepted types are: %s
|
||||
""" % (self.callback, self.structure, str(self.accepted))
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
"""
|
||||
Image Utils
|
||||
Handles preprocessing images before they are sent to the server
|
||||
"""
|
||||
import os.path, base64, StringIO, re, warnings
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from indicoio.utils.errors import IndicoError, DataStructureException
|
||||
|
||||
B64_PATTERN = re.compile("^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)")
|
||||
|
||||
def image_preprocess(image, size=(48,48), batch=False):
|
||||
"""
|
||||
Takes an image and prepares it for sending to the api including
|
||||
resizing and image data/structure standardizing.
|
||||
"""
|
||||
if batch:
|
||||
return [image_preprocess(img, batch=False) for img in image]
|
||||
|
||||
if isinstance(image, basestring):
|
||||
b64_str = re.sub('^data:image/.+;base64,', '', image)
|
||||
if os.path.isfile(image):
|
||||
# check type of element
|
||||
outImage = Image.open(image)
|
||||
elif B64_PATTERN.match(b64_str) is not None:
|
||||
return b64_str
|
||||
else:
|
||||
raise IndicoError("Snose tring provided must be a valid filepath or base64 encoded string")
|
||||
|
||||
elif isinstance(image, list): # image passed in is a list and not np.array
|
||||
warnings.warn(
|
||||
"Input as lists of pixels will be deprecated in the next major update",
|
||||
DeprecationWarning
|
||||
)
|
||||
outImage = process_list_image(image)
|
||||
elif isinstance(image, Image.Image):
|
||||
outImage = image
|
||||
elif type(image).__name__ == "ndarray": # image is from numpy/scipy
|
||||
out_image = Image.fromarray(image)
|
||||
else:
|
||||
raise IndicoError("Image must be a filepath, base64 encoded string, or a numpy array")
|
||||
|
||||
# image resizing
|
||||
outImage = outImage.resize(size)
|
||||
|
||||
# convert to base64
|
||||
temp_output = StringIO.StringIO()
|
||||
outImage.save(temp_output, format='PNG')
|
||||
temp_output.seek(0)
|
||||
output_s = temp_output.read()
|
||||
|
||||
return base64.b64encode(output_s)
|
||||
|
||||
|
||||
def get_list_dimensions(_list):
|
||||
"""
|
||||
Takes a nested list and returns the size of each dimension followed
|
||||
by the element type in the list
|
||||
"""
|
||||
if isinstance(_list, list) or isinstance(_list, tuple):
|
||||
return [len(_list)] + get_list_dimensions(_list[0])
|
||||
return []
|
||||
|
||||
|
||||
def get_element_type(_list, dimens):
|
||||
"""
|
||||
Given the dimensions of a nested list and the list, returns the type of the
|
||||
elements in the inner list.
|
||||
"""
|
||||
elem = _list
|
||||
for _ in xrange(len(dimens)):
|
||||
elem = elem[0]
|
||||
return type(elem)
|
||||
|
||||
|
||||
def process_list_image(_list):
|
||||
"""
|
||||
Processes list to be [[(int, int, int), ...]]
|
||||
"""
|
||||
# Check if list is empty
|
||||
if not _list:
|
||||
return _list
|
||||
|
||||
dimens = get_list_dimensions(_list)
|
||||
data_type = get_element_type(_list, dimens)
|
||||
|
||||
seq_obj = []
|
||||
|
||||
outImage = Image.new("RGB", (dimens[0], dimens[1]))
|
||||
for i in xrange(dimens[0]):
|
||||
for j in xrange(dimens[1]):
|
||||
elem = _list[i][j]
|
||||
if len(dimens) >= 3:
|
||||
#RGB(A)
|
||||
if data_type == float:
|
||||
seq_obj.append((int(elem[0] * 255), int(elem[1] * 255), int(elem[2] * 255)))
|
||||
else:
|
||||
seq_obj.append(elem[0:3])
|
||||
elif data_type == float:
|
||||
#Grayscale 0 - 1.0f
|
||||
seq_obj.append((int(elem * 255), ) * 3)
|
||||
else:
|
||||
#Grayscale 0 - 255
|
||||
seq_obj.append((elem, ) * 3)
|
||||
|
||||
#Needs to be 0 - 255 in flattened list of (R, G, B)
|
||||
outImage.putdata(data = seq_obj)
|
||||
|
||||
return outImage
|
||||
@@ -1,6 +1,7 @@
|
||||
from indicoio.config import TEXT_APIS, IMAGE_APIS, API_NAMES
|
||||
from indicoio.utils.api import api_handler
|
||||
from indicoio.utils.image import image_preprocess
|
||||
from indicoio.utils.errors import IndicoError
|
||||
from indicoio.utils import api_handler, image_preprocess
|
||||
|
||||
|
||||
CLIENT_SERVER_MAP = dict((api, api.strip().replace("_", "").lower()) for api in API_NAMES)
|
||||
|
||||
Reference in New Issue
Block a user