from sklearn.base import BaseEstimator, ClassifierMixin
import base64
+import gzip
+import sys
+if sys.version_info[0] < 3:
+ from cStringIO import StringIO as BytesIO
+else:
+ from io import BytesIO as BytesIO
+
class FaceDetectTransform(BaseEstimator, ClassifierMixin):
'''
COL_IMAGE_DATA = 'image_binary'
VAL_REGION_IMAGE_ID = -1
- def __init__(self, cascade_path=None, include_image=True):
+ def __init__(self, cascade_path=None, cascade_stream=None, include_image=True):
self.include_image = include_image # should output transform include image?
- self.cascade_path = cascade_path # abs path outside of module
self.cascade_obj = None # late-load this component
+ self.cascade_stream = cascade_stream # compressed binary final for cascade data
+ if self.cascade_stream is None:
+ if cascade_path is None: # default/included data?
+ pathRoot = os.path.dirname(os.path.abspath(__file__))
+ cascade_path = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
+ raw_stream = b""
+ with open(cascade_path, 'rb') as f:
+ raw_stream = f.read()
+ self.cascade_stream = {'name': os.path.basename(cascade_path),
+ 'data': FaceDetectTransform.string_compress(raw_stream)}
+
+ @staticmethod
+ def string_compress(string_data):
+ out_data = BytesIO()
+ with gzip.GzipFile(fileobj=out_data, mode="wb") as f:
+ f.write(string_data)
+ return out_data.getvalue()
+
+ @staticmethod
+ def string_decompress(compressed_data):
+ in_data = BytesIO(compressed_data)
+ ret_str = None
+ with gzip.GzipFile(fileobj=in_data, mode="rb") as f:
+ ret_str = f.read()
+ return ret_str
def get_params(self, deep=False):
- return {'include_image': self.include_image}
+ return {'include_image': self.include_image, 'cascade_stream': self.cascade_stream}
@staticmethod
def generate_in_df(path_image="", bin_stream=b""):
with open(path_image, 'wb') as f:
f.write(row[FaceDetectTransform.COL_IMAGE_DATA][0])
+ @staticmethod
+ def output_names_():
+ return [FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_REGION_IDX,
+ FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
+ FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
+ FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
+
@staticmethod
def generate_out_dict(idx=VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
- return {FaceDetectTransform.COL_IMAGE_IDX: image, FaceDetectTransform.COL_REGION_IDX: idx,
- FaceDetectTransform.COL_FACE_X: x, FaceDetectTransform.COL_FACE_Y: y,
- FaceDetectTransform.COL_FACE_W: w, FaceDetectTransform.COL_FACE_H: h,
- FaceDetectTransform.COL_IMAGE_MIME: media, FaceDetectTransform.COL_IMAGE_DATA: bin_stream}
+ return dict(zip(FaceDetectTransform.output_names_(), [image, idx, x, y, w, h, media, bin_stream]))
@staticmethod
def suppress_image(df):
def fit(self, X, y=None):
return self
+ def load_cascade(self):
+ # if no model exists yet, create it; return False for deserialize required
+ if self.cascade_obj is None:
+ if self.cascade_stream is not None:
+ import tempfile
+ with tempfile.TemporaryDirectory() as tdir:
+ cascade_data = FaceDetectTransform.string_decompress(self.cascade_stream['data'])
+ cascade_path = os.path.join(tdir, self.cascade_stream['name'])
+ with open(cascade_path, 'wb') as f:
+ f.write(cascade_data)
+ self.cascade_obj = cv2.CascadeClassifier(cascade_path)
+ return False
+ return True
+
def predict(self, X, y=None):
"""
Assumes a numpy array of [[mime_type, binary_string] ... ]
where mime_type is an image-specifying mime type and binary_string is the raw image bytes
"""
- # if no model exists yet, create it
- if self.cascade_obj is None:
- if self.cascade_path is not None:
- self.cascade_obj = cv2.CascadeClassifier(self.cascade_path)
- else: # none provided, load what came with the package
- pathRoot = os.path.dirname(os.path.abspath(__file__))
- pathFile = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
- self.cascade_obj = cv2.CascadeClassifier(pathFile)
-
+ self.load_cascade() # JIT load model
dfReturn = None
+ listData = []
for image_idx in range(len(X)):
image_byte = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
if type(image_byte) == str:
# img = cv2.imread(image_set[1])
faces = self.detect_faces(img)
- df = pd.DataFrame() # start with empty DF for this image
if self.include_image: # create and append the image if that's requested
- dict_image = FaceDetectTransform.generate_out_dict(w=img.shape[1], h=img.shape[0], image=image_idx)
- dict_image[FaceDetectTransform.COL_IMAGE_MIME] = X[FaceDetectTransform.COL_IMAGE_MIME][image_idx]
- dict_image[FaceDetectTransform.COL_IMAGE_DATA] = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
- df = pd.DataFrame([dict_image])
+ listData.append(FaceDetectTransform.generate_out_dict(w=img.shape[1], h=img.shape[0], image=image_idx,
+ media=X[FaceDetectTransform.COL_IMAGE_MIME][image_idx],
+ bin_stream=X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]))
for idxF in range(len(faces)): # walk through detected faces
face_rect = faces[idxF]
- df = df.append(pd.DataFrame([FaceDetectTransform.generate_out_dict(idxF, face_rect[0], face_rect[1],
- face_rect[2], face_rect[3], image=image_idx)]),
- ignore_index=True)
- if dfReturn is None: # create an NP container for all image samples + features
- dfReturn = df # df.reindex_axis(self.output_names_, axis=1)
- else:
- dfReturn = dfReturn.append(df, ignore_index=True)
+ listData.append(FaceDetectTransform.generate_out_dict(idxF, x=face_rect[0], y=face_rect[1],
+ w=face_rect[2], h=face_rect[3], image=image_idx))
# print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
- return dfReturn
+ return pd.DataFrame(listData, columns=FaceDetectTransform.output_names_()) # start with empty DF for this image
def detect_faces(self, img):
if self.cascade_obj is None:
return {'transform_mode': self.transform_mode}
@staticmethod
- def generate_out_df(media_type="", bin_stream=b""):
- return pd.DataFrame([[media_type, bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
+ def output_names_():
+ return [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
+
+ @staticmethod
+ def generate_out_dict(bin_stream=b"", media=""):
+ return {FaceDetectTransform.COL_IMAGE_MIME: media, FaceDetectTransform.COL_IMAGE_DATA: bin_stream}
@staticmethod
def generate_in_df(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
# collect all remaining regions, operate with each on input image
# generate output image, send to output
- dfReturn = None
image_region_list = RegionTransform.transform_raw_sample(X)
+ listData = []
for image_data in image_region_list:
- # print(image_data)
img = image_data['data']
for r in image_data['regions']: # loop through regions
x_max = min(r[0] + r[2], img.shape[1])
img_binary = cv2.imencode(".jpg", img)[1].tostring()
img_mime = 'image/jpeg' # image_data['mime']
- df = RegionTransform.generate_out_df(media_type=img_mime, bin_stream=img_binary)
- if dfReturn is None: # create an NP container for all images
- dfReturn = df.reindex_axis(self.output_names_, axis=1)
- else:
- dfReturn = dfReturn.append(df, ignore_index=True)
- print("IMAGE {:} found {:} total rows".format(image_data['image'], len(df)))
- return dfReturn
+ listData.append(RegionTransform.generate_out_dict(media=img_mime, bin_stream=img_binary))
+ print("IMAGE {:} found {:} total rows".format(image_data['image'], len(image_data['regions'])))
+ return pd.DataFrame(listData, columns=RegionTransform.output_names_())
@staticmethod
def transform_raw_sample(raw_sample):
image_byte = image_row[FaceDetectTransform.COL_IMAGE_DATA][0]
if type(image_byte) == str:
image_byte = image_byte.encode()
- image_byte = bytearray(base64.b64decode(image_byte))
+ image_byte = bytearray(base64.b64decode(image_byte))
+ else:
+ image_byte = bytearray(image_byte)
file_bytes = np.asarray(image_byte, dtype=np.uint8)
local_image['data'] = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
local_image['image'] = nameG