Merged in hotfix/refactor_docs (pull request #4)
[face-privacy-filter.git] / face_privacy_filter / transform_detect.py
1 #! python
2 # -*- coding: utf-8 -*-
3 """
4 Wrapper for face detection task; wrapped in classifier for pipieline terminus
5 """
6 import cv2
7 import os
8 import pandas as pd
9 import numpy as np
10 from sklearn.base import BaseEstimator, ClassifierMixin
11 import base64
12
13 import gzip
14 import sys
15 if sys.version_info[0] < 3:
16     from cStringIO import StringIO as BytesIO
17 else:
18     from io import BytesIO as BytesIO
19
20
21 class FaceDetectTransform(BaseEstimator, ClassifierMixin):
22     '''
23     A sklearn transformer mixin that detects faces and optionally outputa the original detected image
24     '''
25     CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
26     COL_FACE_X = 'x'
27     COL_FACE_Y = 'y'
28     COL_FACE_W = 'w'
29     COL_FACE_H = 'h'
30     COL_REGION_IDX = 'region'
31     COL_IMAGE_IDX = 'image'
32     COL_IMAGE_MIME = 'mime_type'
33     COL_IMAGE_DATA = 'image_binary'
34     VAL_REGION_IMAGE_ID = -1
35
36     def __init__(self, cascade_path=None, cascade_stream=None, include_image=True):
37         self.include_image = include_image    # should output transform include image?
38         self.cascade_obj = None  # late-load this component
39         self.cascade_stream = cascade_stream    # compressed binary final for cascade data
40         if self.cascade_stream is None:
41             if cascade_path is None:   # default/included data?
42                 pathRoot = os.path.dirname(os.path.abspath(__file__))
43                 cascade_path = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
44             raw_stream = b""
45             with open(cascade_path, 'rb') as f:
46                 raw_stream = f.read()
47                 self.cascade_stream = {'name': os.path.basename(cascade_path),
48                                        'data': FaceDetectTransform.string_compress(raw_stream)}
49
50     @staticmethod
51     def string_compress(string_data):
52         out_data = BytesIO()
53         with gzip.GzipFile(fileobj=out_data, mode="wb") as f:
54             f.write(string_data)
55         return out_data.getvalue()
56
57     @staticmethod
58     def string_decompress(compressed_data):
59         in_data = BytesIO(compressed_data)
60         ret_str = None
61         with gzip.GzipFile(fileobj=in_data, mode="rb") as f:
62             ret_str = f.read()
63         return ret_str
64
65     def get_params(self, deep=False):
66         return {'include_image': self.include_image, 'cascade_stream': self.cascade_stream}
67
68     @staticmethod
69     def generate_in_df(path_image="", bin_stream=b""):
70         # munge stream and mimetype into input sample
71         if path_image and os.path.exists(path_image):
72             bin_stream = open(path_image, 'rb').read()
73         return pd.DataFrame([['image/jpeg', bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
74
75     @staticmethod
76     def generate_out_image(row, path_image):
77         # take image row and output to disk
78         with open(path_image, 'wb') as f:
79             f.write(row[FaceDetectTransform.COL_IMAGE_DATA][0])
80
81     @staticmethod
82     def output_names_():
83         return [FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_REGION_IDX,
84                 FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
85                 FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
86                 FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
87
88     @staticmethod
89     def generate_out_dict(idx=VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
90         return dict(zip(FaceDetectTransform.output_names_(), [image, idx, x, y, w, h, media, bin_stream]))
91
92     @staticmethod
93     def suppress_image(df):
94         blank_cols = [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
95         # set columns that aren't in our known column list to empty strings; search where face index==-1 (no face)
96         df[blank_cols] = None
97         return df
98
99     @property
100     def _type_in(self):
101         """Custom input type for this processing transformer"""
102         return {FaceDetectTransform.COL_IMAGE_MIME: str, FaceDetectTransform.COL_IMAGE_DATA: bytes}, "FaceImage"
103
104     @property
105     def _type_out(self):
106         """Custom input type for this processing transformer"""
107         output_dict = FaceDetectTransform.generate_out_dict()
108         return {k: type(output_dict[k]) for k in output_dict}, "DetectionFrames"
109
110     def score(self, X, y=None):
111         return 0
112
113     def fit(self, X, y=None):
114         return self
115
116     def load_cascade(self):
117         # if no model exists yet, create it; return False for deserialize required
118         if self.cascade_obj is None:
119             if self.cascade_stream is not None:
120                 import tempfile
121                 with tempfile.TemporaryDirectory() as tdir:
122                     cascade_data = FaceDetectTransform.string_decompress(self.cascade_stream['data'])
123                     cascade_path = os.path.join(tdir, self.cascade_stream['name'])
124                     with open(cascade_path, 'wb') as f:
125                         f.write(cascade_data)
126                     self.cascade_obj = cv2.CascadeClassifier(cascade_path)
127             return False
128         return True
129
130     def predict(self, X, y=None):
131         """
132         Assumes a numpy array of [[mime_type, binary_string] ... ]
133            where mime_type is an image-specifying mime type and binary_string is the raw image bytes
134         """
135         self.load_cascade()  # JIT load model
136         dfReturn = None
137         listData = []
138         for image_idx in range(len(X)):
139             image_byte = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
140             if type(image_byte) == str:
141                 image_byte = image_byte.encode()
142                 image_byte = base64.b64decode(image_byte)
143             image_byte = bytearray(image_byte)
144             file_bytes = np.asarray(image_byte, dtype=np.uint8)
145             img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
146             # img = cv2.imread(image_set[1])
147             faces = self.detect_faces(img)
148
149             if self.include_image:  # create and append the image if that's requested
150                 listData.append(FaceDetectTransform.generate_out_dict(w=img.shape[1], h=img.shape[0], image=image_idx,
151                                                                       media=X[FaceDetectTransform.COL_IMAGE_MIME][image_idx],
152                                                                       bin_stream=X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]))
153             for idxF in range(len(faces)):  # walk through detected faces
154                 face_rect = faces[idxF]
155                 listData.append(FaceDetectTransform.generate_out_dict(idxF, x=face_rect[0], y=face_rect[1],
156                                                                       w=face_rect[2], h=face_rect[3], image=image_idx))
157             # print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
158
159         return pd.DataFrame(listData, columns=FaceDetectTransform.output_names_())  # start with empty DF for this image
160
161     def detect_faces(self, img):
162         if self.cascade_obj is None:
163             return []
164         gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
165
166         faces = self.cascade_obj.detectMultiScale(
167             gray,
168             scaleFactor=1.1,
169             minNeighbors=5,
170             minSize=(30, 30),
171             flags=cv2.CASCADE_SCALE_IMAGE
172         )
173
174         # Draw a rectangle around the faces
175         # for (x, y, w, h) in faces:
176         #    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
177         return faces