CellNOpt homepage|cellnopt.core 1.0.0 documentation

Source code for cellnopt.core.normalisation

# -*- python -*-
#  This file is part of cellnopt.core software
#  Copyright (c) 2011-2013 - EBI-EMBL
#  File author(s): Thomas Cokelaer <cokelaer@ebi.ac.uk>
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#  website: www.cellnopt.org

# cannot do from cellnopt.core import * ???
import midas
from oldmidas import MIDASReader
import easydev
#from cnograph import CNOGraph
import numpy as np

__all__ = ["NormaliseMIDAS", "XMIDASNormalise"]

class NormaliseMIDASBase(object):
    def __init__(self, mode="time", verbose=True, saturation=np.inf, 
                 detection=0., EC50noise=0., EC50data=0.5, HillCoeff=2.,
        # read-write attributes
        self.verbose = verbose
        self.mode = mode
        self.saturation = saturation
        self.EC50noise = EC50noise
        self.EC50data = EC50data
        self.HillCoeff = HillCoeff
        self.changeThreshold = changeThreshold
        self.detection = detection

    def _get_mode(self):
        return self._mode
    def _set_mode(self, mode):
        easydev.check_param_in_list(mode, ["time", "control"])
        self._mode = mode
    mode = property(_get_mode, _set_mode, doc="todo")

    def _get_saturation(self):
        return self._saturation
    def _set_saturation(self, saturation):
        if isinstance(saturation, (int, long, float)):
            self._saturation = saturation
            raise TypeError("saturation argument must be a number")
    saturation = property(_get_saturation, _set_saturation, 
        doc="saturation above which measurement are ignored")

    def _get_detection(self):
        return self._detection
    def _set_detection(self, detection):
        if isinstance(detection, (int, long, float)):
            self._detection = detection
            raise TypeError("detection argument must be a number")
    detection = property(_get_detection, _set_detection, 
        doc="detection above which measurement are ignored")

    def _get_EC50noise(self):
        return self._EC50noise
    def _set_EC50noise(self, EC50noise):
        if isinstance(EC50noise, (int, long, float)):
            self._EC50noise = EC50noise
            raise TypeError("EC50noise argument must be a number")
    EC50noise = property(_get_EC50noise, _set_EC50noise, doc="todo")

    def _get_EC50data(self):
        return self._EC50data
    def _set_EC50data(self, EC50data):
        if isinstance(EC50data, (int, long, float)):
            self._EC50data = EC50data
            raise TypeError("EC50data argument must be a number")
    EC50data = property(_get_EC50data, _set_EC50data, doc="todo")

    def _get_changeThreshold(self):
        return self._changeThreshold
    def _set_changeThreshold(self, changeThreshold):
        #if isinstance(changeThreshold, (int, long, float)):
        self._changeThreshold = changeThreshold
        #    raise TypeError("changeThreshold argument must be a number")
    changeThreshold = property(_get_changeThreshold, _set_changeThreshold, doc="todo")

    def _get_HillCoeff(self):
        return self._HillCoeff
    def _set_HillCoeff(self, HillCoeff):
        if isinstance(HillCoeff, (int, long, float)):
            self._HillCoeff = HillCoeff
            raise TypeError("HillCoeff argument must be a number")
    HillCoeff = property(_get_HillCoeff, _set_HillCoeff, doc="todo")

    def normalise(self, **kargs):
        """Performs the normalisation using the :attr:`mode` attribute"""
        if self.verbose:
            print("normalise using the mode '%s'" % self.mode)

        if self.mode == "time":
            res = self.time_normalisation(**kargs)
        elif self.mode == "control":
            raise NotImplementedError()
        return res 

    def time_normalisation(self):
        raise NotImplementedError

    def control_normalisation(self):
        raise NotImplementedError

[docs]class NormaliseMIDAS(NormaliseMIDASBase): r"""Class dedicated to the normalisation of MIDAS data Before normalisation, the measurements that are out of the dynamic range [:attr:`detection`; :attr:`saturation`] are tagged to be ignored. The fold change matrix is computed with a choice of algorithm. The following is the mode="time" case (see :meth:`normalise`): .. math:: \hat{x} = \frac{\left\lvert X(t) - X(t_0) \right\lvert }{ X(t_0)} Then, a penalty coefficient is computed as follows: .. math:: P(t) = \frac{X(t)}{ EC_{50}^{(noise)} + X(t)} and the a new matrix is computed using a Hill transformation: .. math:: H(t) = \frac{X^{k_H}}{ {EC_{50}^{(data)}}^{k_H} + X^{k_H} } rescale negative values. For each specy over all experiment and time, if a negative value is found and then the column is rescaled as follows: .. math:: X(t) = P(t) H(t) .. math:: X_{s}(t) = \frac{X_{s}(t) - m_s }{M_s - m_s} where m_s and M_s are the minimum and maximum value over time and experiment for the given specy :math:`s`. """ def __init__(self, data, mode="time", verbose=True, saturation=np.inf, detection=0., EC50noise=0., EC50data=0.5, HillCoeff=2., changeThreshold=0., rescale_negative=True): super(NormaliseMIDAS, self).__init__(mode=mode, verbose=verbose, saturation=saturation, detection=detection, EC50noise=EC50noise, EC50data=EC50data, HillCoeff=HillCoeff, changeThreshold=changeThreshold) """.. rubric:: constructor :param data: the MIDAS data. Could be a filename, an instance of MIDASReader or an instance of CNOGraph. :param mode: either "time" or "ctrl". :math:`X(t)` is a matrix with experiments as rows and species as columns. If mode is "time", the relative change is X(t) - X(t=t0). In other word, each measurement is compared to the exact same condition at time 0. If mode is "ctrl", the relative change is computed relative to the control at the same time, which is case without stimuli, but with the same inhibitors. :param float saturation: :param float detection: :param float EC50noise: :param float EC50data: :param float HillCoeff: :param float changeThres: :param bool rescale_negative: :param verbose: print informative messages """ print("Deprecated and not maintained. Please use the XMIDAS version") if isinstance(data, str): self.midas = MIDASReader(data) elif isinstance(data, MIDASReader): self.midas = data # a reference ? elif isinstance(data, CNOGraph): self.midas = data.copy() else: raise TypeError("data must be either a filename or an instance of MIDASReader or CNOGraph") self.data = np.array(self.midas.cnolist.valueSignals) # read-only attributes self.rescale_negative = rescale_negative
[docs] def dynamicRange(self): """Returns a mask to ignore values out of the dynamic range This function masks values out of the dynamic range defined by the :attr:`detection` and :attr:`saturation`. :return: a numpy mask on the data matrix """ # create a mask for the data dynamic range if self.verbose: N = len(self.data[self.data>self.saturation]) if N>0: print("found %s values above saturation (%s). " % (N, self.saturation)) # deal with saturation threshold data = np.where(self.data > self.saturation, np.inf, self.data) # here below, self.data must not be used if self.verbose: N = len(data[data<self.detection]) if N>0: print("found %s values below detection (%s). " % (N, self.detection)) # deal with detection threshold data = np.where(data < self.detection, np.inf, data) if self.verbose: N = np.sum(np.isnan(data)) if N>0: print("found %s NAN values. " % (N)) # set NA to infinite data[np.isnan(data)] = np.inf return np.isfinite(data)
def _get_masked_dynamic_range(self): y = np.ma.masked_where(self.dynamicRange()==False, self.data) return y def _get_masked_pos(self): return np.ma.masked_inside((self.data-self.data[0,:,:]), self.changeThreshold, np.inf) def _get_masked_neg(self): return np.ma.masked_inside((self.data-self.data[0,:,:]), -np.inf, -self.changeThreshold) def _get_masked_ignore(self): return np.ma.masked_inside((self.data-self.data[0,:,:]), -self.changeThreshold, self.changeThreshold)
[docs] def timeNormalisation(self): return self.time_normalisation()
[docs] def time_normalisation(self): print("WARNING:: use time_normalisation") print("""WARNING:: XMIDAS instead of MIDASREader. Maybe buggy and is deprecated. See MIDASReader doc for more deails.""") """Performs the normalisation over time. See the class docstring for details.""" # create a mask with values within the y = self._get_masked_dynamic_range() # todo: right now changeThr is absolute like in CellNOptR, but it would be better # to implement changeThreshold in a relative manner. # we could also uise the langmuir function like in the paper Saez MSB 2009 #1. Compute the max across all measurements for each signal, excluding # the values out of the dynamic range signalMax = np.max(y, axis=0) # for debugging self._signalMax = signalMax foldChange = abs(y-y[0,:,:])/y[0,:,:] # for debugging self._foldChange = foldChange # 2. get the saturation penalty using EC50noise # compute the penalty for being noisy, which is calculated for each # measurement as the measurement divided by the max measurement across all # conditions and time for that particular signal (excluding values out of the # dynamic range). data = y/signalMax satPenalty = data / (self.EC50noise + data) # for debugging self._satPenalty = satPenalty # Now I make the data go through the Hill function HillData = foldChange**self.HillCoeff/((self.EC50data**self.HillCoeff)+(foldChange**self.HillCoeff)) # for debugging self._HillData = HillData # multiply HillData and SatPenalty, matrix by matrix and element by element. NormData = HillData * satPenalty # use masked_dynamic_range to set to NAN values out of dynamic range NormData[y.mask] = np.nan # multiply negative values by -1 #NormData[self._get_masked_pos().mask] *= -1 NormData[self._get_masked_neg().mask] *= -1 # set non significant values to zero NormData[self._get_masked_ignore().mask] = 0 # rescale negative values if self.rescale_negative: # search for min and max over each colum ignoring time=t0 m = np.nanmin(NormData, axis=0).data[0] M = np.nanmax(NormData, axis=0).data[0] # loop over species for i, x in enumerate(m): if m[i] <0 and m[i]!=M[i]: NormData[:,:,i] = (NormData[:,:,i] - m[i] ) / (M[i] - m[i]) normdata = NormData.data # finally set to zero values that are not significant. return normdata
[docs]class XMIDASNormalise(NormaliseMIDASBase): """This is a version of normalisation. Note that it is 100 times slower than the version that uses numpy. However, it uses the XMIDAS datafrme as input and would be more convenient. Faster version could be implemented by providing a dataframe to numpy array. :param float EC50noise: EC50noise no effect if set to 0 (defaults to 0) :param floatEC50Data: parameter for the scaling of the data between 0 and 1, default=0.5 :param float HillCoef: Hill coefficient for the scaling of the data, default to 2 :param EC50Noise: parameter for the computation of a penalty for data comparatively smaller than other time points or conditions. No effect if set to zero (default). :param float detection: minimum detection level of the instrument, -everything smaller will be treated as noise (NA), default to 0 :param float saturation: saturation level of the instrument, everything over this will be treated as NA, default to Inf. :param float changeThrehold: threshold for relative change considered significant, default to 0 Once parameters are provided, you can still change them since there are all attributes. Thex next step is to normalise the data. this can be done using one of : * :meth:`time_normalisation` * :meth:`control_normalisation` """ def __init__(self, data, mode="time", verbose=True, saturation=np.inf, detection=0., EC50noise=0., EC50data=0.5, HillCoeff=2., changeThreshold=0.): super(XMIDASNormalise, self).__init__(mode=mode, verbose=verbose, saturation=saturation, detection=detection, EC50noise=EC50noise, EC50data=EC50data, HillCoeff=HillCoeff, changeThreshold=changeThreshold) # is the data a filename ? if isinstance(data, str): data = midas.XMIDAS(data) elif isinstance(data, midas.XMIDAS): pass else: raise TypeError("Input must be a filename string or a XMIDAS instance") # FIXME: do we need a copy or just a reference could be enough ? self.df = data.df.copy() self.cellLine = data.cellLine self._df_ref = data # do not touch df_reference #self.rescale_negative = rescale_negative self._level_exp_name = "experiment" def _get_masked_neg(self): # note the - sign before changeThreshold return self.df.groupby(level=[self._level_exp_name]).apply(lambda x: x-x.ix[0]) <= -self.changeThreshold def _get_masked_pos(self): return self.df.groupby(level=[self._level_exp_name]).apply( lambda x: x-x.ix[0]) >= self.changeThreshold def _get_masked_ignore(self): cond1 = self.df.groupby(level=[self._level_exp_name]).apply( lambda x: x-x.ix[0]) >= -self.changeThreshold cond2 = self.df.groupby(level=[self._level_exp_name]).apply( lambda x: x-x.ix[0]) <= self.changeThreshold return cond1 & cond2
[docs] def time_normalisation(self): r"""Class dedicated to the normalisation of MIDAS data Before normalisation, the measurements that are out of the dynamic range [:attr:`detection`; :attr:`saturation`] are tagged to be ignored. The fold change matrix is computed as follows: .. math:: F(t) = \frac{\left\lvert X(t) - X(t_0) \right\lvert }{ X(t_0)} Then, a penalty coefficient is computed as follows: .. math:: P(t) = \frac{\hat{X}(t)}{ EC_{50, noise} + \hat{X}(t)} where :math:`\hat{X} = X/X(t)_{max}`. A new matrix is computed using a Hill transformation: .. math:: H(t) = \frac{F(t)^{k_H}}{ EC_{50, data}^{k_H} + F(t)^{k_H} } The data is first rescaled to take into account the noise and data: .. math:: X_s(t) = P(t) H(t) Negative values are multiplied by -1 and values that are non-significant are set to zero. Finally, rescale for min and max over each colum ignoring time t0 if and only if rescale_scaling is On. .. math:: X_{s}(t) = \frac{X_s(t) - m_s }{M_s - m_s} where :math:`m_s` and :math:`M_s` are the minimum and maximum value over time and experiment for the given specy :math:`s`. .. note:: this normalisation works by computing a fold change relative to the same condition at time 0. If the value at time zero equals zero, , then the fold change calculation will fails. Note, however, that in X(t=0)=0 is not expected in many common biochemical techniques) """ # we will exclude the values out of the dynamic range y = self.df.mask((self.df>self.saturation)|(self.df<self.detection)) #1. Compute the max across all measurements for each signal, # will be used later on for the saturation signalMax = y.groupby(level=self._level_exp_name).max() # fold Change according to time zero # group by experiments to apply on each matrix the function # F(t) = abs(X(t)-X_{t=0})/X_{0} foldChange = y.groupby(level=[self._level_exp_name]).apply( lambda x: abs((x-x.ix[0]))/x.ix[0]) # for debugging self._y = y self._signalMax = signalMax self._foldChange = foldChange # 2. Get the saturation penalty using EC50noise # compute the penalty for being noisy, which is calculated for each # measurement as the measurement divided by the max measurement across all # conditions and time for that particular signal (excluding values out of the # dynamic range). This is P(t) data = y.divide(signalMax, level=self._level_exp_name) satPenalty = data / (self.EC50noise + data) # for debugging self._satPenalty = satPenalty # 3. Now we transform the data through a Hill function HillData = foldChange**self.HillCoeff/ \ ((self.EC50data**self.HillCoeff)+(foldChange**self.HillCoeff)) # for debugging self._HillData = HillData # multiply HillData and SatPenalty, matrix by matrix and element by element. NormData = HillData * satPenalty # use masked_dynamic_range to set to NAN values out of dynamic range # already done with the use of dataframe # NormData[y.mask] = np.nan # multiply negative values by -1 ?? why that this is from the original # paper/CellNOptR NormData[self._get_masked_neg()] *= -1 # set non significant values to zero NormData[self._get_masked_ignore()] = 0 # rescale negative values # search for min and max over each colum ignoring time=t0 m = NormData.groupby(level=self._level_exp_name).min() M = NormData.groupby(level=self._level_exp_name).max() # if minimum is negative and minimum != maximum # FIXME: check this code. cond = (m < 0) & (m!=M) m[cond==False] = 0 M[cond==False] = 1 NormData = NormData.sub(m, level=self._level_exp_name) NormData = NormData.divide(M-m, level=self._level_exp_name) return NormData
[docs] def get_experiments_with_same_control(self): controls = self.get_control_name() mapping = {} for control in controls: mapping[control] = [] for exp in self._df_ref.experiments.index: control = self._get_control(exp) if control != exp: mapping[control].append(exp) return mapping
[docs] def get_control_name(self): """Return experiment name that are control (i.e., stimuli are off)""" name = self._df_ref.experiments[self._df_ref.stimuli.sum(axis=1)==0].index return name
def _get_control(self, experiment_name): inhibitors = self._df_ref.inhibitors.ix[experiment_name] # here is a sub selction where experiment matches the experiment_name mask = (self._df_ref.inhibitors == inhibitors).all(axis=1) # indices contains all experiment that have the same inhibitors indices = self._df_ref.inhibitors[mask].index # now from those experiment, which one is the control (i.e., all stimuli are off) stimuli = self._df_ref.stimuli.ix[indices] mask = stimuli.sum(axis=1)==0 control = stimuli[mask].index # TODO assert control is unique assert len(control) == 1 return control[0]
[docs] def control_normalisation(self): """ In the control normalisation, the relative change is computed relative to the control experiment at the same time. The control being the experiment where all stimuli are zero but inhibitors re identical .. todo:: check that a data set has these experiments. .. note:: the time zero case is a special case. Indeed, even if provided, control is ignored. The t0 data is set to zero everywhere since only two measurements were made: with and without inhibitor(s) and these measurements have been copied across corresponding position; we assume that the inhibitors are already present at time 0 when we add the stimuli to find the right row to normalise. .. note:: for now, the control is chosen as the experiment where all stimuli are zero. changeTHresholdcan be a scalar or a time series (pandas) or a list """ """control = self.get_control_name() # we will exclude the values out of the dynamic range y = self.df.mask((self.df>self.saturation)|(self.df<self.detection)) #1. Compute the max across all measurements for each signal, # will be used later on for the saturation signalMax = y.groupby(level=self._level_exp_name).max() # fold Change according to time zero # group by experiments to apply on each matrix the function # F(t) = abs(X(t)-X_{t=0})/X_{0} foldChange = y.groupby(level=[self._level_exp_name]).apply( lambda x: abs((x-x.ix[0]))/x.ix[0]) #y.ix['undefined'].ix['experiment_0'] = 0 # FIXME is that correct ??? need a complicated example ctrl = y.ix[self.cellLine].ix[control] norm = abs(y.sub(ctrl, level="time")) foldChange = norm.divide(y.ix[self.cellLine].ix[control], level='time') # for debugging # signalMax should be on experiment_1 only not all of them. self._signalMax = signalMax.ix[control] self._foldChange = foldChange """ # we will exclude the values out of the dynamic range y = self.df.mask((self.df>self.saturation)|(self.df<self.detection)) # Note the double max to max over signals and time signalMax = y.groupby(level=['cellLine', 'experiment']).max().max() self._signalMax = signalMax #experiment = list(set(y.reset_index()['experiment'])) foldChange = y.copy() diff = y.copy() # FIXME there is probably a nice way to handle the following without for loop but # this is quite tricky and did not manage so for now, let us use for loops. # besides, this way looks easier for debugging purposes. for exp in self._df_ref.experiments.index: control = self._get_control(exp) #if control == exp: if 0==1: # this is a control, let us skip it for now continue else: # according to http://stackoverflow.com/questions/17552997/how-to-update-a-subset-of-a-multiindexed-pandas-dataframe?rq=1 # get_level_values is the fastest way df1 = y[y.index.get_level_values('experiment') == control] df2 = y[y.index.get_level_values('experiment') == exp] # !!! here, we use df2-df1.values not the inverse in order to # maje sure that the update is made on the experiment of the data #, not the control. The inverse would be buggy. foldChange.update( abs(df2 -df1.values)/df1.values ) diff.update(df2 -df1.values) self._foldChange = foldChange self._diff = diff # 2. Get the saturation penalty using EC50noise # compute the penalty for being noisy, which is calculated for each # measurement as the measurement divided by the max measurement across all # conditions and time for that particular signal (excluding values out of the # dynamic range). This is P(t) self._y = y.copy() data = y.divide(signalMax, level=self._level_exp_name) satPenalty = data / (self.EC50noise + data) # for debugging self._satPenalty = satPenalty # 3. Now we transform the data through a Hill function HillData = foldChange**self.HillCoeff/((self.EC50data**self.HillCoeff)+(foldChange**self.HillCoeff)) # for debugging self._HillData = HillData # multiply HillData and SatPenalty, matrix by matrix and element by element. NormData = HillData * satPenalty self._NormData = NormData.copy() # use masked_dynamic_range to set to NAN values out of dynamic range # already done with the use of dataframe # NormData[y.mask] = np.nan # multiply negative values by -1 ?? why that this is from the original # paper/CellNOptR masked_neg = diff < self.changeThreshold NormData[masked_neg] *= -1 self._NormData = NormData.copy() # set non significant values to zero FIXME to be checked cond1 = diff >= -self.changeThreshold cond2 = diff <= self.changeThreshold masked_ignore = cond1 & cond2 NormData[masked_ignore] = 0 # search for min and max over each colum ignoring time=t0 m = NormData.min(level="experiment") M = NormData.max(level="experiment") # if minimum is negative and minimum != maximum # FIXME: check this code. cond = (m < 0) & (m!=M) m[cond==False] = 0 M[cond==False] = 1 NormData = NormData.sub(m, level="experiment") NormData = NormData.divide(M-m, level="experiment") return NormData