- fix i/o to match more consistent dataframe
[face-privacy-filter.git] / face_privacy_filter / transform_detect.py
1 #! python
2 # -*- coding: utf-8 -*-
3 """
4 Wrapper for face detection task; wrapped in classifier for pipieline terminus
5 """
6 import cv2
7 import os
8 import pandas as pd
9 import numpy as np
10 from sklearn.base import BaseEstimator, ClassifierMixin
11 import base64
12
13
14 class FaceDetectTransform(BaseEstimator, ClassifierMixin):
15     '''
16     A sklearn transformer mixin that detects faces and optionally outputa the original detected image
17     '''
18     CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
19     COL_FACE_X = 'x'
20     COL_FACE_Y = 'y'
21     COL_FACE_W = 'w'
22     COL_FACE_H = 'h'
23     COL_REGION_IDX = 'region'
24     COL_IMAGE_IDX = 'image'
25     COL_IMAGE_MIME = 'mime_type'
26     COL_IMAGE_DATA = 'image_binary'
27     VAL_REGION_IMAGE_ID = -1
28
29     def __init__(self, cascade_path=None, include_image=True):
30         self.include_image = include_image    # should output transform include image?
31         self.cascade_path = cascade_path    # abs path outside of module
32         self.cascade_obj = None  # late-load this component
33
34     def get_params(self, deep=False):
35         return {'include_image': self.include_image}
36
37     @staticmethod
38     def generate_in_df(path_image="", bin_stream=b""):
39         # munge stream and mimetype into input sample
40         if path_image and os.path.exists(path_image):
41             bin_stream = open(path_image, 'rb').read()
42         return pd.DataFrame([['image/jpeg', bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
43
44     @staticmethod
45     def generate_out_image(row, path_image):
46         # take image row and output to disk
47         with open(path_image, 'wb') as f:
48             f.write(row[FaceDetectTransform.COL_IMAGE_DATA][0])
49
50     @staticmethod
51     def generate_out_dict(idx=VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
52         return {FaceDetectTransform.COL_IMAGE_IDX: image, FaceDetectTransform.COL_REGION_IDX: idx,
53                 FaceDetectTransform.COL_FACE_X: x, FaceDetectTransform.COL_FACE_Y: y,
54                 FaceDetectTransform.COL_FACE_W: w, FaceDetectTransform.COL_FACE_H: h,
55                 FaceDetectTransform.COL_IMAGE_MIME: media, FaceDetectTransform.COL_IMAGE_DATA: bin_stream}
56
57     @staticmethod
58     def suppress_image(df):
59         blank_cols = [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
60         # set columns that aren't in our known column list to empty strings; search where face index==-1 (no face)
61         df[blank_cols] = None
62         return df
63
64     @property
65     def _type_in(self):
66         """Custom input type for this processing transformer"""
67         return {FaceDetectTransform.COL_IMAGE_MIME: str, FaceDetectTransform.COL_IMAGE_DATA: bytes}, "FaceImage"
68
69     @property
70     def _type_out(self):
71         """Custom input type for this processing transformer"""
72         output_dict = FaceDetectTransform.generate_out_dict()
73         return {k: type(output_dict[k]) for k in output_dict}, "DetectionFrames"
74
75     def score(self, X, y=None):
76         return 0
77
78     def fit(self, X, y=None):
79         return self
80
81     def predict(self, X, y=None):
82         """
83         Assumes a numpy array of [[mime_type, binary_string] ... ]
84            where mime_type is an image-specifying mime type and binary_string is the raw image bytes
85         """
86         # if no model exists yet, create it
87         if self.cascade_obj is None:
88             if self.cascade_path is not None:
89                 self.cascade_obj = cv2.CascadeClassifier(self.cascade_path)
90             else:   # none provided, load what came with the package
91                 pathRoot = os.path.dirname(os.path.abspath(__file__))
92                 pathFile = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
93                 self.cascade_obj = cv2.CascadeClassifier(pathFile)
94
95         dfReturn = None
96         for image_idx in range(len(X)):
97             image_byte = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
98             if type(image_byte) == str:
99                 image_byte = image_byte.encode()
100                 image_byte = base64.b64decode(image_byte)
101             image_byte = bytearray(image_byte)
102             file_bytes = np.asarray(image_byte, dtype=np.uint8)
103             img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
104             # img = cv2.imread(image_set[1])
105             faces = self.detect_faces(img)
106
107             df = pd.DataFrame()  # start with empty DF for this image
108             if self.include_image:  # create and append the image if that's requested
109                 dict_image = FaceDetectTransform.generate_out_dict(w=img.shape[1], h=img.shape[0], image=image_idx)
110                 dict_image[FaceDetectTransform.COL_IMAGE_MIME] = X[FaceDetectTransform.COL_IMAGE_MIME][image_idx]
111                 dict_image[FaceDetectTransform.COL_IMAGE_DATA] = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
112                 df = pd.DataFrame([dict_image])
113             for idxF in range(len(faces)):  # walk through detected faces
114                 face_rect = faces[idxF]
115                 df = df.append(pd.DataFrame([FaceDetectTransform.generate_out_dict(idxF, face_rect[0], face_rect[1],
116                                                                                    face_rect[2], face_rect[3], image=image_idx)]),
117                                ignore_index=True)
118             if dfReturn is None:  # create an NP container for all image samples + features
119                 dfReturn = df   # df.reindex_axis(self.output_names_, axis=1)
120             else:
121                 dfReturn = dfReturn.append(df, ignore_index=True)
122             # print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
123
124         return dfReturn
125
126     def detect_faces(self, img):
127         if self.cascade_obj is None:
128             return []
129         gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
130
131         faces = self.cascade_obj.detectMultiScale(
132             gray,
133             scaleFactor=1.1,
134             minNeighbors=5,
135             minSize=(30, 30),
136             flags=cv2.CASCADE_SCALE_IMAGE
137         )
138
139         # Draw a rectangle around the faces
140         # for (x, y, w, h) in faces:
141         #    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
142         return faces