- serialize face cascade to avoid filesystem
[face-privacy-filter.git] / face_privacy_filter / transform_region.py
index 533efbd..e50726f 100644 (file)
@@ -4,19 +4,23 @@
 Wrapper for region processing task; wrapped in classifier for pipieline terminus
 """
 import cv2
-import os
 import pandas as pd
 import numpy as np
 from sklearn.base import BaseEstimator, ClassifierMixin
+import base64
 
 # NOTE: If this class were built in another model (e.g. another vendor, class, etc), we would need to
 #       *exactly match* the i/o for the upstream (detection) and downstream (this processing)
+# from face_privacy_filter.transform_detect import RegionTransform
+
 from face_privacy_filter.transform_detect import FaceDetectTransform
 
+
 class RegionTransform(BaseEstimator, ClassifierMixin):
     '''
     A sklearn classifier mixin that manpulates image content based on input
     '''
+    CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
 
     def __init__(self, transform_mode="pixelate"):
         self.transform_mode = transform_mode    # specific image processing mode to utilize
@@ -25,35 +29,31 @@ class RegionTransform(BaseEstimator, ClassifierMixin):
         return {'transform_mode': self.transform_mode}
 
     @staticmethod
-    def generate_out_df(media_type="", bin_stream=b""):
-        # munge stream and mimetype into input sample
-        return pd.DataFrame([[media_type, bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
+    def output_names_():
+        return [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
 
     @staticmethod
-    def generate_in_df(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
-        return pd.DataFrame([[idx,x,y,w,h,image,media,bin_stream]],
-                            columns=[FaceDetectTransform.COL_REGION_IDX, FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
-                                     FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
-                                     FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_IMAGE_MIME,
-                                     FaceDetectTransform.COL_IMAGE_DATA])
+    def generate_out_dict(bin_stream=b"", media=""):
+        return {FaceDetectTransform.COL_IMAGE_MIME: media, FaceDetectTransform.COL_IMAGE_DATA: bin_stream}
 
-    @property
-    def output_names_(self):
-        return [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
+    @staticmethod
+    def generate_in_df(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
+        return pd.DataFrame([], RegionTransform.generate_in_dict(idx=idx, x=x, y=y, h=h, w=w, image=image, bin_stream=bin_stream, media=media))
 
-    @property
-    def output_types_(self):
-        list_name = self.output_names_
-        list_type = self.classes_
-        return [{list_name[i]:list_type[i]} for i in range(len(list_name))]
+    @staticmethod
+    def generate_in_dict(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
+        return FaceDetectTransform.generate_out_dict(idx=idx, x=x, y=y, h=h, w=w, image=image, bin_stream=bin_stream, media=media)
 
     @property
-    def n_outputs_(self):
-        return 8
+    def _type_in(self):
+        """Custom input type for this processing transformer"""
+        input_dict = RegionTransform.generate_in_dict()
+        return {k: type(input_dict[k]) for k in input_dict}, "DetectionFrames"
 
     @property
-    def classes_(self):
-        return [str, str]
+    def _type_out(self):
+        """Custom input type for this processing transformer"""
+        return {FaceDetectTransform.COL_IMAGE_MIME: str, FaceDetectTransform.COL_IMAGE_DATA: bytes}, "TransformedImage"
 
     def score(self, X, y=None):
         return 0
@@ -64,7 +64,7 @@ class RegionTransform(BaseEstimator, ClassifierMixin):
     def predict(self, X, y=None):
         """
         Assumes a numpy array of [[mime_type, binary_string] ... ]
