- first pass as rename and dependency update
[face-privacy-filter.git] / face_privacy_filter / transform_region.py
1 #! python
2 # -*- coding: utf-8 -*-
3 """
4 Wrapper for region processing task; wrapped in classifier for pipieline terminus
5 """
6 import cv2
7 import pandas as pd
8 import numpy as np
9 from sklearn.base import BaseEstimator, ClassifierMixin
10 import base64
11
12 # NOTE: If this class were built in another model (e.g. another vendor, class, etc), we would need to
13 #       *exactly match* the i/o for the upstream (detection) and downstream (this processing)
14 # from face_privacy_filter.transform_detect import RegionTransform
15
16 from face_privacy_filter.transform_detect import FaceDetectTransform
17
18
19 class RegionTransform(BaseEstimator, ClassifierMixin):
20     '''
21     A sklearn classifier mixin that manpulates image content based on input
22     '''
23     CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
24
25     def __init__(self, transform_mode="pixelate"):
26         self.transform_mode = transform_mode    # specific image processing mode to utilize
27
28     def get_params(self, deep=False):
29         return {'transform_mode': self.transform_mode}
30
31     @staticmethod
32     def generate_out_df(media_type="", bin_stream=b""):
33         return pd.DataFrame([[media_type, bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
34
35     @staticmethod
36     def generate_in_df(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
37         return pd.DataFrame([], RegionTransform.generate_in_dict(idx=idx, x=x, y=y, h=h, w=w, image=image, bin_stream=bin_stream, media=media))
38
39     @staticmethod
40     def generate_in_dict(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
41         return FaceDetectTransform.generate_out_dict(idx=idx, x=x, y=y, h=h, w=w, image=image, bin_stream=bin_stream, media=media)
42
43     @property
44     def _acumos_type_in(self):
45         """Custom input type for this processing transformer"""
46         from acumos.modeling import List, create_namedtuple
47         # base input for detect is image itself
48         input_dict = RegionTransform.generate_in_dict()
49         tuple_types = [(k, type(input_dict[k])) for k in input_dict]
50         # base output for detect is several parts
51         DetectionRow = create_namedtuple('DetectionRow', tuple_types)
52         # represents a collection of flattened detect arrays
53         return List[DetectionRow]
54
55     @property
56     def _acumos_type_out(self):
57         """Custom input type for this processing transformer"""
58         from acumos.modeling import List, create_namedtuple
59         # base input for detect is image itself
60         ImageRow = create_namedtuple('ImageRow', [(FaceDetectTransform.COL_IMAGE_MIME, str),
61                                                   (FaceDetectTransform.COL_IMAGE_DATA, bytes)])
62         # represents a collection of flattened image arrays
63         return List[ImageRow]
64
65     def score(self, X, y=None):
66         return 0
67
68     def fit(self, X, y=None):
69         return self
70
71     def predict(self, X, y=None):
72         """
73         Assumes a numpy array of [[mime_type, binary_string] ... ]
74            where mime_type is an image-specifying mime type and binary_string is the raw image bytes
75         """
76
77         # group by image index first
78         #   decode image at region -1
79         #   collect all remaining regions, operate with each on input image
80         #   generate output image, send to output
81
82         dfReturn = None
83         image_region_list = RegionTransform.transform_raw_sample(X)
84         for image_data in image_region_list:
85             # print(image_data)
86             img = image_data['data']
87             for r in image_data['regions']:  # loop through regions
88                 x_max = min(r[0] + r[2], img.shape[1])
89                 y_max = min(r[1] + r[3], img.shape[0])
90                 if self.transform_mode == "pixelate":
91                     img[r[1]:y_max, r[0]:x_max] = \
92                         RegionTransform.pixelate_image(img[r[1]:y_max, r[0]:x_max])
93
94             # for now, we hard code to jpg output; TODO: add more encoding output (or try to match source?)
95             img_binary = cv2.imencode(".jpg", img)[1].tostring()
96             img_mime = 'image/jpeg'  # image_data['mime']
97
98             df = RegionTransform.generate_out_df(media_type=img_mime, bin_stream=img_binary)
99             if dfReturn is None:  # create an NP container for all images
100                 dfReturn = df.reindex_axis(self.output_names_, axis=1)
101             else:
102                 dfReturn = dfReturn.append(df, ignore_index=True)
103             print("IMAGE {:} found {:} total rows".format(image_data['image'], len(df)))
104         return dfReturn
105
106     @staticmethod
107     def transform_raw_sample(raw_sample):
108         """Method to transform raw samples into dict of image and regions"""
109         raw_sample.sort_values([FaceDetectTransform.COL_IMAGE_IDX], ascending=True, inplace=True)
110         groupImage = raw_sample.groupby(FaceDetectTransform.COL_IMAGE_IDX)
111         return_set = []
112
113         for nameG, rowsG in groupImage:
114             local_image = {'image': -1, 'data': b"", 'regions': [], 'mime': ''}
115             image_row = rowsG[rowsG[FaceDetectTransform.COL_REGION_IDX] == FaceDetectTransform.VAL_REGION_IMAGE_ID]
116             if len(image_row) < 1:  # must have at least one image set
117                 print("Error: RegionTransform could not find a valid image reference for image set {:}".format(nameG))
118                 continue
119             if not len(image_row[FaceDetectTransform.COL_IMAGE_DATA]):  # must have valid image data
120                 print("Error: RegionTransform expected image data, but found empty binary string {:}".format(nameG))
121                 continue
122             image_byte = image_row[FaceDetectTransform.COL_IMAGE_DATA][0]
123             if type(image_byte) == str:
124                 image_byte = image_byte.encode()
125             image_byte = bytearray(base64.b64decode(image_byte))
126             file_bytes = np.asarray(image_byte, dtype=np.uint8)
127             local_image['data'] = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
128             local_image['image'] = nameG
129             local_image['mime'] = image_row[FaceDetectTransform.COL_IMAGE_MIME]
130
131             # now proceed to loop around regions detected
132             for index, row in rowsG.iterrows():
133                 if row[FaceDetectTransform.COL_REGION_IDX] != FaceDetectTransform.VAL_REGION_IMAGE_ID:  # skip bad regions
134                     local_image['regions'].append([row[FaceDetectTransform.COL_FACE_X], row[FaceDetectTransform.COL_FACE_Y],
135                                                    row[FaceDetectTransform.COL_FACE_W], row[FaceDetectTransform.COL_FACE_H]])
136             return_set.append(local_image)
137         return return_set
138
139     ################################################################
140     # image processing routines (using opencv)
141
142     # http://www.jeffreythompson.org/blog/2012/02/18/pixelate-and-posterize-in-processing/
143     @staticmethod
144     def pixelate_image(img, blockSize=None):
145         if not img.shape[0] or not img.shape[1]:
146             return img
147         if blockSize is None:
148             blockSize = round(max(img.shape[0], img.shape[2]) / 8)
149         ratio = (img.shape[1] / img.shape[0]) if img.shape[0] < img.shape[1] else (img.shape[0] / img.shape[1])
150         blockHeight = round(blockSize * ratio)  # so that we cover all image
151         for x in range(0, img.shape[0], blockSize):
152             for y in range(0, img.shape[1], blockHeight):
153                 max_x = min(x + blockSize, img.shape[0])
154                 max_y = min(y + blockSize, img.shape[1])
155                 fill_color = img[x, y]  # img[x:max_x, y:max_y].mean()
156                 img[x:max_x, y:max_y] = fill_color
157         return img