2 # -*- coding: utf-8 -*-
4 Wrapper for region processing task; wrapped in classifier for pipieline terminus
9 from sklearn.base import BaseEstimator, ClassifierMixin
12 # NOTE: If this class were built in another model (e.g. another vendor, class, etc), we would need to
13 # *exactly match* the i/o for the upstream (detection) and downstream (this processing)
14 # from face_privacy_filter.transform_detect import RegionTransform
16 from face_privacy_filter.transform_detect import FaceDetectTransform
19 class RegionTransform(BaseEstimator, ClassifierMixin):
21 A sklearn classifier mixin that manpulates image content based on input
23 CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
25 def __init__(self, transform_mode="pixelate"):
26 self.transform_mode = transform_mode # specific image processing mode to utilize
28 def get_params(self, deep=False):
29 return {'transform_mode': self.transform_mode}
33 return [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
36 def generate_out_dict(bin_stream=b"", media=""):
37 return {FaceDetectTransform.COL_IMAGE_MIME: media, FaceDetectTransform.COL_IMAGE_DATA: bin_stream}
40 def generate_in_df(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
41 return pd.DataFrame([], RegionTransform.generate_in_dict(idx=idx, x=x, y=y, h=h, w=w, image=image, bin_stream=bin_stream, media=media))
44 def generate_in_dict(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
45 return FaceDetectTransform.generate_out_dict(idx=idx, x=x, y=y, h=h, w=w, image=image, bin_stream=bin_stream, media=media)
49 """Custom input type for this processing transformer"""
50 input_dict = RegionTransform.generate_in_dict()
51 return {k: type(input_dict[k]) for k in input_dict}, "DetectionFrames"
55 """Custom input type for this processing transformer"""
56 return {FaceDetectTransform.COL_IMAGE_MIME: str, FaceDetectTransform.COL_IMAGE_DATA: bytes}, "TransformedImage"
58 def score(self, X, y=None):
61 def fit(self, X, y=None):
64 def predict(self, X, y=None):
66 Assumes a numpy array of [[mime_type, binary_string] ... ]
67 where mime_type is an image-specifying mime type and binary_string is the raw image bytes
70 # group by image index first
71 # decode image at region -1
72 # collect all remaining regions, operate with each on input image
73 # generate output image, send to output
75 image_region_list = RegionTransform.transform_raw_sample(X)
77 for image_data in image_region_list:
78 img = image_data['data']
79 for r in image_data['regions']: # loop through regions
80 x_max = min(r[0] + r[2], img.shape[1])
81 y_max = min(r[1] + r[3], img.shape[0])
82 if self.transform_mode == "pixelate":
83 img[r[1]:y_max, r[0]:x_max] = \
84 RegionTransform.pixelate_image(img[r[1]:y_max, r[0]:x_max])
86 # for now, we hard code to jpg output; TODO: add more encoding output (or try to match source?)
87 img_binary = cv2.imencode(".jpg", img)[1].tostring()
88 img_mime = 'image/jpeg' # image_data['mime']
90 listData.append(RegionTransform.generate_out_dict(media=img_mime, bin_stream=img_binary))
91 print("IMAGE {:} found {:} total rows".format(image_data['image'], len(image_data['regions'])))
92 return pd.DataFrame(listData, columns=RegionTransform.output_names_())
95 def transform_raw_sample(raw_sample):
96 """Method to transform raw samples into dict of image and regions"""
97 raw_sample.sort_values([FaceDetectTransform.COL_IMAGE_IDX], ascending=True, inplace=True)
98 groupImage = raw_sample.groupby(FaceDetectTransform.COL_IMAGE_IDX)
101 for nameG, rowsG in groupImage:
102 local_image = {'image': -1, 'data': b"", 'regions': [], 'mime': ''}
103 image_row = rowsG[rowsG[FaceDetectTransform.COL_REGION_IDX] == FaceDetectTransform.VAL_REGION_IMAGE_ID]
104 if len(image_row) < 1: # must have at least one image set
105 print("Error: RegionTransform could not find a valid image reference for image set {:}".format(nameG))
107 if not len(image_row[FaceDetectTransform.COL_IMAGE_DATA]): # must have valid image data
108 print("Error: RegionTransform expected image data, but found empty binary string {:}".format(nameG))
110 image_byte = image_row[FaceDetectTransform.COL_IMAGE_DATA][0]
111 if type(image_byte) == str:
112 image_byte = image_byte.encode()
113 image_byte = bytearray(base64.b64decode(image_byte))
115 image_byte = bytearray(image_byte)
116 file_bytes = np.asarray(image_byte, dtype=np.uint8)
117 local_image['data'] = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
118 local_image['image'] = nameG
119 local_image['mime'] = image_row[FaceDetectTransform.COL_IMAGE_MIME]
121 # now proceed to loop around regions detected
122 for index, row in rowsG.iterrows():
123 if row[FaceDetectTransform.COL_REGION_IDX] != FaceDetectTransform.VAL_REGION_IMAGE_ID: # skip bad regions
124 local_image['regions'].append([row[FaceDetectTransform.COL_FACE_X], row[FaceDetectTransform.COL_FACE_Y],
125 row[FaceDetectTransform.COL_FACE_W], row[FaceDetectTransform.COL_FACE_H]])
126 return_set.append(local_image)
129 ################################################################
130 # image processing routines (using opencv)
132 # http://www.jeffreythompson.org/blog/2012/02/18/pixelate-and-posterize-in-processing/
134 def pixelate_image(img, blockSize=None):
135 if not img.shape[0] or not img.shape[1]:
137 if blockSize is None:
138 blockSize = round(max(img.shape[0], img.shape[2]) / 8)
139 ratio = (img.shape[1] / img.shape[0]) if img.shape[0] < img.shape[1] else (img.shape[0] / img.shape[1])
140 blockHeight = round(blockSize * ratio) # so that we cover all image
141 for x in range(0, img.shape[0], blockSize):
142 for y in range(0, img.shape[1], blockHeight):
143 max_x = min(x + blockSize, img.shape[0])
144 max_y = min(y + blockSize, img.shape[1])
145 fill_color = img[x, y] # img[x:max_x, y:max_y].mean()
146 img[x:max_x, y:max_y] = fill_color