MS-DAS 0.9.0 documentation

Source code for msdas.clustering

# -*- python -*-
#
#  This file is part of MS-DAS software
#
#  Copyright (c) 2014 - EBI-EMBL
#
#  File author(s): Thomas Cokelaer <cokelaer@ebi.ac.uk>
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#
#
##############################################################################
"""Clustering utilities"""
import pylab
import numpy as np
import scipy.cluster
import scipy

from msdas import readers

__all__ = ["MSClustering", "Affinity"]



[docs]class MSClustering(readers.MassSpecReader): """Clustering utilities This class is used to perform clustering analysis on measurements to be found in a :class:`msdas.readers.MassSpecReader` instance. The data extraction consists in selecting a subset of the initial dataframe provided by the Reader classes. The selection is made on (1) the protein name that can be given by the user or not, and (2) the time measurements are kept only. This is achieved via the :meth:`get_group`. Currently, only the Affinity propagation algorithm is provided. It can be used via the :meth:`run_affinity_propagation_clustering` method or directly with the :class:`Affinity` class. Here is an example that reads data, creates a MSClustering instance and finally performs a clustering with the affinity propagation algorithm. :: from msdas import * y = MassSpecReader(get_yeast_small_data(), mode="YEAST") c = MSClustering(y) c.run_affinity_propagation_clustering(preference=-120) """ def __init__(self, data, mode="default", fillna=True, verbose=True, cleanup=False): """.. rubric:: constructor :param data: can be either a dataframe obtained from :class:`~msdas.readers.MassSpecReader` or an instance of :class:`~msdas.readers.MassSpecReader` itself. :param str mode: must be **yeast**. """ super(MSClustering,self).__init__(data, verbose=verbose, mode=mode, cleanup=cleanup) self._metadata_names.append("cluster") if fillna: self.logging.info("Filling Nas with zeros if any") self.df.fillna(0, inplace=True) def _get_groups(self): return self.df.groupby("Protein").groups groups = property(_get_groups, doc="Returns all group for each protein. Alias to dataframe.groupby('Protein')")
[docs] def get_group(self, name=None, tag="onlyCD3"): """Returns a subset of the original dataframe (stored in :attr:`df`) If name is provided, only the protein corresponding to that name is selected, otherwise all proteins are kept. Then, only columns related to time measurements are kept (ie. protein columns and non numeric columns are ignored). In the **yeast** case, the criteria to select time measurements is to keep columns that starts with the letter **a**. In the tcell case, "Nor_Unstimulated" column is selected as as as columns starting with "Nor" and containin the tag provided(e.g., onlyCD3) indices on time are renamed as 00, 05, 10, 30. """ if name in self.groups.keys(): indices = self.groups[name] subdf = self.df.ix[indices] subdf = subdf[[x for x in self.df.columns if x in self.measurements.columns]] subdf = subdf.transpose() subdf.drop_duplicates(inplace=True) subdf.columns = self.df.Identifier[indices] return subdf elif name==None: subdf = self.df[[x for x in self.df.columns if x in self.measurements.columns]] subdf = subdf.transpose() subdf.drop_duplicates(inplace=True) subdf.columns = self.df.Identifier return subdf else: raise KeyError("name not found")
[docs] def scale(self, df): """scale a df and returns the scaled version. This can be used on the numeric dataframe returned by :meth:`get_group`. :: from msdas import * df = MassSpecReader(get_yeast_small_data()) c = MSClustering(df) subdf = c.get_group() scaled_subdf = c.scale(subdf) .. math:: \hat{X} = {X-\mu}{\sigma (X)} where the standard deviation is the biased one computed with numpy as X.std(ddof=1). """ # the scale function uses biased variance #from sklearn.preprocessing import scale #newdf = pd.DataFrame(scale(df, axis=axis, with_mean=with_mean, # with_std=with_std), index=df.index, columns=df.columns) print("rescaling over %s columns" % len(df.columns)) newdf = df.copy() for i in newdf.columns: X = newdf.ix[:,i] newdf.ix[:,i] = (X-X.mean())/X.std(ddof=1) return newdf
def _normalise(self, subdf): """in place normalisation""" for this in subdf.columns: subdf.ix[:,this] /= pylab.sqrt(subdf.ix[:,this].dot(subdf.ix[:,this])) return subdf def _get_euclidean_distance_matrix(self, name=None): """ .. plot:: :include-source: :width: 50% clf() pcolor(c.get_dot_matrix("DIG1")) colorbar() .. seealso:: used by :meth:`dendogram` .. note:: The data is normalised before calling scipy.spatial.distance.eucllidean. may not be required actually. """ import scipy.spatial.distance subdf = self.get_group(name) self._normalise(subdf) N = len(subdf.columns) m = np.zeros((N, N)) for i, ix in enumerate(subdf.columns): for j, jx in enumerate(subdf.columns): m[i,j] = scipy.spatial.distance.euclidean(subdf[ix], subdf[jx]) return m def _get_dot_matrix(self, name=None): """ :: clf() pcolor(c.get_dot_matrix("DIG1")) colorbar() .. seealso:: used by :meth:`dendogram` .. note:: The data is normalised before calling scipy.spatial.distance.eucllidean. may not be required actually. """ subdf = self.get_group(name) self._normalise(subdf) N = len(subdf.columns) m = np.zeros((N,N)) for i,ix in enumerate(subdf.columns): for j,jx in enumerate(subdf.columns): m[i,j] = subdf.ix[:,ix].dot(subdf.ix[:,jx]) return m
[docs] def plot_vectors(self, name, normalise=False, scale=True, legend=True, subplots=False, fontsize=8, **kargs): """Plot time series or data of the time measurements for a group of protein (i.e. individual peptides). :param name: must be a valid protein name (see :attr:`df.Protein`) or None to select all proteins. :param normalise, scale: mutually exclusive pre processing option to normalise or scale the data before plotting. :param bool legend: add the legend or not :param fontsize: used for the legend :param kargs: any option accepted y the plot method used by the pandas dataframe. location and fontsize are used for the legend .. plot:: :include-source: :width: 50% >>> from msdas import * >>> y = MassSpecReader(get_yeast_small_data()) >>> c = MSClustering(y) >>> # c.run_affinity_propagation_clustering(-90) >>> c.plot_vectors("DIG1", normalise=False, scale=True, legend=False) """ subdf = self.get_group(name) assert (normalise == True and scale == False) or (normalise == False and scale == True) or (normalise == False and scale == False) if normalise: self._normalise(subdf) if scale: subdf = self.scale(subdf) if self.mode == "YEAST": kargs['xticks'] = kargs.get("xticks", range(0, self.N)) kargs['rot'] = kargs.get("rot", 90) for this in subdf.columns: subdf.ix[:, this].plot(legend=False, subplots=subplots, **kargs) if legend: ax = pylab.gca() patches, labels = ax.get_legend_handles_labels() ax.legend(patches,labels, fontsize=kargs.get("fontsize",fontsize), loc=kargs.get('location',"best"))
[docs] def dendogram(self, name=None, method="euclidean", metric="euclidean",p=2, pdist=False, rotation=90, **kargs): """Dendogram. In progress, please do not use or with care. :param name: name of the protein to perform dendogram on. IF none, all proteins are selected :param method: euclidean or dot :param pdist: pdist computes the Euclidean distance between pairs of objects in m-by-n data matrix X. Rows of X correspond to observations, and columns correspond to variables. :param rotation: rotation of xlabels (protein name + psite) :param kargs: any valid parameter of the scipy dendogram function .. plot:: :include-source: :width: 80% import pylab from msdas import * y = MassSpecReader(get_yeast_small_data()) c = MSClustering(y, mode="YEAST") c.dendogram("DIG2", pdist="euclidean") # equivalent to c.dendogram("DIG2", method="euclidean") pylab.figure(2) c.plot_vectors("DIG2") .. note:: pdist has many metrics and we recommand to use the pdist argument in place of method parameter, which maybe deprecated in the future. """ pylab.clf() if method == "euclidean": X = self._get_euclidean_distance_matrix(name) names = list(self.get_group(name).columns) linkage = scipy.cluster.hierarchy.linkage(X) elif method == "dot": X = self._get_dot_matrix(name) names = list(self.get_group(name).columns) linkage = scipy.cluster.hierarchy.linkage(X) elif pdist == True: X = scipy.spatial.distance.pdist(self.get_group(name).transpose(), metric=metric,p=p).transpose() subdf = list(self.get_group(name).columns) names = list(subdf.columns) linkage = scipy.cluster.hierarchy.linkage(pdist) else: raise ValueError("method must be euclidean or dot or pdist must be provided") # note that # linkage(pdist(c.get_group("DIG2").transpose()).transpose()) # is equivalent to # linkage(X) scipy.cluster.hierarchy.dendrogram(linkage, labels=names, leaf_rotation=rotation, **kargs) pylab.title(name) pylab.tight_layout() self._linkage = linkage
[docs] def run_affinity_propagation_clustering(self, protein=None, preference=-30): """Run the affinity propagation algorithm :param str name: a protein name. If None, consider all proteins. (Defaults to None) :param float preference: the affinity propagation preference parameter. should be negative. Large negative values gives less clusters. :return: an affinity instance that can be manipulated to extract labels, cluster, or plot the clusters. See :class:`Affinity` for more details The original data stored in :attr:`df` will contain anew column called `cluster` will the label of each cluster. .. warning:: NA are filled with zero for now. """ data = self.get_group(protein) data.fillna(0, inplace=True) affinity = Affinity(data, method="euclidean", preference=preference, transpose=True) # clustering import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") # this raises annoying warnings res = self.df.query("Identifier in @data.columns", engine="python") indices = res.index if "cluster" not in self.df.columns: self.df['cluster'] = self.df.shape[0] * [None] self.affinity_results = affinity self.df['cluster'].ix[indices] = affinity.labels return affinity
[docs] def plotClusters_and_protein(self, name, **kargs): """Alias to :meth:`Affinity.plotClusters_and_protein`""" self.affinity_results.plotClusters_psites(name, **kargs)
[docs] def plotClusters(self, **kargs): """Alias to :meth:`Affinity.plotClusters`""" self.affinity_results.plotClusters(**kargs)
[docs] def plotClusteredProtein(self, name, scale=True, fontsize=8, **kargs): """For a given protein, plot time series by group/label If a group does not have the protein in it, then nothing is plotted. .. note:: the affinity propagation algorithm must have been run before. """ groups = self.df.groupby(["Protein", "cluster"]).groups #groups = [g for g in groups if g[0]==name] #columns = [c for c in self.df.columns if c.startswith("a")] for group in groups.iteritems(): if group[0][0] == name: index = self.metadata.ix[group[1]].Identifier data = self.measurements.ix[group[1]].transpose() data.columns = list(index) if scale: data = self.scale(data) data.plot() pylab.title("Group %s" % group[0][1]) #self.df.ix[group[1], columns].transpose().plot() pylab.legend(fontsize=fontsize)
[docs]class Affinity(object): """Apply affinity propagation clustering on MS data set. Get some data first. It must be a dataframe that contains the measurements as columns and the Psites as indices. It can be the output of :meth:`MSClustering.get_group` method. .. plot:: :include-source: :width: 30% >>> from msdas import * >>> y = MassSpecReader(get_yeast_small_data(), mode="YEAST") >>> c = clustering.MSClustering(y) >>> af = c.run_affinity_propagation_clustering(preference=-90) >>> # OR >>> data = c.get_group() >>> af = Affinity(data, preference=-90, method="euclidean") >>> af.plotClusters() >>> af.nclusters 7 >>> labels = af.labels >>> center_indices = af.cluster_centers_indices The data will be scaled using the :meth:`~msdas.clustering.MSClustering.scale` method. Similarly for the cell except that the method should be correlation instead of euclidean. In which case, the correlation is performed internally. """ def __init__(self, data, method, affinity_matrix=None, preference=-30, damping=0.9, transpose=False, verbose=True): """.. rubric:: contructor :param DataFrame data: :param method: either euclidean or correlation. :param affinity_matrix: :param preference: the affinity propagation parameter that affects the number of clusters. (default is -30) :param float damping: values between 0.5 and 1 (stricly). (default is 0.9 to be similar to what is used in R affinity propagation routine) :param bool transpose: transpose the affinity matrix before calling the affinity propagation algorith. Should be used if euclidean distance method is selected. .. note:: the dataframe is scaled internally using :math:`\hat{X} = (X-\mu)/\sigma` """ self.verbose = verbose self.df = data.copy() # TODO what about another scaling methods ? self.df = self._scale(self.df) #self.df = self._normalise(self.df) self.method = method if affinity_matrix!=None: self.method = "other" self.affinity_matrix = affinity_matrix elif self.method == "correlation": self.affinity_matrix = self.df.corr() elif self.method=="euclidean": self.affinity_matrix = self.df.copy() else: raise ValueError("method incorrect or affinity_matrix not provided") self.affinity_results = None self._preference = preference self._damping = damping ## TODO this has to be done in yeast case if transpose==True: self.affinity_matrix = self.affinity_matrix.transpose() self.AffinityPropagation() def _scale(self, df): if self.verbose: print("rescaling over %s columns" % len(df.columns)) newdf = df.copy() for i in newdf.columns: X = newdf.ix[:,i] newdf.ix[:,i] = (X-X.mean())/X.std(ddof=1) return newdf def _normalise(self, df): """in place normalisation""" newdf = df.copy() for this in df.columns: newdf.ix[:,this] /= np.sqrt(newdf.ix[:,this].dot(newdf.ix[:,this])) return newdf
[docs] def AffinityPropagation(self, preference=None, damping=None, max_iter=200): """The core of the Affinity Propagation algorithm. This method uses scikit-learn :param preference: the affinity propagation parameter that affects the number of clusters. (default is -30) :param float damping: values between 0.5 and 1 (stricly). (default is 0.9 to be similar to what is used in R affinity propagation routine) :param int max_iter: 200 by default Populates :attr:`affinity_results` with the results of the clustering. """ if preference == None: preference = self._preference if damping == None: damping = self._damping if self.verbose: print("Running affinity propagation using preference=%s" % preference) from sklearn.cluster import AffinityPropagation if self.method != "euclidean": self.affinity_results = AffinityPropagation(preference=preference, max_iter=max_iter, damping=damping, affinity="precomputed").fit(self.affinity_matrix) else: self.affinity_results = AffinityPropagation(preference=preference, max_iter=max_iter, damping=damping).fit(self.affinity_matrix) if self.verbose: print("Found %s clusters" % self.nclusters) return self.affinity_results
[docs] def plotClusters(self, legend=True, fontsize=8, **kargs): """Plots N figures related to the N clusters found with all time series See class documentation for an example .. seealso:: :class:`~msdas.clustering.MSClustering` All timeseries for a given cluster are plotted in blue. The exampler in red. """ if self.affinity_results == None: raise ValueError("Call AffinityPropagation first.") else: for icluster, cluster in enumerate(self.affinity_results.cluster_centers_indices_): # plot all time series from this cluster self.df.ix[:,self.affinity_results.labels_==icluster].plot(legend=legend, color="b") # and the exampler: self.df.ix[:,cluster].plot(legend=legend, color="r", lw=4) if legend: pylab.legend(fontsize=fontsize)
[docs] def plotClusters_and_protein(self, name, fontsize=8, legend=True): """Same as `plotClusters` but superimposed a specific protein if present All timeseries for a given cluster are plotted in blue. The exampler is in red and the time series matching the name provided are in green. .. plot:: :include-source: :width: 30% >>> from msdas import * >>> y = MassSpecReader(get_yeast_small_data(), mode="yeast") >>> c = clustering.MSClustering(y) >>> af = c.run_affinity_propagation_clustering(preference=-90) >>> af.plotClusters_and_protein("DIG1", legend=True) """ if self.affinity_results == None: raise ValueError("Call AffinityPropagation first.") for label in set(self.labels): psites = self.df.columns[np.logical_and(self.labels==label, [x.startswith(name) for x in self.df.columns])] cluster_index = self.cluster_centers_indices[label] # all timeseries self.df.ix[:,self.labels==label].plot(legend=legend, color="b") # the exampler self.df.ix[:,cluster_index].plot(legend=legend, color="r", lw=4) # the psites for psite in psites: self.df.ix[:,psite].plot(legend=legend, color="g", lw=2) if legend: pylab.legend(fontsize=fontsize)
def _get_nclusters(self): return len(self.affinity_results.cluster_centers_indices_) nclusters = property(_get_nclusters) def _get_preference(self): return self.affinity_results.preference preference = property(_get_preference) def _get_cluster_center_indices(self): return self.affinity_results.cluster_centers_indices_ cluster_centers_indices = property(_get_cluster_center_indices, doc="Returns indices of each cluster center") def _get_labels(self): return self.affinity_results.labels_ labels = property(_get_labels, doc="Returns labels of the clustering") def __str__(self): str_ = "Number of clusters found %s" % self.nclusters str_+="\n" for i in range(0,self.nclusters): str_+="Cluster %s is made of %s items" % (i+1, sum(self.affinity_results.labels_==i)) str_+="\n" return str_