#
#-*- python -*-
#
# This file is part of the cellnopt package
#
# Copyright (c) 2014 - EMBL-EBI
#
# File author(s): Thomas Cokelaer (cokelaer@ebi.ac.uk)
#
# Distributed under the GLPv3 License.
# See accompanying file LICENSE.txt or copy at
# http://www.gnu.org/licenses/gpl-3.0.html
#
# website: www.cellnopt.org
#
##############################################################################
from __future__ import print_function
import types
import pylab
import numpy as np
import pandas as pd
from easydev.logging_tools import Logging
import colormap
from cno.io.measurements import Measurement
from cno.io import midas_normalisation as normalisation
from easydev import DevTools
from cno.misc import CNOError
__all__ = ["XMIDAS", "Trend", 'MIDASReader']
class MIDAS(object):
"""This is s a description of the MIDAS format (let us call it 1.0)
column with those name are ignored:
['NOINHIB', 'NOCYTO', 'NOLIG', 'NO-CYTO', 'NO-INHIB', 'NO-LIG']
"""
_valid_codes = {
'ID': 'identifier',
'TR': 'stimuli/inhibitors',
'DA': 'times',
'DV': 'measurements'}
_ignore_codes = ['NOINHIB', 'NOCYTO', 'NOLIG', 'NO-CYTO', 'NO-INHIB', 'NO-LIG']
def __init__(self, filename=None, verbose=False):
""".. rubric:: Constructor
:param str filename: the filename (including correct path)
"""
self._header = []
self._names = []
self.verbose = verbose
#: filename of the original data
self.filename = filename
self.cmap_scale = 1 # 1 is same as CellNOptR
self.fontsize = 16
self.logging = Logging(verbose)
self._colormap = colormap.Colormap()
self._dev = DevTools()
[docs]class MIDASReader(MIDAS):
"""A MIDAS reader that converts the experiments and data into dataframes
Used by XMIDAS to read and convert the data.
CellLine must be encoded in the header as follows::
TR:name:CellLine
If name is not provided, it is replaced internally with "undefined".
Those columns are ignored in MIDASReader:
['NOINHIB', 'NOCYTO', 'NOLIG', 'NO-CYTO', 'NO-INHIB', 'NO-LIG']
User should not need to use this class. Use :class:`XMIDAS` instead.
"""
def __init__(self, filename, verbose='ERROR', exclude_rows={}):
super(MIDASReader, self).__init__(filename, verbose)
self.exclude_rows = exclude_rows
# names of the multi index level (rows)
self._levels = ["cell", "experiment", "time"]
[docs] def read(self):
# If the provided filename is not defined, nothing to do
if self.filename == None:
return
self.logging.debug("Reading the data")
# skip spaces after delimiter
self._data = pd.read_csv(self.filename, skipinitialspace=True,
sep=",")
self.logging.debug(" - Processing cell lines")
self._preprocess_cellines()
self.logging.debug(" - Filtering")
self._filtering_raw_data()
# remove columns that are invalid and check MIDAS validity
self._midas_validity()
self.logging.debug(" - Checking format")
# some cleanup to remove columns that have to be ignored
labels = ["TR:"+x for x in self._ignore_codes
if "TR:"+x in self._data.columns]
self._data = self._data.drop(labels, axis=1)
# just clean up the labels (strip)
self._data.columns = [x.strip() for x in self._data.columns]
# from the data, build up the experiment and data dataframes
self.logging.debug(" - Initialising")
self._init()
self.logging.debug(" - Data loaded")
def _filtering_raw_data(self):
# problably not the most efficient but does the job:
for key, value in self.exclude_rows.items():
self._data = self._data[self._data[key] != value]
def _preprocess_cellines(self):
#CellLine are tricky to handle with the MIDAS format because they use the
#same prefix TR: as the treatments. You must be sure that (1) there are
# 2 : signs (2) the suffix is called CellLine and (3) the middle name is
# provided
# Those could be recognised but only CellLine is correct. So, let us
# enforce the correct one instead of dealing with all kind of user
# choices.
cellLines = [col for col in self._data.columns if "CellLine" in col]
if len(cellLines) == 0:
self.logging.warning("Could not find any column with the required keyword 'CellLine'"
+ "capitalisation matters !. Adding a column named TR:undefined:CellLine")
self._data['TR:undefined:CellLine'] = [1] * self._data.shape[0]
for name in cellLines:
if name.count(":") != 2:
txt = "Column name related to CellLine %s must have 2 : characters".format(name)
txt += "{} has less or more than 2"
raise CNOError(txt)
# rename the cellLines if there are undefined
# let us give thme the same name ('undefined')
# this should be done at the level of the data
count = 0
for i, name in enumerate(cellLines):
if name.split(":")[1] == "":
self.logging.warning("Found a CellLine column without name. Renamed to undefined.")
columns = list(self._data.columns)
columns[i] = "TR:undefined:CellLine"
self._data.columns = columns
count += 1
if count > 1:
raise CNOError("More than 1 undefined cell line. " +
"Please name the cellLine columns with TR:name:CellLine")
# if there is only one undefined, no need for an error.
# otherwise they will be an ambiguity.
celltype_names = self._get_cellLines()
if len(cellLines) >= 2:
if self.cellLine == None or self.cellLine not in celltype_names:
txt = "More than 1 celline was found.\n"
txt += "You must select one amongst: {}".format([this.split(":")[1] for this in cellLines])
txt += "\n Use the keyword cellLine (note the capital L). e.g. cellLine='HepG2'"
self.logging.warning(txt)
self.cellLine = celltype_names[0]
#raise CNOError(txt)
else:
# we could remove columns and rows where cell type is not correct.
# but we are going to do it in the _init so that a user can
# change his mind and reset the cell line to be looked at.
pass
# check that each row belongs to one and only one cell line
N = len(self._data)
if all(self._data[[x for x in self._data.columns
if 'CellLine' in x]].sum(axis=1) == [1] * N) is not True:
row_indices = (self._data[[x for x in self._data.columns
if 'CellLine' in x]].sum(axis=1) != [1] * N)
indices = self._data.index[row_indices]
raise CNOError("cell line columns are not compatible in rows %s . " % indices +
"Check that sum over cell line on each row is unity")
if len(celltype_names) == 1:
self._cellLine = celltype_names[0]
def _midas_validity(self):
# check validity of TR:
for this in self._data.columns:
if ":" not in this:
txt = "Error in header of the input MIDAS file\n"
txt += " Column's name must contain the special character ':'\n"
txt += " Found {0}".format(this)
raise CNOError(txt)
if this.split(":")[0] not in self._valid_codes.keys():
txt = "Error in header of the input MIDAS file\n"
txt += " Column's name must start with one of the valid code {}\n".format(self._valid_codes)
raise CNOError(txt)
# should have at least one DV
if len([x for x in self._data.columns if x.startswith("DV")]) == 0:
raise CNOError("Header of MIDAS file has no columns starting with DV. expects at least one")
def _set_missing_time_zero(self):
# check if zero time data is available for all experiments
unique_times = sorted(list(self.df.index.levels[2]))
self._missing_time_zero = False
if len(unique_times) < 2 or 0 not in unique_times:
self._missing_time_zero = True
labels = list(self.df.index.labels[2])
counter = {}
for label in set(labels):
counter[label] = labels.count(label)
for label in set(labels):
if counter[0] != counter[label]:
self._missing_time_zero = True
def _init(self):
"""Builds the dataframe"""
# select only data that matches the cell line choice made by the user.
cellLine = 'TR:%s:CellLine' % self.cellLine
if len(self._data) == 0:
return
# select data for the cell line provided by the user.
_data = self._data[self._data[cellLine] == 1]
# and remove all column with the CellLine keyword now
_data = _data[ [col for col in _data.columns if "CellLine" not in col]]
# drop ID columns if any
self._data_raw = _data.copy()
_data = _data[ [col for col in _data.columns if col.startswith("ID")==False]]
# figure out the treatments, times and measurements
df_tr = _data[[this for this in _data.columns if this.startswith("TR")]]
df_da = _data[[this for this in _data.columns if this.startswith("DA")]]
df_dv = _data[[this for this in _data.columns if this.startswith("DV")]]
# let us gather experiments and replace empty fields with zeros ??
# FIXME filling NA with zero is it correct ? seems so
# DATA should not be changed to zero though
value_experiments = _data[df_tr.columns].copy()
# This takes 40% of the time!
value_experiments = value_experiments.fillna(0)
value_signals = _data[df_dv.columns].as_matrix()
value_times = _data[df_da.columns]
names = [this for this in df_tr[:] if "CellLine" not in this]
self._experiments = _data[names].drop_duplicates()
self._experiments.index = range(0, self._experiments.shape[0])
self._experiments.index = ["experiment_{}".format(this)
for this in self._experiments.index]
# FIXME: already done above ...seemed required for time0_to_duplicate test
# takes 35% of the time
self._experiments = self._experiments.fillna(0)
# FIXME this part is slow (for loop)
# build the tuples that will be used by the MultiIndex dataframe
tuples = []
self._values_exps = value_experiments
N = len(value_experiments)
map_index = dict(zip(value_experiments.index, range(0,N)))
for ix in _data.index:
# 1. find name of the experiment
this_exp = value_experiments.values[map_index[ix]]
# scan unique experiments and figure out which one is this_exp
exp_name = None
count = 0
for this_unique_exp in iter(self._experiments.values):
if all(this_unique_exp == this_exp):
# found it
exp_name = self._experiments.index[count]
count += 1
# if so, not need to look for others
#temp_df.drop(exp_name, inplace=True)
break
else:
count+=1
assert exp_name is not None
# 2. times
time = set(value_times.values[map_index[ix]])
assert len(time) == 1
time = list(time)[0]
tuples.append((self.cellLine, exp_name, time))
# replace empty strings with 0
# note that ,,, is interpreted as ,NaN,NaN,NaN
# but , , , is interpreted as ," "," "," ",
self._experiments = self._experiments.applymap(lambda x: 0
if isinstance(x, str) and x.isspace() else x)
# must convert read data into numeric value.
self._experiments = self._experiments.convert_objects(convert_numeric=True,
copy=True)
index = pd.MultiIndex.from_tuples(tuples, names=self._levels)
#keep = [this for this in self.df.columns if this not in ["experiments", "times"]]
names_species = [x for x in _data.columns if x.startswith('DV:')]
names_species = [x[3:] for x in names_species]
self.df = pd.DataFrame(value_signals, index=index, columns=names_species)
self.df = self.df.sortlevel(["experiment"])
self.df = self.df.sort_index(axis=1) # sort the species
# Get rid of TR in experiments
self._experiments.columns = [this.replace("TR:", "") for this in self._experiments.columns]
cues = []
for c in self._experiments.columns:
if c.count(":") >=2:
raise CNOError("Invalid header. Found more than 2 : sign in a column")
if c.endswith(":Stimuli"):
cues.append(c.split(":")[0])
elif c.endswith(":Inhibitors"):
cues.append(c.split(":")[0] + ":i")
elif c.endswith("i"):
cues.append(c[0:-1] + ":i")
else:
cues.append(c)
self._experiments.columns = cues
inh = [x for x in self._experiments.columns if x.endswith(":i") is True]
stim = [x for x in self._experiments.columns if x.endswith(":i") is False]
# Finally, build up the dataframe for experiments
# which one are inhibitors / stimuli ?
columns_is = []
for this in self._experiments.columns:
if this.endswith(':i'):
columns_is.append('Inhibitors')
else:
columns_is.append('Stimuli')
columns = [x.replace(":i","") for x in self._experiments.columns]
# add dummy columns that we will delete afterwards just to create the structure of themultiindex
data = self._experiments.values
if 'Inhibitors' not in columns_is:
# let us add an inhibitors
columns_is += ['Inhibitors']
columns += ['__dummy__inhibitor']
N = len(self._experiments.values)
data = np.concatenate([self._experiments.values, np.array([[None]] * N)], axis=1)
if 'Stimuli' not in columns_is:
columns_is = ['Stimuli'] + columns_is
columns = ['__dummy__stimulus'] + columns
N = len(self._experiments.values)
data = np.concatenate([self._experiments.values, np.array([[None]] * N)], axis=1)
self._experiments = pd.DataFrame(data, columns=[columns_is, columns],
index= self._experiments.index)
try:
self._experiments.drop(('Inhibitors', '__dummy__inhibitor'), axis=1, inplace=True)
except:
pass
try:
self._experiments.drop(('Stimuli', '__dummy__stimulus'), axis=1, inplace=True)
except:
pass
self._experiments.sortlevel(axis=1, inplace=True)
#self.df = self.df.sort_index(axis=1)
self._set_missing_time_zero()
if self._missing_time_zero == True:
# 3 cases:
# - no time zero provided
# - one control provided all inh and all stim off so sum==0
# - one control provided for each different set of inhibitors (could include previous case)
df = self.df.query('time==0')
if len(df) == 0:
# no time 0 provided --> case 1
S = -1
else:
# if S == 0, we may still be in case where there are
# other control for different inhibitors, so we need t check that
if len(set(df.reset_index()['experiment']))>1:
S = 1
else:
S = 0
if S > 0:
self.logging.debug("Duplicating time zero based on inhibitors")
self._duplicate_time_zero_using_inhibitors_only()
elif S == 0:
self.logging.debug("Duplicating time zero")
self._duplicate_time_zero()
elif S == -1:
self.logging.debug("Adding time zero")
self._add_time_zero()
# drop the time 0, which has no data in principle
# self._set_missing_time_zero()
# if self._missing_time_zero is True:
# raise CNOError("Issue with missing time zero ")
if self.df.shape[0] > len(self.times) * self.experiments.shape[0]:
self.logging.warning("You may have duplicated experiments. " +
"Please average the replicates using self.average_replicates(inplace=True)")
if self.df.max(skipna=True).max(skipna=True) > 1:
self.logging.warning("values larger than 1. " +
"You may want to normalise/scale the data")
self.sim = self.df.copy() * 0
self.errors = self.df.copy() * 0
def _add_time_zero(self):
# copy the dataframe with data assuming time0 does not exsits
# actually, we should have a row only for each unique experiment
# excluding time since we could have tim1, time2, ...
times = self.df.query("time!=0").index.levels[2]
time = times[0]
tobeadded = self.df.query("time==@time") * 0
tobeadded.reset_index(inplace=True)
tobeadded['time'] = [0] * len(tobeadded)
tobeadded.set_index(self._levels, inplace=True)
df = pd.concat([self.df, tobeadded])
#df = df.set_index(self._levels)
df = df.sortlevel([self._levels[1], self._levels[2]]) # experiment
#df = df.sort_index(axis=1)
self.df = df.copy()
def _duplicate_time_zero(self):
tobeadded = pd.DataFrame()
# Let us identify the data for the time0
df = self.df.query('time==0')
# true if only one cell line
assert len(set(df.reset_index()['experiment'])) == 1
exp0 = df.reset_index()['experiment'][0]
df = self.df.query("time==0 and experiment==@exp0")
# now, let us get rid of all time 0
self.df = self.df.query('time!=0')
#
experiments = list(self.experiments.index)
for i, this_exp in enumerate(experiments):
try:
# first experiment (control) was dropped.
# If there is no time1, this should fail
times = list(self.df.xs((self.cellLine, this_exp)).index)
except:
# in which case, the exp shoudl be dropped as well
self.experiments.drop(this_exp, inplace=True)
continue
if 0 in times:
times.remove(0)
if len(times) == 0:
self.experiments.drop(this_exp, inplace=True)
continue
newrow = df.copy()
# let us add some index information that is now missing
newrow[self._levels[0]] = self.cellLine
newrow[self._levels[1]] = this_exp
newrow[self._levels[2]] = 0
# we can now merge with the full data set
tobeadded = pd.concat([tobeadded, newrow])
# finally concatenate all the rows with the dataframe df
df = pd.concat([self.df.reset_index(), tobeadded], ignore_index=True)
df = df.set_index(self._levels)
df = df.sortlevel([self._levels[1]]) # experiment
self.df = df.copy()
self._reset_experiment_names()
def _reset_experiment_names(self):
names = list(self.experiments.index)
N = len(self.experiments)
newnames = ['experiment_' + str(i) for i in range(0,N)]
mapping = dict(zip(names,newnames))
self.experiments.rename(index=mapping, inplace=True)
# also need to reset the experiment in df:
self.df.reset_index(inplace=True)
self.df['experiment'] = [mapping[x] for x in self.df['experiment']]
self.df.set_index(self._levels, inplace=True)
self.df = self.df.sort_index()
def _duplicate_time_zero_using_inhibitors_only(self):
"""
Sometimes the time zero data sets are not explicitly written in MIDAS
files. One example is MD-ExtLiverHepG2-MCP2010-mod4.csv
A set of perturbations with same inhibitors but different stimuli
have the same control, which is the perturbation with same inhibitor
and no stimuli. That control can therefore be duplicated.
"""
# find experiment that have missing time zero data
experiments = list(self.experiments.index)
# FIXME: this is slow
tobeadded = pd.DataFrame()
for i, this_exp in enumerate(experiments):
times = self.df.ix[self.cellLine].ix[experiments[i]].index
if 0 not in times:
# need to find the experiment with same inhibitors and time 0
# there should be only one ? maybe not if replicates.
these_inhibitors = self.experiments.ix[this_exp].Inhibitors
for this_exp_intern in experiments:
times = self.df.ix[self.cellLine].ix[this_exp_intern].index
if 0 in times:
if all(self.experiments.ix[this_exp_intern].Inhibitors ==
these_inhibitors):
break # so that if there are several replicates found,
# we pick up the first one only
# get the times for this experimenti. it must contain the time zero
newdata = self.df.xs((self.cellLine, this_exp_intern))
# we only need the time 0
newrow = newdata[newdata.index == 0].copy()
# let us add some index information that is now missing
newrow[self._levels[0]] = self.cellLine
newrow[self._levels[1]] = this_exp
newrow[self._levels[2]] = 0
tobeadded = pd.concat([tobeadded, newrow])
# finally concatenate all the rows with the dataframe df
df = pd.concat([self.df.reset_index(), tobeadded], ignore_index=True)
df = df.set_index(self._levels)
df = df.sortlevel([self._levels[1]]) # experiment
self.df = df.copy()
[docs]class XMIDAS(MIDASReader):
"""The extended MIDAS data structure.
.. plot::
:include-source:
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.df # access to the data frame
m.experiments # access to experiments
m.sim # access to simulation
m.plot()
Please see main documentation for extended explanation and documentation
of the methods here below.
.. warning:: if there are replicates, call average_replicates before creating a
simulation or calling plot"mode="mse")
When reading a MIDAS file with several cell lines, all data is stored but
only one cell line can be visualised and manipulated at a time::
# when reading, you must give the cell line name you want to activate
multiple_cell = XMIDAS('somedata', 'cell1')
# valid cell line names are stored in this attribute
multiple_cell.cellLines
# activating another cell line needs to set this attribute:
multiple_cell.cellLine = 'cell2'
"""
def __init__(self, filename=None, cellLine=None, verbose=False, exclude_rows={}):
"""**Constructor**
:param str filename: filename of a MIDAS file or a XMIDAS instance
:param str cellLine: name of a cell Line (compulsory if several
cell lines are present)
"""
super(XMIDAS, self).__init__(filename, verbose=verbose, exclude_rows=exclude_rows)
self._data = pd.DataFrame()
self._missing_time_zero = None
self._cellLine = cellLine
# multi index related (position of the columns)
self._celltype_index = 0
self._experiment_index = 1
self._time_index = 2
self.verbose = verbose
#self._ignore_invalid_columns = True
self._experiments = pd.DataFrame()
self.df = pd.DataFrame()
self.sim = self.df.copy() * 0
self.errors = self.df.copy() * 0
if filename is None:
pass
elif isinstance(filename, str):
self.read()
self.create_empty_simulation()
self.errors = self.sim.copy()
else:
try:
md = filename
self.filename = md.filename
self.df = md.df.copy()
self._ = md._data.copy()
self._missing_time_zero = md._missing_time_zero
self.df = self.df.copy()
self._experiments = md.experiments.copy()
self._cellLine = md.cellLine
self.sim = md.sim.copy()
self.errors = md.errors.copy()
except Exception as e:
raise CNOError(e.message +
"Invalid input file. Expecting a XMIDAS instance or valid filename")
self._params = {
'plot_layout_space': 0.2, # spaces between axes
'plot_fontsize':16,
'plot_fontsize_times':16,
'plot_fontsize_species':16,
'plot_fontsize_perturbations':14,
'plot_fontsize_titles':16,
'plot_layout_shifttop': 1,
'plot_colorbar_N': 20,
'plot_orientation_perturbation_label': 90,
'plot_data_markersize': 5,
'plot_sim_lw': 4,
'plot_sim_color':'blue',
}
from easydev import AttrDict
self._params = AttrDict(**self._params)
def _manage_replicates(self):
"""
"""
groups = self.df.groupby(level=self._levels).groups
if any([len(this)>1 for this in groups.values()])==False:
self.logging.info("No replicates found")
else:
newdf = self.df.groupby(level=self._levels).agg([np.mean, np.std])
return newdf
[docs] def average_replicates(self, inplace=False):
"""Average replicates if any
:param bool inplace: default to False
If inplace, a new dataframe :attr:`errors` is created
and contains the errors (standard deviation)
"""
df = self._manage_replicates()
if isinstance(df, pd.DataFrame):
dfstd = df[[this for this in df.columns if "std" in this]]
df = df[[this for this in df.columns if "mean" in this]]
df.columns = [this[0] for this in df.columns]
if inplace:
self.df = df.copy()
self.sim = self.df.copy()
self.errors = dfstd.copy()
else:
return df
def __div__(self, this):
m = self.copy()
m.df /= this
return m
def __mul__(self, this):
m = self.copy()
m.df *= this
return m
def __sub__(self, this):
# m1 + m2 or m1+=m2
m = self.copy()
if isinstance(this, XMIDAS):
m.df -= this.df
else:
m.df -= this
return m
def __add__(self, this):
# m1 + m2 or m1+=m2
m = self.copy()
if isinstance(this, XMIDAS):
m.df += this.df
else:
m.df += this
return m
def __getitem__(self, item):
"""Simple alias to the dataframe's columns
.. doctest::
>>> from cno import XMIDAS, cnodata
>>> m = XMIDAS(cnodata("MD-ToyPB.csv"))
>>> m['p38'].ix[0]
0.1076
>>> # equivalent to m.df['p38'].ix[0]
.. note:: this is a read-only access
"""
if len(item)==3 and isinstance(item, str) != True:
i1, i2, i3 = item
return self.df.ix[i1].ix[i2].ix[i3]
elif isinstance(item,str)==True:
if item in self.df.columns:
return self.df[item].copy()
def _get_experiments(self):
return self._experiments
experiments = property(_get_experiments,
doc="Return dataframe with experiments")
def _get_names_species(self):
return self.df.columns.values
names_species = property(_get_names_species,
doc="list of species")
names_signals = property(_get_names_species,
doc="same as :attr:`names_species`")
species = property(_get_names_species,
doc="Getter for the columns of the dataframe that represents the species/signals")
signals = property(_get_names_species,
doc="Getter for the columns of the dataframe that represents the species/signals")
def _get_names_cues(self):
cues = self.names_stimuli + self.names_inhibitors
return cues
names_cues = property(_get_names_cues,
doc="Return list of stimuli and inhibitors together")
def _get_cellLines(self):
# XMIDAS created from an input file
if len(self._data):
names = [this for this in self._data.columns if "CellLine" in this]
names = [this.split(":")[1] for this in names]
return names
# XMIDAS created from another builder e.g. MIDASBuilder
else:
names = self.df.index.levels[0]
return names
cellLines = property(_get_cellLines, doc="Return available cell lines")
def _get_cellLine(self):
return self._cellLine
def _set_cellLine(self, name):
if name not in self.cellLines:
raise CNOError("Invalid cellLine name {}. Valid ones are {}".format(name,
self.cellLines))
self._cellLine = name
# TODO: do we need to call _init again ?
self._init()
cellLine = property(_get_cellLine, _set_cellLine,
doc="Getter/Setter of the active cell line")
def _get_times(self):
times = self.df.index.levels[self._time_index]
return sorted(list(times))
times = property(_get_times, doc="Getter to the different times")
def _get_names_inhibitors(self):
return list(self.experiments.Inhibitors.columns)
names_inhibitors = property(_get_names_inhibitors,
doc="return list of inhibitors")
def _get_names_stimuli(self):
return list(self.experiments.Stimuli.columns)
names_stimuli = property(_get_names_stimuli,
doc="returns list of stimuli")
[docs] def reset(self):
"""Reset the dataframe to original data set"""
self._init()
def _check_consistency_data(self):
# times consistency all times must have same length
pass
[docs] def reset_index(self):
"""Remove all indices (cellLine, time, experiment)
Done in the 3 dataframes :attr:`df`, :attr:`sim` and :attr:`errors`
"""
self.df.reset_index(inplace=True)
self.sim.reset_index(inplace=True)
self.errors.reset_index(inplace=True)
[docs] def set_index(self):
"""Reset all indices (cellLine, time, experiment)
Done in the 3 dataframes :attr:`df`, :attr:`sim` and :attr:`errors`
"""
if len(self.df):
self.df.set_index(self._levels, inplace=True)
self.sim.set_index(self._levels, inplace=True)
self.errors.set_index(self._levels, inplace=True)
[docs] def remove_species(self, labels):
"""Remove a set of species
:param labels: list of Species (list of strings) or just one
species(single string or as a list.
::
m.remove_species("p38")
m.remove_species(["p38"])
"""
labels = self._dev.to_list(labels)
columns = self.df.columns[:]
for label in labels:
if label not in columns:
self.logging.warning("{} not =in the species. skipped".format(label))
else:
self.df.drop(label, axis=1, inplace=True)
self.sim.drop(label, axis=1, inplace=True)
self.errors.drop(label, axis=1, inplace=True)
[docs] def remove_cellLine(self, labels):
if labels in self.cellLines and len(self.cellLines) == 1:
raise ValueError("Irrelevant since there is only one cell line in the dataframe")
else:
raise NotImplementedError
# self._remove_labels_from_level(labels, self._levels[0])
# need to handle _data as well removing rows that correspond to the cell line
# set cell line to the remaining one if only one
[docs] def remove_times(self, labels):
"""Remove time values from the data
:param list labels: one time point or a list of time points.
Valid time points are in the :attr:`times` attribute.
"""
self._remove_labels_from_level(labels, "time")
[docs] def remove_experiments(self, labels):
"""Remove experiment(s) from the dataframe
:param list labels: one experiment or a list of experiments.
Valid experiments are in the :attr:`experiments.index` dataframe.
Experiments are of the form "experiment_12". You can refer
to an experiment by its number (e.g., here 12).
"""
self._remove_labels_from_level(labels, "experiment")
def _remove_labels_from_level(self, labels, level):
"""For a given level and a list of labels, this function
figures out the rows to remove in the sim/errors/df data frames.
This is a bit complicated but looks like the proper way
"""
labels = self._dev.to_list(labels)
if level == "experiment":
labels = [x if "experiment" in str(x) else "experiment_"+str(x)
for x in labels]
# FIXME try if it fails must call set_index
self.reset_index()
for label in labels:
if label not in set(self.df[level]):
self.logging.warning("{} not found. Skipped".format(label))
else:
self.df.drop(self.df[self.df[level] == label].index, inplace=True)
self.sim.drop(self.sim[self.sim[level] == label].index, inplace=True)
self.errors.drop(self.errors[self.errors[level] == label].index, inplace=True)
if level == "experiment":
self.experiments.drop(label, inplace=True)
# and now set the level back as before
self.set_index()
[docs] def remove_stimuli(self, labels):
"""Remove a stimuli from the :attr:`experiment` dataframe
:param labels: a string or list of string representing the stimuli
"""
self._remove_column_experiment(labels, level='Stimuli')
[docs] def remove_inhibitors(self, labels):
"""Remove inhibitor(s) from the :attr:`experiment` dataframe
:param labels: a string or list of string representing the inhibitor(s)
"""
self._remove_column_experiment(labels, level='Inhibitors')
def _remove_column_experiment(self, labels, level):
if level == 'Inhibitors':
valid_names = self.names_inhibitors
elif level == 'Stimuli':
valid_names = self.names_stimuli
labels = self._dev.to_list(labels)
for label in labels:
# m.experiments.columns.levels[1] contains all names even those that
# are removed so need to use names_inhibitors instead
if label not in set(valid_names):
self.logging.warning("{} not found. Skipped".format(label))
else:
self.experiments.drop((level,label), axis=1, inplace=True)
[docs] def rename_stimuli(self, names_dict):
"""Rename stimuli in the :attr:`experiment` dataframe
:param names_dict: a dictionary with names (keys) to be replaced (values)
::
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.rename_species({"erk":"ERK", "akt":"AKT"})
.. seealso:: :meth:`rename_stimuli`, :meth:`rename_inhibitors`
"""
self._dev.check_param_in_list(names_dict.keys(), self.names_stimuli)
self.experiments.rename(columns=names_dict, inplace=True)
[docs] def rename_inhibitors(self, names_dict):
"""Rename inhibitors
:param names_dict: a dictionary with names (keys) to be replaced (values)
::
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.rename_species({"raf":"RAF"})
.. seealso:: :meth:`rename_stimuli`, :meth:`rename_species`
"""
self._dev.check_param_in_list(names_dict.keys(), self.names_inhibitors)
self.experiments.rename(columns=names_dict, inplace=True)
[docs] def rename_species(self, names_dict):
"""Rename species in the main :attr:`df` dataframe
:param names_dict: a dictionary with names (keys) to be replaced (values)
::
from cno import cnodata, XMIDAS
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.rename_species({"erk":"ERK", "akt":"AKT"})
.. seealso:: :meth:`rename_stimuli`, :meth:`rename_inhibitors`
"""
self._dev.check_param_in_list(names_dict.keys(), self.df.columns)
columns = list(self.df.columns)
columns = [c if c not in names_dict.keys() else names_dict[c] for c in columns]
self.df.columns = columns
self.sim.columns = columns
[docs] def rename_cellLine(self, to_replace):
"""Rename cellLine indices
:param dict to_replace: dictionary with mapping of values to be replaced.
For example; to rename a valid cell line use::
m.rename_cellLine({"undefined": "PriHu"})
"""
self.reset_index()
self.df.replace({self._levels[0]: to_replace}, inplace=True)
self.set_index()
for k, v in to_replace.items():
if v in self.cellLines:
raise ValueError('CellLine %s already a cell line name' % v)
old = 'TR:' + k + ':CellLine'
if old in self._data.columns:
new = 'TR:' + v + ':CellLine'
self._data.columns = [x.replace(old, new) for x in self._data.columns]
if self.cellLine in to_replace.keys():
self._cellLine = to_replace[self.cellLine]
[docs] def rename_time(self, to_replace):
"""Rename time indices
:param dict to_replace: dictionary with mapping of values to be replaced.
For example; to convert time in minutes to time in seconds, use
something like::
m.rename_time({0:0,1:1*60,5:5*60})
"""
self.reset_index()
# key may be same as value, in which case replace method fails. So, we get rid of them
to_replace = dict([(k,v) for k,v in to_replace.items() if k!=v])
self.df.replace({"time": to_replace}, inplace=True)
self.set_index()
[docs] def reset_experiments(self):
"""remove duplicated experiments and rename them from 0 to N
If experiments are duplicated for some reasons (e.g., you removed
an entire column with a given inibitor), then experiments may be duplicated.
In which case, you may want to (1) remove the duplicated experiments (2)
rename the name so that it goes from 0 to the new number of unique experiments
and (3) reflect those changes into the **df** dataframe.
There could be replicares in the resulting **df** attribute, which
should be averaged by the user using :meth:`average_replicates` method.
"""
# figure out the list of experiments in terms of column labels.
levels = self.experiments.columns.levels
labels = self.experiments.columns.labels
to_replace = {}
list_exps = [(levels[0][l1], levels[1][l2]) for l1,l2 in zip(labels[0], labels[1])]
groups = self.experiments.groupby(list_exps).groups
for k,v in groups.items():
# if we have duplicated experiments,
if len(v)>1:
# let us rename them:
for this in v:
to_replace[this] = v[0]
self.rename_experiments(to_replace) # in df and experiments dataframes
#self.experiments.drop_duplicated()
self.df.sortlevel(inplace=True)
self.experiments.drop_duplicates(inplace=True)
# finally rename of experiment names to make sure it goes from 0 to N
to_replace = [(k,'experiment_%s' %i) for i,k in enumerate(self.experiments.index)]
self.rename_experiments(dict(to_replace))
[docs] def rename_experiments(self, to_replace):
"""Rename experiments in the **df** and **experiments** dataframes
:param dict to_replace: a dictionary mapping old values (key) to new values (value)
"""
# the **df** attribute
self.reset_index()
to_replace = dict([(k,v) for k,v in to_replace.items() if k!=v])
self.df.replace({"experiment": to_replace}, inplace=True)
self.set_index()
# the experiments attribute:
self.experiments.index = [to_replace[x] if x in to_replace.keys() else x
for x in self.experiments.index]
[docs] def merge_times(self, how="mean"):
"""Not implemented yet"""
raise NotImplementedError
[docs] def add_experiment(self, e):
"""Not implemented yet"""
raise NotImplementedError
[docs] def scale_max(self, inplace=True):
r"""Divide all data by the maximum over the entire data set
.. math::
X = \frac{X}{M}
where :math:`M = \max_{e,s,t} X` (with :math:`e` the experiment,
:math:`s` the species, and :math:`t` the time).
"""
M = self.df.max().max()
if inplace:
self.df /= M
self.errors /= M
else:
return self.df / M
[docs] def scale_max_across_experiments(self, inplace=True):
r"""Rescale each species column across all experiments
.. math::
X_s = \frac{X}{M_s}
In the MIDAS plot, this is equivalent to dividing each column by
the max over that column. So, on each column, you should get 1 max values
set to 1 (if the max is unique). The minimum values may not be set to 0.
"""
M = self.df.max()
newdf = self.df.divide(M, level="experiment")
if inplace:
self.df = newdf.copy()
for col in M.index:
self.errors[col] /= M.ix[col]
else:
return newdf
[docs] def scale_min_max_across_experiments(self, inplace=True):
r"""Rescale each species column across all experiments
.. math::
X_s = \frac{X-m_s}{M-m_s}
where :math:`m = min_{e,s,t} X` and :math:`M = max_{e,s,t} X`,
with :math:`e` the experiment, with :math:`s` the species,
with :math:`t` the time.
"""
m = self.df.min()
M = self.df.max()
data = (self.df - m)/(M-m.astype(np.float64))
if inplace:
self.df = data.copy()
for col in M.index:
self.errors[col] /= M.ix[col] - m.ix[col]
else:
return data
[docs] def scale_min_max(self, inplace=True):
r"""Divide all data by the maximum over entire data set
.. math::
X = \frac{X-m}{M-m}
where :math:`m = min_{e,s,t} X` and :math:`M = max_{e,s,t} X`,
with :math:`e` the experiment, with :math:`s` the species,
with :math:`t` the time.
This is an easy (but naive way) to set all data points between 0 and 1.
"""
m = self.df.min().min()
M = self.df.max().max()
# if X is normal distributed,
# new errors of X-a is unchanged
# new errors of (X-a)/(b-a) is to be divided by b-a
if inplace:
self.df -= m
self.df /= (M-m)
self.errors /= (M-m)
else:
newdf = self.df - m
newdf /= (M-m)
return newdf
[docs] def create_empty_simulation(self):
"""Populate the simulation dataframe with zeros.
The simulation has the same layout as the experiment.
The dataframe is stored in :attr:`sim`.
"""
self.sim = self.df * 0
[docs] def create_random_simulation(self):
"""Populate the simulation dataframe with uniformly random values.
The simulation has the same layout as the experiment.
The dataframe is stored in :attr:`sim`.
"""
self.sim = self.df * 0 + np.random.uniform(size=self.df.shape)
[docs] def get_diff(self, sim=None, squared=True, normed=True):
r"""Return dataframe with differences between the data and simulation
The dataframe returned contains the MSE (mean square error) by default.
:param sim: if not provided, uses the :attr:`sim` attribute.
:param normed:
:param bool square: set to True to get MSE otherwise, returns the absolute
values without squaring
if square is True and normed is True:
.. math::
\epsilon = \frac{1}{N_t} \left(X - X^s\right)^2
We then sum over time and if *normed* is False, :math:`N_t` is set to 1.
"""
# dataframe cannot be compared to None so, we need this trick:
if isinstance(sim, types.NoneType):
sim = self.sim
if squared is True:
diff = (sim - self.df).abs()**2
else:
diff = (sim - self.df).abs()
diff = diff.sum(level="experiment")
if normed:
N = len(self.times)
diff = diff/float(N)
return diff
# PLOTTING
[docs] def corr(self, names=None, cmap='gist_heat_r', show=True):
"""plot correlation between the measured species
:param list names: restriction to some species if provided.
:param string cmap: a valid colormap (e.g. jet). Can also use "green" or "heat".
.. plot::
:include-source:
:width: 80%
>>> from cno import XMIDAS, cnodata
>>> m = XMIDAS(cnodata("MD-ToyPB.csv"))
>>> m.corr(cmap="green")
"""
cmap = self._get_cmap(cmap)
corr = self.df.corr()
if show is True:
N = corr.shape[0]
names = self.df.columns[:]
pylab.clf()
pylab.pcolor(corr, edgecolors="k", cmap=cmap);
pylab.xticks([x+0.5 for x in range(0,N)], names, rotation=90)
pylab.yticks([x+0.5 for x in range(0,N)], names)
pylab.tight_layout()
pylab.colorbar()
return corr
[docs] def plot_sim_data(self, markersize=3, logx=False, linestyle="--",
marker="x", **kargs):
"""plot experimental curves
.. plot::
:width: 80%
:include-source:
>>> from cno import XMIDAS, cnodata
>>> m = XMIDAS(cnodata("MD-ToyPB.csv"))
>>> m.plot_layout()
>>> m.plot_data()
>>> m.plot_sim_data()
"""
color = self._params['plot_sim_color']
lw = self._params['plot_sim_lw']
times = np.array(self.times)
# if simulation do not have the same number of points as data
simtimes = np.array(self.sim.index.levels[2])
vmin = float(self.sim.min(skipna=True).min(skipna=True))
vmin_data = float(self.df.min(skipna=True).min(skipna=True))
if logx == False:
# a tick at x = 0, 0.5 in each box (size of 1) + last x=1 in last box
xt = pylab.linspace(0, self.nSignals, self.nSignals*2+1)
M = max(max(times), max(simtimes))
times = times/float(max(times))
simtimes = simtimes/float(M)
xtlabels = self._get_xtlabels()
else:
M = float(max(times))
xtlin = pylab.linspace(0, self.nSignals, self.nSignals*2+1)
xt = [int(x)+pylab.log10(1+pylab.mod(x,1)*M)/pylab.log10(1+M)
for i,x in enumerate(xtlin)]
xtlabels = self._get_xtlabels()
M = max( max(pylab.log10(1+times)), max(pylab.log10(1+simtimes)))
times = pylab.log10(1+times)/max(pylab.log10(1+times))
simtimes = pylab.log10(1+simtimes)/float(M)
#for isim, sim in enumerate(self.sim):
for i in range(0, self.nExps):
for j in range(0, self.nSignals):
# divide data by 1.1 to see results close to 1.
signal = self.names_signals[j]
exp = self.experiments.index[i]
data = self.sim[signal][self.cellLine][exp]
if vmin <0 or vmin_data<0:
data/=2.
data+=0.5
# sometimes we may want to get rid of all NA and show the lines.
data = data.dropna()
times = np.array(list(data.index))
simtimes = times / float(M)
Xtimes = simtimes + j
pylab.plot(Xtimes, data/1.05+(self.nExps-i-1), marker=marker, color=color, alpha=0.5,
linestyle=linestyle,
markersize=markersize, lw=lw)
# plot(times+j, sim[i,j]/1.05+(self.nExps-i-1), 'b--o',
# markersize=markersize)
pylab.gca().set_xticklabels(xtlabels, fontsize=kargs.get("fontsize", 10))
pylab.gca().set_xticks(xt)
[docs] def plot2(self, logx=False, logy=False):
# works only if there is at most one stim per experiment
trend = Trend()
assert all(self.experiments.Stimuli.sum(axis=1) <= 1)
pylab.clf()
nSignals = len(self.df.columns)
nStimuli = len(self.experiments.Stimuli.columns)
nInhibitors = len(self.experiments.Inhibitors.columns)
nTimes = len(self.times)
# How many drug per stimuli at max ?
# Need to find unique combination of stimuli...
for i, signal in enumerate(self.df.columns):
# ... and loop here over the different combi
for j, stimuli in enumerate(self.experiments.Stimuli.columns):
pylab.subplot(nStimuli, nSignals, j*nSignals+1+i)
# add the signal name on top
if j == 0:
pylab.title(self.df.columns[i])
if i == 0:
pylab.ylabel(stimuli)
df1 = self.experiments.Stimuli.query('%s == 1' % stimuli)
for k,inhibitor in enumerate(self.experiments.Inhibitors.columns):
df2 = self.experiments.Inhibitors.query('%s == 1' % inhibitor)
experiment = df1.index.intersection(df2.index)[0]
data = self.df.ix[self.cellLine].ix[experiment][signal] # .values
trend.set(data)
color = trend.get_bestfit_color()
xt = np.array(self.times) / float(max(self.times)) + k
pylab.plot(xt, data.values, '-o', color=color)
pylab.fill_between(xt, data, 0,
color=color, alpha=trend.alpha)
pylab.axvline(k+1,color='k', alpha=0.2)
pylab.xticks([])
pylab.yticks([])
pylab.ylim([0, pylab.ylim()[1]])
pylab.grid(True)
[docs] def focus_exp(self, y1=0, y2=None):
# Once the plot() function called, you can easily focus on
# a subset of rows
if y2 is None:
y2 = len(self.experiments)
axes = pylab.gcf().get_axes()
x1, x2 = axes[0].set_ylim(y1,y2)
x1, x2 = axes[0].get_ylim()
print(x1, x2)
# now, set the 2 others ylimits
ax1 = axes[1].set_ylim([x1,x2])
ax2 = axes[3].set_ylim([x1,x2])
pylab.plot([],[])
[docs] def plot_mse(self, mode='mse', cmap='heat', vmax=1, vmin=0, hold=False):
"""Simpler and faster version of plot() function"""
if hold is False:
pylab.clf()
self.plot_layout()
# Get the cmap
if mode == "data":
#should be one with zero being white
cmap = self._colormap.get_cmap_heat_r()
else:
cmap = self._get_cmap(cmap)
data = self.get_diff().ix[self.experiments.index].values
pylab.pcolor(pylab.flipud(data), edgecolors='k', vmin=vmin, vmax=vmax,
cmap=cmap)
N = len(self.times) * len(self.species)
Ntimes = len(self.times)
df = self.df.ix[self.cellLine]
sim = self.sim.ix[self.cellLine]
xdata = [i+(j)/(Ntimes-1.) for i in range(0,len(self.species)) for j in range(0,Ntimes)]
for exp in self.experiments.index:
shift = len(self.experiments) - list(self.experiments.index).index(exp) - 1
pylab.plot(xdata, [x+shift for x in pylab.flatten(df.ix[exp].T.values)],
color='k', lw=1, marker='x')
pylab.plot(xdata, [x+shift for x in pylab.flatten(sim.ix[exp].T.values)],
color='b', lw=2, marker='o', linestyle='--', alpha=0.5)
# plots all the data and simulated data
#plot([0,1],[0,1], lw=2, color='blue', marker='o', alpha=0.5, linestyle='--');
pylab.xlim([0, len(self.species)])
pylab.ylim([0, len(self.experiments)])
pylab.xticks([])
pylab.yticks([])
#pylab.colorbar()
[docs] def plot_data(self, logx=False, color="black", **kargs):
"""plot experimental curves
.. plot::
:width: 80%
:include-source:
>>> from cno import XMIDAS, cnodata
>>> m = XMIDAS(cnodata("MD-ToyPB.csv"));
>>> m.plot_layout()
>>> m.plot_data()
.. note:: called by :meth:`plot`
.. seealso:: :meth:`plot`, :meth:`plot_layout`, :meth:`plot_sim_data`
"""
mode = kargs.get("mode", "data")
times = np.array(self.times)
max_time = float(max(self.times))
markersize = self._params['plot_data_markersize']
if self.df.max().max() > 1 or self.df.min().min() < 0:
if mode == 'data':
self.logging.info("Data is not normalised. Scaling by max across of species and perturbations ")
#else:
# raise ValueError('data must be between 0 and 1 for simulation')
if logx is False:
# a tick at x = 0, 0.5 in each box (size of 1) + last x=1 in last box
xt = pylab.linspace(0, self.nSignals, self.nSignals*2+1)
times = times / max_time
xtlabels = self._get_xtlabels(logx=True)
else:
M = max_time
xtlin = pylab.linspace(0, self.nSignals, self.nSignals*2+1)
xt = [int(x)+pylab.log10(1+pylab.mod(x,1)*M)/pylab.log10(1+M)
for i,x in enumerate(xtlin)]
xtlabels = self._get_xtlabels()
times = pylab.log10(1+times)/max(pylab.log10(1+times))
trend = Trend()
# do a copy to rescale the data locally
df = self.df.copy()
# we do not care about the cell line name. let us simplify the df
df = df.ix[self.cellLine]
vmax = float(df.max(skipna=True).max(skipna=True))
vmin = float(df.min(skipna=True).min(skipna=True))
if vmax > 1:
df /= vmax
errors = self.errors.copy()
errors /= vmax # V(X/vmax) = V(X)/vmax
errors.fillna(0, inplace=True)
if errors.sum().sum() > 0:
show_error = True
else:
show_error = False
errors = errors.ix[self.cellLine]
# start at zero
_names_exps = [x[0] for x in list(df.index)]
for j in range(0, self.nSignals):
signal = self.names_signals[j]
df2 = df[signal]
for i in range(0, self.nExps):
#for j in range(0, self.nSignals):
#signal = self.names_signals[j]
exp = self.experiments.index[i]
#ii = df.index.levels[0].get_loc(exp)
data = df2.ix[exp]
#self.df2 = df2
#data = df2[ii:ii+2,j]
if vmin <0:
data /=2.
data +=0.5
try:
yerr = errors[signal].ix['average'].ix[exp]
except:
yerr = errors[signal].ix[exp]
yerr = list(pylab.flatten(yerr.values))
trend.set(data)
# y-axis ratio
ratio = 0.95
Xtimes = trend.normed_times
Xtimes += j
if mode == "data":
color = trend.get_bestfit_color()
if color is None and bool(np.isnan(data.sum())) is True:
pylab.fill_between(Xtimes, ratio * np.array([1] * len(Xtimes)) + self.nExps-i-1,
self.nExps-i-1,
color='grey')
continue
pylab.plot(Xtimes,
ratio*data + self.nExps-i-1 ,
'k-o', markersize=markersize, color=color, mfc='gray')
if show_error is True:
pylab.errorbar(Xtimes,
ratio*data + self.nExps-i-1,
yerr=yerr, color='k', ecolor='k', elinewidth=3, linewidth=3)
# if data are NA, do not use
pylab.fill_between(Xtimes,
ratio*data + self.nExps-1-i ,
self.nExps-1-i, alpha=trend.alpha/1.1,
color=color)
else:
pylab.plot(Xtimes,
ratio*data + self.nExps-i-1 , 'k-o',
markersize=markersize, color="k", mfc='gray')
if show_error is True:
pylab.errorbar(Xtimes,
ratio*data + self.nExps-i-1,
yerr=yerr, ecolor='k', elinewidth=3, linewidth=3, color='k')
# plot(times+j, sim[i,j]/1.05+(self.nExps-i-1), 'b--o', markersize=markersize)
pylab.gca().set_xticklabels(xtlabels, fontsize=kargs.get("fontsize",10))
pylab.gca().set_xticks(xt)
def _get_cmap(self, cmap=None):
if cmap == "heat":
cmap = self._colormap.get_cmap_heat_r()
elif cmap == "green":
cmap = self._colormap.get_cmap_red_green()
return cmap
def _get_diff_data(self, mode, squared=True):
diffs = self.get_diff(self.sim, squared=squared)
# we re-order the dataframe with the indices of the experiments
# we will do the same with the data !
diffs = diffs.ix[self.experiments.index]
if mode == "mse":
diffs = np.ma.array (diffs, mask=np.isnan(diffs))
elif mode == "data":
pass
return diffs
[docs] def imshow(self, time, colorbar=True, cmap=None, vmin=None, vmax=None,
edgecolors='k'):
"""vmax and vmin are replaced so that we have a symmetric colorbar
centered around 0"""
if cmap is None:
import colormap
cmap = colormap.cmap_builder('red', 'black', 'green')
pylab.clf()
ax = self.plot_layout(cmap=cmap, colorbar=colorbar, mode='mse')
# now replaces the main images with the data at a given time
# can not select in a given order of experiments a multi index
# so, we first use xs to get a dtaframe. Then, we select the experiments
# in the order found in self.experiments
df = self.df.xs((self.cellLine, slice(None), time)).ix[self.experiments.index]
data = pylab.flipud(df.as_matrix())
data = np.ma.array(data, mask=np.isnan(data))
# should be
ax_cb = pylab.gcf().axes[5]
if vmax is None:
vmax = data.max().max()
if vmin is None:
vmin = data.min().min()
# we make the cb symmetric around 0
if abs(vmin) > vmax:
vmax = abs(vmin)
else:
vmin = -vmax
ax.pcolormesh(data, vmin=vmin, vmax=vmax, cmap=cmap, edgecolors=edgecolors)
self._set_colorbar(ax_cb, vmin, vmax, cmap=cmap)
def _set_colorbar(self, ax_cb, vmin, vmax, cmap='jet'):
N = self._params['plot_colorbar_N']
cbar = pylab.linspace(0, 1, N)
indices = [int(x) for x in cbar*(N-1)]
cbar = [cbar[i] for i in indices]
ax_cb.pcolor(np.array([cbar, cbar]).transpose(), cmap=cmap,
vmin=0, vmax=1)
ax_cb.yaxis.tick_right()
ticks = np.array(ax_cb.get_yticks())
M = max(ticks)
indices = [int(N*x) for x in ticks/(float(M))]
ax_cb.set_yticks(indices)
tic = vmin + np.array(indices)/float(N)*(vmax-vmin)
try:
tic = [int(x*100)/100. for x in tic]
ax_cb.set_yticklabels(tic)
ax_cb.set_xticks([],[])
except:
pass
[docs] def plot_layout(self, cmap="heat",
rotation=90, colorbar=True, vmax=None, vmin=0.,
mode="data", **kargs):
"""plot MSE errors and layout
:param cmap:
:param rotation:
:param colorbar:
:param vmax:
:param vmin:
:param mode:
:param int fignum: figure number
.. plot::
:width: 80%
:include-source:
>>> from cno import cnodata, XMIDAS
>>> m = XMIDAS(cnodata("MD-ToyPB.csv"));
>>> m.plot_layout()
.. note:: called by :meth:`plot`
.. seealso:: :meth:`plot`, :meth:`plot_layout`, :meth:`plot_sim_data`
"""
# get the number (default figure or otherwise one defined by the user)
fignum = kargs.get('fignum', pylab.gcf().number)
# Get the cmap
if mode == "data":
#should be one with zero being white
cmap = self._colormap.get_cmap_heat_r()
else:
cmap = self._get_cmap(cmap)
# The data to show in the main subplot
diffs = self._get_diff_data(mode)
# aliases
fct = self._params['plot_fontsize_times']
fcp = self._params['plot_fontsize_perturbations']
fcti = self._params['plot_fontsize_titles']
# The layout
gs = pylab.GridSpec(10, 10,
wspace=self._params['plot_layout_space'],
hspace=self._params['plot_layout_space'])
fig = pylab.figure(num=fignum, figsize=(10, 6))
shift_top = self._params['plot_layout_shifttop']
layout_width = 7
w1 = layout_width + 1
w2 = layout_width + 2
ax_main = fig.add_subplot(gs[shift_top:, 0:layout_width])
ax_main.set_yticks([], [])
ax_main.set_xticks([], [])
# stimuli
if len(self.names_stimuli) > 0:
ax_stim = fig.add_subplot(gs[shift_top:, layout_width:w1])
ax_stim.set_yticks([], [])
ax_stim.set_xticks([], [])
ax_stim_top = fig.add_subplot(gs[0:shift_top, layout_width:w1])
ax_stim_top.set_yticks([], [])
ax_stim_top.set_xticks([], [])
# inhibitors
if len(self.names_inhibitors) > 0:
ax_inh = fig.add_subplot(gs[shift_top:, w1:w2])
ax_inh.set_yticks([], [])
ax_inh.set_xticks([], [])
ax_inh_top = fig.add_subplot(gs[0:shift_top, w1:w2])
ax_inh_top.set_yticks([], [])
ax_inh_top.set_xticks([], [])
# colorbar
if colorbar and mode == 'mse':
ax_cb = fig.add_subplot(gs[shift_top:, w2:10])
#ax_cb.set_yticks([], [])
ax_cb.set_xticks([], [])
# MAIN subplot with signals
M = np.nanmax(diffs) # figure out the maximum individual MSE
m = np.nanmin(diffs) # figure out the minimum individual MSE
vmax_user = vmax
vmax = max(1, M) # if M below 1, set the max to 1 otherwise to M
if vmax_user:
vmax = vmax_user
pylab.sca(ax_main)
if mode == "mse":
try: cmap.set_bad("grey", 1.)
except: pass
pylab.pcolormesh(pylab.flipud(diffs)**self.cmap_scale, cmap=cmap,
vmin=vmin, vmax=vmax, edgecolors='k');
elif mode == "data":
try:cmap.set_bad("grey", 1.)
except: pass
pylab.pcolormesh(pylab.flipud(diffs*0), cmap=cmap, edgecolors='k');
# Some tuning of the main plot
pylab.sca(ax_main)
pylab.axis([0, diffs.shape[1], 0, diffs.shape[0]])
# the species name on the top
ax2 = ax_main.twiny()
ax2.set_xticks([i+.5 for i,x in enumerate(self.names_species)])
N = len(self.names_species)
ax2.set_xticks(pylab.linspace(0.5, N-1, N))
ax2.set_xticklabels(self.names_species,
rotation=self._params['plot_orientation_perturbation_label'],
fontsize=self._params['plot_fontsize_species'])
# the stimuli
if len(self.names_stimuli) > 0:
pylab.sca(ax_stim)
#stimuli = np.where(np.isnan(self.stimuli)==False, self.stimuli, 0.5)
stimuli = self.stimuli
pylab.pcolor(1-pylab.flipud(stimuli), edgecolors='gray', cmap='gray',
vmin=0,vmax=1);
#ax_stim.set_yticks([], [])
ax_stim.set_xticks([i+.5 for i,x in enumerate(self.names_stimuli)])
ax_stim.set_xticklabels(self.names_stimuli, rotation=rotation,
fontsize=fcp)
pylab.axis([0,self.stimuli.shape[1], 0, self.stimuli.shape[0]])
# the inhibitors
if len(self.names_inhibitors)>0:
pylab.sca(ax_inh)
#inhibitors = np.where(np.isnan(self.inhibitors)==False,
# self.inhibitors, 0.5)
inhibitors = self.inhibitors
pylab.pcolor(1-pylab.flipud(inhibitors), edgecolors='gray',
cmap='gray',
vmin=0,vmax=1);
#ax_inh.set_yticks([],[])
ax_inh.set_xticks([i+.5 for i,x in enumerate(self.names_inhibitors)])
ax_inh.set_xticklabels(self.names_inhibitors, rotation=rotation,
fontsize=fcp)
pylab.axis([0,self.inhibitors.shape[1], 0, self.inhibitors.shape[0]])
# the stimuli
if len(self.names_stimuli) > 0:
pylab.sca(ax_stim_top)
pylab.text(0.5,0.5, "Stimuli", color="blue", horizontalalignment="center",
verticalalignment="center", fontsize=int(fcti/1.5))
# the inhibitors
if len(self.names_inhibitors) > 0:
pylab.sca(ax_inh_top)
pylab.text(0.5,0.5, "Inhibitors", color="blue",
horizontalalignment="center", verticalalignment="center",
fontsize=int(fcti/1.5))
# colorbar. we build our own colorbar to place it on the RHS
if colorbar and mode == "mse":
pylab.sca(ax_cb)
N = self._params['plot_colorbar_N']
cbar = pylab.linspace(0, 1, N)
indices = [int(x) for x in cbar**self.cmap_scale*(N-1)]
cbar = [cbar[i] for i in indices]
pylab.pcolor(np.array([cbar, cbar]).transpose(), cmap=cmap,
vmin=0, vmax=1)
ax_cb.yaxis.tick_right()
ticks = np.array(ax_cb.get_yticks())
M = max(ticks)
indices = [int(N*x) for x in ticks**self.cmap_scale/(M**self.cmap_scale)]
ax_cb.set_yticks(indices)
if vmax == 1:
# set number of digits
tic = np.array(indices)/float(N)
tic = [int(x*100)/100. for x in tic]
ax_cb.set_yticklabels(tic)
else:
ax_cb.set_yticklabels([int(x*100)/100. for x in np.array(indices)/float(N)*vmax])
ax_cb.set_xticks([],[])
pylab.sca(ax_main)
return ax_main
def _get_nExps(self):
return len(self.experiments)
nExps = property(_get_nExps, doc="return number of experiments")
def _get_stimuli(self):
return self.experiments.Stimuli
stimuli = property(_get_stimuli, doc="return the stimuli dataframe")
def _get_inhibitors(self):
return self.experiments.Inhibitors
inhibitors = property(_get_inhibitors, doc="return the inhibitors dataframe")
def _get_nSignals(self):
return len(self.df.columns)
nSignals = property(_get_nSignals, doc="return the number of signals")
def _get_xtlabels(self, logx=False):
"""build the time labels vector
The vector is [t0,tmid,t0,tmid,...t0,tmid,tend]
"""
t0 = self.times[0]
t2 = self.times[-1]
xtlabels = [int(t0),int((t2-t0)/2)] * self.nSignals + [int(t2)]
return xtlabels
[docs] def xplot(self, bbox=False, *args, **kargs):
"""Same as :meth:`plot` using the xkcd layout for fun!"""
with pylab.xkcd():
self.plot(*args, **kargs)
mybbox = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
if bbox:
pylab.text(3, 12, "XMIDAS", fontsize=14,
verticalalignment='top', bbox=mybbox)
tx = pylab.title("XMIDAS", bbox=dict(boxstyle='round',
facecolor='wheat', alpha=0.5))
tx.set_position((0.6, 1.15))
[docs] def plot(self, mode='data', **kargs):
"""Plot data contained in :attr:`experiment` and :attr:`df` dataframes.
:param string mode: must be either "mse" or "data" (defaults to data)
:param fignum: figure number
if mode is 'mse', calls also plot_layout, plot_data and plot_sim_data
else calls plot_layout and plot_data
.. plot::
:include-source:
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.plot(mode="mse")
"""
if mode == "mse":
if self.df.min().min() < 0:
self.logging.warning("values are expected to be positive")
if self.df.max().max() > 1:
self.logging.warning("values are expected to be normalised")
kargs['mode'] = 'mse'
self.plot_layout(**kargs)
self.plot_data(**kargs)
self.plot_sim_data(**kargs)
else:
kargs['mode'] = 'data'
self.plot_layout(**kargs)
self.plot_data(**kargs)
[docs] def boxplot(self, mode="time"):
"""Plot boxplot of the dataframe with the data
:param str mode: time or species
.. plot::
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.boxplot(mode="species")
m.boxplot(mode="time")
"""
if mode == "time":
self.df.reset_index().boxplot(by="time")
pylab.tight_layout()
else:
# return_type= axes is to remove warning in pandas
# The default value for 'return_type' will change to 'axes' in a future release.
self.df.boxplot(return_type='axes')
[docs] def radviz(self, species=None, fontsize=10):
"""
.. plot::
:include-source:
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.radviz(["ap1", "gsk3", "p38"])
"""
if species == None:
species = list(self.df.columns)
from pandas.tools.plotting import radviz
df = self.df.reset_index()
del df['time']
del df['cell']
pylab.figure(1)
pylab.clf()
radviz(df[['experiment']+species], "experiment")
pylab.legend(fontsize=fontsize)
[docs] def to_measurements(self):
"""Returns a Measurements instance
Each datum in the dataframe :attr:`df` is converted into an
instance of :class:`~cno.io.measurement.Measurements`.
:return: list of experiments.
::
mb = MIDASbuilder(m.to_measurements)
mb.xmidas
"""
experiments = []
for row in self.df.iterrows():
cellLine, exp, time = row[0]
data = row[1]
inhibitors = self.experiments.ix[exp].Inhibitors
stimuli = self.experiments.ix[exp].Stimuli
for species in data.index:
e = Measurement(species, time=time, stimuli=dict(stimuli),
inhibitors=dict(inhibitors), value=data[species],
cellLine=cellLine)
experiments.append(e)
return experiments
[docs] def to_csv(self, filename, expand_time_column=False, export_only_current_cellLine=True):
return self.to_midas(filename, expand_time_column=False,
export_only_current_cellLine=export_only_current_cellLine)
[docs] def to_midas(self, filename, expand_time_column=False, export_only_current_cellLine=True):
"""Save XMIDAS into a MIDAS CSV file.
By default, only the current cell line is exported.
Multi cell line are not saved.
:param str filename:
"""
f = open(filename, "w")
# TODO: cellline
header = ["TR:%s:CellLine"%self.cellLine]
header += ['TR:'+this for this in self.experiments.Stimuli.columns]
header += ['TR:'+this+'i' for this in self.experiments.Inhibitors.columns]
if expand_time_column == False:
header += ["DA:ALL"]
else:
header += ["DA:" + x for x in self.df.columns]
try:
header += ["DV:" + x for x in self.df.columns]
except Exception:
print(self.df.columns)
raise Exception
header = ",".join(header)
f.write(header + "\n")
for time in self.times:
# experiments of levels is 1
# better to use experiments df so that order is same as in experiments
for exp in self.experiments.index:
if (self.cellLine, exp, time) not in self.df.index:
continue
measurements = self.df.xs((self.cellLine, exp, time))
measurements = measurements[self.df.columns] # maybe not needed
# keep it for now to be sure that order of measurements is same as in the header
experiment = self.experiments.ix[exp]
values = list(experiment.Stimuli.values) + list(experiment.Inhibitors.values)
# FIXME use instance
if type(measurements) == pd.core.series.Series:
if expand_time_column == False:
rowdata = [1] + values + [time] + list(measurements)
else:
rowdata = [1] + values + [time]*len(self.df.columns) + list(measurements)
f.write(",".join(["{}".format(x) for x in rowdata]) + "\n")
elif type(measurements) == pd.core.frame.DataFrame:
for measurement in measurements.values:
if expand_time_column == False:
rowdata = [1] + values + [time] + list(measurement)
else:
rowdata = [1] + values +\
[time]*len(self.df.columns) + list(measurement)
f.write(",".join(["{}".format(x) for x in rowdata]) + "\n")
else:
raise TypeError()
f.close()
[docs] def normalise(self, mode, inplace=True, changeThreshold=0, **kargs):
"""Normalise the data
:param mode: 'time' or 'control'
:param bool inplace: Defaults to True.
.. warning:: not fully tested. the mode "time" should work. The
control mode has been tested on 2 MIDAS files only. This is
a complex normalisation described in
:class:`~cno.io.midas_normalisation.XMIDASNormalise`
"""
if self.df.max().max() <=1 and self.df.min.min()>=0:
raise CNOError("data already between 0 and 1")
assert mode in ["fold_change", "time", "control"], "mode must be control,fold_change or time"
kargs['changeThreshold'] = changeThreshold
if mode == "time":
n = normalisation.XMIDASNormalise(self, **kargs)
normed_midas = n.time_normalisation()
elif mode == "control":
n = normalisation.XMIDASNormalise(self, **kargs)
normed_midas = n.control_normalisation()
elif mode == 'fold_change':
raise NotImplementedError
if inplace is False:
return normed_midas
else:
self.df = normed_midas.copy()
[docs] def add_gaussian_noise(self, mu=0, sigma=0.1, inplace=False):
r"""add gaussian noise to the data. Results may be negative or above 1
:param float beta: see equation
:param float sigma: see equation (default to 0.1)
:param bool inplace: Default to False
.. math::
\hat{X} = X + \mathcal{N}(\mu, \sigma)
"""
# random.normal accepts x and takes its shape to return random values.
# so, the addition of x and the ouptut of np.random.normal is as
# expected: points by point.
newdf = self.df.apply(lambda x:x + np.random.normal(mu, scale=sigma))
if inplace:
self.df = newdf
else:
return newdf
def _make_df_compatible(self):
"""if you use MakeBuilder, you can really add any combi of experiments
However, the resulting df built is not MIDAS compatible for sure. For example,
not all same time are available for each experiment.
"""
raise NotImplementedError
[docs] def get_residual_errors(self, level="time", normed=False):
r"""Return vector with residuals errors
The residual errors are interesting to look at in the context
of a boolean analysis. Indeed, residual errors is the minimum error
that is unavoidable with a boolean network and comes from the discrete
nature of such a model. In a boolean analysis, one would compare 0/1
values to continuous values between 0 and 1. Therefore, however good
is the optimisation, the value of the goodness of fit term cannot go
under this residual error.
:param: level to sum over
:return: a time series with summed residual errors
:math:`\sum (round(x)-x)^2` the summation is performed over species
and experiment by default.
.. doctest::
>>> from cno import cnodata, XMIDAS
>>> m = XMIDAS(cnodata("MD-ToyMMB_T2.csv"))
>>> m.get_residual_errors()
time
0 0.000000
10 2.768152
100 0.954000
dtype: float64
if normed is False, returns:
.. math::
\sum_{level} \left( X - \bf{round}( X) \right)^2
where level can be either over time or experiment.
If normed is True, divides the time series by number of experiments
and number of times
.. note:: if normed set to False, same results as in
CellNOptR for mode set to time.
"""
diff = (self.df - self.df.apply(lambda x: x.round(), axis=1))
diff_square = diff.apply(lambda x: x**2, axis=1)
S = diff_square.sum(level=level).sum(axis=1)
if normed :
S /= len(self.experiments) * len(self.signals)
return S
[docs] def get_max_errors(self, level='time', normed=False):
"""Returns max error possible
Should be below one once normalised but could be as low as 0.5
"""
diff = (self.df - (1-self.df.apply(lambda x: x.round(), axis=1)))
diff_square = diff.apply(lambda x: x**2, axis=1)
S = diff_square.sum(level=level).sum(axis=1)
if normed :
S /= len(self.experiments) * len(self.signals)
return S
[docs] def copy(self):
x = XMIDAS()
x._data = self._data.copy()
x._missing_time_zero = self._missing_time_zero
x.df = self.df.copy()
x._experiments = self.experiments.copy()
x._cellLine = self.cellLine
x.sim = self.sim.copy()
x.errors = self.errors.copy()
return x
def __str__(self):
txt = "Your data set contains {} cellLines\n".format(len(self.cellLines))
txt += " Current selected cell line is {}\n".format(self.cellLine)
txt += "\nThe data contains \n"
txt += "{} Species:\n\t {}\n".format(len(self.names_species),
self.names_species)
txt += "{} inhibitors:\n\t {}\n".format(len(self.names_inhibitors),
self.names_inhibitors)
txt += "{} stimuli:\n\t {}\n".format(len(self.names_stimuli),
self.names_stimuli)
return txt
[docs] def pca(self, pca_components=2, fontsize=16):
"""PCA analysis
.. plot::
:include-source:
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.pca()
"""
from sklearn.decomposition import PCA
pca = PCA(n_components=pca_components)
#t = self.df[signal]
#pca.fit(t.unstack(1))
for signal in self.df.columns:
t = self.df[signal].fillna(0)
X = t.unstack()
X_r = pca.fit(X).transform(X)
pylab.plot(X_r[:,0], X_r[:,1], 'o', label=signal)
print(signal, pca.explained_variance_ratio_)
pylab.legend(fontsize=fontsize)
return pca
[docs] def discretize(self, **kargs):
return self.discretise(**kargs)
[docs] def discretise(self, inplace=True, N=2):
"""Discretise data by rounding up the values
:param int N: number of discrete values (defaults to 2). If set to
2, values will be either 0 or 1. If set to 5, values wil lbe in
[0,0.25,0.5,0.75,1]
:param inplace:
.. warning. data has to be normalised
"""
assert N >= 2, "N must be at least equal to 2"
N = N-1.
self.logging.info("Discretization between 0-1 assuming normalised data")
if inplace:
self.df = self.df.apply(lambda x: (x*N).round()/N)
else:
df = self.df.apply(lambda x: (x*N).round()/N)
return df
[docs] def round(self, inplace=True, decimals=0):
"""Round values to a given decimal"""
if inplace == True:
self.df = self.df.apply(lambda x : x.round(decimals=decimals))
else:
return self.df.values.apply(lambda x : x.round(decimals=decimals))
[docs] def hcluster(self, mode="experiment", metric='euclidean', leaf_rotation=90,
leaf_font_size=12, **kargs):
"""Plot the hiearchical cluster (simple approach)
.. plot::
:include-source:
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.hcluster("species")
"""
kargs['leaf_font_size'] = leaf_font_size
kargs['leaf_rotation'] = leaf_rotation
assert mode in ["experiment", "time", "species"]
from scipy.spatial.distance import pdist, squareform
from scipy.cluster.hierarchy import linkage, dendrogram
pylab.clf()
if mode == "experiment":
distxy = squareform(pdist(self.df.unstack("time"), metric=metric))
elif mode == "time":
distxy = squareform(pdist(self.df.unstack("experiment"), metric=metric))
elif mode == "species":
distxy = squareform(pdist(self.df.transpose(), metric=metric))
try:
R = dendrogram(linkage(distxy), **kargs)
except:
R = dendrogram(linkage(distxy, method='complete'), **kargs)
self._debug = R
leaves = R['leaves']
if mode == "time":
pylab.xticks(pylab.xticks()[0], self.times[leaves])
pylab.title("Clustering by time")
elif mode == "experiment":
pylab.xticks(pylab.xticks()[0], self.experiments.index[leaves])
pylab.title("Clustering by experiments")
else:
pylab.xticks(pylab.xticks()[0], self.species[leaves])
pylab.title("Clustering by species")
ylim = pylab.ylim()
if ylim[0] == 0:
pylab.ylim([0-ylim[1]*.05, ylim[1]])
pylab.tight_layout()
[docs] def heatmap(self, cmap="heat", transpose=False):
"""Hierarchical clustering on species and one of experiment/time level
.. plot::
:include-source:
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.heatmap()
.. note:: time zero is ignored. Indeed, data at time zero is mostly set
to zero, which biases clustering.
"""
cmap = self._get_cmap(cmap)
from biokit.viz.heatmap import Heatmap
data = self.df.query('time>0').unstack(2).ix[self.cellLine]
if transpose:
h = Heatmap(data.transpose())
else:
h = Heatmap(data)
h.plot(cmap=cmap)
return h
[docs] def shuffle(self, mode="experiment", inplace=True):
"""Shuffle data
This method does not alter the data but shuffles it
depending on the user choice.
:param str mode: type of shuffling (see below)
:param bool inplace: Defaults to True
The **mode** parameter can be
#. `timeseries` shuffles experiments and species; timeseries
are unchanged.
#. `all`: through times, experiments and species. No structure kept
sum of the data is constant.
#. `signal` (or `species` or `column`): within a column, timeseries
are shuffled. So, sum over signals is constant.
#. `experiment` (or `index`): with a row (experiment),
timeseries are shuffled. Sum of data over a row is constant.
Original data:
.. plot::
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.plot()
Shuffling all timeseries (shuffling rows and columsn in the plot):
.. plot::
:include-source:
:width: 80%
from cno import XMIDAS, cnodata
m = XMIDAS(cnodata("MD-ToyPB.csv"))
m.shuffle(mode="timeseries")
m.plot()
.. warning:: shuffling is made inplace.
"""
if mode == "all":
# The random.shuffle function does not work!! somehow sum of
# data increases or decreases
# One must use numpy.random.shuffle instead
shape = self.df.shape
data = self.df.values.reshape(shape[0]*shape[1])
np.random.shuffle(data)
count = 0
# not very efficient but works for now
for i in range(0, shape[0]):
for j in range(0, shape[1]):
self.df.values[i][j] = data[count]
count += 1
elif mode in ["signal", "species", "column"]:
# sum on a species is constant
for c in self.df.columns:
np.random.shuffle(self.df[c].values)
elif mode in ["index", "experiment"]:
# m.df.sum(level="experiment").sum(axis=1) is constant
for this_index in self.df.index:
np.random.shuffle(self.df.ix[this_index].values)
elif mode == "timeseries":
# swap boxes
species = list(self.species)
exps = list(self.experiments.index)
pairs = [(s,e) for s in species for e in exps]
# permutation now
np.random.shuffle(pairs)
df = self.df.copy()
# FIXME: cell not implemented yet
count = 0
for s in species:
for e in exps:
rand_s = pairs[count][0]
rand_e = pairs[count][1]
data = list(df[rand_s].ix[self.cellLine].ix[rand_e])
self.df[s].ix[self.cellLine].ix[e] = data
count += 1
else:
raise CNOError("unknown mode {0} in shuffle()".format(mode))
[docs] def sort_experiments_by_stimuli(self):
"""Sort experiments by stimuli
Affects the experiment dataframe for th rendering but do not
change the dataframe that contains the data.
"""
N = len(self.experiments)
list_exps = [('Stimuli', stimulus) for stimulus in self.names_stimuli]
list_exps += [('Inhibitors', inhibitor) for inhibitor in self.names_inhibitors]
new_order = self.experiments.groupby(list_exps).groups
new_order = [new_order[x] for x in sorted(new_order.keys())]
new_order = list(pylab.flatten(new_order))
assert len(new_order )== N
self._experiments = self.experiments.reindex_axis(new_order, axis=0)
[docs] def sort_experiments_by_inhibitors(self):
"""Sort the experiments by inhibitors
Affects the experiment dataframe for th rendering but do not
change the dataframe that contains the data.
"""
N = len(self.experiments)
list_exps = [('Inhibitors', inhibitor) for inhibitor in self.names_inhibitors]
list_exps += [('Stimuli', stimulus) for stimulus in self.names_stimuli]
new_order = self.experiments.groupby(list_exps).groups
new_order = [new_order[x] for x in sorted(new_order.keys())]
new_order = list(pylab.flatten(new_order))
assert len(new_order )== N
self._experiments = self.experiments.reindex_axis(new_order, axis=0)
def __eq__(self, other):
if all(other.df == self.df) == False:
return False
if all(other.experiments == self.experiments) == False:
return False
return True
[docs]class Trend(object):
"""Utility that figures out the trend of a time series
.. plot::
:include-source:
:width: 80%
from cno import cnodata
from cno.io import midas
# get a time series
xm = midas.XMIDAS(cnodata("MD-ToyPB.csv"))
ts = xm.df['ap1']['Cell']['experiment_0']
# set a trend instance
trend = midas.Trend()
trend.set(ts)
trend.plot()
trend.get_bestfit_color()
"""
def __init__(self):
self.data = None
[docs] def set(self, ts):
"""Set the data with the parameter
:param ts: a Pandas TimeSeries
"""
self.data = ts
def _get_times(self):
return self.data.index.values
times = property(_get_times, doc="return time array")
def _get_values(self):
return self.data.values
values = property(_get_values, doc="return the value array")
def _get_normed_times(self):
# Here, time 0 is zero so we can divide by Max only
return self.times / float(self.times.max())
normed_times = property(_get_normed_times, doc="return normed time array")
def _get_normed_values(self):
# FIXME do we want to use the span ??
#span = float(self.values.max()) - float(self.values.min())
return self.values / float(self.values.max())
#return (self.values -self.values.min())/ span
normed_values = property(_get_normed_values, doc="return normed value array")
def _get_alpha(self):
try:
alpha = np.trapz(self.normed_values, self.normed_times)
except:
alpha = 'white'
return alpha
alpha = property(_get_alpha, doc="return strength of the signal")
[docs] def transient(self, x=None):
if x == None:
x = self.times
M = max(x)
v = np.array([(M-y)/(M/2.) if y>=M/2. else y/(M/2.) for y in x])
return self._normed(v)
[docs] def constant(self, x=0):
v = np.array([x] * len(self.times))
v = self._normed(v)
return v
def _normed(self, v):
sumsq = np.sqrt(sum([this**2 for this in v]))
return v/sumsq
[docs] def earlier(self, x=None, n=3., N=4.):
if x == None:
x = self.times
M = max(x)
v = np.array([(M-y)/(n*M/N) if y>=M/N else y/(M/N) for y in x])
return self._normed(v)
[docs] def sustained(self, x=None, L=0.5):
if x == None:
x = self.times
M = max(x)
m = L * M
v = np.array([y if y<m else m for y in x])
return self._normed(v)
[docs] def inverse_sustained(self, x=None, L=0.5):
if x == None:
x = self.times
M = max(x)
m = L * M
v = np.array([(M-y) if y < m else M-m for y in x])
return self._normed(v)
[docs] def later(self, x=None, L=0.5):
if x == None:
x = self.times
M = max(x)
m = L * M
v = np.array([0 if y<m else y-m for y in x])
return self._normed(v)
def _correlate(self, a, b):
a = self._normed(a)
b = self._normed(b)
return np.correlate(a,b)
def _get_correlation(self, a):
correlation = {}
correlation['later'] = self._correlate(a, self.later())
correlation['earlier'] = self._correlate(a, self.earlier())
correlation['earlier2'] = self._correlate(a, self.earlier(n=1,N=10))
correlation['transient'] = self._correlate(a, self.transient())
correlation['constant_half'] = self._correlate(a, self.constant(0.5))
correlation['constant_unity'] = self._correlate(a, self.constant(1))
correlation['sustained'] = self._correlate(a, self.sustained(L=.5))
correlation['inverse_sustained'] = self._correlate(a, self.inverse_sustained(L=.5))
return correlation
[docs] def plot(self):
"""Plots the data and possible choices of trends"""
data = self.values
corrs = self._get_correlation(data)
pylab.clf()
pylab.plot(self.times, self._normed(data),
label="data", lw=4, ls="--", color='k')
# transient
pylab.plot(self.times, self.transient(), 'o-',
label="transient " + str(corrs['transient']))
# earlier
pylab.plot(self.times, self.earlier(), 'o-',
label="earlier " + str(corrs['earlier']))
pylab.plot(self.times, self.earlier(n=1, N=10), 'o-',
label="earlier2 " + str(corrs['earlier2']))
# later
pylab.plot(self.times, self.later(), 'o-',
label="later " + str(corrs['later']))
# constant
pylab.plot(self.times, self.constant(.5), 'o-',
label="constant " + str(corrs['constant_half']))
# sustained
pylab.plot(self.times, self.sustained(L=.5), 'o-',
label="sustained" + str(corrs['sustained']))
pylab.plot(self.times, self.inverse_sustained(L=.5), 'o-',
label="inv sustained" + str(corrs['inverse_sustained']))
pylab.legend(fontsize=10)
[docs] def get_bestfit(self):
corrs = self._get_correlation(self.normed_values)
keys,values = (corrs.keys(), corrs.values())
# M = max(values)
return keys[np.argmax(values)]
[docs] def get_bestfit_color(self):
# somehow must cast to python bool...
if bool(np.isnan(self.data.sum())) is True:
return None
corrs = self._get_correlation(self.normed_values)
keys,values = (corrs.keys(), corrs.values())
# M = max(values)
values = [x[0] for x in values]
if np.isnan(np.nanmax(values)):
return 'purple'
else:
res = keys[np.nanargmax(values)]
#except:
# res = keys[np.argmax(values)]
if "constant" in res:
return "black"
elif "later" in res:
return "red"
elif "transient" in res:
return "yellow"
elif "earlier" in res:
return "purple"
elif "sustained" in res:
return "green"
else:
return "white"