-           where mime_type is an image-specifying mime type and binary_string is the raw image bytes       
+           where mime_type is an image-specifying mime type and binary_string is the raw image bytes
         """
 
         # group by image index first
@@ -72,15 +72,14 @@ class RegionTransform(BaseEstimator, ClassifierMixin):
         #   collect all remaining regions, operate with each on input image
         #   generate output image, send to output
 
-        dfReturn = None
         image_region_list = RegionTransform.transform_raw_sample(X)
+        listData = []
         for image_data in image_region_list:
-            #print(image_data)
             img = image_data['data']
             for r in image_data['regions']:  # loop through regions
-                x_max = min(r[0]+r[2], img.shape[1])
-                y_max = min(r[1]+r[3], img.shape[0])
-                if self.transform_mode=="pixelate":
+                x_max = min(r[0] + r[2], img.shape[1])
+                y_max = min(r[1] + r[3], img.shape[0])
+                if self.transform_mode == "pixelate":
                     img[r[1]:y_max, r[0]:x_max] = \
                         RegionTransform.pixelate_image(img[r[1]:y_max, r[0]:x_max])
 
@@ -88,13 +87,9 @@ class RegionTransform(BaseEstimator, ClassifierMixin):
             img_binary = cv2.imencode(".jpg", img)[1].tostring()
             img_mime = 'image/jpeg'  # image_data['mime']
 
-            df = RegionTransform.generate_out_df(media_type=img_mime, bin_stream=img_binary)
-            if dfReturn is None:  # create an NP container for all images
-                dfReturn = df.reindex_axis(self.output_names_, axis=1)
-            else:
-                dfReturn = dfReturn.append(df, ignore_index=True)
-            print("IMAGE {:} found {:} total rows".format(image_data['image'], len(df)))
-        return dfReturn
+            listData.append(RegionTransform.generate_out_dict(media=img_mime, bin_stream=img_binary))
+            print("IMAGE {:} found {:} total rows".format(image_data['image'], len(image_data['regions'])))
+        return pd.DataFrame(listData, columns=RegionTransform.output_names_())
 
     @staticmethod
     def transform_raw_sample(raw_sample):
@@ -105,21 +100,27 @@ class RegionTransform(BaseEstimator, ClassifierMixin):
 
         for nameG, rowsG in groupImage:
             local_image = {'image': -1, 'data': b"", 'regions': [], 'mime': ''}
-            image_row = rowsG[rowsG[FaceDetectTransform.COL_REGION_IDX]==FaceDetectTransform.VAL_REGION_IMAGE_ID]
+            image_row = rowsG[rowsG[FaceDetectTransform.COL_REGION_IDX] == FaceDetectTransform.VAL_REGION_IMAGE_ID]
             if len(image_row) < 1:  # must have at least one image set
                 print("Error: RegionTransform could not find a valid image reference for image set {:}".format(nameG))
                 continue
             if not len(image_row[FaceDetectTransform.COL_IMAGE_DATA]):  # must have valid image data
                 print("Error: RegionTransform expected image data, but found empty binary string {:}".format(nameG))
                 continue
-            file_bytes = np.asarray(bytearray(FaceDetectTransform.read_byte_arrays(image_row[FaceDetectTransform.COL_IMAGE_DATA][0])), dtype=np.uint8)
+            image_byte = image_row[FaceDetectTransform.COL_IMAGE_DATA][0]
+            if type(image_byte) == str:
+                image_byte = image_byte.encode()
+                image_byte = bytearray(base64.b64decode(image_byte))
+            else:
+                image_byte = bytearray(image_byte)
+            file_bytes = np.asarray(image_byte, dtype=np.uint8)
             local_image['data'] = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
             local_image['image'] = nameG
             local_image['mime'] = image_row[FaceDetectTransform.COL_IMAGE_MIME]
 
             # now proceed to loop around regions detected
             for index, row in rowsG.iterrows():
-                if row[FaceDetectTransform.COL_REGION_IDX]!=FaceDetectTransform.VAL_REGION_IMAGE_ID:  # skip bad regions
+                if row[FaceDetectTransform.COL_REGION_IDX] != FaceDetectTransform.VAL_REGION_IMAGE_ID:  # skip bad regions
                     local_image['regions'].append([row[FaceDetectTransform.COL_FACE_X], row[FaceDetectTransform.COL_FACE_Y],
                                                    row[FaceDetectTransform.COL_FACE_W], row[FaceDetectTransform.COL_FACE_H]])
             return_set.append(local_image)
@@ -139,10 +140,8 @@ class RegionTransform(BaseEstimator, ClassifierMixin):
         blockHeight = round(blockSize * ratio)  # so that we cover all image
         for x in range(0, img.shape[0], blockSize):
             for y in range(0, img.shape[1], blockHeight):
-                max_x = min(x+blockSize, img.shape[0])
-                max_y = min(y+blockSize, img.shape[1])
-                fill_color = img[x,y] # img[x:max_x, y:max_y].mean()
+                max_x = min(x + blockSize, img.shape[0])
+                max_y = min(y + blockSize, img.shape[1])
+                fill_color = img[x, y]  # img[x:max_x, y:max_y].mean()
                 img[x:max_x, y:max_y] = fill_color
         return img
-
-# RegionTransform.__module__ = '__main__'