2 # -*- coding: utf-8 -*-
4 Wrapper for face detection task; wrapped in classifier for pipieline terminus
10 from sklearn.base import BaseEstimator, ClassifierMixin
13 class FaceDetectTransform(BaseEstimator, ClassifierMixin):
15 A sklearn transformer mixin that detects faces and optionally outputa the original detected image
17 CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
22 COL_REGION_IDX = 'region'
23 COL_IMAGE_IDX = 'image'
24 COL_IMAGE_MIME = 'mime_type'
25 COL_IMAGE_DATA = 'base64_data'
26 VAL_REGION_IMAGE_ID = -1
28 def __init__(self, cascade_path=None, include_image=True):
29 self.include_image = include_image # should output transform include image?
30 self.cascade_path = cascade_path # abs path outside of module
31 self.cascade_obj = None # late-load this component
33 def get_params(self, deep=False):
34 return {'include_image': self.include_image}
37 def generate_in_df(path_image="", bin_stream=b""):
38 # munge stream and mimetype into input sample
39 if path_image and os.path.exists(path_image):
40 bin_stream = open(path_image, 'rb').read()
41 bin_stream = base64.b64encode(bin_stream)
42 if type(bin_stream) == bytes:
43 bin_stream = bin_stream.decode()
44 return pd.DataFrame([['image/jpeg', bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
47 def generate_out_image(row, path_image):
48 # take image row and output to disk
49 with open(path_image, 'wb') as f:
50 f.write(row[FaceDetectTransform.COL_IMAGE_DATA][0])
53 def generate_out_dict(idx=VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0):
54 return {FaceDetectTransform.COL_REGION_IDX: idx, FaceDetectTransform.COL_FACE_X: x,
55 FaceDetectTransform.COL_FACE_Y: y, FaceDetectTransform.COL_FACE_W: w, FaceDetectTransform.COL_FACE_H: h,
56 FaceDetectTransform.COL_IMAGE_IDX: image,
57 FaceDetectTransform.COL_IMAGE_MIME: '', FaceDetectTransform.COL_IMAGE_DATA: ''}
60 def suppress_image(df):
61 keep_col = [FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
62 FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
63 FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
64 FaceDetectTransform.COL_REGION_IDX, FaceDetectTransform.COL_IMAGE_IDX]
65 blank_cols = [col for col in df.columns if col not in keep_col]
66 # set columns that aren't in our known column list to empty strings; search where face index==-1 (no face)
67 df.loc[df[FaceDetectTransform.COL_REGION_IDX]==FaceDetectTransform.VAL_REGION_IMAGE_ID,blank_cols] = ""
71 def output_names_(self):
72 return [FaceDetectTransform.COL_REGION_IDX, FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
73 FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
74 FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
77 def output_types_(self):
78 list_name = self.output_names_
79 list_type = self.classes_
80 return [{list_name[i]:list_type[i]} for i in range(len(list_name))]
88 return [int, int, int, int, int, int, str, str]
90 def score(self, X, y=None):
93 def fit(self, X, y=None):
96 def predict(self, X, y=None):
98 Assumes a numpy array of [[mime_type, binary_string] ... ]
99 where mime_type is an image-specifying mime type and binary_string is the raw image bytes
101 # if no model exists yet, create it
102 if self.cascade_obj is None:
103 if self.cascade_path is not None:
104 self.cascade_obj = cv2.CascadeClassifier(self.cascade_path)
105 else: # none provided, load what came with the package
106 pathRoot = os.path.dirname(os.path.abspath(__file__))
107 pathFile = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
108 self.cascade_obj = cv2.CascadeClassifier(pathFile)
111 for image_idx in range(len(X)):
112 image_byte = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
113 if type(image_byte)==str:
114 image_byte = image_byte.encode()
115 image_byte = bytearray(base64.b64decode(image_byte))
116 file_bytes = np.asarray(image_byte, dtype=np.uint8)
117 img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
118 # img = cv2.imread(image_set[1])
119 faces = self.detect_faces(img)
121 df = pd.DataFrame() # start with empty DF for this image
122 if self.include_image: # create and append the image if that's requested
123 dict_image = FaceDetectTransform.generate_out_dict(w=img.shape[1], h=img.shape[0], image=image_idx)
124 dict_image[FaceDetectTransform.COL_IMAGE_MIME] = X[FaceDetectTransform.COL_IMAGE_MIME][image_idx]
125 dict_image[FaceDetectTransform.COL_IMAGE_DATA] = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
126 df = pd.DataFrame([dict_image])
127 for idxF in range(len(faces)): # walk through detected faces
128 face_rect = faces[idxF]
129 df = df.append(pd.DataFrame([FaceDetectTransform.generate_out_dict(idxF, face_rect[0], face_rect[1],
130 face_rect[2], face_rect[3], image=image_idx)]),
132 if dfReturn is None: # create an NP container for all image samples + features
133 dfReturn = df.reindex_axis(self.output_names_, axis=1)
135 dfReturn = dfReturn.append(df, ignore_index=True)
136 #print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
140 def detect_faces(self, img):
141 if self.cascade_obj is None: return []
142 gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
144 faces = self.cascade_obj.detectMultiScale(
149 flags=cv2.CASCADE_SCALE_IMAGE
152 # Draw a rectangle around the faces
153 #for (x, y, w, h) in faces:
154 # cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
157 # FaceDetectTransform.__module__ = '__main__'