# -*- coding: utf-8 -*-
# ---------------------------- IMPORTS ---------------------------- #
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
# multiprocessing
from builtins import zip
from builtins import object
import itertools as it
from multiprocessing.pool import ThreadPool as Pool
# three-party
import cv2
import numpy as np
# custom
from .config import MANAGER, FLAG_DEBUG
#from cache import memoize
from .arrayops import SimKeyPoint, normsigmoid
# ---------------------------- GLOBALS ---------------------------- #
NO_CPUs = cv2.getNumberOfCPUs()
if FLAG_DEBUG: print("configured to use {} CPUs".format(NO_CPUs))
pool = Pool(processes = NO_CPUs) # DO NOT USE IT when module is imported and this runs with it. It creates a deadlock"
feature_name = 'sift-flann'
# ----------------------------SPECIALIZED FUNCTIONS---------------------------- #
[docs]class Feature(object):
"""
Class to manage detection and computation of features
:param pool: multiprocessing pool (dummy, it uses multithreading)
:param useASIFT: if True adds Affine perspectives to the detector.
:param debug: if True prints to the stdout debug messages.
"""
def __init__(self,pool=pool,useASIFT = True, debug = True):
self.pool=pool
self.detector = None
self.matcher = None
self.useASIFT = useASIFT
self.debug = debug
[docs] def detectAndCompute(self, img, mask=None):
"""
detect keypoints and descriptors
:param img: image to find keypoints and its descriptors
:param mask: mask to detect keypoints (it uses default, mask[:] = 255)
:return: keypoints,descriptors
"""
# bulding parameters of tilt and rotation variations
if self.useASIFT:
params = [(1.0, 0.0)] # first tilt and rotation
# phi rotations for t tilts of the image
for t in 2**(0.5*np.arange(1,6)):
for phi in np.arange(0, 180, 72.0 / t):
params.append((t, phi))
def helper(param):
t, phi = param #tilt, phi (rotation)
# computing the affine transform
timg, tmask, Ai = affine_skew(t, phi, img, mask) # get tilted image, mask and transformation
# Find keypoints and descriptors with the detector
keypoints, descrs = self.detector.detectAndCompute(timg, tmask) # use detector
for kp in keypoints:
x, y = kp.pt # get actual keypoints
kp.pt = tuple( np.dot(Ai, (x, y, 1)) ) # transform keypoints to original img
if descrs is None: descrs = [] # faster than: descrs or []
return keypoints, descrs
if self.pool is None:
try:
ires = it.imap(helper, params) # process asynchronously
except AttributeError: # compatibility with python 3
ires = map(helper,params)
else:
ires = self.pool.imap(helper, params) # process asynchronously in pool
keypoints, descrs = [], []
for i, (k, d) in enumerate(ires):
keypoints.extend(k)
descrs.extend(d)
if self.debug: print('affine sampling: %d / %d\r' % (i+1, len(params)), end=' ')
else:
keypoints, descrs = self.detector.detectAndCompute(img, mask) # use detector
keypoints = [getattr(SimKeyPoint(obj),"__dict__") for obj in keypoints] # convert to dictionaries
#return keyPoint2tuple(keypoints), np.array(descrs)
return keypoints, np.array(descrs)
[docs] def config(self, name, separator = "-"):
"""
This function takes parameters from a command to initialize a detector and matcher.
:param name: "[a-]<sift|surf|orb>[-flann]" (str) Ex: "a-sift-flann"
:param separator: separator character
:return: detector, matcher
"""
# Here is agood explanation for all the decisions in the features and matchers
# http://docs.opencv.org/3.0-beta/doc/py_tutorials/py_feature2d/py_matcher/py_matcher.html
FLANN_INDEX_KDTREE = 1 # bug: flann enums are missing
FLANN_INDEX_LSH = 6
chunks = name.split(separator)
index = 0
if chunks[index].lower() == "a":
self.useASIFT = True
index+=1
if chunks[index].lower() == 'sift':
try: # opencv 2
detector = cv2.SIFT() # Scale-invariant feature transform
except AttributeError: # opencv 3
detector = cv2.xfeatures2d.SIFT_create()
norm = cv2.NORM_L2 # distance measurement to be used
elif chunks[index].lower() == 'surf':
try: # opencv 2
detector = cv2.SURF() # Hessian Threshold to 800, 500 # http://stackoverflow.com/a/18891668/5288758
except AttributeError: # opencv 3
detector = cv2.xfeatures2d.SURF_create()
# http://docs.opencv.org/2.4/modules/nonfree/doc/feature_detection.html
norm = cv2.NORM_L2 # distance measurement to be used
elif chunks[index].lower() == 'orb':
try: # opencv 2
detector = cv2.ORB() # around 400, binary string based descriptors
except AttributeError: # opencv 3
detector = cv2.ORB_create()
norm = cv2.NORM_HAMMING # Hamming distance
elif chunks[index].lower() == 'brisk':
try: # opencv 2
detector = cv2.BRISK()
except AttributeError: # opencv 3
detector = cv2.BRISK_create()
norm = cv2.NORM_HAMMING # Hamming distance
else:
raise Exception("name '{}' with detector '{}' not valid".format(name,chunks[index]))
index +=1
if len(chunks)-1 >= index and chunks[index].lower() == 'flann':
# FLANN based Matcher, Fast Approximate Nearest Neighbor Search Library
if norm == cv2.NORM_L2: # for SIFT ans SURF
flann_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
else: # for ORB
flann_params= dict(algorithm = FLANN_INDEX_LSH,
table_number = 6, # 12
key_size = 12, # 20
multi_probe_level = 1) #2
matcher = cv2.FlannBasedMatcher(flann_params, {}) # bug : need to pass empty dict (#1329)
else: # brute force matcher
matcher = cv2.BFMatcher(norm) # difference in norm http://stackoverflow.com/a/32849908/5288758
self.detector, self.matcher = detector, matcher
return detector, matcher
[docs]def init_feature(name, separator = "-", features = {}): # dictionary caches the features
"""
This function takes parameters from a command to initialize a detector and matcher.
:param name: "<sift|surf|orb>[-flann]" (str) Ex: "sift-flann"
:param separator: separator character
:param features: it is a dictionary containing the mapping from name to the
initialized detector, matcher pair. If None it is created.
This feature is to reduce time by reusing created features.
:return: detector, matcher
"""
FLANN_INDEX_KDTREE = 1 # bug: flann enums are missing
FLANN_INDEX_LSH = 6
if features is None: features = {} # reset features
if name not in features: # if called with a different name
chunks = name.split(separator)
index = 0
if chunks[index].lower() == 'sift':
try: # opencv 2
detector = cv2.SIFT() # Scale-invariant feature transform
except AttributeError: # opencv 3
detector = cv2.xfeatures2d.SIFT_create()
norm = cv2.NORM_L2 # distance measurement to be used
elif chunks[index].lower() == 'surf':
try: # opencv 2
detector = cv2.SURF() # Hessian Threshold to 800, 500 # http://stackoverflow.com/a/18891668/5288758
except AttributeError: # opencv 3
detector = cv2.xfeatures2d.SURF_create()
# http://docs.opencv.org/2.4/modules/nonfree/doc/feature_detection.html
norm = cv2.NORM_L2 # distance measurement to be used
elif chunks[index].lower() == 'orb':
try: # opencv 2
detector = cv2.ORB() # around 400, binary string based descriptors
except AttributeError: # opencv 3
detector = cv2.ORB_create()
norm = cv2.NORM_HAMMING # Hamming distance
elif chunks[index].lower() == 'brisk':
try: # opencv 2
detector = cv2.BRISK()
except AttributeError: # opencv 3
detector = cv2.BRISK_create()
norm = cv2.NORM_HAMMING # Hamming distance
else:
raise Exception("name '{}' with detector '{}' not valid".format(name,chunks[index]))
index +=1
if len(chunks)-1 >= index and chunks[index].lower() == 'flann':
# FLANN based Matcher, Fast Approximate Nearest Neighbor Search Library
if norm == cv2.NORM_L2: # for SIFT ans SURF
flann_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
else: # for ORB
flann_params= dict(algorithm = FLANN_INDEX_LSH,
table_number = 6, # 12
key_size = 12, # 20
multi_probe_level = 1) #2
matcher = cv2.FlannBasedMatcher(flann_params, {}) # bug : need to pass empty dict (#1329)
else: # brute force matcher
matcher = cv2.BFMatcher(norm) # difference in norm http://stackoverflow.com/a/32849908/5288758
features[name] = detector, matcher # cache detector and matcher
return features[name] # get buffered: detector, matcher
[docs]def affine_skew(tilt, phi, img, mask=None):
"""
Increase robustness to descriptors by calculating other invariant perspectives to image.
:param tilt: tilting of image
:param phi: rotation of image (in degrees)
:param img: image to find Affine transforms
:param mask: mask to detect keypoints (it uses default, mask[:] = 255)
:return: skew_img, skew_mask, Ai (invert Affine Transform)
Ai - is an affine transform matrix from skew_img to img
"""
h, w = img.shape[:2] # get 2D shape
if mask is None:
mask = np.zeros((h, w), np.uint8)
mask[:] = 255
A = np.float32([[1, 0, 0], [0, 1, 0]]) # init Transformation matrix
if phi != 0.0: # simulate rotation
phi = np.deg2rad(phi) # convert degrees to radian
s, c = np.sin(phi), np.cos(phi) # get sine, cosine components
A = np.float32([[c,-s], [ s, c]]) # build partial Transformation matrix
corners = [[0, 0], [w, 0], [w, h], [0, h]] # use corners
tcorners = np.int32( np.dot(corners, A.T) ) # transform corners
x, y, w, h = cv2.boundingRect(tcorners.reshape(1,-1,2)) # get translations
A = np.hstack([A, [[-x], [-y]]]) # finish Transformation matrix build
img = cv2.warpAffine(img, A, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE)
if tilt != 1.0:
s = 0.8*np.sqrt(tilt*tilt-1) # get sigma
img = cv2.GaussianBlur(img, (0, 0), sigmaX=s, sigmaY=0.01) # blur image with gaussian blur
img = cv2.resize(img, (0, 0), fx=1.0/tilt, fy=1.0, interpolation=cv2.INTER_NEAREST) # resize
A[0] /= tilt
if phi != 0.0 or tilt != 1.0:
h, w = img.shape[:2] # get new 2D shape
mask = cv2.warpAffine(mask, A, (w, h), flags=cv2.INTER_NEAREST) # also get mask transformation
Ai = cv2.invertAffineTransform(A)
return img, mask, Ai
#@memoize(MANAGER["TEMPPATH"],ignore=["pool"])
[docs]def ASIFT(feature_name, img, mask=None, pool=pool):
"""
asift(feature_name, img, mask=None, pool=None) -> keypoints, descrs
Apply a set of affine transformations to the image, detect keypoints and
reproject them into initial image coordinates.
See http://www.ipol.im/pub/algo/my_affine_sift/ for the details.
ThreadPool object may be passed to speedup the computation.
:param feature_name: feature name to create detector.
:param img: image to find keypoints and its descriptors
:param mask: mask to detect keypoints (it uses default, mask[:] = 255)
:param pool: multiprocessing pool (dummy, it uses multithreading)
:return: keypoints,descriptors
"""
# bulding parameters of tilt and rotation variations
detector = init_feature(feature_name)[0] # it must get detector object of cv2 here to prevent conflict with memoizers
params = [(1.0, 0.0)] # first tilt and rotation
# phi rotations for t tilts of the image
for t in 2**(0.5*np.arange(1,6)):
for phi in np.arange(0, 180, 72.0 / t):
params.append((t, phi))
def helper(param):
t, phi = param #tilt, phi (rotation)
# computing the affine transform
timg, tmask, Ai = affine_skew(t, phi, img, mask) # get tilted image, mask and transformation
# Find keypoints and descriptors with the detector
keypoints, descrs = detector.detectAndCompute(timg, tmask) # use detector
for kp in keypoints:
x, y = kp.pt # get actual keypoints
kp.pt = tuple( np.dot(Ai, (x, y, 1)) ) # transform keypoints to original img
if descrs is None: descrs = [] # faster than: descrs or []
return keypoints, descrs
if pool is None:
ires = it.imap(helper, params) # process asynchronously
else:
ires = pool.imap(helper, params) # process asynchronously in pool
keypoints, descrs = [], []
for i, (k, d) in enumerate(ires):
keypoints.extend(k)
descrs.extend(d)
if FLAG_DEBUG: print('affine sampling: %d / %d\r' % (i+1, len(params)), end=' ')
keypoints = [getattr(SimKeyPoint(obj),"__dict__") for obj in keypoints] # convert to dictionaries
#return keyPoint2tuple(keypoints), np.array(descrs)
return keypoints, np.array(descrs)
[docs]def ASIFT_iter(imgs, feature_name=feature_name):
"""
Affine-SIFT for N images.
:param imgs: images to apply asift
:param feature_name: eg. SIFT SURF ORB
:return: [(kp1,desc1),...,(kpN,descN)]
"""
#print 'imgf - %d features, imgb - %d features' % (len(kp1), len(kp2))
for img in imgs: yield ASIFT(feature_name, img, pool=pool)
[docs]def ASIFT_multiple(imgs, feature_name=feature_name):
"""
Affine-SIFT for N images.
:param imgs: images to apply asift
:param feature_name: eg. SIFT SURF ORB
:return: [(kp1,desc1),...,(kpN,descN)]
"""
#print 'imgf - %d features, imgb - %d features' % (len(kp1), len(kp2))
return [ASIFT(feature_name, img, pool=pool) for img in imgs]
[docs]def filter_matches(kp1, kp2, matches, ratio = 0.75):
"""
This function applies a ratio test.
:param kp1: raw keypoints 1
:param kp2: raw keypoints 2
:param matches: raw matches
:param ratio: filtering ratio of distance
:return: filtered keypoint 1, filtered keypoint 2, keypoint pairs
"""
mkp1, mkp2 = [], [] # initialize matched keypoints
for m in matches:
if len(m) == 2 and m[0].distance < m[1].distance * ratio: # by Hamming distance
m = m[0]
mkp1.append( kp1[m.queryIdx] ) # keypoint with Index of the descriptor in query descriptors
mkp2.append( kp2[m.trainIdx] ) # keypoint with Index of the descriptor in train descriptors
p1 = np.float32([kp["pt"] for kp in mkp1])
p2 = np.float32([kp["pt"] for kp in mkp2])
return p1, p2, list(zip(mkp1, mkp2)) # p1, p2, kp_pairs
#@memoize(MANAGER["TEMPPATH"])
[docs]def MATCH(feature_name,kp1,desc1,kp2,desc2):
"""
Use matcher and asift output to obtain Transformation matrix (TM).
:param feature_name: feature name to create detector. It is the same used in the detector
which is used in init_feature function but the detector itself is ignored.
e.g. if 'detector' uses BFMatcher, if 'detector-flann' uses FlannBasedMatcher.
:param kp1: keypoints of source image
:param desc1: descriptors of kp1
:param kp2: keypoints of destine image
:param desc2: descriptors of kp2
:return: TM
"""
# http://docs.opencv.org/3.0-beta/doc/py_tutorials/py_feature2d/py_feature_homography/py_feature_homography.html
matcher = init_feature(feature_name)[1] # it must get matcher object of cv2 here to prevent conflict with memoizers
# BFMatcher.knnMatch() returns k best matches where k is specified by the user
raw_matches = matcher.knnMatch(desc1, trainDescriptors = desc2, k = 2) #2
# If k=2, it will draw two match-lines for each keypoint.
# So we have to pass a status if we want to selectively draw it.
p1, p2, kp_pairs = filter_matches(kp1, kp2, raw_matches) #ratio test of 0.75
if len(p1) >= 4:
H, status = cv2.findHomography(p1, p2, cv2.RANSAC, 5.0) # status specifies the inlier and outlier points
if FLAG_DEBUG: print('%d / %d inliers/matched' % (np.sum(status), len(status)))
# do not draw outliers (there will be a lot of them)
#kp_pairs = [kpp for kpp, flag in zip(kp_pairs, status) if flag] # uncomment to give only good kp_pairs
else:
H, status = None, None
if FLAG_DEBUG: print('%d matches found, not enough for homography estimation' % len(p1))
return H, status, kp_pairs
[docs]def MATCH_multiple(pairlist, feature_name=feature_name):
"""
:param pairlist: list of keypoint and descriptors pair e.g. [(kp1,desc1),...,(kpN,descN)]
:param feature_name: feature name to create detector
:return: [(H1, mask1, kp_pairs1),....(HN, maskN, kp_pairsN)]
"""
kp1,desc1 = pairlist[0]
return [MATCH(feature_name,kp1,desc1,kpN,descN) for kpN,descN in pairlist[1:]]
[docs]def inlineRatio(inlines,lines, thresh = 30):
"""
Probability that a match was correct.
:param inlines: number of matched lines
:param lines: number lines
:param thresh: threshold for lines (i.e. very low probability <= thresh < good probability)
:return:
"""
return (inlines/lines)*normsigmoid(lines,30,thresh) # less than 30 are below 0.5