#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Laurent El Shafey <Laurent.El-Shafey@idiap.ch>
import bob.core
import bob.io.base
import bob.learn.linear
import bob.learn.em
import numpy
from .Tool import Tool
from .. import utils
[docs]class PLDA (Tool):
"""Tool chain for computing PLDA (over PCA-dimensionality reduced) features"""
def __init__(
self,
subspace_dimension_of_f, # Size of subspace F
subspace_dimension_of_g, # Size of subspace G
subspace_dimension_pca = None, # if given, perform PCA on data and reduce the PCA subspace to the given dimension
plda_training_iterations = 200, # Maximum number of iterations for the EM loop
# TODO: refactor the remaining parameters!
INIT_SEED = 5489, # seed for initializing
INIT_F_METHOD = 'BETWEEN_SCATTER',
INIT_G_METHOD = 'WITHIN_SCATTER',
INIT_S_METHOD = 'VARIANCE_DATA',
multiple_probe_scoring = 'joint_likelihood'
):
"""Initializes the local (PCA-)PLDA tool chain with the given file selector object"""
# call base class constructor and register that this class requires training for enrollment
Tool.__init__(
self,
requires_enroller_training = True,
subspace_dimension_of_f = subspace_dimension_of_f, # Size of subspace F
subspace_dimension_of_g = subspace_dimension_of_g, # Size of subspace G
subspace_dimension_pca = subspace_dimension_pca, # if given, perform PCA on data and reduce the PCA subspace to the given dimension
plda_training_iterations = plda_training_iterations, # Maximum number of iterations for the EM loop
# TODO: refactor the remaining parameters!
INIT_SEED = INIT_SEED, # seed for initializing
INIT_F_METHOD = str(INIT_F_METHOD),
INIT_G_METHOD = str(INIT_G_METHOD),
INIT_S_METHOD =str(INIT_S_METHOD),
multiple_probe_scoring = multiple_probe_scoring,
multiple_model_scoring = None
)
self.m_subspace_dimension_of_f = subspace_dimension_of_f
self.m_subspace_dimension_of_g = subspace_dimension_of_g
self.m_subspace_dimension_pca = subspace_dimension_pca
self.m_plda_training_iterations = plda_training_iterations
self.m_score_set = {'joint_likelihood': 'joint_likelihood', 'average':numpy.average, 'min':min, 'max':max}[multiple_probe_scoring]
# TODO: refactor
self.m_init = (INIT_SEED, INIT_F_METHOD, INIT_G_METHOD, INIT_S_METHOD)
def __train_pca__(self, training_set):
"""Trains and returns a LinearMachine that is trained using PCA"""
data_list = []
for client in training_set:
for feature in client:
# Appends in the array
data_list.append(feature)
data = numpy.vstack(data_list)
utils.info(" -> Training LinearMachine using PCA ")
t = bob.learn.linear.PCATrainer()
machine, __eig_vals = t.train(data)
# limit number of pcs
machine.resize(machine.shape[0], self.m_subspace_dimension_pca)
return machine
def __perform_pca_client__(self, machine, client):
"""Perform PCA on an array"""
client_data_list = []
for feature in client:
# project data
projected_feature = numpy.ndarray(machine.shape[1], numpy.float64)
machine(feature, projected_feature)
# add data in new array
client_data_list.append(projected_feature)
client_data = numpy.vstack(client_data_list)
return client_data
def __perform_pca__(self, machine, training_set):
"""Perform PCA on data"""
data = []
for client in training_set:
client_data = self.__perform_pca_client__(machine, client)
data.append(client_data)
return data
[docs] def train_enroller(self, training_features, projector_file):
"""Generates the PLDA base model from a list of arrays (one per identity),
and a set of training parameters. If PCA is requested, it is trained on the same data.
Both the trained PLDABase and the PCA machine are written."""
# train PCA and perform PCA on training data
if self.m_subspace_dimension_pca is not None:
self.m_pca_machine = self.__train_pca__(training_features)
training_features = self.__perform_pca__(self.m_pca_machine, training_features)
input_dimension = training_features[0].shape[1]
utils.info(" -> Training PLDA base machine")
# create trainer
trainer = bob.learn.em.PLDATrainer()
trainer.init_f_method = self.m_init[1]
trainer.init_g_method = self.m_init[2]
trainer.init_sigma_method = self.m_init[3]
# train machine
self.m_plda_base = bob.learn.em.PLDABase(input_dimension, self.m_subspace_dimension_of_f, self.m_subspace_dimension_of_g)
bob.learn.em.train(trainer, self.m_plda_base, training_features, self.m_plda_training_iterations, rng=bob.core.random.mt19937(self.m_init[0]))
# write machines to file
proj_hdf5file = bob.io.base.HDF5File(str(projector_file), "w")
if self.m_subspace_dimension_pca is not None:
proj_hdf5file.create_group('/pca')
proj_hdf5file.cd('/pca')
self.m_pca_machine.save(proj_hdf5file)
proj_hdf5file.create_group('/plda')
proj_hdf5file.cd('/plda')
self.m_plda_base.save(proj_hdf5file)
[docs] def load_enroller(self, projector_file):
"""Reads the PCA projection matrix and the PLDA model from file"""
# read UBM
proj_hdf5file = bob.io.base.HDF5File(projector_file)
if self.m_subspace_dimension_pca is not None:
proj_hdf5file.cd('/pca')
self.m_pca_machine = bob.learn.linear.Machine(proj_hdf5file)
proj_hdf5file.cd('/plda')
self.m_plda_base = bob.learn.em.PLDABase(proj_hdf5file)
#self.m_plda_base = bob.machine.PLDABase(bob.io.HDF5File(projector_file))
self.m_plda_machine = bob.learn.em.PLDAMachine(self.m_plda_base)
self.m_plda_trainer = bob.learn.em.PLDATrainer()
[docs] def enroll(self, enroll_features):
"""Enrolls the model by computing an average of the given input vectors"""
if self.m_subspace_dimension_pca is not None:
enroll_features_projected = self.__perform_pca_client__(self.m_pca_machine, enroll_features)
self.m_plda_trainer.enroll(self.m_plda_machine,enroll_features_projected)
else:
self.m_plda_trainer.enroll(self.m_plda_machine,enroll_features)
return self.m_plda_machine
[docs] def read_model(self, model_file):
"""Reads the model, which in this case is a PLDA-Machine"""
# read machine and attach base machine
plda_machine = bob.learn.em.PLDAMachine(bob.io.base.HDF5File(model_file), self.m_plda_base)
return plda_machine
[docs] def score(self, model, probe):
"""Computes the PLDA score for the given model and probe"""
return self.score_for_multiple_probes(model, [probe])
[docs] def score_for_multiple_probes(self, model, probes):
"""This function computes the score between the given model and several given probe files.
In this base class implementation, it computes the scores for each probe file using the 'score' method,
and fuses the scores using the fusion method specified in the constructor of this class."""
n_probes = len(probes)
if self.m_subspace_dimension_pca is not None:
# project probe
probe_ = numpy.ndarray((n_probes, self.m_pca_machine.shape[1]), numpy.float64)
projected_probe = numpy.ndarray(self.m_pca_machine.shape[1], numpy.float64)
for i in range(n_probes):
self.m_pca_machine(probes[i], projected_probe)
probe_[i,:] = projected_probe
# forward
if self.m_score_set == 'joint_likelihood':
return model.log_likelihood_ratio(probe_)
else:
scores = [model.log_likelihood_ratio(probe_[i,:]) for i in range(n_probes)]
return self.m_score_set(scores)
else:
# just forward
if self.m_score_set == 'joint_likelihood':
probe_ = numpy.ndarray((n_probes, probe[0].shape[0]), numpy.float64)
for i in range(n_probes):
probe_[i,:] = probes[i]
return model.forward(probe_)
# forward
else:
scores = [model.forward(probes[i]) for i in range(n_probes)]
return self.m_score_set(scores)