Source code for bob.pad.voice.algorithm.logregr_algorithm

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# @author: Pavel Korshunov <pavel.korshunov@idiap.ch>
# @date: Wed 19 Oct 23:43:22 2016

import bob.io.base

import numpy

import bob.learn.linear

from bob.pad.base.algorithm import Algorithm

import logging

logger = logging.getLogger("bob.pad.voice")


class LogRegrAlgorithm(Algorithm):
    """Trains Logistical Regression classifier and projects testing dat on it."""

    def __init__(self, use_PCA_training=False, normalize_features=False, **kwargs):

        # call base class constructor registering that this tool performs everything.
        Algorithm.__init__(
            self,
            performs_projection=True,
            requires_projector_training=True,
            use_projected_features_for_enrollment=True,
        )
        self.machine = None
        self.pca_machine = None
        self.use_PCA_training = use_PCA_training
        self.normalize_features = normalize_features

    def _check_feature(self, feature, machine=None, projected=False):
        """Checks that the features are appropriate."""
        if not isinstance(feature, numpy.ndarray) or feature.ndim != 1 or feature.dtype != numpy.float64:
            raise ValueError("The given feature is not appropriate", feature)
        index = 1 if projected else 0
        if machine is not None and feature.shape[0] != machine.shape[index]:
            logger.warn("The given feature is expected to have %d elements, but it has %d" % (
            machine.shape[index], feature.shape[0]))
            return False
        return True

[docs] def train_projector(self, training_features, projector_file): if len(training_features) < 2: raise ValueError("Training projector: features should contain two lists: real and attack!") # the format is specified in FileSelector.py:training_list() of bob.spoof.base logger.info(" - Training: number of real features %d", len(training_features[0])) # print (training_features[0]) if isinstance(training_features[0][0][0], numpy.ndarray): logger.info(" - Training: each feature is a set of arrays") real_features = numpy.array( [row if self._check_feature(row) else numpy.nan for feat in training_features[0] for row in feat], dtype=numpy.float64) attack_features = numpy.array( [row if self._check_feature(row) else numpy.nan for feat in training_features[1] for row in feat], dtype=numpy.float64) else: logger.info(" - Training: each feature is a single array") real_features = numpy.array( [feat if self._check_feature(feat) else numpy.nan for feat in training_features[0]], dtype=numpy.float64) attack_features = numpy.array( [feat if self._check_feature(feat) else numpy.nan for feat in training_features[1]], dtype=numpy.float64) # print ("LogRegrAlgorithm:train_projector(), real_features shape:", real_features.shape) # print ("LogRegrAlgorithm:train_projector(), attack_features shape:", attack_features.shape) # print ("Min real ", numpy.min(real_features)) # print ("Max real ", numpy.max(real_features)) # print ("Min attack ", numpy.min(attack_features)) # print ("Max attack ", numpy.max(attack_features)) # save the trained model to file for future use hdf5file = bob.io.base.HDF5File(projector_file, "w") from bob.pad.voice.utils import extraction mean = None std = None # reduce the feature space using PCA if self.use_PCA_training or self.normalize_features: mean, std = extraction.calc_mean_std(real_features, attack_features, nonStdZero=True) real_features = extraction.zeromean_unitvar_norm(real_features, mean, std) attack_features = extraction.zeromean_unitvar_norm(attack_features, mean, std) if self.use_PCA_training: pca_trainer = bob.learn.linear.PCATrainer() self.pca_machine, eigenvalues = pca_trainer.train(numpy.vstack((real_features, attack_features))) # select only meaningful weights cummulated = numpy.cumsum(eigenvalues) / numpy.sum(eigenvalues) for index in range(len(cummulated)): if cummulated[index] > 0.99: # variance subspace_dimension = index break subspace_dimension = index # save the PCA matrix self.pca_machine.resize(self.pca_machine.shape[0], subspace_dimension) if mean is not None and std is not None: self.pca_machine.input_subtract = mean self.pca_machine.input_divide = std hdf5file.create_group('PCAProjector') hdf5file.cd('PCAProjector') self.pca_machine.save(hdf5file) # project all current features on PCA real_features = [self.pca_machine(feature) for feature in real_features] real_features = numpy.asarray(real_features, dtype=numpy.float64) attack_features = [self.pca_machine(feature) for feature in attack_features] attack_features = numpy.asarray(attack_features, dtype=numpy.float64) # create Logistic Regression Machine trainer = bob.learn.linear.CGLogRegTrainer() # train the mchine using the provided training data # negative features go first, positive - second self.machine = trainer.train(attack_features, real_features) # if we use PCA, PCA machine is normalizing features already if self.normalize_features and not self.use_PCA_training: if mean is not None and std is not None: self.machine.input_subtract = mean self.machine.input_divide = std # print ("LogRegrAlgorithm:train_projector(), machine shape: ", self.machine.shape) # print ("LogRegrAlgorithm:train_projector(), machine weights: ", self.machine.weights) hdf5file.cd('/') hdf5file.create_group('LogRegProjector') hdf5file.cd('LogRegProjector') self.machine.save(hdf5file)
[docs] def load_projector(self, projector_file): hdf5file = bob.io.base.HDF5File(projector_file) if self.use_PCA_training: hdf5file.cd('/PCAProjector') self.pca_machine = bob.learn.linear.Machine(hdf5file) # read LogRegr Machine model hdf5file.cd('/LogRegProjector') self.machine = bob.learn.linear.Machine(hdf5file)
[docs] def project_feature(self, feature): feature = numpy.asarray(feature, dtype=numpy.float64) # reduce dimension using PCA if self.use_PCA_training and self._check_feature(feature, machine=self.pca_machine): feature = self.pca_machine(feature) if self._check_feature(feature, machine=self.machine): # Projects the data on LogRegression classifier projection = self.machine(feature) return projection return numpy.zeros(1, dtype=numpy.float64)
[docs] def project(self, feature): """project(feature) -> projected Projects the given feature into Fisher space. **Parameters:** feature : 1D :py:class:`numpy.ndarray` The 1D feature to be projected. **Returns:** projected : 1D :py:class:`numpy.ndarray` The ``feature`` projected into Fisher space. """ if len(feature) > 0: if isinstance(feature[0], numpy.ndarray) or isinstance(feature[0], list): return [self.project_feature(feat) for feat in feature] else: return self.project_feature(feature) else: return numpy.zeros(1, dtype=numpy.float64)
[docs] def enroll(self, enroll_features): """We do no enrollment here""" assert len(enroll_features) # we need no enrollment return enroll_features
[docs] def score(self, toscore): """Returns the output of a classifier""" return toscore
[docs] def score_for_multiple_projections(self, toscore): return toscore
algorithm = LogRegrAlgorithm()