2 # -*- coding: utf-8 -*-
3 # ===============LICENSE_START=======================================================
5 # ===================================================================================
6 # Copyright (C) 2017-2018 AT&T Intellectual Property & Tech Mahindra. All rights reserved.
7 # ===================================================================================
8 # This Acumos software file is distributed by AT&T and Tech Mahindra
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # This file is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ===============LICENSE_END=========================================================
21 Wrapper for face detection task; wrapped in classifier for pipieline terminus
27 from sklearn.base import BaseEstimator, ClassifierMixin
32 if sys.version_info[0] < 3:
33 from cStringIO import StringIO as BytesIO
35 from io import BytesIO as BytesIO
38 class FaceDetectTransform(BaseEstimator, ClassifierMixin):
40 A sklearn transformer mixin that detects faces and optionally outputa the original detected image
42 CASCADE_DEFAULT_FILE = "data/haarcascade_frontalface_alt.xml.gz"
47 COL_REGION_IDX = 'region'
48 COL_IMAGE_IDX = 'image'
49 COL_IMAGE_MIME = 'mime_type'
50 COL_IMAGE_DATA = 'image_binary'
51 VAL_REGION_IMAGE_ID = -1
53 def __init__(self, cascade_path=None, cascade_stream=None, include_image=True):
54 self.include_image = include_image # should output transform include image?
55 self.cascade_obj = None # late-load this component
56 self.cascade_stream = cascade_stream # compressed binary final for cascade data
57 if self.cascade_stream is None:
58 if cascade_path is None: # default/included data?
59 pathRoot = os.path.dirname(os.path.abspath(__file__))
60 cascade_path = os.path.join(pathRoot, FaceDetectTransform.CASCADE_DEFAULT_FILE)
62 with open(cascade_path, 'rb') as f:
64 self.cascade_stream = {'name': os.path.basename(cascade_path),
65 'data': FaceDetectTransform.string_compress(raw_stream)}
68 def string_compress(string_data):
70 with gzip.GzipFile(fileobj=out_data, mode="wb") as f:
72 return out_data.getvalue()
75 def string_decompress(compressed_data):
76 in_data = BytesIO(compressed_data)
78 with gzip.GzipFile(fileobj=in_data, mode="rb") as f:
82 def get_params(self, deep=False):
83 return {'include_image': self.include_image, 'cascade_stream': self.cascade_stream}
86 def generate_in_df(path_image="", bin_stream=b""):
87 # munge stream and mimetype into input sample
88 if path_image and os.path.exists(path_image):
89 bin_stream = open(path_image, 'rb').read()
90 return pd.DataFrame([['image/jpeg', bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
93 def generate_out_image(row, path_image):
94 # take image row and output to disk
95 with open(path_image, 'wb') as f:
96 f.write(row[FaceDetectTransform.COL_IMAGE_DATA][0])
100 return [FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_REGION_IDX,
101 FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
102 FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
103 FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
106 def generate_out_dict(idx=VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
107 return dict(zip(FaceDetectTransform.output_names_(), [image, idx, x, y, w, h, media, bin_stream]))
110 def suppress_image(df):
111 blank_cols = [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
112 # set columns that aren't in our known column list to empty strings; search where face index==-1 (no face)
113 df[blank_cols] = None
118 """Custom input type for this processing transformer"""
119 return [(FaceDetectTransform.COL_IMAGE_MIME, str), (FaceDetectTransform.COL_IMAGE_DATA, bytes)], "Image"
123 """Custom input type for this processing transformer"""
124 output_dict = FaceDetectTransform.generate_out_dict()
125 return [(k, type(output_dict[k])) for k in output_dict], "RegionDetection"
127 def score(self, X, y=None):
130 def fit(self, X, y=None):
133 def load_cascade(self):
134 # if no model exists yet, create it; return False for deserialize required
135 if self.cascade_obj is None:
136 if self.cascade_stream is not None:
138 with tempfile.TemporaryDirectory() as tdir:
139 cascade_data = FaceDetectTransform.string_decompress(self.cascade_stream['data'])
140 cascade_path = os.path.join(tdir, self.cascade_stream['name'])
141 with open(cascade_path, 'wb') as f:
142 f.write(cascade_data)
143 self.cascade_obj = cv2.CascadeClassifier(cascade_path)
147 def predict(self, X, y=None):
149 Assumes a numpy array of [[mime_type, binary_string] ... ]
150 where mime_type is an image-specifying mime type and binary_string is the raw image bytes
152 self.load_cascade() # JIT load model
154 for image_idx in range(len(X)):
155 image_byte = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
156 if type(image_byte) == str:
157 image_byte = image_byte.encode()
158 image_byte = base64.b64decode(image_byte)
159 image_byte = bytearray(image_byte)
160 file_bytes = np.asarray(image_byte, dtype=np.uint8)
161 img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
162 # img = cv2.imread(image_set[1])
163 faces = self.detect_faces(img)
165 if self.include_image: # create and append the image if that's requested
166 listData.append(FaceDetectTransform.generate_out_dict(w=img.shape[1], h=img.shape[0], image=image_idx,
167 media=X[FaceDetectTransform.COL_IMAGE_MIME][image_idx],
168 bin_stream=X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]))
169 for idxF in range(len(faces)): # walk through detected faces
170 face_rect = faces[idxF]
171 listData.append(FaceDetectTransform.generate_out_dict(idxF, x=face_rect[0], y=face_rect[1],
172 w=face_rect[2], h=face_rect[3], image=image_idx))
173 # print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
175 return pd.DataFrame(listData, columns=FaceDetectTransform.output_names_()) # start with empty DF for this image
177 def detect_faces(self, img):
178 if self.cascade_obj is None:
180 gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
182 faces = self.cascade_obj.detectMultiScale(
187 flags=cv2.CASCADE_SCALE_IMAGE
190 # Draw a rectangle around the faces
191 # for (x, y, w, h) in faces:
192 # cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)