- add first pass for testing server, does not act
[face-privacy-filter.git] / face_privacy_filter / transform_detect.py
1 #! python
2 # -*- coding: utf-8 -*-
3 """
4 Wrapper for face detection task; wrapped in classifier for pipieline terminus
5 """
6 import cv2
7 import os
8 import pandas as pd
9 import numpy as np
10 from sklearn.base import BaseEstimator, ClassifierMixin
11
12 class FaceDetectTransform(BaseEstimator, ClassifierMixin):
13     '''
14     A sklearn transformer mixin that detects faces and optionally outputa the original detected image
15     '''
16     CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
17     COL_FACE_X = 'x'
18     COL_FACE_Y = 'y'
19     COL_FACE_W = 'w'
20     COL_FACE_H = 'h'
21     COL_FACE_IDX = 'region'
22     COL_IMAGE_IDX = 'image'
23     COL_IMAGE_MIME = 'mime_type'
24     COL_IMAGE_DATA = 'binary_stream'
25
26     def __init__(self, cascade_path=None, include_image=True):
27         self.include_image = include_image    # should output transform include image?
28         self.cascade_path = cascade_path    # abs path outside of module
29         self.cascade_obj = None # late-load this component
30
31     def get_params(self, deep=False):
32         return {'include_image': self.include_image}
33
34     @staticmethod
35     def generate_in_df(path_image="", bin_stream=b""):
36         # munge stream and mimetype into input sample
37         if path_image and os.path.exists(path_image):
38             bin_stream = open(path_image, 'rb').read()
39         return pd.DataFrame([['image/jpeg', bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
40
41     def generate_out_dict(self, idx=-1, x=0, y=0, w=0, h=0, image=0):
42         return {FaceDetectTransform.COL_FACE_IDX: idx, FaceDetectTransform.COL_FACE_X: x,
43                 FaceDetectTransform.COL_FACE_Y: y, FaceDetectTransform.COL_FACE_W: w, FaceDetectTransform.COL_FACE_H: h,
44                 FaceDetectTransform.COL_IMAGE_IDX: image,
45                 FaceDetectTransform.COL_IMAGE_MIME: '', FaceDetectTransform.COL_IMAGE_DATA: ''}
46
47     @staticmethod
48     def suppress_image(df):
49         keep_col = [FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
50                     FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
51                     FaceDetectTransform.COL_FACE_IDX, FaceDetectTransform.COL_IMAGE_IDX]
52         blank_cols = [col for col in df.columns if col not in keep_col]
53         # set columns that aren't in our known column list to empty strings; search where face index==-1 (no face)
54         df.loc[df[FaceDetectTransform.COL_FACE_IDX]==-1,blank_cols] = ""
55         return df
56
57     @property
58     def output_names_(self):
59         return [FaceDetectTransform.COL_FACE_IDX, FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
60                  FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
61                  FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
62
63     @property
64     def output_types_(self):
65         list_name = self.output_names_
66         list_type = self.classes_
67         return [{list_name[i]:list_type[i]} for i in range(len(list_name))]
68
69     @property
70     def n_outputs_(self):
71         return 8
72
73     @property
74     def classes_(self):
75         return [int, int, int, int, int, int, str, str]
76
77     def score(self, X, y=None):
78         return 0
79
80     def fit(self, X, y=None):
81         return self
82
83     def predict(self, X, y=None):
84         """
85         Assumes a numpy array of [[mime_type, binary_string] ... ]
86            where mime_type is an image-specifying mime type and binary_string is the raw image bytes       
87         """
88         # if no model exists yet, create it
89         if self.cascade_obj is None:
90             if self.cascade_path is not None:
91                 self.cascade_obj = cv2.CascadeClassifier(self.cascade_path)
92             else:   # none provided, load what came with the package
93                 pathRoot = os.path.dirname(os.path.abspath(__file__))
94                 pathFile = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
95                 self.cascade_obj = cv2.CascadeClassifier(pathFile)
96
97         dfReturn = None
98         for image_idx in range(len(X)):
99             # image_set = X[:, image_idx]
100             file_bytes = np.asarray(bytearray(X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]), dtype=np.uint8)
101             img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
102             # img = cv2.imread(image_set[1])
103             faces = self.detect_faces(img)
104
105             df = pd.DataFrame()  # start with empty DF for this image
106             if self.include_image:  # create and append the image if that's requested
107                 dict_image = self.generate_out_dict(w=img.shape[0], h=img.shape[1], image=image_idx)
108                 dict_image[self.mime_col] = image_set[0]
109                 dict_image[self.data_col] = image_set[1]
110                 df = pd.DataFrame([dict_image])
111             for idxF in range(len(faces)):  # walk through detected faces
112                 face_rect = faces[idxF]
113                 df = df.append(pd.DataFrame([self.generate_out_dict(idxF, face_rect[0], face_rect[1],
114                                                                     face_rect[2], face_rect[3], image=image_idx)]),
115                                ignore_index=True)
116             if dfReturn is None:  # create an NP container for all image samples + features
117                 dfReturn = df.reindex_axis(self.output_names_, axis=1)
118             else:
119                 dfReturn = dfReturn.append(df, ignore_index=True)
120             print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
121
122         return dfReturn
123
124     def detect_faces(self, img):
125         if self.cascade_obj is None: return []
126         gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
127
128         faces = self.cascade_obj.detectMultiScale(
129             gray,
130             scaleFactor=1.1,
131             minNeighbors=5,
132             minSize=(30, 30),
133             flags=cv2.CASCADE_SCALE_IMAGE
134         )
135
136         # Draw a rectangle around the faces
137         #for (x, y, w, h) in faces:
138         #    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
139         return faces