```
usage: run_face-privacy-filter_reference.py [-h] [-p PREDICT_PATH] [-i INPUT]
- [-s] [-a PUSH_ADDRESS]
- [-d DUMP_MODEL]
+ [-c] [-s] [-f {detect,pixelate}]
+ [-a PUSH_ADDRESS] [-d DUMP_MODEL]
optional arguments:
-h, --help show this help message and exit
save detections from model (model must be provided via
'dump_model')
-i INPUT, --input INPUT
- absolute path to input image (only during prediction /
- dump)
+ absolute path to input data (image or csv, only during
+ prediction / dump)
+ -c, --csv_input input as CSV format not an image
-s, --suppress_image do not create an extra row for a returned image
+ -f {detect,pixelate}, --function {detect,pixelate}
+ which type of model to generate
-a PUSH_ADDRESS, --push_address PUSH_ADDRESS
server address to push the model (e.g.
http://localhost:8887/v2/models)
### Examples
-Example for dumping the `detect` model to disk.
+This single repo has a number of different models that can be
+composed together for operation.
+
+* Dump the `detect` model to disk.
+```
+./bin/run_local.sh -d model_detect -f detect
+```
+* Dump the `pixelate` model to disk.
+```
+./bin/run_local.sh -d model_pix -f pixelate
```
-./bin/run_local.sh -d model
+* Evaluate the `detect` model from disk and a previously produced detect object
```
+./bin/run_local.sh -d model_detect -f detect -p output.csv -i web_demo/images/face_DiCaprio.jpg
+```
+* Example for evaluating the `pixelate` model from disk and a previously produced detect object
+```
+./bin/run_local.sh -d model_pix -f pixelate -i detect.csv -p output.jpg --csv_input
+```
+
## Face-based Use Cases
# -*- coding: utf-8 -*-
-__version__ = "0.0.1"
+__version__ = "0.1.0"
MODEL_NAME = 'face_privacy_filter'
import pandas as pd
from face_privacy_filter.transform_detect import FaceDetectTransform
+from face_privacy_filter.transform_region import RegionTransform
from face_privacy_filter._version import MODEL_NAME
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--predict_path', type=str, default='', help="save detections from model (model must be provided via 'dump_model')")
- parser.add_argument('-i', '--input', type=str, default='',help='absolute path to input image (only during prediction / dump)')
+ parser.add_argument('-i', '--input', type=str, default='',help='absolute path to input data (image or csv, only during prediction / dump)')
+ parser.add_argument('-c', '--csv_input', dest='csv_input', action='store_true', default=False, help='input as CSV format not an image')
parser.add_argument('-s', '--suppress_image', dest='suppress_image', action='store_true', default=False, help='do not create an extra row for a returned image')
+ parser.add_argument('-f', '--function', type=str, default='detect',help='which type of model to generate', choices=['detect', 'pixelate'])
parser.add_argument('-a', '--push_address', help='server address to push the model (e.g. http://localhost:8887/v2/models)', default='')
parser.add_argument('-d', '--dump_model', help='dump model to a pickle directory for local running', default='')
config.update(vars(parser.parse_args())) #pargs, unparsed = parser.parse_known_args()
print("Attempting to create new model for dump or push...")
# refactor the raw samples from upstream image classifier
- transform = FaceDetectTransform(include_image=not config['suppress_image'])
+ if config['function'] == "detect":
+ transform = FaceDetectTransform(include_image=not config['suppress_image'])
+ elif config['function'] == "pixelate":
+ transform = RegionTransform()
+ else:
+ print("Error: Functional mode '{:}' unknown, aborting create".format(config['function']))
inputDf = transform.generate_in_df()
pipeline, EXTRA_DEPS = model_create_pipeline(transform, "detect")
print("Attempting predict/transform on input sample...")
from cognita_client.wrap.load import load_model
model = load_model(config['dump_model'])
- inputDf = FaceDetectTransform.generate_in_df(config['input'])
+ if not config['csv_input']:
+ inputDf = FaceDetectTransform.generate_in_df(config['input'])
+ else:
+ inputDf = pd.read_csv(config['input'], converters={FaceDetectTransform.COL_IMAGE_DATA:FaceDetectTransform.read_byte_arrays})
dfPred = model.transform.from_native(inputDf).as_native()
- dfPred = FaceDetectTransform.suppress_image(dfPred)
if config['predict_path']:
print("Writing prediction to file '{:}'...".format(config['predict_path']))
- dfPred.to_csv(config['predict_path'], sep=",", index=False)
+ if not config['csv_input']:
+ dfPred.to_csv(config['predict_path'], sep=",", index=False)
+ else:
+ FaceDetectTransform.generate_out_image(dfPred, config['predict_path'])
+ if not config['csv_input']:
+ dfPred = FaceDetectTransform.suppress_image(dfPred)
if dfPred is not None:
print("Predictions:\n{:}".format(dfPred))
COL_FACE_Y = 'y'
COL_FACE_W = 'w'
COL_FACE_H = 'h'
- COL_FACE_IDX = 'region'
+ COL_REGION_IDX = 'region'
COL_IMAGE_IDX = 'image'
COL_IMAGE_MIME = 'mime_type'
COL_IMAGE_DATA = 'binary_stream'
+ VAL_REGION_IMAGE_ID = -1
def __init__(self, cascade_path=None, include_image=True):
self.include_image = include_image # should output transform include image?
bin_stream = open(path_image, 'rb').read()
return pd.DataFrame([['image/jpeg', bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
- def generate_out_dict(self, idx=-1, x=0, y=0, w=0, h=0, image=0):
- return {FaceDetectTransform.COL_FACE_IDX: idx, FaceDetectTransform.COL_FACE_X: x,
+ @staticmethod
+ def generate_out_image(row, path_image):
+ # take image row and output to disk
+ with open(path_image, 'wb') as f:
+ f.write(row[FaceDetectTransform.COL_IMAGE_DATA][0])
+
+ @staticmethod
+ def generate_out_dict(idx=VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0):
+ return {FaceDetectTransform.COL_REGION_IDX: idx, FaceDetectTransform.COL_FACE_X: x,
FaceDetectTransform.COL_FACE_Y: y, FaceDetectTransform.COL_FACE_W: w, FaceDetectTransform.COL_FACE_H: h,
FaceDetectTransform.COL_IMAGE_IDX: image,
FaceDetectTransform.COL_IMAGE_MIME: '', FaceDetectTransform.COL_IMAGE_DATA: ''}
def suppress_image(df):
keep_col = [FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
- FaceDetectTransform.COL_FACE_IDX, FaceDetectTransform.COL_IMAGE_IDX]
+ FaceDetectTransform.COL_REGION_IDX, FaceDetectTransform.COL_IMAGE_IDX]
blank_cols = [col for col in df.columns if col not in keep_col]
# set columns that aren't in our known column list to empty strings; search where face index==-1 (no face)
- df.loc[df[FaceDetectTransform.COL_FACE_IDX]==-1,blank_cols] = ""
+ df.loc[df[FaceDetectTransform.COL_REGION_IDX]==FaceDetectTransform.VAL_REGION_IMAGE_ID,blank_cols] = ""
return df
@property
def output_names_(self):
- return [FaceDetectTransform.COL_FACE_IDX, FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
+ return [FaceDetectTransform.COL_REGION_IDX, FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
dfReturn = None
for image_idx in range(len(X)):
- # image_set = X[:, image_idx]
file_bytes = np.asarray(bytearray(X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]), dtype=np.uint8)
img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
# img = cv2.imread(image_set[1])
df = pd.DataFrame() # start with empty DF for this image
if self.include_image: # create and append the image if that's requested
- dict_image = self.generate_out_dict(w=img.shape[0], h=img.shape[1], image=image_idx)
- dict_image[self.mime_col] = image_set[0]
- dict_image[self.data_col] = image_set[1]
+ dict_image = FaceDetectTransform.generate_out_dict(w=img.shape[0], h=img.shape[1], image=image_idx)
+ dict_image[FaceDetectTransform.COL_IMAGE_MIME] = X[FaceDetectTransform.COL_IMAGE_MIME][image_idx]
+ dict_image[FaceDetectTransform.COL_IMAGE_DATA] = X[FaceDetectTransform.COL_IMAGE_DATA][image_idx]
df = pd.DataFrame([dict_image])
for idxF in range(len(faces)): # walk through detected faces
face_rect = faces[idxF]
- df = df.append(pd.DataFrame([self.generate_out_dict(idxF, face_rect[0], face_rect[1],
+ df = df.append(pd.DataFrame([FaceDetectTransform.generate_out_dict(idxF, face_rect[0], face_rect[1],
face_rect[2], face_rect[3], image=image_idx)]),
ignore_index=True)
if dfReturn is None: # create an NP container for all image samples + features
dfReturn = df.reindex_axis(self.output_names_, axis=1)
else:
dfReturn = dfReturn.append(df, ignore_index=True)
- print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
+ #print("IMAGE {:} found {:} total rows".format(image_idx, len(df)))
return dfReturn
#for (x, y, w, h) in faces:
# cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
return faces
+
+ #############################################
+ ## helper for i/o
+ @staticmethod
+ def read_byte_arrays(bytearray_string):
+ """Method to recover bytes from pandas read/cast function:
+ inputDf = pd.read_csv(config['input'], converters:{FaceDetectTransform.COL_IMAGE_DATA:FaceDetectTransform.read_byte_arrays})
+ https://stackoverflow.com/a/43024993
+ """
+ from ast import literal_eval
+ if bytearray_string.startswith("b'"):
+ return bytearray(literal_eval(bytearray_string))
+ return bytearray_string
--- /dev/null
+#! python
+# -*- coding: utf-8 -*-
+"""
+Wrapper for region processing task; wrapped in classifier for pipieline terminus
+"""
+import cv2
+import os
+import pandas as pd
+import numpy as np
+from sklearn.base import BaseEstimator, ClassifierMixin
+
+# NOTE: If this class were built in another model (e.g. another vendor, class, etc), we would need to
+# *exactly match* the i/o for the upstream (detection) and downstream (this processing)
+from face_privacy_filter.transform_detect import FaceDetectTransform
+
+class RegionTransform(BaseEstimator, ClassifierMixin):
+ '''
+ A sklearn classifier mixin that manpulates image content based on input
+ '''
+
+ def __init__(self, transform_mode="pixelate"):
+ self.transform_mode = transform_mode # specific image processing mode to utilize
+
+ def get_params(self, deep=False):
+ return {'transform_mode': self.transform_mode}
+
+ @staticmethod
+ def generate_out_df(media_type="", bin_stream=b""):
+ # munge stream and mimetype into input sample
+ return pd.DataFrame([[media_type, bin_stream]], columns=[FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA])
+
+ @staticmethod
+ def generate_in_df(idx=FaceDetectTransform.VAL_REGION_IMAGE_ID, x=0, y=0, w=0, h=0, image=0, bin_stream=b"", media=""):
+ return pd.DataFrame([[idx,x,y,w,h,image,media,bin_stream]],
+ columns=[FaceDetectTransform.COL_REGION_IDX, FaceDetectTransform.COL_FACE_X, FaceDetectTransform.COL_FACE_Y,
+ FaceDetectTransform.COL_FACE_W, FaceDetectTransform.COL_FACE_H,
+ FaceDetectTransform.COL_IMAGE_IDX, FaceDetectTransform.COL_IMAGE_MIME,
+ FaceDetectTransform.COL_IMAGE_DATA])
+
+ @property
+ def output_names_(self):
+ return [FaceDetectTransform.COL_IMAGE_MIME, FaceDetectTransform.COL_IMAGE_DATA]
+
+ @property
+ def output_types_(self):
+ list_name = self.output_names_
+ list_type = self.classes_
+ return [{list_name[i]:list_type[i]} for i in range(len(list_name))]
+
+ @property
+ def n_outputs_(self):
+ return 8
+
+ @property
+ def classes_(self):
+ return [str, str]
+
+ def score(self, X, y=None):
+ return 0
+
+ def fit(self, X, y=None):
+ return self
+
+ def predict(self, X, y=None):
+ """
+ Assumes a numpy array of [[mime_type, binary_string] ... ]
+ where mime_type is an image-specifying mime type and binary_string is the raw image bytes
+ """
+
+ # group by image index first
+ # decode image at region -1
+ # collect all remaining regions, operate with each on input image
+ # generate output image, send to output
+
+ dfReturn = None
+ image_region_list = RegionTransform.transform_raw_sample(X)
+ for image_data in image_region_list:
+ #print(image_data)
+ img = image_data['data']
+ for r in image_data['regions']: # loop through regions
+ x_max = min(r[0]+r[2], img.shape[0])
+ y_max = min(r[1]+r[3], img.shape[1])
+ if self.transform_mode=="pixelate":
+ block_size = round(max(img.shape[0], img.shape[2])/15)
+ img[r[0]:x_max, r[1]:y_max] = \
+ RegionTransform.pixelate_image(img[r[0]:x_max, r[1]:y_max], block_size)
+
+ # for now, we hard code to jpg output; TODO: add more encoding output (or try to match source?)
+ img_binary = cv2.imencode(".jpg", img)[1].tostring()
+ img_mime = 'image/jpeg' # image_data['mime']
+
+ df = RegionTransform.generate_out_df(media_type=img_mime, bin_stream=img_binary)
+ if dfReturn is None: # create an NP container for all images
+ dfReturn = df.reindex_axis(self.output_names_, axis=1)
+ else:
+ dfReturn = dfReturn.append(df, ignore_index=True)
+ print("IMAGE {:} found {:} total rows".format(image_data['image'], len(df)))
+ return dfReturn
+
+ @staticmethod
+ def transform_raw_sample(raw_sample):
+ """Method to transform raw samples into dict of image and regions"""
+ raw_sample.sort_values([FaceDetectTransform.COL_IMAGE_IDX], ascending=True, inplace=True)
+ groupImage = raw_sample.groupby(FaceDetectTransform.COL_IMAGE_IDX)
+ return_set = []
+
+ for nameG, rowsG in groupImage:
+ local_image = {'image': -1, 'data': b"", 'regions': [], 'mime': ''}
+ image_row = rowsG[rowsG[FaceDetectTransform.COL_REGION_IDX]==FaceDetectTransform.VAL_REGION_IMAGE_ID]
+ if len(image_row) < 1: # must have at least one image set
+ print("Error: RegionTransform could not find a valid image reference for image set {:}".format(nameG))
+ continue
+ if not len(image_row[FaceDetectTransform.COL_IMAGE_DATA]): # must have valid image data
+ print("Error: RegionTransform expected image data, but found empty binary string {:}".format(nameG))
+ continue
+ file_bytes = np.asarray(image_row[FaceDetectTransform.COL_IMAGE_DATA][0], dtype=np.uint8)
+ local_image['data'] = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)
+ local_image['image'] = nameG
+ local_image['mime'] = image_row[FaceDetectTransform.COL_IMAGE_MIME]
+
+ # now proceed to loop around regions detected
+ for index, row in rowsG.iterrows():
+ if row[FaceDetectTransform.COL_REGION_IDX]!=FaceDetectTransform.VAL_REGION_IMAGE_ID: # skip bad regions
+ local_image['regions'].append([row[FaceDetectTransform.COL_FACE_X], row[FaceDetectTransform.COL_FACE_Y],
+ row[FaceDetectTransform.COL_FACE_W], row[FaceDetectTransform.COL_FACE_H]])
+ return_set.append(local_image)
+ return return_set
+
+ ################################################################
+ # image processing routines (using opencv)
+
+ # http://www.jeffreythompson.org/blog/2012/02/18/pixelate-and-posterize-in-processing/
+ @staticmethod
+ def pixelate_image(img, blockSize):
+ ratio = (img.shape[1] / img.shape[0]) if img.shape[0] < img.shape[1] else (img.shape[0] / img.shape[1])
+ blockHeight = round(blockSize * ratio) # so that we cover all image
+ for x in range(0, img.shape[0], blockSize):
+ for y in range(0, img.shape[1], blockHeight):
+ max_x = min(x+blockSize, img.shape[0])
+ max_y = min(y+blockSize, img.shape[1])
+ fill_color = img[x,y] # img[x:max_x, y:max_y].mean()
+ img[x:max_x, y:max_y] = fill_color
+ return img
+