Source code for bob.learn.linear.GFK

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Tiago de Freitas Pereira <>


Implementing the algorithm Geodesic Flow Kernel to do transfer learning from the modality A to modality B from the paper

Gong, Boqing, et al. "Geodesic flow kernel for unsupervised domain adaptation." Computer Vision and Pattern Recognition (CVPR), 2012 IEEE Conference on. IEEE, 2012.

A very good explanation can be found here


import numpy
import numpy.matlib
import scipy.linalg

import logging

logger = logging.getLogger("bob.learn.linear")

def null_space(A, eps=1e-20):
    Computes the left null space of `A`.
    The left null space of A is the orthogonal complement to the column space of A.

    U, S, V = scipy.linalg.svd(A)

    padding = max(0, numpy.shape(A)[1] - numpy.shape(S)[0])
    null_mask = numpy.concatenate(((S <= eps), numpy.ones((padding,), dtype=bool)), axis=0)
    null_s = scipy.compress(null_mask, V, axis=0)
    return scipy.transpose(null_s)

[docs]class GFKMachine(object): """ Geodesic flow Kernel (GFK) Machine. This is output of the :py:class:`bob.learn.linear.GFKTrainer` """ def __init__(self, hdf5=None): """ Constructor **Parameters** hdf5: :py:class:`` If is not None, will read all the content of the HDF5File """ self.source_machine = None self.target_machine = None self.G = None if isinstance(hdf5, self.load(hdf5)
[docs] def load(self, hdf5): """ Loads the machine from the given HDF5 file **Parameters** hdf5: :py:class:`` An HDF5 file opened for reading """ assert isinstance(hdf5, # read PCA projector"source_machine") self.source_machine = bob.learn.linear.Machine(hdf5)"..")"target_machine") self.target_machine = bob.learn.linear.Machine(hdf5)"..") self.G = hdf5.get("G")
[docs] def save(self, hdf5): """ Saves the machine to the given HDF5 file **Parameters** hdf5: :py:class:`` An HDF5 file opened for writing """ hdf5.create_group("source_machine")"source_machine")"..") hdf5.create_group("target_machine")"target_machine")"..") hdf5.set("G", self.G)
[docs] def shape(self): """ A tuple that represents the shape of the kernel matrix **Returns** (int, int) <– The size of the weights matrix """ return self.G.shape
[docs] def compute_principal_angles(self): """ Compute the principal angles between source (:math:`P_s`) and target (:math:`P_t`) subspaces in a Grassman which is defined as the following: :math:`d^{2}(P_s, P_t) = \sum_{i}( \theta_i^{2} )`, """ Ps = self.source_machine.weights Pt = self.target_machine.weights # S = cos(theta_1, theta_2, ..., theta_n) _, S, _ = numpy.linalg.svd(, Pt)) thetas_squared = numpy.arccos(S) ** 2 return numpy.sum(thetas_squared)
[docs] def compute_binetcouchy_distance(self): """ Compute the Binet-Couchy distance between source (:math:`P_s`) and target (:math:`P_t`) subspaces in a Grassman which is defined as the following: :math:`d(P_s, P_t) = 1 - (det(P_{s}^{T} * P_{t}^{T}))^{2}` """ # Preparing the source Ps = self.source_machine.weights Rs = null_space(Ps.T) Y1 = numpy.hstack((Ps, Rs)) # Preraring the target Pt = self.target_machine.weights Rt = null_space(Pt.T) Y2 = numpy.hstack((Pt, Rt)) return 1 - numpy.linalg.det(, Y2)) ** 2
def __call__(self, source_domain_data, target_domain_data): """ Compute dot product in the infinity space using the trainer Kernel (G) **Parameters** source_domain_data: :py:func:`numpy.array` Data from the source domain target_domain_data: :py:func:`numpy.array` Data from the target domain """ source_domain_data = ( source_domain_data - self.source_machine.input_subtract) / self.source_machine.input_divide target_domain_data = ( target_domain_data - self.target_machine.input_subtract) / self.target_machine.input_divide return, self.G), target_domain_data.T)[0]
[docs]class GFKTrainer(object): """ Trains the Geodesic Flow Kernel (GFK) that models the domain shift from a certain source linear subspace :math:`P_S` to a certain target linear subspaces :math:`P_T`. GFK models the source domain and the target domain with d-dimensional linear subspaces and embeds them onto a Grassmann manifold. Specifically, let denote the basis of the PCA subspaces for each of the two domains, respectively. The Grassmann manifold :math:`G(d,D)` is the collection of all d-dimensional subspaces of the feature vector space :math:`\mathbb{R}^D`. The geodesic flow :math:`\phi(t)` between :math:`P_S, P_T` on the manifold parameterizes a path connecting the two subspaces. In the beginning of the flow, the subspace is similar to that of the source domain and in the end of the flow, the subspace is similar to that of the target. The original feature :math:`x` is projected into these subspaces and forms a feature vector of infinite dimensions: :math:`z^{\infty} = \phi(t)^T x: t \in [0, 1]`. Using the new feature representation for learning, will force the classifiers to NOT lean towards either the source domain or the target domain, or in other words, will force the classifier to use domain-invariant features. The infinite-dimensional feature vector is handled conveniently by their inner product that gives rise to a positive semidefinite kernel defined on the original features, :math:`G(x_i, x_j) = x_{i}^T \int_0^1 \! \phi(t)\phi(t)^T \, \mathrm{d}t x_{j} = x_i^T G x_j`. The matrix G can be computed efficiently using singular value decomposition. Moreover, computing the kernel does not require any labeled data. More details can be found in: Gong, Boqing, et al. "Geodesic flow kernel for unsupervised domain adaptation." Computer Vision and Pattern Recognition (CVPR), 2012 IEEE Conference on. IEEE, 2012. A very good intuition can be found in: **Constructor Documentation:** - **bob.learn.linear.GFKTrainer** (number_of_subspaces, subspace_dim_source, subspace_dim_target, eps) **Parameters** number_of_subspaces: `int` Number of subspaces for the transfer learning. If set to -1, this value will be estimated automatically. For more information check, Section 3.4. subspace_dim_source: `float` Energy kept in the source linear subspace subspace_dim_target: `float` Energy kept in the target linear subspace eps: `float` Floor value """ def __init__(self, number_of_subspaces=-1, subspace_dim_source=0.99, subspace_dim_target=0.99, eps=1e-20): """ Constructor **Parameters** number_of_subspaces: `int` Number of subspaces for the transfer learning subspace_dim_source: `float` Energy kept in the source linear subspace subspace_dim_target: `float` Energy kept in the target linear subspace eps: `float` Floor value """ self.m_number_of_subspaces = number_of_subspaces self.m_subspace_dim_source = subspace_dim_source self.m_subspace_dim_target = subspace_dim_target self.eps = eps
[docs] def get_best_d(self, Ps, Pt, Pst): """ Get the best value for the number of subspaces For more details, read section 3.4 of the paper. **Parameters** Ps: Source subspace Pt: Target subspace Pst: Source + Target subspace """ def compute_angles(A, B): _, S, _ = numpy.linalg.svd(, B)) S[numpy.where(numpy.isclose(S ,1, atol=self.eps)==True)[0]] = 1 return numpy.arccos(S) max_d = min(Ps.shape[1], Pt.shape[1], Pst.shape[1] ) alpha_d = compute_angles(Ps, Pst) beta_d = compute_angles(Pt, Pst) d = 0.5 * ( numpy.sin(alpha_d) + numpy.sin(beta_d)) return numpy.argmax(d)
[docs] def train(self, source_data, target_data, norm_inputs=True): """ Trains the GFK (:py:class:`bob.learn.linear.GFKMachine`) **Parameters** source_data: :py:func:`numpy.array` Data from the source domain target_data: :py:func:`numpy.array` Data from the target domain **Returns** machine: :py:class:`bob.learn.linear.GFKMachine` """ source_data = source_data.astype("float64") target_data = target_data.astype("float64") if(self.m_number_of_subspaces == -1): source_target = numpy.vstack((source_data, target_data)) norm_inputs = True" -> Automatic search for d. We set norm_inputs=True")" -> Normalizing data per modality") if norm_inputs: source, mu_source, std_source = self._znorm(source_data) target, mu_target, std_target = self._znorm(target_data) else: mu_source = numpy.zeros(shape=(source_data.shape[1])) std_source = numpy.ones(shape=(source_data.shape[1])) mu_target = numpy.zeros(shape=(target_data.shape[1])) std_target = numpy.ones(shape=(target_data.shape[1])) source = source_data target = target_data" -> Computing PCA for the source modality") Ps = self._train_pca(source, mu_source, std_source, self.m_subspace_dim_source)" -> Computing PCA for the target modality") Pt = self._train_pca(target, mu_target, std_target, self.m_subspace_dim_target) # self.m_machine ="/idiap/user/tpereira/gitlab/workspace_HTFace/GFK.hdf5") # If -1, let's compute the optimal value for d if(self.m_number_of_subspaces == -1):" -> Computing the best value for m_number_of_subspaces") source_target, mu_source_target, std_source_target = self._znorm(source_target) Pst = self._train_pca(source_target, mu_source_target, std_source_target, min(self.m_subspace_dim_target, self.m_subspace_dim_source)) self.m_number_of_subspaces = self.get_best_d(Pst.weights, Ps.weights, Pt.weights)" -> Best m_number_of_subspaces is {0}".format(self.m_number_of_subspaces)) G = self._train_gfk(numpy.hstack((Ps.weights, null_space(Ps.weights.T))), Pt.weights[:, 0:self.m_number_of_subspaces]) machine = GFKMachine() machine.source_machine = Ps machine.target_machine = Pt machine.G = G return machine
def _train_gfk(self, Ps, Pt): """ """ N = Ps.shape[1] dim = Pt.shape[1] # Principal angles between subspaces QPt =, Pt) # [V1,V2,V,Gam,Sig] = gsvd(QPt(1:dim,:), QPt(dim+1:end,:)); A = QPt[0:dim, :].copy() B = QPt[dim:, :].copy() # Equation (2) [V1, V2, V, Gam, Sig] = bob.math.gsvd(A, B) V2 = -V2 # Some sanity checks with the GSVD I = numpy.eye(V1.shape[1]) I_check =, Gam) +, Sig) assert numpy.sum(abs(I - I_check)) < 1e-10 theta = numpy.arccos(numpy.diagonal(Gam)) # Equation (6) B1 = numpy.diag(0.5 * (1 + (numpy.sin(2 * theta) / (2. * numpy.maximum (theta, self.eps))))) B2 = numpy.diag(0.5 * ((numpy.cos(2 * theta) - 1) / (2 * numpy.maximum( theta, self.eps)))) B3 = B2 B4 = numpy.diag(0.5 * (1 - (numpy.sin(2 * theta) / (2. * numpy.maximum (theta, self.eps))))) # Equation (9) of the suplementary matetial delta1_1 = numpy.hstack((V1, numpy.zeros(shape=(dim, N - dim)))) delta1_2 = numpy.hstack((numpy.zeros(shape=(N - dim, dim)), V2)) delta1 = numpy.vstack((delta1_1, delta1_2)) delta2_1 = numpy.hstack((B1, B2, numpy.zeros(shape=(dim, N - 2 * dim)))) delta2_2 = numpy.hstack((B3, B4, numpy.zeros(shape=(dim, N - 2 * dim)))) delta2_3 = numpy.zeros(shape=(N - 2 * dim, N)) delta2 = numpy.vstack((delta2_1, delta2_2, delta2_3)) delta3_1 = numpy.hstack((V1, numpy.zeros(shape=(dim, N - dim)))) delta3_2 = numpy.hstack((numpy.zeros(shape=(N - dim, dim)), V2)) delta3 = numpy.vstack((delta3_1, delta3_2)).T delta =, delta2), delta3) G =, delta), Ps.T) return G def _train_pca(self, data, mu_data, std_data, subspace_dim): t = bob.learn.linear.PCATrainer() machine, variances = t.train(data) # For re-shaping, we need to copy... variances = variances.copy() # compute variance percentage, if desired if isinstance(subspace_dim, float): cummulated = numpy.cumsum(variances) / numpy.sum(variances) for index in range(len(cummulated)): if cummulated[index] > subspace_dim: subspace_dim = index break subspace_dim = index" ... Keeping %d PCA dimensions", subspace_dim) machine.resize(machine.shape[0], subspace_dim) machine.input_subtract = mu_data machine.input_divide = std_data return machine def _znorm(self, data): """ Z-Normaliza """ mu = numpy.average(data, axis=0) std = numpy.std(data, axis=0) data = (data - mu) / std return data, mu, std