Source code for msaf.base

"""
Base module containing parent classes for the Features.
In following versions, base classes for algorithms should also be included
here.
"""

import collections
import datetime
from enum import Enum
import librosa
import logging
import jams
import json
import numpy as np
import os
import six

# Local stuff
import msaf
from msaf.exceptions import WrongFeaturesFormatError, NoFeaturesFileError,\
    FeaturesNotFound, FeatureTypeNotFound, FeatureParamsError, NoAudioFileError

# Three types of features at the moment:
#   - framesync: Frame-wise synchronous.
#   - est_beatsync: Beat-synchronous using estimated beats with librosa
#   - ann_beatsync: Beat-synchronous using annotated beats from ground-truth
FeatureTypes = Enum('FeatureTypes', 'framesync est_beatsync ann_beatsync')

# All available features
features_registry = {}


class MetaFeatures(type):
    """Meta-class to register the available features."""
    def __new__(meta, name, bases, class_dict):
        cls = type.__new__(meta, name, bases, class_dict)
        # Register classes that inherit from the base class Features
        if "Features" in [base.__name__ for base in bases]:
            features_registry[cls.get_id()] = cls
        return cls


[docs]class Features(six.with_metaclass(MetaFeatures)): """This is the base class for all the features in MSAF. It contains functions to automatically estimate beats, read annotated beats, compute beat-synchronous features, read and write features. It should be straightforward to add features in MSAF, simply by writing classes that inherit from this one. The `features` getter does the main job, and it returns a matrix `(N, F)`, where `N` is the number of frames an `F` is the number of features per frames. """
[docs] def __init__(self, file_struct, sr, hop_length, feat_type): """Init function for the base class to make sure all features have at least these parameters as attributes. Parameters ---------- file_struct: `msaf.input_output.FileStruct` Object containing the paths to the files. sr: int > 0 Sampling rate of the audio file. hop_length: int > 0 Hop in frames of the features to be computed. feat_type: `FeatureTypes` Enum containing the type of feature. """ # Set the global parameters self.file_struct = file_struct self.sr = sr self.hop_length = hop_length self.feat_type = feat_type # The following attributes will be populated, if needed, # once the `features` getter is called self.dur = None # The duration of the audio file in seconds self._features = None # The actual features self._framesync_features = None # Frame-sync features self._est_beatsync_features = None # Estimated Beat-sync features self._ann_beatsync_features = None # Annotated Beat-sync features self._audio = None # Actual audio signal self._audio_harmonic = None # Harmonic audio signal self._audio_percussive = None # Percussive audio signal self._framesync_times = None # The times of the framesync features self._est_beatsync_times = None # Estimated beat-sync times self._est_beats_times = None # Estimated beat times self._est_beats_frames = None # Estimated beats in frames self._ann_beatsync_times = None # Annotated beat-sync times self._ann_beats_times = None # Annotated beat times self._ann_beats_frames = None # Annotated beats in frames # Differentiate global params from sublcass attributes. # This is a bit hacky... I accept Pull Requests ^_^ self._global_param_names = ["file_struct", "sr", "feat_type", "hop_length", "dur"]
def compute_HPSS(self): """Computes harmonic-percussive source separation. Returns ------- audio_harmonic: np.array The harmonic component of the audio signal audio_percussive: np.array The percussive component of the audio signal """ return librosa.effects.hpss(self._audio) def estimate_beats(self): """Estimates the beats using librosa. Returns ------- times: np.array Times of estimated beats in seconds. frames: np.array Frame indeces of estimated beats. """ # Compute harmonic-percussive source separiation if needed if self._audio_percussive is None: self._audio_harmonic, self._audio_percussive = self.compute_HPSS() # Compute beats tempo, frames = librosa.beat.beat_track( y=self._audio_percussive, sr=self.sr, hop_length=self.hop_length) # To times times = librosa.frames_to_time(frames, sr=self.sr, hop_length=self.hop_length) # TODO: Is this really necessary? if len(times) > 0 and times[0] == 0: times = times[1:] frames = frames[1:] return times, frames def read_ann_beats(self): """Reads the annotated beats if available. Returns ------- times: np.array Times of annotated beats in seconds. frames: np.array Frame indeces of annotated beats. """ times, frames = (None, None) # Read annotations if they exist in correct folder if os.path.isfile(self.file_struct.ref_file): try: jam = jams.load(self.file_struct.ref_file) except TypeError: logging.warning( "Can't read JAMS file %s. Maybe it's not " "compatible with current JAMS version?" % self.file_struct.ref_file) return times, frames beat_annot = jam.search(namespace="beat.*") # If beat annotations exist, get times and frames if len(beat_annot) > 0: beats_inters, _ = beat_annot[0].data.to_interval_values() times = beats_inters[:, 0] frames = librosa.time_to_frames(times, sr=self.sr, hop_length=self.hop_length) return times, frames def compute_beat_sync_features(self, beat_frames, beat_times, pad): """Make the features beat-synchronous. Parameters ---------- beat_frames: np.array The frame indeces of the beat positions. beat_times: np.array The time points of the beat positions (in seconds). pad: boolean If `True`, `beat_frames` is padded to span the full range. Returns ------- beatsync_feats: np.array The beat-synchronized features. `None` if the beat_frames was `None`. beatsync_times: np.array The beat-synchronized times. `None` if the beat_frames was `None`. """ if beat_frames is None: return None, None # Make beat synchronous beatsync_feats = librosa.util.utils.sync(self._framesync_features.T, beat_frames, pad=pad).T # Assign times (and add last time if padded) beatsync_times = np.copy(beat_times) if beatsync_times.shape[0] != beatsync_feats.shape[0]: beatsync_times = np.concatenate((beatsync_times, [self._framesync_times[-1]])) return beatsync_feats, beatsync_times def read_features(self, tol=1e-3): """Reads the features from a file and stores them in the current object. Parameters ---------- tol: float Tolerance level to detect duration of audio. """ try: # Read JSON file with open(self.file_struct.features_file) as f: feats = json.load(f) # Store duration if self.dur is None: self.dur = float(feats["globals"]["dur"]) # Check that we have the correct global parameters assert(np.isclose( self.dur, float(feats["globals"]["dur"]), rtol=tol)) assert(self.sr == int(feats["globals"]["sample_rate"])) assert(self.hop_length == int(feats["globals"]["hop_length"])) assert(os.path.basename(self.file_struct.audio_file) == os.path.basename(feats["globals"]["audio_file"])) # Check for specific features params feat_params_err = FeatureParamsError( "Couldn't find features for %s id in file %s" % (self.get_id(), self.file_struct.features_file)) if self.get_id() not in feats.keys(): raise feat_params_err for param_name in self.get_param_names(): value = getattr(self, param_name) if hasattr(value, '__call__'): # Special case of functions if value.__name__ != \ feats[self.get_id()]["params"][param_name]: raise feat_params_err else: if str(value) != \ feats[self.get_id()]["params"][param_name]: raise feat_params_err # Store actual features self._est_beats_times = np.array(feats["est_beats"]) self._est_beatsync_times = np.array(feats["est_beatsync_times"]) self._est_beats_frames = librosa.core.time_to_frames( self._est_beats_times, sr=self.sr, hop_length=self.hop_length) self._framesync_features = \ np.array(feats[self.get_id()]["framesync"]) self._est_beatsync_features = \ np.array(feats[self.get_id()]["est_beatsync"]) # Read annotated beats if available if "ann_beats" in feats.keys(): self._ann_beats_times = np.array(feats["ann_beats"]) self._ann_beatsync_times = np.array(feats["ann_beatsync_times"]) self._ann_beats_frames = librosa.core.time_to_frames( self._ann_beats_times, sr=self.sr, hop_length=self.hop_length) self._ann_beatsync_features = \ np.array(feats[self.get_id()]["ann_beatsync"]) except KeyError: raise WrongFeaturesFormatError( "The features file %s is not correctly formatted" % self.file_struct.features_file) except AssertionError: raise FeaturesNotFound( "The features for the given parameters were not found in " "features file %s" % self.file_struct.features_file) except IOError: raise NoFeaturesFileError("Could not find features file %s", self.file_struct.features_file) def write_features(self): """Saves features to file.""" out_json = collections.OrderedDict() try: # Only save the necessary information self.read_features() except (WrongFeaturesFormatError, FeaturesNotFound, NoFeaturesFileError): # We need to create the file or overwite it # Metadata out_json = collections.OrderedDict({"metadata": { "versions": {"librosa": librosa.__version__, "msaf": msaf.__version__, "numpy": np.__version__}, "timestamp": datetime.datetime.today().strftime( "%Y/%m/%d %H:%M:%S")}}) # Global parameters out_json["globals"] = { "dur": self.dur, "sample_rate": self.sr, "hop_length": self.hop_length, "audio_file": self.file_struct.audio_file } # Beats out_json["est_beats"] = self._est_beats_times.tolist() out_json["est_beatsync_times"] = self._est_beatsync_times.tolist() if self._ann_beats_times is not None: out_json["ann_beats"] = self._ann_beats_times.tolist() out_json["ann_beatsync_times"] = self._ann_beatsync_times.tolist() except FeatureParamsError: # We have other features in the file, simply add these ones with open(self.file_struct.features_file) as f: out_json = json.load(f) finally: # Specific parameters of the current features out_json[self.get_id()] = {} out_json[self.get_id()]["params"] = {} for param_name in self.get_param_names(): value = getattr(self, param_name) # Check for special case of functions if hasattr(value, '__call__'): value = value.__name__ else: value = str(value) out_json[self.get_id()]["params"][param_name] = value # Actual features out_json[self.get_id()]["framesync"] = \ self._framesync_features.tolist() out_json[self.get_id()]["est_beatsync"] = \ self._est_beatsync_features.tolist() if self._ann_beatsync_features is not None: out_json[self.get_id()]["ann_beatsync"] = \ self._ann_beatsync_features.tolist() # Save it with open(self.file_struct.features_file, "w") as f: json.dump(out_json, f, indent=2) def get_param_names(self): """Returns the parameter names for these features, avoiding the global parameters.""" return [name for name in vars(self) if not name.startswith('_') and name not in self._global_param_names] def _compute_framesync_times(self): """Computes the framesync times based on the framesync features.""" self._framesync_times = librosa.core.frames_to_time( np.arange(self._framesync_features.shape[0]), self.sr, self.hop_length) def _compute_all_features(self): """Computes all the features (beatsync, framesync) from the audio.""" # Read actual audio waveform self._audio, _ = librosa.load(self.file_struct.audio_file, sr=self.sr) # Get duration of audio file self.dur = len(self._audio) / float(self.sr) # Compute actual features self._framesync_features = self.compute_features() # Compute framesync times self._compute_framesync_times() # Compute/Read beats self._est_beats_times, self._est_beats_frames = self.estimate_beats() self._ann_beats_times, self._ann_beats_frames = self.read_ann_beats() # Beat-Synchronize pad = True # Always append to the end of the features self._est_beatsync_features, self._est_beatsync_times = \ self.compute_beat_sync_features(self._est_beats_frames, self._est_beats_times, pad) self._ann_beatsync_features, self._ann_beatsync_times = \ self.compute_beat_sync_features(self._ann_beats_frames, self._ann_beats_times, pad) @property def frame_times(self): """This getter returns the frame times, for the corresponding type of features.""" frame_times = None # Make sure we have already computed the features self.features if self.feat_type is FeatureTypes.framesync: self._compute_framesync_times() frame_times = self._framesync_times elif self.feat_type is FeatureTypes.est_beatsync: frame_times = self._est_beatsync_times elif self.feat_type is FeatureTypes.ann_beatsync: frame_times = self._ann_beatsync_times return frame_times @property def features(self): """This getter will compute the actual features if they haven't been computed yet. Returns ------- features: np.array The actual features. Each row corresponds to a feature vector. """ # Compute features if needed if self._features is None: try: self.read_features() except (NoFeaturesFileError, FeaturesNotFound, WrongFeaturesFormatError, FeatureParamsError) as e: try: self._compute_all_features() self.write_features() except IOError: if isinstance(e, FeaturesNotFound) or \ isinstance(e, FeatureParamsError): msg = "Computation of the features is needed for " \ "current parameters but no audio file was found." \ "Please, change your parameters or add the audio" \ " file in %s" else: msg = "Couldn't find audio file in %s" raise NoAudioFileError(msg % self.file_struct.audio_file) # Choose features based on type if self.feat_type is FeatureTypes.framesync: self._features = self._framesync_features elif self.feat_type is FeatureTypes.est_beatsync: self._features = self._est_beatsync_features elif self.feat_type is FeatureTypes.ann_beatsync: if self._ann_beatsync_features is None: raise FeatureTypeNotFound( "Feature type %s is not valid because no annotated beats " "were found" % self.feat_type) self._features = self._ann_beatsync_features else: raise FeatureTypeNotFound("Feature type %s is not valid." % self.feat_type) return self._features @classmethod def select_features(cls, features_id, file_struct, annot_beats, framesync): """Selects the features from the given parameters. Parameters ---------- features_id: str The identifier of the features (it must be a key inside the `features_registry`) file_struct: msaf.io.FileStruct The file struct containing the files to extract the features from annot_beats: boolean Whether to use annotated (`True`) or estimated (`False`) beats framesync: boolean Whether to use framesync (`True`) or beatsync (`False`) features Returns ------- features: obj The actual features object that inherits from `msaf.Features` """ if not annot_beats and framesync: feat_type = FeatureTypes.framesync elif annot_beats and not framesync: feat_type = FeatureTypes.ann_beatsync elif not annot_beats and not framesync: feat_type = FeatureTypes.est_beatsync else: raise FeatureTypeNotFound("Type of features not valid.") # Select features with default parameters if features_id not in features_registry.keys(): raise FeaturesNotFound( "The features '%s' are invalid (valid features are %s)" % (features_id, features_registry.keys())) return features_registry[features_id](file_struct, feat_type) def compute_features(self): raise NotImplementedError("This method must contain the actual " "implementation of the features") @classmethod def get_id(self): raise NotImplementedError("This method must return a string identifier" " of the features")