- minor addition to the web demo writeup
[face-privacy-filter.git] / face_privacy_filter / transform_region.py
1 #! python
2 # -*- coding: utf-8 -*-
3 """
4 Wrapper for region processing task; wrapped in classifier for pipieline terminus
5 """
6 import cv2
7 import os
8 import pandas as pd
9 import numpy as np
10 from sklearn.base import BaseEstimator, ClassifierMixin
11 import base64
12
13 # NOTE: If this class were built in another model (e.g. another vendor, class, etc), we would need to
14 #       *exactly match* the i/o for the upstream (detection) and downstream (this processing)
15 from face_privacy_filter.transform_detect import FaceDetectTransform
16
17 class RegionTransform(BaseEstimator, ClassifierMixin):
18     '''
19     A sklearn classifier mixin that manpulates image content based on input
20     '''
21
22     def __init__(self, transform_mode="pixelate"):
23         self.transform_mode = transform_mode    # specific image processing mode to utilize
24
25     def get_params(self, deep=False):
26         return {'transform_mode': self.transform_mode}
27
28     @staticmethod
29     def generate_out_df(media_type="", bin_stream=b""):
30         # munge stream and mimetype into input sample
31         bin_stream = base64.b64encode(bin_stream)
32         if type(bin_stream)==bytes:
33             bin_stream = bin_stream.decode()
34         return pd.DataFrame([[media_type, bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
35
36     @staticmethod
37     def generate_in_df(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
38         return pd.DataFrame([[idx,x,y,w,h,image,media,bin_stream]],
39                             columns=[FaceDetectTransform.COL_REGION_IDX, FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
40                                      FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
41                                      FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_IMAGE_MIME,
42                                      FaceDetectTransform.COL_IMAGE_DATA])
43
44     @property
45     def output_names_(self):
46         return [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
47
48     @property
49     def output_types_(self):
50         list_name = self.output_names_
51         list_type = self.classes_
52         return [{list_name[i]:list_type[i]} for i in range(len(list_name))]
53
54     @property
55     def n_outputs_(self):
56         return 8
57
58     @property
59     def classes_(self):
60         return [str, str]
61
62     def score(self, X, y=None):
63         return 0
64
65     def fit(self, X, y=None):
66         return self
67
68     def predict(self, X, y=None):
69         """
70         Assumes a numpy array of [[mime_type, binary_string] ... ]
71            where mime_type is an image-specifying mime type and binary_string is the raw image bytes       
72         """
73
74         # group by image index first
75         #   decode image at region -1
76         #   collect all remaining regions, operate with each on input image
77         #   generate output image, send to output
78
79         dfReturn = None
80         image_region_list = RegionTransform.transform_raw_sample(X)
81         for image_data in image_region_list:
82             #print(image_data)
83             img = image_data['data']
84             for r in image_data['regions']:  # loop through regions
85                 x_max = min(r[0]+r[2], img.shape[1])
86                 y_max = min(r[1]+r[3], img.shape[0])
87                 if self.transform_mode=="pixelate":
88                     img[r[1]:y_max, r[0]:x_max] = \
89                         RegionTransform.pixelate_image(img[r[1]:y_max, r[0]:x_max])
90
91             # for now, we hard code to jpg output; TODO: add more encoding output (or try to match source?)
92             img_binary = cv2.imencode(".jpg", img)[1].tostring()
93             img_mime = 'image/jpeg'  # image_data['mime']
94
95             df = RegionTransform.generate_out_df(media_type=img_mime, bin_stream=img_binary)
96             if dfReturn is None:  # create an NP container for all images
97                 dfReturn = df.reindex_axis(self.output_names_, axis=1)
98             else:
99                 dfReturn = dfReturn.append(df, ignore_index=True)
100             print("IMAGE {:} found {:} total rows".format(image_data['image'], len(df)))
101         return dfReturn
102
103     @staticmethod
104     def transform_raw_sample(raw_sample):
105         """Method to transform raw samples into dict of image and regions"""
106         raw_sample.sort_values([FaceDetectTransform.COL_IMAGE_IDX], ascending=True, inplace=True)
107         groupImage = raw_sample.groupby(FaceDetectTransform.COL_IMAGE_IDX)
108         return_set = []
109
110         for nameG, rowsG in groupImage:
111             local_image = {'image': -1, 'data': b"", 'regions': [], 'mime': ''}
112             image_row = rowsG[rowsG[FaceDetectTransform.COL_REGION_IDX]==FaceDetectTransform.VAL_REGION_IMAGE_ID]
113             if len(image_row) < 1:  # must have at least one image set
114                 print("Error: RegionTransform could not find a valid image reference for image set {:}".format(nameG))
115                 continue
116             if not len(image_row[FaceDetectTransform.COL_IMAGE_DATA]):  # must have valid image data
117                 print("Error: RegionTransform expected image data, but found empty binary string {:}".format(nameG))
118                 continue
119             image_byte = image_row[FaceDetectTransform.COL_IMAGE_DATA][0]
120             if type(image_byte)==str:
121                 image_byte = image_byte.encode()
122             image_byte = bytearray(base64.b64decode(image_byte))
123             file_bytes = np.asarray(image_byte, dtype=np.uint8)
124             local_image['data'] = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
125             local_image['image'] = nameG
126             local_image['mime'] = image_row[FaceDetectTransform.COL_IMAGE_MIME]
127
128             # now proceed to loop around regions detected
129             for index, row in rowsG.iterrows():
130                 if row[FaceDetectTransform.COL_REGION_IDX]!=FaceDetectTransform.VAL_REGION_IMAGE_ID:  # skip bad regions
131                     local_image['regions'].append([row[FaceDetectTransform.COL_FACE_X], row[FaceDetectTransform.COL_FACE_Y],
132                                                    row[FaceDetectTransform.COL_FACE_W], row[FaceDetectTransform.COL_FACE_H]])
133             return_set.append(local_image)
134         return return_set
135
136     ################################################################
137     # image processing routines (using opencv)
138
139     # http://www.jeffreythompson.org/blog/2012/02/18/pixelate-and-posterize-in-processing/
140     @staticmethod
141     def pixelate_image(img, blockSize=None):
142         if not img.shape[0] or not img.shape[1]:
143             return img
144         if blockSize is None:
145             blockSize = round(max(img.shape[0], img.shape[2]) / 8)
146         ratio = (img.shape[1] / img.shape[0]) if img.shape[0] < img.shape[1] else (img.shape[0] / img.shape[1])
147         blockHeight = round(blockSize * ratio)  # so that we cover all image
148         for x in range(0, img.shape[0], blockSize):
149             for y in range(0, img.shape[1], blockHeight):
150                 max_x = min(x+blockSize, img.shape[0])
151                 max_y = min(y+blockSize, img.shape[1])
152                 fill_color = img[x,y] # img[x:max_x, y:max_y].mean()
153                 img[x:max_x, y:max_y] = fill_color
154         return img
155
156 # RegionTransform.__module__ = '__main__'