Source code for msaf.algorithms.cnmf.segmenter

# coding: utf-8
import numpy as np
from scipy.ndimage import filters

import msaf.utils as U
from msaf.algorithms.interface import SegmenterInterface
from msaf import pymf


def median_filter(X, M=8):
    """Median filter along the first axis of the feature matrix X."""
    for i in range(X.shape[1]):
        X[:, i] = filters.median_filter(X[:, i], size=M)
    return X


def cnmf(S, rank, niter=500, hull=False):
    """(Convex) Non-Negative Matrix Factorization.

    Parameters
    ----------
    S: np.array(p, N)
        Features matrix. p row features and N column observations.
    rank: int
        Rank of decomposition
    niter: int
        Number of iterations to be used

    Returns
    -------
    F: np.array
        Cluster matrix (decomposed matrix)
    G: np.array
        Activation matrix (decomposed matrix)
        (s.t. S ~= F * G)
    """
    if hull:
        nmf_mdl = pymf.CHNMF(S, num_bases=rank)
    else:
        nmf_mdl = pymf.CNMF(S, num_bases=rank)
    nmf_mdl.factorize(niter=niter)
    F = np.asarray(nmf_mdl.W)
    G = np.asarray(nmf_mdl.H)
    return F, G


def most_frequent(x):
    """Returns the most frequent value in x."""
    return np.argmax(np.bincount(x))


def compute_labels(X, rank, R, bound_idxs, niter=300):
    """Computes the labels using the bounds."""

    try:
        F, G = cnmf(X, rank, niter=niter, hull=False)
    except:
        return [1]

    label_frames = filter_activation_matrix(G.T, R)
    label_frames = np.asarray(label_frames, dtype=int)

    #labels = [label_frames[0]]
    labels = []
    bound_inters = zip(bound_idxs[:-1], bound_idxs[1:])
    for bound_inter in bound_inters:
        if bound_inter[1] - bound_inter[0] <= 0:
            labels.append(np.max(label_frames) + 1)
        else:
            labels.append(most_frequent(
                label_frames[bound_inter[0]: bound_inter[1]]))
        #print bound_inter, labels[-1]
    #labels.append(label_frames[-1])

    return labels


def filter_activation_matrix(G, R):
    """Filters the activation matrix G, and returns a flattened copy."""

    #import pylab as plt
    #plt.imshow(G, interpolation="nearest", aspect="auto")
    #plt.show()

    idx = np.argmax(G, axis=1)
    max_idx = np.arange(G.shape[0])
    max_idx = (max_idx, idx.flatten())
    G[:, :] = 0
    G[max_idx] = idx + 1

    # TODO: Order matters?
    G = np.sum(G, axis=1)
    G = median_filter(G[:, np.newaxis], R)

    return G.flatten()


def get_segmentation(X, rank, R, rank_labels, R_labels, niter=300,
                     bound_idxs=None, in_labels=None):
    """
    Gets the segmentation (boundaries and labels) from the factorization
    matrices.

    Parameters
    ----------
    X: np.array()
        Features matrix (e.g. chromagram)
    rank: int
        Rank of decomposition
    R: int
        Size of the median filter for activation matrix
    niter: int
        Number of iterations for k-means
    bound_idxs : list
        Use previously found boundaries (None to detect them)
    in_labels : np.array()
        List of input labels (None to compute them)

    Returns
    -------
    bounds_idx: np.array
        Bound indeces found
    labels: np.array
        Indeces of the labels representing the similarity between segments.
    """

    #import pylab as plt
    #plt.imshow(X, interpolation="nearest", aspect="auto")
    #plt.show()

    # Find non filtered boundaries
    compute_bounds = True if bound_idxs is None else False
    while True:
        if bound_idxs is None:
            try:
                F, G = cnmf(X, rank, niter=niter, hull=False)
            except:
                return np.empty(0), [1]

            # Filter G
            G = filter_activation_matrix(G.T, R)
            if bound_idxs is None:
                bound_idxs = np.where(np.diff(G) != 0)[0] + 1

        # Increase rank if we found too few boundaries
        if compute_bounds and len(np.unique(bound_idxs)) <= 2:
            rank += 1
            bound_idxs = None
        else:
            break

    # Add first and last boundary
    bound_idxs = np.concatenate(([0], bound_idxs, [X.shape[1] - 1]))
    bound_idxs = np.asarray(bound_idxs, dtype=int)
    if in_labels is None:
        labels = compute_labels(X, rank_labels, R_labels, bound_idxs,
                                niter=niter)
    else:
        labels = np.ones(len(bound_idxs) - 1)

    #plt.imshow(G[:, np.newaxis], interpolation="nearest", aspect="auto")
    #for b in bound_idxs:
        #plt.axvline(b, linewidth=2.0, color="k")
    #plt.show()

    return bound_idxs, labels


[docs]class Segmenter(SegmenterInterface): """ This script identifies the structure of a given track using a modified version of the C-NMF method described here: Nieto, O., Jehan, T., Convex Non-negative Matrix Factorization For Automatic Music Structure Identification. Proc. of the 38th IEEE International Conference on Acoustics, Speech, and Signal Processing (ICASSP). Vancouver, Canada, 2013 (`PDF`_). The modified version can be found in Oriol Nieto's `PhD Thesis`_. .. _PDF: http://marl.smusic.nyu.edu/nieto/publications/Nieto-ICASSP13.pdf .. _PhD Thesis: http://marl.smusic.nyu.edu/nieto/publications/Nieto-Dissertation.pdf """ def processFlat(self): """Main process. Returns ------- est_idxs : np.array(N) Estimated indeces for the segment boundaries in frames. est_labels : np.array(N-1) Estimated labels for the segments. """ # C-NMF params niter = self.config["niters"] # Iterations for the MF and clustering # Preprocess to obtain features, times, and input boundary indeces F = self._preprocess() # Normalize F = U.normalize(F, norm_type=self.config["norm_feats"]) if F.shape[0] >= self.config["h"]: # Median filter F = median_filter(F, M=self.config["h"]) #plt.imshow(F.T, interpolation="nearest", aspect="auto"); plt.show() # Find the boundary indices and labels using matrix factorization est_idxs, est_labels = get_segmentation( F.T, self.config["rank"], self.config["R"], self.config["rank_labels"], self.config["R_labels"], niter=niter, bound_idxs=self.in_bound_idxs, in_labels=None) # Remove empty segments if needed est_idxs, est_labels = U.remove_empty_segments(est_idxs, est_labels) else: # The track is too short. We will only output the first and last # time stamps if self.in_bound_idxs is None: est_idxs = np.array([0, F.shape[0] - 1]) est_labels = [1] else: est_idxs = self.in_bound_idxs est_labels = [1] * (len(est_idxs) + 1) # Make sure that the first and last boundaries are included assert est_idxs[0] == 0 and est_idxs[-1] == F.shape[0] - 1 # Post process estimations est_idxs, est_labels = self._postprocess(est_idxs, est_labels) return est_idxs, est_labels