21a44dc174a34c7fd8aaeda128ce7138f1dae941
[face-privacy-filter.git] / face_privacy_filter / transform_detect.py
1 #! python
2 # -*- coding: utf-8 -*-
3 # ===============LICENSE_START=======================================================
4 # Acumos Apache-2.0
5 # ===================================================================================
6 # Copyright (C) 2017-2018 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
7 # ===================================================================================
8 # This Acumos software file is distributed by AT&T and Tech Mahindra
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # This file is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ===============LICENSE_END=========================================================
20 """
21 Wrapper for face detection task; wrapped in classifier for pipieline terminus
22 """
23 import cv2
24 import os
25 import pandas as pd
26 import numpy as np
27 from sklearn.base import BaseEstimator, ClassifierMixin
28 import base64
29
30 import gzip
31 import sys
32 if sys.version_info[0] < 3:
33     from cStringIO import StringIO as BytesIO
34 else:
35     from io import BytesIO as BytesIO
36
37
38 class FaceDetectTransform(BaseEstimator, ClassifierMixin):
39     '''
40     A sklearn transformer mixin that detects faces and optionally outputa the original detected image
41     '''
42     CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
43     COL_FACE_X = 'x'
44     COL_FACE_Y = 'y'
45     COL_FACE_W = 'w'
46     COL_FACE_H = 'h'
47     COL_REGION_IDX = 'region'
48     COL_IMAGE_IDX = 'image'
49     COL_IMAGE_MIME = 'mime_type'
50     COL_IMAGE_DATA = 'image_binary'
51     VAL_REGION_IMAGE_ID = -1
52
53     def __init__(self, cascade_path=None, cascade_stream=None, include_image=True):
54         self.include_image = include_image    # should output transform include image?
55         self.cascade_obj = None  # late-load this component
56         self.cascade_stream = cascade_stream    # compressed binary final for cascade data
57         if self.cascade_stream is None:
58             if cascade_path is None:   # default/included data?
59                 pathRoot = os.path.dirname(os.path.abspath(__file__))
60                 cascade_path = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
61             raw_stream = b""
62             with open(cascade_path, 'rb') as f:
63                 raw_stream = f.read()
64                 self.cascade_stream = {'name': os.path.basename(cascade_path),
65                                        'data': FaceDetectTransform.string_compress(raw_stream)}
66
67     @staticmethod
68     def string_compress(string_data):
69         out_data = BytesIO()
70         with gzip.GzipFile(fileobj=out_data, mode="wb") as f:
71             f.write(string_data)
72         return out_data.getvalue()
73
74     @staticmethod
75     def string_decompress(compressed_data):
76         in_data = BytesIO(compressed_data)
77         ret_str = None
78         with gzip.GzipFile(fileobj=in_data, mode="rb") as f:
79             ret_str = f.read()
80         return ret_str
81
82     def get_params(self, deep=False):
83         return {'include_image': self.include_image, 'cascade_stream': self.cascade_stream}
84
85     @staticmethod
86     def generate_in_df(path_image="", bin_stream=b""):
87         # munge stream and mimetype into input sample
88         if path_image and os.path.exists(path_image):
89             bin_stream = open(path_image, 'rb').read()
90         return pd.DataFrame([['image/jpeg', bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
91
92     @staticmethod
93     def generate_out_image(row, path_image):
94         # take image row and output to disk
95         with open(path_image, 'wb') as f:
96             f.write(row[FaceDetectTransform.COL_IMAGE_DATA][0])
97
98     @staticmethod
99     def output_names_():
100         return [FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_REGION_IDX,
101                 FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
102                 FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
103                 FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
104
105     @staticmethod
106     def generate_out_dict(idx=VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
107         return dict(zip(FaceDetectTransform.output_names_(), [image, idx, x, y, w, h, media, bin_stream]))
108
109     @staticmethod
110     def suppress_image(df):
111         blank_cols = [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
112         # set columns that aren't in our known column list to empty strings; search where face index==-1 (no face)
113         df[blank_cols] = None
114         return df
115
116     @property
117     def _type_in(self):
118         """Custom input type for this processing transformer"""
119         return [(FaceDetectTransform.COL_IMAGE_MIME, str), (FaceDetectTransform.COL_IMAGE_DATA, bytes)], "Image"
120
121     @property
122     def _type_out(self):
123         """Custom input type for this processing transformer"""
124         output_dict = FaceDetectTransform.generate_out_dict()
125         return [(k, type(output_dict[k])) for k in output_dict], "DetectionFrame"
126
127     def score(self, X, y=None):
128         return 0
129
130     def fit(self, X, y=None):
131         return self
132
133     def load_cascade(self):
134         # if no model exists yet, create it; return False for deserialize required
135         if self.cascade_obj is None:
136             if self.cascade_stream is not None:
137                 import tempfile
138                 with tempfile.TemporaryDirectory() as tdir:
139                     cascade_data = FaceDetectTransform.string_decompress(self.cascade_stream['data'])
140                     cascade_path = os.path.join(tdir, self.cascade_stream['name'])
141                     with open(cascade_path, 'wb') as f:
142                         f.write(cascade_data)
143                     self.cascade_obj = cv2.CascadeClassifier(cascade_path)
144             return False
145         return True
146
147     def predict(self, X, y=None):
148         """
149         Assumes a numpy array of [[mime_type, binary_string] ... ]
150            where mime_type is an image-specifying mime type and binary_string is the raw image bytes
151         """
152         self.load_cascade()  # JIT load model
153         listData = []
154         for image_idx in range(len(X)):
155             image_byte = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
156             if type(image_byte) == str:
157                 image_byte = image_byte.encode()
158                 image_byte = base64.b64decode(image_byte)
159             image_byte = bytearray(image_byte)
160             file_bytes = np.asarray(image_byte, dtype=np.uint8)
161             img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
162             # img = cv2.imread(image_set[1])
163             faces = self.detect_faces(img)
164
165             if self.include_image:  # create and append the image if that's requested
166                 listData.append(FaceDetectTransform.generate_out_dict(w=img.shape[1], h=img.shape[0], image=image_idx,
167                                                                       media=X[FaceDetectTransform.COL_IMAGE_MIME][image_idx],
168                                                                       bin_stream=X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]))
169             for idxF in range(len(faces)):  # walk through detected faces
170                 face_rect = faces[idxF]
171                 listData.append(FaceDetectTransform.generate_out_dict(idxF, x=face_rect[0], y=face_rect[1],
172                                                                       w=face_rect[2], h=face_rect[3], image=image_idx))
173             # print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
174
175         return pd.DataFrame(listData, columns=FaceDetectTransform.output_names_())  # start with empty DF for this image
176
177     def detect_faces(self, img):
178         if self.cascade_obj is None:
179             return []
180         gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
181
182         faces = self.cascade_obj.detectMultiScale(
183             gray,
184             scaleFactor=1.1,
185             minNeighbors=5,
186             minSize=(30, 30),
187             flags=cv2.CASCADE_SCALE_IMAGE
188         )
189
190         # Draw a rectangle around the faces
191         # for (x, y, w, h) in faces:
192         #    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
193         return faces