CellNOpt homepage|cellnopt.core 1.0.0 documentation

Source code for cellnopt.core.cnograph

# -*- python -*-
#
#  This file is part of the cinapps.tcell package
#
#  Copyright (c) 2012-2013 - EMBL-EBI
#
#  File author(s): Thomas Cokelaer (cokelaer@ebi.ac.uk)
#
#  Distributed under the GLPv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#
#  website: www.cellnopt.org
#
##############################################################################
from __future__ import print_function
import copy
import tempfile
import itertools
import subprocess
import shutil

import pylab
import networkx as nx
import numpy as np
import easydev
from easydev import Logging

# cellnopt modules
from cellnopt.core.sif import SIF
from cellnopt.core.midas import XMIDAS
from cellnopt.core.oldmidas import MIDASReader

from colormap import Colormap

__all__ = ["CNOGraph", "CNOGraphMultiEdges", "CNOGraphAttributes"]


class Attributes(dict):
    """Simple dictionary to handle attributes (nodes or eges)"""
    def __init__(self, color, **kargs):
        self['color'] = color
        for k,v in kargs.iteritems():
            self[k] = v

class EdgeAttributes(Attributes):
    def __init__(self, penwidth=1, color="black", arrowhead="normal", **kargs):
        super(EdgeAttributes, self).__init__(color=color, **kargs)
        self['penwidth'] = penwidth
        self['arrowhead'] = arrowhead

class NodeAttributes(dict):
    """A class to manage attributes of the nodes (graphviz attribute)

    Used by the :class:`CNOGraph`.

    ::

        >>> a = Attributes(color="red", fillcolor="white")
        >>> assert a['color'] == "red"
        True

    """
    def __init__(self, color="black", fillcolor="white", shape="rectangle",
                 style="filled,bold", **kargs):
        """

        :param color: color of the border
        :param fillcolor: color inside the shape
        :param shape color inside the shape
        :param style: color inside the shape
        """
        super(NodeAttributes, self).__init__(color=color, **kargs)

        #self['color'] = color
        self['fillcolor'] = fillcolor
        self['shape'] = shape
        self['style'] = style


[docs]class CNOGraphAttributes(object): """ define attributes of the cnograp when calling the plotting function. """ def __init__(self): self.attributes = { 'activation': EdgeAttributes(color='black', arrowhead="normal"), 'inhibition': EdgeAttributes(color='red', arrowhead="tee"), 'stimuli': NodeAttributes(fillcolor='#9ACD32'), 'inhibitors': NodeAttributes(fillcolor='orangered'), 'signals': NodeAttributes(fillcolor='lightblue'), 'nonc': NodeAttributes( fillcolor='gray', style='diagonals, filled'), 'compressed': NodeAttributes(fillcolor="white",style='dashed', penwidth=2), 'others': NodeAttributes(fillcolor="white",style='filled,bold',penwidth=2), 'and': NodeAttributes(shape='circle',style='filled',width=.1, height=.1, fixedsize=True, label='') }
[docs] def keys(self): return self.attributes.keys()
def __getitem__(self, key): return self.attributes[key]
[docs]class CNOGraph(nx.DiGraph): """Data structure (Digraph) used to manipulate networks The networks can represent for instance a protein interaction network. CNOGraph is a graph data structure dedicated to the analysis of phosphorylation data within protein-protein interaction networks but can be used in a more general context. Indeed no data is required. Note that CNOGraph inherits from the **directed graph** data structure of networkx. However, we impose links between nodes to be restricted to two types: * "+" for activation * "-" for inhibition. An instance can be created from an empty graph:: c = CNOGraph() and edge can be added as follows:: c.add_edge("A", "B", link="+") c.add_edge("A", "C", link="-") The methods :meth:`add_node` and :meth:`add_edge` methods can be used to populate the graph. However, it is also possible to read a network stored in a file in :class:`cellnopt.core.sif.SIF` format:: >>> from cellnopt.core import * >>> pknmodel = cnodata("PKN-ToyPB.sif") >>> c = CNOGraph(pknmodel) The SIF model can be a filename, or an instance of :class:`~cellnopt.core.sif.SIF`. Note for CellNOpt users that if **and** nodes are contained in the original SIF files, they are kept (see the SIF documentation for details). You can add or remove nodes/edges in the CNOGraph afterwards. As mentionned above, you can also populate data within the CNOGraph data structure. The input data is an instance of :class:`~cellnopt.core.midas.XMIDAS` or a MIDAS filename. MIDAS file contains measurements made on proteins in various experimental conditions (stimuli and inhibitors). The names of the simuli, inhibitors and signals are used to color the nodes in the plotting function. However, the data itself is not used. If you don't use any MIDAS file as input, you can set the stimuli/inhibitors/signals manually by filling the hidden attributes _stimuli, _signals and _inhibitors. .. rubric:: Node and Edge attributes The node and edge attributes can be accessed as follows (and changed):: >>> c.node['egf'] {'color': u'black', u'fillcolor': u'white', 'penwidth': 2, u'shape': u'rectangle', u'style': u'filled,bold'} >>> c.edge['egf']['egfr'] {u'arrowhead': u'normal', u'color': u'black', u'compressed': [], 'link': u'+', u'penwidth': 1} .. rubric:: OPERATORS CNOGraph is a data structure with useful operators (e.g. union). Note, however, that these operators are applied on the topology only (MIDAS information is ignored). For instance, you can add graphs with the **+** operator or check that there are identical :: c = a+b a += b a == b Let us illustrate the + operation with another example. Let us consider the following graphs: .. plot:: :include-source: :width: 30% from cellnopt.core import * c1 = CNOGraph() c1.add_edge("A","B", link="+") c1.add_edge("A","C", link="-") c1.plotdot() .. plot:: :include-source: :width: 30% from cellnopt.core import * c2 = CNOGraph() c2.add_edge("A","E", link="+") c2.add_edge("C","E", link="+") c2.plotdot() :: (c1+c2).plotdot() .. plot:: :width: 50% from cellnopt.core import * c1 = CNOGraph() c1.add_edge("A","B", link="+") c1.add_edge("A","C", link="-") c1.plotdot() c2 = CNOGraph() c2.add_edge("A","E", link="+") c2.add_edge("C","E", link="+") c2.plotdot() (c1+c2).plotdot() You can also substract a graph from another one:: c3 = c1 - c2 c3.nodes() The new graph should contains only one node (B). Additional functionalities such as :meth:`intersect`, :meth:`union` and :meth:`difference` can be used to see the difference between two graphs. .. rubric:: PLOTTING There are plotting functionalities to look at the graph, which are based on graphviz library. For instance, the :meth:`plotdot` is quite flexible but has a default behaviour following CellNOptR convention, where stimuli are colored in green, inhibitors in red and measurements in blue: .. plot:: :include-source: :width: 50% from cellnopt.core import * pknmodel = cnodata("PKN-ToyPB.sif") data = cnodata("MD-ToyPB.csv") c = CNOGraph(pknmodel, data) c.plotdot() If you did not use any MIDAS file as input parameter, you can still populate the hidden fields :attr:`_stimuli`, :attr:`_inhibitors`, :attr:`_signals`. You can also overwrite this behaviour by using the node_attribute parameter when calling :meth:`plotdot`. For instance, if you call :meth:`centrality_degree`, which computes and populate the node attribute **degree**. You can then call plotdot as follows to replace the default color: .. plot:: :include-source: :width: 50% from cellnopt.core import * pknmodel = cnodata("PKN-ToyPB.sif") data = cnodata("MD-ToyPB.csv") c = CNOGraph(pknmodel, data) c.centrality_degree() c.plotdot(node_attribute="degree") Similarly, you can tune the color of the edge attribute. See the :meth:`plotdot` for more details. .. seealso:: tutorial, user guide .. todo:: graph attribute seems to be reset somewhere .. todo:: penwidth should be a class attribute, overwritten if provided. .. todo:: call findnonc only once or when nodes are changed. .. todo:: reacID when a model is expanded, returns only original reactions """ def __init__(self, model=None, data=None, verbose=False, **kargs): """.. rubric:: Constructor :param str model: optional network in SIF format. Can be the filename or instance of :class:`~cellnopt.core.sif.SIF` :param data: optional data file in MIDAS format. Can be a filename or instance of :class:`~cellnopt.core.midas.XMIDAS` :param bool verbose: :param str celltype: if a MIDAS file contains more that 1 celltype, you must provide a celltype name .. todo:: check that the celltype option works """ super(CNOGraph, self).__init__(**kargs) self.kargs = kargs.copy() # This is a DIgraph attribute # self.graph is a DiGraph attribute that is overwritten sometinmes self.graph_options = { 'graph': { "dpi":200, 'rankdir':'TB', # 'nodesep': None, 'ranksep':1 }, 'node':{'fontname':'bold'} } #TB, LR, RL #self.graph_options['node']['fontsize'] = 26 #self.graph_options['node']['height'] #self.graph_options['node']['width'] #: the attributes for nodes and edges are stored within this attribute. See :class:`CNOGraphAttributes` self.attributes = CNOGraphAttributes() self._midas = None self.verbose = verbose self.logging = Logging("INFO") self._compress_ands = False #: stimuli self._stimuli = [] self._compressed = [] self._signals =[] #: inhibitors self._inhibitors = [] self._nonc = None if hasattr(model, "nodes") and hasattr(model, "edges"): for node in model.nodes(): self.add_node(str(node)) for edge in model.edges(data=True): if "link" in edge[2]: self.add_edge(str(edge[0]), str(edge[1]), link=edge[2]['link']) else: self.add_edge(str(edge[0]), str(edge[1]), link="+") self.set_default_node_attributes() # must be call if sif or midas modified. else: self.readSIF(model) self.midas = data self._set_dot_attributes() self._colormap = Colormap() def _set_dot_attributes(self): # other attributes self._dot_mode = "end_signals_bottom" self.dotattrs = {} self.dotattrs['graph'] = {"title": "CNOGraph output from cellnopt.core.cnograph.plotdot", 'fontname': 'helvetica', 'fontsize': 22, 'size': "25,25", 'ordering': "out", 'splines':True} self.dotattrs['edge'] = { 'minlen':1, 'color':'black'} # SOME PROPERTIES def _get_midas(self): return self._midas def _set_midas(self, data): if isinstance(data, str): self._midas = XMIDAS(data, cellLine=self.kargs.get("cellLine", None)) elif isinstance(data, XMIDAS): self._midas = copy.deepcopy(data) elif isinstance(data, MIDASReader): raise TypeError("MIDASReader class is deprecated. Use XMIDAS instead") elif data == None: self._midas = data else: raise ValueError("Incorrect data, Must a valid MIDAS file or instance of XMIDAS class {}".format(data)) #if self._midas != None: #self._dataModelCompatibility() self.set_default_node_attributes() midas = property(fget=_get_midas, fset=_set_midas, doc="MIDAS Read/Write attribute.") def _get_stimuli(self): stimuli = list(self._stimuli[:]) if self.midas: stimuli += self.midas.names_stimuli[:] return stimuli stimuli = property(_get_stimuli, doc="list of stimuli found in the :attr:`midas` and hidden attribute :meth:`_stimuli`") def _get_inhibitors(self): inhibitors = list(self._inhibitors) if self.midas: inhibitors += self.midas.names_inhibitors[:] return inhibitors inhibitors = property(_get_inhibitors, doc="list of inhibitors found in the :attr:`midas` and hidden attribute :attr:`_inhibitors`") def _get_signals(self): signals = list(self._signals) # do not reference if self.midas: signals += list(self.midas.names_signals) return signals signals = property(_get_signals, doc="list of signals found in the :attr:`midas` and hidden attribute :meth:`_signals`") # METHODS
[docs] def readSIF(self, model): """If the SIF file changes, we need to rebuild the graph.""" # takes the SIF input file and build up the CNOGraph. remove all nodes # before self.clear() if isinstance(model, str): sif = SIF(model) elif isinstance(model, SIF): sif = model elif hasattr(model, "reacID"): sif = model elif model==None: sif = SIF() else: raise ValueError("The sif input must be a filename to a SIF file or an instance of the SIF class") # add all reactions for reac in sif.reacID: self.add_reaction(reac) # now, we need to set the attributes, only if we have a cnolist, # otherwise color is the default (white) self.set_default_node_attributes() # must be call if sif or midas modified.
def _add_simple_reaction(self, reac, **edge_attributes): """A=B or !!A=B""" # not the second argument: we split only once so you can add a reaction # where the RHS is a AND gate coded with a "=" character in it (e.g., # A=A+B=C which means reaction from A to AND gate called A+B=C) lhs, rhs = reac.split("=", 1) if lhs=="": self.add_node(rhs) elif rhs == "": self.add_node(lhs) else: if lhs.startswith("!"): link = "-" lhs = lhs[1:] else: link = "+" if self.has_edge(lhs, rhs): if self[lhs][rhs]['link'] == link: self.logging.info("skip existing reactions %s %s %s" % (lhs, link, rhs)) else: self.add_edge(lhs,rhs, link=link, **edge_attributes) else: self.add_edge(lhs,rhs, link=link, **edge_attributes)
[docs] def add_reaction(self, reac, **edge_attributes): """Add nodes and edges given a reaction :param str reac: a valid reaction. See below for examples Here are some valid reactions that includes NOT, AND and OR gates. + is an OR and ^ character is an AND gate:: >>> s.add_reaction("A=B") >>> s.add_reaction("A+B=C") >>> s.add_reaction("A^C=E") >>> s.add_reaction("!F+G=H") .. plot:: :width: 50% :include-source: from cellnopt.core import * c = CNOGraph() c.add_reaction("a+b^c+e+d^h=Z") c.plotdot() """ # TODO: check if this is a valid reaction # ro rhs if there is an AND in LHS is wrong # mixing ^ and & and + is not implemented for now reac = reac.strip() reac = reac.replace("&", "^") lhs, rhs = reac.split("=") # if there is an OR gate, easy, just need to add simple reactions # A+!B=C is splitted into A=C and !B=C if "+" in lhs and "^" not in lhs: for this in lhs.split("+"): self._add_simple_reaction(this+"="+rhs, **edge_attributes) return # if no AND gates, even simpler it can only be reaction A=B or !A=B if "^" not in lhs and "+" not in lhs: self._add_simple_reaction(lhs+"="+rhs, **edge_attributes) return if "^" and "+" in lhs: # let us suppose that there is no mix such as a+b^c+d^h for this in lhs.split("+"): if "+" in this: self._add_simple_reaction(this +"=" + rhs, **edge_attributes) else: self.add_reaction(this +"=" + rhs, **edge_attributes) return # finally case with AND gates only if "^" in lhs and "+" not in rhs: # and gates need a little bit more work self.add_edge(reac, rhs, link="+", **edge_attributes) # the AND gate and its the unique output # now the inputs species = lhs.split("^") for this in species: self._add_simple_reaction(this + "=" + reac, **edge_attributes) return
[docs] def set_default_edge_attributes(self, **attr): if "compressed" not in attr.keys(): attr["compressed"] = [] link = attr.get("link") if link == "-": attrs = self.attributes['inhibition'] elif link == "+": attrs = self.attributes['activation'] else: raise ValueError("link must be '+' or '-'. Found %s" % link) for k in attrs.keys(): attr[k] = attrs[k] return attr
[docs] def reset_edge_attributes(self): """set all edge attributes to default attributes .. seealso:: :meth:`set_default_edge_attribute` if we set an edge label, which is an AND ^, then plot fails in this function c.edge["alpha^NaCl=HOG1"]['label'] = "?" """ for edge in self.edges(): attrs = self.edge[edge[0]][edge[1]] attrs = self.set_default_edge_attributes(**attrs) self.edge[edge[0]][edge[1]] = attrs
[docs] def add_edge(self, u, v, attr_dict=None, **attr): """adds an edge between node u and v. :param str u: source node :param str v: target node :param str link: compulsary keyword. must be "+" or "-" :param dict attr_dict: dictionary, optional (default= no attributes) Dictionary of edge attributes. Key/value pairs will update existing data associated with the edge. :param attr: keyword arguments, optional edge data (or labels or objects) can be assigned using keyword arguments. keywords provided will overwrite keys provided in the **attr_dict** parameter .. warning:: color, penwidth, arrowhead keywords are populated according to the value of the link. * If link="+", then edge is black and arrowhead is normal. * If link="-", then edge is red and arrowhead is a tee .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph() c.add_edge("A","B",link="+") c.add_edge("A","C",link="-") c.add_edge("C","D",link="+", mycolor="blue") c.add_edge("C","E",link="+", data=[1,2,3]) You can also add several edges at the same time for a single output but multiple entries:: c.add_edge("A+B+C", "D", link="+") equivalent to :: c.add_edge("A", "D", link="+") c.add_edge("B", "D", link="+") c.add_edge("C", "D", link="+") Attributes on the edges can be provided using the parameters **attr_dict** (a dictionary) and/or ****attr**, which is a list of key/value pairs. The latter will overwrite the key/value pairs contained in the dictionary. Consider this example:: c = CNOGraph() c.add_edge("a", "c", attr_dict={"k":1, "data":[0,1,2]}, link="+", k=3) c.edges(data=True) [('a', 'c', {'arrowhead': 'normal', 'color': 'black', 'compressed': [], 'data': [0, 1, 2], 'k':3 'link': '+', 'penwidth': 1})] The field "k" in the dictionary (attr_dict) is set to 1. However, it is also provided as an argument but with the value 3. The latter is the one used to populate the edge attributes, which can be checked by printing the data of the edge (c.edges(data=True()) .. seealso:: special attributes are automatically set by :meth:`set_default_edge_attributes`. the color of the edge is black if link is set to "+" and red otherwie. """ link = attr.get("link", "+") attr['link'] = link attr = self.set_default_edge_attributes(**attr) super(CNOGraph, self).add_edge(u, v, attr_dict, **attr) # cast u to str to search for + sign if "+" in unicode(u): lhs = u.split("+") for x in lhs: if x.startswith("-"): attr["link"] = "+" super(CNOGraph, self).add_edge(x[1:], u, attr_dict, **attr) else: attr["link"] = "+" super(CNOGraph, self).add_edge(x, u, attr_dict, **attr)
[docs] def clear(self): """Remove nodes and edges and MIDAS instance""" super(CNOGraph, self).clear() self.midas = None
[docs] def clean_orphan_ands(self): """Remove AND gates that are not AND gates anymore When removing an edge or a node, AND gates may not be valid anymore either because the output does not exists or there is a single input. This function is called when :meth:`remove_node` or :meth:`remove_edge` are called. However, if you manipulate the nodes/edges manually you may need to call this function afterwards. """ for node in self._find_and_nodes(): if len(self.successors(node))==0 or len(self.predecessors(node))<=1: self.remove_node(node) continue
def _dataModelCompatibility(self): """When setting a MIDAS file, need to check that it is compatible with the graph, i.e. species are found in the model.""" for x in self.midas.names_cues: if x not in self.nodes(): raise ValueError("""The cue %s was found in the MIDAS file but is not present in the model. Change your model or MIDAS file. """ % x) for x in self.midas.names_signals: if x not in self.nodes(): raise ValueError("""The signal %s was found in the MIDAS file but is not present in the model. Change your model or MIDAS file. """ % x)
[docs] def remove_and_gates(self): """Remove the AND nodes added by :meth:`expand_and_gates`""" for n in self._find_and_nodes(): self.remove_node(n)
def __eq__(self, other): # we must look at the data to figure out the link + or - but should ignore # all other keys try: edges1 = sorted(self.edges(data=True)) edges2 = sorted(other.edges(data=True)) edges1 = [(e[0], e[1], {'link':e[2]['link']}) for e in edges1] edges2 = [(e[0], e[1], {'link':e[2]['link']}) for e in edges2] res = edges1 == edges2 return res except Exception: print("found exception") return False def __add__(self, other): """allows a+b operation combines the _inhibitors, _signals, _stimuli but keep only the first midas file. """ print("calling __add__") G = self.copy() G.add_nodes_from(other.nodes(data=True)) edges = other.edges(data=True) for e1,e2,d in edges: G.add_edge(e1,e2,None, **d) G._inhibitors += other._inhibitors G._signals += other._signals G._stimuli += other._stimuli # TODO: merge the MIDAS files. ? return G def __radd__(self, other): print("calling __radd__") self.add_nodes_from(other.nodes(data=True)) edges = other.edges(data=True) for e1,e2,d in edges: self.add_edge(e1,e2,None, **d) #def __iadd__(self, other): # print("calling iadd") # self.add_nodes_from(other.nodes(data=True)) # edges = other.edges(data=True) # for e1,e2,d in edges: # self.add_edge(e1,e2,None, **d) # return self def __sub__(self, other): print("calling __sub__") G = self.copy() G.remove_nodes_from([n for n in G if n in other.nodes()]) return G #def __rsub__(self, other): # self.remove_nodes_from([n for n in self if n in other.nodes()])
[docs] def union(self, other): """Return graph with elements from this instance and the input graph. .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "B", link="+") c1.add_edge("A", "C", link="-") c1.add_edge("C", "E", link="+") subplot(1,3,1) title(r"graph $C_1$") c1.plotdot(hold=True) c2 = CNOGraph() c2.add_edge("A", "B", link="+") c2.add_edge("B", "D", link="+") c2.add_edge("B", "F", link="+") subplot(1,3,2) c2.plotdot(hold=True) title(r"graph $C_2$") c3 = c1.union(c2) subplot(1,3,3) c3.plotdot(hold=True) title(r"graph $C_3 = C_1 \cup C_2$") """ c = self + other return c
[docs] def difference(self, other): """Return a CNOGraph instance that is the difference with the input graph (i.e. all elements that are in this set but not the others.) .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "B", link="+") c1.add_edge("A", "C", link="-") c1.add_edge("C", "E", link="+") subplot(1,3,1) title("graph C1") c1.plotdot(hold=True) c2 = CNOGraph() c2.add_edge("A", "B", link="+") c2.add_edge("B", "D", link="+") c2.add_edge("B", "F", link="+") subplot(1,3,2) c2.plotdot(hold=True) title("graph C2") c3 = c1.difference(c2) subplot(1,3,3) c3.plotdot(hold=True) title("graph C3=C1-C2") .. note:: this method should be equivalent to the - operator. So c1-c2 == c1.difference(c2) """ G = self.copy() G.remove_nodes_from([n for n in G if n in other.nodes()]) return G
[docs] def intersect(self, other): """Return a graph with only nodes found in "other" graph. .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "B", link="+") c1.add_edge("A", "C", link="-") c1.add_edge("C", "E", link="+") subplot(1,3,1) title(r"graph $C_1$") c1.plotdot(hold=True) c2 = CNOGraph() c2.add_edge("A", "B", link="+") c2.add_edge("B", "D", link="+") c2.add_edge("B", "F", link="+") subplot(1,3,2) c2.plotdot(hold=True) title(r"graph $C_2$") c3 = c1.intersect(c2) subplot(1,3,3) c3.plotdot(hold=True) title(r"graph $C_3 = C_1 \cap C_2$") """ G = self.copy() G.remove_nodes_from([n for n in G if n not in other]) return G
def __str__(self): nodes = len([x for x in self.nodes() if '^' not in x]) andnodes = len([x for x in self.nodes() if '^' in x]) msg = "The model contains %s nodes (and %s AND node)\n" % (nodes, andnodes) self.logging.warning("Edge counting valid only if and node have only 2 inputs") edges = len([e for e in self.edges() if '^' not in e[0] and '^' not in e[1]]) andedges = len([e for e in self.edges() if '^' in e[0] or '^' in e[1]])/3 msg += "%s Hyperedges found (%s+%s) \n" % (edges+andedges, edges, andedges) return msg #def _get_link(self): # return [self.edge[x[0]][x[1]]['link'] for x in self.edges_iter()] #links = property(fget=_get_link, doc="Read only attribute.") #def _get_weight(self): # return [self.edge[x[0]][x[1]]['weight'] for x in self.edges_iter()] #weights = property(fget=_get_weight, doc="Read only attribute.")
[docs] def draw(self, prog="dot", hold=False, attribute="fillcolor", colorbar=True, **kargs): """Draw the network using matplotlib. Not exactly what we want but could be useful. :param str prog: one of the graphviz program (default dot) :param bool hold: hold previous plot (default is False) :param str attribute: attribute to use to color the nodes (default is "fillcolor"). :param node_size: default 1200 :param width: default 2 :param bool colorbar: add colorbar (default is True) Uses the fillcolor attribute of the nodes Uses the link attribute of the edges .. seealso:: :meth:`plotdot` that is dedicated to this kind of plot using graphviz """ self.logging.warning("Not for production. Use plotdot() instead") pos = nx.drawing.graphviz_layout(self, prog=prog) if hold==False: pylab.clf() node_size = kargs.get('node_size', 1200) kargs['node_size'] = node_size width = kargs.get('width', 2) kargs['width'] = width # node attributes nodes = sorted(self.nodes()) node_colors = [self.node[x][attribute] if attribute in self.node[x].keys() else "gray" for x in nodes] # edge attributes edges = self.edges(data=True) colors = {'-':'red', '+':'black'} edge_colors = [colors[x[2]['link']] for x in edges] nx.draw(self, prog=prog, hold=hold, nodelist=nodes, edge_color=edge_colors, node_color=node_colors, pos=pos, **kargs) if attribute in ["degree_cent", "betweeness_cent", "closeness_cent"]: if colorbar: pylab.colorbar(shrink=0.7, pad=0.01, fraction=0.10)
def _plot_legend(self): """used by plotdot""" txt = "Color legend:\n--------------------\n green:stimuli\nred:inhibitors\nblue:readouts\nred arrow: inhibits\n: black arrow:direct link" self._add_textbox(txt, 0.95, 0.95) def _add_textbox(self, text, x1=0.95,x2=0.95): a = pylab.gca() pylab.text(x1, x2, text, transform=a.transAxes, verticalalignment='top', bbox=dict(facecolor="wheat", boxstyle="round", alpha=0.5)) def _check_dot_prog(self, prog): easydev.check_param_in_list(prog, ["twopi", "gvcolor", "wc", "ccomps", "tred", "sccmap", "fdp", "circo", "neato", "acyclic", "nop", "gvpr", "dot", "sfdp"])
[docs] def plot(self, *args, **kargs): return self.plotdot(*args, **kargs)
def _get_cmap(self, cmap=None): if cmap == "heat": cmap = self._colormap.get_cmap_heat_r() elif cmap == "green": cmap = self._colormap.get_cmap_red_green() return cmap
[docs] def plotdot(self, prog="dot", viewer="pylab", hold=False, legend=False, show=True, filename=None, node_attribute=None, edge_attribute=None, cmap=None, colorbar=False, remove_dot=True, cluster_stimuli=False, normalise_cmap=True, edge_attribute_labels=True, aspect="equal", rank=False ): """plotting graph using dot program (graphviz) and networkx By default, a temporary file is created to hold the image created by graphviz, which is them shown using pylab. You can choose not to see the image (show=False) and to save it in a local file instead (set the filename). The output format is PNG. You can play with networkx.write_dot to save the dot and create the SVG yourself. :param str prog: the graphviz layout algorithm (default is dot) :param viewer: pylab :param bool legend: adds a simple legend (default is False) :param bool show: show the plot (True by default) :param bool remove_dot: if True, remove the temporary dot file. :param edge_attribute_labels: is True, if the label are available, show them. otherwise, if edge_attribute is provided, set lael as the edge_attribute :param aspect: auto or equal. Used to scale the the image (imshow argument) and affects the scaling on the x/y axis. Additional attributes on the graph itself can be set up by populating the graph attribute with a dictionary called "graph":: c.graph['graph'] = {"splines":True, "size":(20,20), "dpi":200} Useful other options are:: c.edge["tnfa"]["tnfr"]["penwidth"] = 3 c.edge["tnfa"]["tnfr"]["label"] = " 5" If you use edge_attribute and show_edge_labels, label are replaced by the content of edge_attribute. If you still want differnt labels, you must stet show_label_edge to False and set the label attribute manually :: c = cnograph.CNOGraph() c.add_reaction("A=C") c.add_reaction("B=C") c.edge['A']['C']['measure'] = 0.5 c.edge['B']['C']['measure'] = 0.1 c.expand_and_gates() c.edge['A']['C']['label'] = "0.5 seconds" # compare this that shows only one user-defined label c.plot() # with that show all labels c.plot(edge_attribute="whatever", edge_attribute_labels=False) See the graphviz homepage documentation for more options. .. note:: edge attribute in CNOGraph (Directed Graph) are not coded in the same way in CNOGraphMultiEdges (Multi Directed Graph). So, this function does not work for MultiGraph .. todo:: use same colorbar as in midas. rigtht now; the vmax is not correct. .. todo:: precision on edge_attribute to 2 digits. """ # graph is a DiGraph attribute # that is sometimes replaced by {} inside networkx so we need to overwrite it here # each time we want to plot the graph. self.graph = self.graph_options.copy() if cmap == "heat": _colormap = Colormap() cmap = _colormap.get_cmap_heat_r() # update the attributes if node_attribute == None: self.set_default_node_attributes() else: if len(self): node = self.nodes()[0] else: self.logging.info("Empty graph. Nothing to plot") return #if node_attribute not in self.node[node].keys(): # self.logging.info("attribute %s not found. We may need to call a specific method (e.g., centrality_closeness before") # return import matplotlib cmap = matplotlib.cm.get_cmap(cmap) sm = matplotlib.cm.ScalarMappable( norm=matplotlib.colors.Normalize(vmin=0,vmax=1), cmap=cmap) M = [node[1][node_attribute] for node in self.nodes(data=True) if node_attribute in node[1].keys()] if len(M): if normalise_cmap == True: M = max(M) else: M = 1 else: self.logging.error("attribute %s not found in any nodes" % node_attribute) for node in self.nodes(): try: value = self.node[node][node_attribute]/float(M) rgb = sm.to_rgba(value) colorHex = matplotlib.colors.rgb2hex(rgb) self.node[node]['fillcolor'] = colorHex except: self.node[node]['fillcolor'] = "#FFFFFF" #if edge_attribute == None: # self.set_edge_attributes() #else: # M = self._set_edge_attribute_color(edge_attribute, cmap) if edge_attribute: M = self._set_edge_attribute_color(edge_attribute, cmap) # ? self._check_dot_prog(prog) # create temp files infile = tempfile.NamedTemporaryFile(suffix=".dot", delete=False) if filename == None: outfile = tempfile.NamedTemporaryFile(suffix=".png", delete=False) filename = outfile.name # Some nodes may belong to 2 colors. Creating subgraph is one way to go # around. Having graphviz 2.30 we could use striped node. if self.midas and node_attribute==None: for node in self.nodes(): if node in self.midas.names_signals and node in self.midas.names_inhibitors: self.node[node]['style'] = "diagonals,filled" self.node[node]['color'] = "red" self.node[node]['fillcolor'] = "lightblue" if node in self.midas.names_stimuli and node in self.midas.names_inhibitors: self.node[node]['style'] = "diagonals,filled" self.node[node]['color'] = "red" self.node[node]['fillcolor'] = "#9ACD32" # to not change the current graph, let us copy it this = self.copy() if edge_attribute_labels and edge_attribute != None: self._set_edge_attribute_label(this, edge_attribute) # real plotting is now here below # -------------------------------------------------- if rank is True: H = self._get_ranked_agraph() else: H = nx.to_agraph(this) cluster = 1 # we want stimuli to be on top if (self.midas or self._stimuli) and cluster_stimuli: stimuli = [] if self.midas: stimuli = self.midas.names_stimuli[:] stimuli += self._stimuli[:] if rank: H.add_subgraph(self.stimuli, name="cluster_stimuli", rank="source", style="filled", fillcolor="white") H.write(infile.name) #nx.write_doit(self, infile.name) if filename.endswith(".png"): cmd = "%s -Tpng %s -o %s" % (prog, infile.name, filename) elif filename.endswith(".svg"): cmd = "%s -Tsvg %s -o %s" % (prog, infile.name, filename) else: self.logging.info("extension should be svg or png. File not saved") cmd = "" # nothing to be done # Below is the code to actually show the image, not to build it self.logging.info(cmd) ret = subprocess.call(cmd, shell=True) if viewer=="pylab" and show==True: if hold == False: pylab.clf() f = pylab.gcf() f.set_facecolor("white") a = f.add_axes([0.01,0.01,0.9,0.9]) a.imshow(pylab.imread(filename), aspect=aspect) a.axis('off') else: a = pylab.imshow(pylab.imread(filename), aspect=aspect) pylab.axis('off') if colorbar: cbar = pylab.linspace(0, 1, 10) e = pylab.axes([.8, .1, .05, .8]) pylab.pcolor(pylab.array([cbar, cbar]).transpose(), cmap=self._get_cmap(cmap), vmin=0, vmax=1); e.yaxis.tick_right() e.set_xticks([],[]) #ticks = numpy.array(e.get_yticks()) try: e.set_yticklabels([float(this.get_text())*M for this in e.get_yticklabels()]) except: pass f = pylab.gcf() f.set_facecolor("white") if legend: self._plot_legend() elif show==True: a = subprocess.call("%s %s &" % (viewer, filename), shell=True) else: a = None if remove_dot == True: infile.delete = True infile.close() else: print(filename) shutil.move(infile.name, "model.dot") # name = os.path.splitext(filename)[0] + ".dot" # shutil.move(infile.name, name) infile.close() if filename == False: outfile.delete = True outfile.close() if node_attribute != None: self.set_default_node_attributes() #if edge_attribute != None: # self.set_edge_attributes() if show == True: try: from biokit.dev.mpl_focus import ZoomPan ax = pylab.gca() zp = ZoomPan() figZoom = zp.zoom_factory(ax, base_scale = 1.2) figPan = zp.pan_factory(ax) except: pass return a
def _set_edge_attribute_label(self, this, edge_attribute): for e in this.edges(): this.edge[e[0]][e[1]]['label'] = this.edge[e[0]][e[1]][edge_attribute] def _set_edge_attribute_color(self, edge_attribute, cmap): import matplotlib cmap = matplotlib.cm.get_cmap(cmap) sm = matplotlib.cm.ScalarMappable( norm=matplotlib.colors.Normalize(vmin=0,vmax=1), cmap=cmap) M = max([self.edge[edge[0]][edge[1]][edge_attribute] for edge in self.edges()]) for edge in self.edges(): value = self.edge[edge[0]][edge[1]][edge_attribute]/float(M) rgb = sm.to_rgba(value) colorHex = matplotlib.colors.rgb2hex(rgb) self.edge[edge[0]][edge[1]]['color'] = colorHex return M def _get_hex_color_from_value(self, value, cmap): import matplotlib cmap = matplotlib.cm.get_cmap(cmap) sm = matplotlib.cm.ScalarMappable( norm=matplotlib.colors.Normalize(vmin=0,vmax=1), cmap=cmap) rgb = sm.to_rgba(value) colorHex = matplotlib.colors.rgb2hex(rgb) return colorHex def _get_ranked_agraph(self): H = nx.to_agraph(self) for k, v in self.dotattrs['graph'].iteritems(): H.graph_attr[k] = v for k, v in self.dotattrs['edge'].iteritems(): H.edge_attr[k] = v # order the graph for ranks allranks = self.get_same_rank() ranks = {} for k, v in allranks.iteritems(): ranks[k] = sorted([x for x in v if '=' not in x], cmp=lambda x,y:cmp(x.lower(), y.lower())) # add invisible edges so that the nodes that have the same rank are # ordered. if k!=0: for i, node1 in enumerate(ranks[k]): if i!=len(ranks[k])-1: node2 = ranks[k][i+1] H.add_edge(node1, node2, style="invis") M = max(ranks.keys()) for rank in sorted(ranks.keys()): name = str(rank) if rank == 0: name = "stimuli" #H.add_subgraph(ranks[rank], name="cluster_"+name, rank='min') # label="Stimuli" , rank='min') elif rank==M: name = "end_signals" H.add_subgraph(ranks[rank], name=name, rank='sink') else: pass #H.add_subgraph(ranks[rank], name=name, rank='same') return H def _get_nonc(self): if self._nonc == None: nonc = self.findnonc() self._nonc = nonc else: nonc = self._nonc return nonc nonc = property(fget=_get_nonc, doc="Returns list of Non observable and non controlable nodes (Read-only).") def _get_reacID(self): # FIXME. A=B, C=B, expand_and_gates; reacID does not contain the and gate... edges = [x for x in self.edges()] links = [str(self.edge[x[0]][x[1]]['link']) for x in edges] nodes1 = [x[0] for x in edges] nodes2 = [x[1] for x in edges] reacs = [] for n1,link,n2 in zip(nodes1, links, nodes2): if "=" in n1 or "=" in n2: continue if link == "+": reacs.append(n1 + "=" + n2) else: reacs.append("!" + n1 + "=" + n2) return reacs reacID = property(_get_reacID, doc="return the reactions (edges)") def _get_namesSpecies(self): nodes = self.nodes() nodes = [x for x in nodes if "+" not in x and "=" not in x] return sorted(nodes) namesSpecies = property(fget=_get_namesSpecies, doc="Return sorted list of species (ignoring and gates) Read-only attribute.")
[docs] def swap_edges(self, nswap=1): """Swap two edges in the graph while keeping the node degrees fixed. A double-edge swap removes two randomly chosen edges u-v and x-y and creates the new edges u-x and v-y:: u--v u v becomes | | x--y x y If either the edge u- x or v-y already exist no swap is performed and another attempt is made to find a suitable edge pair. :param int nswap: number of swaps to perform (Defaults to 1) :return: nothing .. warning:: the graph is modified in place. .. todo:: need to take into account the AND gates !! a proposal swap is ignored in 3 cases: #. if the summation of in_degree is changed #. if the summation of out_degree is changed #. if resulting graph is disconnected """ Ninh = [x[2]["link"] for x in self.edges(data=True)].count('-') I = sum(self.in_degree().values()) O = sum(self.out_degree().values()) # find 2 nodes that have at least one successor for i in range(0, nswap): #print(i) edges = self.edges() np.random.shuffle(edges) e1, e2 = edges[0:2] d1 = self.edge[e1[0]][e1[1]].copy() d2 = self.edge[e2[0]][e2[1]].copy() G = self.copy() G.add_edge(e1[0], e2[1], None, **d1) G.add_edge(e2[0], e1[1], None, **d2) G.remove_edge(e1[0], e1[1]) G.remove_edge(e2[0], e2[1]) if nx.is_connected(G.to_undirected()) == False: #print("G is disconnected.skipping------") continue if sum(G.in_degree().values()) != I: # the link already exists continue if sum(G.out_degree().values()) != O: continue self.add_edge(e1[0], e2[1], None, **d1) self.add_edge(e2[0], e1[1], None, **d2) self.remove_edge(e1[0], e1[1]) self.remove_edge(e2[0], e2[1]) Ninh2 = [x[2]["link"] for x in self.edges(data=True)].count('-') assert Ninh2 == Ninh assert nx.is_connected(self.to_undirected()) == True
[docs] def dependencyMatrix(self, fontsize=12): r"""Return dependency matrix * :math:`D_{i,j}` = green ; species i is an activator of species j (only positive path) * :math:`D_{i,j}` = red ; species i is an inhibitor of species j (only negative path) * :math:`D_{i,j}` = yellow; ambivalent (positive and negative paths connecting i and j) * :math:`D_{i,j}` = red ; species i has no influence on j .. plot:: :include-source: :width: 80% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.dependencyMatrix() """ import numpy nodes = sorted(self.nodes()) N = len(nodes) data = numpy.zeros((len(nodes), len(nodes))) for i,node1 in enumerate(nodes): paths = nx.shortest_path(self, node1) for j,node2 in enumerate(nodes): if node1 == node2: data[i][j] = 0 elif node2 not in paths.keys(): data[i][j] = 0 else: path = paths[node2] links = [self.edge[path[ii]][path[ii+1]]["link"] for ii in range(0,len(path)-1)] if len(numpy.unique(links)) == 2: data[i][j] = 1 # yellow elif "+" in links: data[i][j] = 2 #green elif "-" in links: if links.count("-") % 2 ==0: data[i][j] = 2 else: data[i][j] = 3 #red nodeNames = [node.replace("_", "\_") for node in nodes] nodeNamesY = [node.replace("_", "\_") for node in nodes] from matplotlib import colors norm = colors.Normalize(vmin=0, vmax=3) cmap = colors.ListedColormap([[0., 0, 0], [1,1,0],[.5, 1, 0.], [1., 0, 0.]]) indices = [i for i, node in enumerate(nodes) if "and" not in node or "+" in nodes] pylab.clf() pylab.pcolor(pylab.flipud(data[indices][:,indices]), edgecolors="w", cmap=cmap,norm=norm); N = len(indices) nodeNames = numpy.array(nodeNames)[indices] nodeNamesY = numpy.array(nodeNamesY)[indices[::-1]] pylab.axis([0, N, 0, N]) pylab.xticks([0.5+x for x in pylab.arange(N)], nodeNames, rotation=90, fontsize=fontsize) pylab.yticks([0.5+x for x in pylab.arange(N)], nodeNamesY, rotation=0, fontsize=fontsize) ax = pylab.colorbar() ax.set_ticks([0.4,1.1,1.9,2.6]) ax.set_ticklabels(["no effect","activation", "ambiguous", "inhibition"]) pylab.draw() return data
[docs] def adjacencyMatrix(self, nodelist=None, weight=None): """Return adjacency matrix. :param list nodelist: The rows and columns are ordered according to the nodes in nodelist. If nodelist is None, then the ordering is produced by :meth:`nodes` method. :param str weight: (default=None) The edge data key used to provide each value in the matrix. If None, then each edge has weight 1. Otherwise, you can set it to "weight" :returns: numpy matrix Adjacency matrix representation of CNOGraph. .. note:: alias to :meth:`networkx.adjacency_matrix` .. seealso:: :meth:`adjacency_iter` and :meth:`adjacency_list` """ from networkx import adjacency_matrix return adjacency_matrix(self, nodelist=nodelist).astype(int)
[docs] def remove_edge(self, u, v): """Remove the edge between u and v. :param str u: node u :param str u: node v Calls :meth:`clean_orphan_ands` afterwards """ super(CNOGraph, self).remove_edge(u,v) #if "+" not in n: self.clean_orphan_ands()
[docs] def remove_node(self, n): """Remove a node n :param str node: the node to be removed Edges linked to **n** are also removed. **AND** gate may now be irrelevant (only one input or no input). Orphan AND gates are removed. .. seealso:: :meth:`clean_orphan_ands` """ super(CNOGraph, self).remove_node(n) if "^" not in unicode(n): self.clean_orphan_ands()
[docs] def add_node(self, node, attr_dict=None, **attr): """Add a node :param str node: a node to add :param dict attr_dict: dictionary, optional (default= no attributes) Dictionary of edge attributes. Key/value pairs will update existing data associated with the edge. :param attr: keyword arguments, optional edge data (or labels or objects) can be assigned using keyword arguments. keywords provided will overwrite keys provided in the **attr_dict** parameter .. warning:: color, fillcolor, shape, style are automatically set. :: c = CNOGraph() c.add_node("A", data=[1,2,3,] .. warning:: ****attr** replaces any key found in attr_dict. See :meth:`add_edge` for details. .. todo:: currently nodes that contains a ^ sign are interpreted as AND gate and will appear as small circle. One way to go around is to use the label attribute. you first add the node with a differnt name and populate the label with the correct nale (the one that contain the ^ sign); When calling the plot function, they should all appear as expected. """ if attr_dict: print("Warning: attr_dict overwritten") if "fillcolor" not in attr.keys(): attr["fillcolor"] = "white" attr_dict = self.get_node_attributes(node) if "fillcolor" not in attr_dict.keys(): attr_dict["fillcolor"] = "white" attr["fillcolor"] = "white" super(CNOGraph, self).add_node(node, attr_dict, **attr)
[docs] def preprocessing(self, expansion=True, compression=True, cutnonc=True, maxInputsPerGate=2): """Performs the 3 preprocessing steps (cutnonc, expansion, compression) :param bool expansion: calls :meth:`expand_and_gates` method :param bool compression: calls :meth:`compress` method :param bool cutnon: calls :meth:`cutnonc` method :param int maxInputPerGates: parameter for the expansion step .. plot:: :width: 80% :include-source: from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.preprocessing() c.plotdot() """ if cutnonc: self.cutnonc() if compression: self.compress() if expansion: self.expand_and_gates(maxInputsPerGate=maxInputsPerGate)
[docs] def cutnonc(self): """Finds non-observable and non-controllable nodes and removes them. .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.cutnonc() c.plotdot() """ nonc = self.nonc[:] for node in nonc: self.collapse_node(node)
[docs] def compress(self): """Finds compressable nodes and removes them from the graph A compressable node is a node that is not part of the special nodes (stimuli/inhibitors/readouts mentionned in the MIDAS file). Nodes that have multiple inputs and multiple outputs are not compressable either. .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.cutnonc() c.compress() c.plotdot() .. seealso:: :meth:`compressable_nodes`, :meth:`is_compressable` """ #Patched to proceed on sorted lists and provide always the same results. #for node in self.compressable_nodes: for node in sorted(self.compressable_nodes): if self.is_compressable(node) == True: # update the graph G as well and interMat/notMat self._compressed.append(node) self.collapse_node(node) #self.midas.cnolist.namesCompressed.append(node) #Patched to proceed on a sorted list and provide always the same results. #for node in self.nodes(): for node in sorted(self.nodes()): if self.degree(node) == 0 and node not in self.stimuli and \ node not in self.signals and node not in self.inhibitors: """Luca: TODO? Shouldn't one check that an eventual orphan is not an observed node? Removing a node that is observed generates an incompatibility between the compressed network and the MIDAS file. The incompatibility arises when saving the compressed network and trying to use it with the same MIDAS file used to compress. In cellnopt.asp.netrec I have a workaround for this. It consists in finding the list of nodes that belong to the network but are in no edges (if a node A is removed during compression, it appears in no edges, but you can still find it in CNOGraph.nodes()). For this set of nodes that do not belong to the network, I add fake edges like A + mock1 and so on. Doing like this I am able to save the compressed (or the repaired) network, while using the same MIDAS file. I was afraid to touch the code because I am not sure whether this is the intended behaviour.""" self.logging.info("Found an orphan, which has been removed (%s)" % node) self.remove_node(node)
def _decompress(self): """uncompress some nodes if possible. .. warning:: don't use ; works for simple cases only so far to be revisited. kept for book keeping """ for n1, n2 in self.edges(): compressedNodes = self.edge[n1][n2]['compressed'] if compressedNodes==[]: continue for this in compressedNodes: link1 = this[0] link2 = this[-1] node = this[1:len(this)-1] #print(link1, node, link2) # must remove from compress list before add_node that calls # set_attributes (that check for compress list) # TODO check if this is correct to use try/except try:self.midas.cnolist.namesCompressed.remove(node) except: print("could not remove %s" % node) self.add_node(node) self.add_edge(n1,node, link=link1) self.add_edge(node,n2, link=link2) self.remove_edge(n1, n2)
[docs] def collapse_nodes(self, nodes): """Collapse a list of nodes :param list nodes: a list of node to collapse .. seealso:: :meth:`collapse_node`. """ for node in nodes: self.collapse_node(node)
def _get_collapse_edge(self, inputAttrs, outputAttrs): attrs = inputAttrs.copy() if inputAttrs['link'] == outputAttrs['link']: attrs['link'] = "+" attrs['color'] = "black" attrs['arrowhead'] = "normal" else: attrs['link'] = "-" attrs['color'] = "red" attrs['arrowhead'] = "tee" return attrs
[docs] def collapse_node(self, node): """Collapses a node (removes a node but connects input nodes to output nodes) This is different from :meth:`remove_node`, which removes a node and its edges thus creating non-connected graph. :meth:`collapse_node`, instead remove the node but merge the input/output edges IF possible. If there are multiple inputs AND multiple outputs the node is not removed. :param str node: a node to collapse. * Nodes are collapsed if there is at least one input or output. * Node are not removed if there is several inputs and several ouputs. * if the input edge is -, and the next is + or viceversa then the final edge if - * if the input edge is - and output is - then final edge is + """ # Removing a node that has several entries and several outputs is not # implemented because there is no such situation in CNO. successors = self.successors(node) predecessors = self.predecessors(node) #newnodes1 = [] #newnodes2 = [] #newedges = [] # todo: may be simplified ? if len(successors) == 1 and len(predecessors)==1: self.logging.debug("Compressing %s 1,1 mode" % node) # compressed node attr1 = self.edge[node][successors[0]] # attr2 = self.edge[predecessors[0]][node] compressed = attr2['link'] + node + attr1['link'] attrs = self._get_collapse_edge(attr1, attr2) attrs['compressed'].append(compressed) if predecessors[0] != successors[0]: self.add_edge(predecessors[0], successors[0], None, **attrs) #newnodes1.append(predecessors[0]) #newnodes2.append(successors[0]) #newedges.append(attr['link']) elif len(successors) == 1: for predecessor in predecessors: attr = self.edge[predecessor][node] if predecessor != successors[0]: attr2 = self.edge[node][successors[0]] compressed = attr2['link'] + node + attr['link'] attrs = self._get_collapse_edge(attr, attr2) attrs['compressed'].append(compressed) self.add_edge(predecessor, successors[0], None, **attrs) #newnodes1.append(predecessor) #newnodes2.append(successors[0]) #newedges.append(attr['link']) elif len(predecessors) == 1: #print(node) #print(predecessors) for successor in successors: #print(successor) attr = self.edge[node][successor] if predecessors[0] != successor: attr2 = self.edge[predecessors[0]][node] compressed = attr2['link'] + node + attr['link'] attrs = self._get_collapse_edge(attr, attr2) attrs['compressed'].append(compressed) self.add_edge(predecessors[0], successor, None, **attrs) #newnodes1.append(predecessors[0]) ##newnodes2.append(successor) #newedges.append(attr['link']) else: if len(successors) > 1 and len(predecessors) > 1: self.logging.debug(node, successors, predecessors) self.logging.warning("N succ >1 and N pred >1. Node not removed. use remove_node() if you really want to remove it") return #raise ValueError("invalid node to remove several in/out") else: self.logging.debug("%s %s %s" % (node, successors, predecessors)) self.logging.warning("unknown case (no output or input ?). Node %s removed"% node) #raise ValueError("invalid node to remove several in/out") self.remove_node(node)
[docs] def get_node_attributes(self, node): """Returns attributes of a node using the MIDAS attribute Given a node, this function identifies the type of the input node and returns a dictionary with the relevant attributes found in :attr:`node_attributes.attributes`. For instance, if a midas file exists and if **node** belongs to the stimuli, then the dicitonary returned contains the color green. :param str node: :returns: dictionary of attributes. """ # default attr = self.attributes['others'].copy() # otherwisen if self.midas: if node in self.stimuli: attr = self.attributes['stimuli'].copy() elif node in self.signals: attr = self.attributes['signals'].copy() elif node in self.inhibitors: attr = self.attributes['inhibitors'].copy() elif node in self.nonc: # or node in self.findnonc(): attr = self.attributes['nonc'].copy() elif '^' in node: attr = self.attributes['and'].copy() elif node in self._compressed or node in self.compressable_nodes: attr = self.attributes['compressed'].copy() else: if '^' in unicode(node): attr = self.attributes['and'].copy() if node in self._stimuli: attr = self.attributes['stimuli'].copy() if node in self._signals: attr = self.attributes['signals'].copy() if node in self._inhibitors: attr = self.attributes['inhibitors'].copy() return attr
[docs] def set_default_node_attributes(self): """Set all node attributes to default attributes .. seealso:: :meth:`get_node_attributes` """ for node in self.nodes(): attrs = self.get_node_attributes(node) for k,v in attrs.iteritems(): self.node[node][k] = v
[docs] def get_same_rank(self): """Return ranks of the nodes. Used by plotdot/graphviz. Depends on attribute :attr:`dot_mode` """ # some aliases try: stimuli = self.midas.names_stimuli if len(stimuli) == 0: stimuli = self._get_inputs() except: stimuli = self._get_inputs() try: signals = self.midas.names_signals[:] if len(signals) == 0: signals = self._get_outputs() except: signals = self._get_outputs() func_path = nx.algorithms.floyd_warshall(self) maxrank = int(self.get_max_rank()) # start populating the ranks starting with the obvious one: stimuli and # signals ranks = {} ranks[0] = stimuli for i in range(1, maxrank+1): ranks[i] = [] from pylab import inf, nanmin, nanmax if self.dot_mode == 'free': """default layout but stimuli on top""" for node in self.nodes(): if node not in stimuli: distances = [func_path[s][node] for s in stimuli] distances = [abs(x) for x in distances if x != inf] if len(distances) != 0: M = nanmin([x for x in distances if x != inf]) try: ranks[M].append(node) except: ranks[M] = [node] else: self.logging.debug('warning, rank %s is empyt'% node) elif self.dot_mode == 'signals_bottom': # no need for max rank here for node in self.nodes(): if node not in stimuli and node not in signals: distances = [func_path[s][node] for s in stimuli] distances = [x for x in distances if x != inf] if len(distances) != 0: M = nanmax([abs(x) for x in distances if x != inf]) try: ranks[M].append(node) except: ranks[M] = [node] else: self.logging.debug('warning, rank %s is empyt'% node) M = max(ranks.keys()) ranks[M+2] = signals elif self.dot_mode == 'end_signals_bottom': maxrank = max(ranks.keys()) for node in sorted(self.nodes(),cmp=lambda x,y: cmp(x.lower(), y.lower())): # end signals #print(node,) if node in signals and len(self.successors(node))==0: continue elif node not in stimuli: distances = [func_path[s][node] for s in stimuli] distances = [x for x in distances if x != inf] #print(distances) if len(distances) != 0: M = nanmax([abs(x) for x in distances if x != inf]) try: ranks[M].append(node) except: ranks[M] = [node] else: self.logging.debug('warning, rank %s is empyt'% node) for node in sorted(self.nodes(), cmp=lambda x,y: cmp(x.lower(),y.lower())): if node in signals and len(self.successors(node))==0: try: ranks[maxrank].append(node) except: print("isssu") ranks[maxrank] = [node] return ranks
def _get_sources(self): if self.stimuli: return [x for x in self.nodes() if len(self.predecessors(x))==0 and x in self.stimuli] else: return [x for x in self.nodes() if len(self.predecessors(x))==0] def _get_sinks(self): if self.signals: return [x for x in self.nodes() if len(self.successors(x))==0 and x in self.signals] else: return [x for x in self.nodes() if len(self.successors(x))==0] def _get_inputs(self): return [x for x in self.nodes() if len(self.predecessors(x))==0] def _get_outputs(self): self.logging.warning('WARNING. no signals found, tryring to build a list (node with no successors)') return [x for x in self.nodes() if len(self.successors(x))==0]
[docs] def get_max_rank(self): """Get the maximum rank from the inputs using floyd warshall algorithm If a MIDAS file is provided, the inputs correspond to the stimuli. Otherwise, (or if there is no stimuli in the MIDAS file), use the nodes that have no predecessors as inputs (ie, rank=0). """ from pylab import nanmax, inf if self.midas: stimuli = self.midas.names_stimuli if len(stimuli) == 0: stimuli = self._get_inputs() else: stimuli = self._get_inputs() func_path = nx.algorithms.floyd_warshall(self) # compute the longest path from Stimuli by using the floyd warshall # algorithm. inputs/stimuli has rank 0. ranks = [[x for x in func_path[stimulus].values() if x !=inf] for stimulus in stimuli] allranks = [] for r in ranks: allranks = allranks + r #concatenate all ranks includeing empty [] maxrank = nanmax(allranks) return maxrank
def _get_dot_mode(self): return self._dot_mode def _set_dot_mode(self, value): valid = ['free', 'signals_bottom', 'end_signals_bottom'] assert value in valid, "unknown value. must be in %s" % valid self._dot_mode = value dot_mode = property(fget=_get_dot_mode, fset=_set_dot_mode, doc="Read/Write attribute to use with plotdot2 method (experimental).") def _add_and_gates(self, node, maxInputsPerGate=2): """See expand_and_gates docstring""" preds = self.predecessors(node) preds = [pred for pred in preds if "^" not in pred] assert maxInputsPerGate>=2 and maxInputsPerGate<=5, "maxInputsPerGate must be >2 and less than 5" #todo: order predecessirs according to nameSpecies order self.logging.debug( "node %s, pred=%s " % (node, preds)) if len(preds) == 1: self.logging.debug("Nothing to do with %s (only 1 predecessor)" % node) return for inputsPerGates in range(2, maxInputsPerGate+1): self.logging.debug(inputsPerGates) if inputsPerGates>len(preds): continue for combi in itertools.combinations(preds, inputsPerGates): self.logging.debug("adding %s input gate from %s to %s" % (inputsPerGates, combi, node)) shape = "circle" if len(combi) == 3: shape = "triangle" elif len(combi) >= 4: shape = "square" andNode = self._nodes2reac(list(combi), node) self.logging.debug("add_and_gates: %s " % andNode) self.add_node(andNode, shape=shape) for combinode in combi: attr = self.edge[combinode][node].copy() attr['link'] = self.edge[combinode][node]['link'] #attr['weight'] = pylab.nan # edge from and to specy always + and nan weight self.add_edge(combinode, andNode, None, **attr) attr['link'] = "+" # edge from and to specy always + #attr['weight'] = pylab.nan # edge from and to specy always + and nan weight attr['color'] = 'black' # and output is always black self.add_edge(andNode, node, None, **attr) def _nodes2reac(self, inputsNodes, output): inputs = [] for node in inputsNodes: if self.edge[node][output]['link']=="-": inputs.append("!"+node) else: inputs.append(node) reac = "^".join(inputs) reac += "=" + output return reac def _find_nodes_with_multiple_inputs(self): """return a list of nodes that have multiple predecessors""" nodes = [] for node in self.nodes(): if len(self.predecessors(node))>1 and "^" not in str(node): nodes.append(node) else: if len(self.predecessors(node))>1 and "^" in str(node): self.logging.debug("ignore ", node) return nodes def _find_and_nodes(self): andNodes = [x for x in self.nodes() if "^" in str(x)] return andNodes
[docs] def expand_or_gates(self): """Expand OR gates given AND gates If a graph contains AND gates (without its OR gates), you can add back the OR gates automatically using this function. .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "C", link="-") c1.add_edge("B", "C", link="+") c1.expand_and_gates() subplot(1,3,1) title("OR and AND gates") c1.plotdot(hold=True) c1.remove_edge("A", "C") c1.remove_edge("B", "C") subplot(1,3,2) c1.plotdot(hold=True) title("AND gates only") c1.expand_or_gates() subplot(1,3,3) c1.plotdot(hold=True) title("after call to \\n expand_or_gates function") .. seealso:: :meth:`~cellnopt.core.cnograph.CNOGraph.expand_and_gates` """ for this in self._find_and_nodes(): p = self.predecessors(this) s = self.successors(this) assert len(s) == 1 for node in p: link = self.edge[node][this]['link'] self.add_edge(node, s[0], link=link)
[docs] def expand_and_gates(self, maxInputsPerGate=2): """Expands the network to incorporate AND gates :param int maxInputsPerGate: restrict maximum number of inputs used to create AND gates (default is 2) The CNOGraph instance can be used to model a boolean network. If a node has several inputs, then the combinaison of the inputs behaves like an OR gate that is we can take the minimum over the inputs. In order to include AND behaviour, we introduce a special node called AND gate. This function adds AND gates whenever a node has several inputs. The AND gates can later on be used in a boolean formalism. In order to recognise AND gates, we name them according to the following rule. If a node A has two inputs B and C, then the AND gate is named:: B^C=A and 3 edges are added: B to the AND gates, C to the AND gates and the AND gate to A. If an edge is a "-" link then, an **!** character is introduced. In this expansion process, AND gates themselves are ignored. If there are more than 2 inputs, all combinaison of inputs may be considered but the default parameter **maxInputsPerGate** is set to 2. For instance, with 3 inputs A,B,C you may have the following combinaison: A^B, A^C, B^C. The link A^B^C will be added only if **maxInputsPerGate** is set to 3. .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot, title c = CNOGraph() c.add_edge("A", "C", link="+") c.add_edge("B", "C", link="+") subplot(1,2,1) title("Original network") c.plotdot(hold=True) c.expand_and_gates() subplot(1,2,2) c.plotdot(hold=True) title("Expanded network") .. seealso:: :meth:`remove_and_gates`, :meth:`clean_orphan_ands`, :meth:`expand_or_gates`. .. note:: this method adds all AND gates in one go. If you want to add a specific AND gate, you have to do it manually. You can use the :meth:`add_reaction` for that purpose. .. note:: propagate data from edge on the AND gates. """ nodes2expand = self._find_nodes_with_multiple_inputs() for node in nodes2expand: self._add_and_gates(node, maxInputsPerGate)
[docs] def add_cycle(self, nodes, **attr): """Add a cycle :param list nodes: a list of nodes. A cycle will be constructed from the nodes (in order) and added to the graph. :param dict attr: must provide the "link" keyword. Valid values are "+", "-" the links of every edge in the cycle will be identical. .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph() c.add_edge("A", "C", link="+") c.add_edge("B", "C", link="+") c.add_cycle(["B", "C", "D"], link="-") c.plotdot() .. warning:: added cycle overwrite previous edges """ if "link" not in attr.keys(): raise KeyError("link keyword must be provided") attr = self.set_default_edge_attributes(**attr) super(CNOGraph, self).add_cycle(nodes, **attr)
[docs] def add_path(self): """networkx method not to be used""" raise NotImplementedError
[docs] def to_directed(self): """networkx method not to be used""" raise NotImplementedError
[docs] def add_star(self): """networkx method not to be used""" raise NotImplementedError
[docs] def remove_edges_from(self): """networkx method not to be used""" raise NotImplementedError
[docs] def add_weighted_edges_from(self): """networkx method not to be used""" raise NotImplementedError
[docs] def add_edges_from(self, ebunch, attr_dict=None, **attr): """add list of edges with same parameters :: c.add_edges_from([(0,1),(1,2)], data=[1,2]) .. seealso:: :meth:`add_edge` for details. """ super(CNOGraph, self).add_edges_from(ebunch, attr_dict=None, **attr) #for e in ebunch: # self.add_edge(e[0], e[1], attr_dict=attr_dict, **attr)
[docs] def add_nodes_from(self, nbunch, attr_dict=None, **attr): """Add a bunch of nodes :param list nbunch: list of nodes. Each node being a string. :param dict attr_dict: dictionary, optional (default= no attributes) Dictionary of edge attributes. Key/value pairs will update existing data associated with the edge. :param attr: keyword arguments, optional edge data (or labels or objects) can be assigned using keyword arguments. keywords provided will overwrite keys provided in the **attr_dict** parameter .. warning:: color, fillcolor, shape, style are automatically set. .. seealso:: :meth:`add_node` for details. """ super(CNOGraph, self).add_nodes_from(nbunch, attr_dict=None, **attr) #for n in nbunch: # self.add_node(n, attr_dict=attr_dict, **attr)
[docs] def remove_nodes_from(self, nbunch): """Removes a bunch of nodes .. warning:: need to be tests with and gates.""" super(CNOGraph, self).remove_nodes_from(nbunch)
def _get_compressable_nodes(self): compressables = [x for x in self.nodes() if self.is_compressable(x)] if self._compress_ands == True: return compressables else: return [x for x in compressables if "^" not in x] compressable_nodes = property(fget=_get_compressable_nodes, doc="Returns list of compressable nodes (Read-only).") def _ambiguous_multiedge(self, node): """Test whether the removal of a node may lead to multi edges ambiguity e.g., A-->B and A--| B """ edges = [(x[0], x[1], x[2]['link']) for x in self.edges(data=True)] for A in self.predecessors(node): for B in self.successors(node): link = self.edge[node][B]['link'] if link == "+": link = "-" elif link == "-": link = "+" if (A, B, link) in edges: return True return False
[docs] def is_compressable(self, node): """Returns True if the node can be compressed, False otherwise :param str node: a valid node name :return: boolean value Here are the rules for compression. The main idea is that a node can be removed if the boolean logic is preserved (i.e. truth table on remaining nodes is preserved). A node is compressable if it is not part of the stimuli, inhibitors, or signals specified in the MIDAS file. If a node has several outputs and inputs, it cannot be compressed. If a node has one input or one output, it may be compressed. However, we must check the following possible ambiguity that could be raised by the removal of the node: once removed, the output of the node may have multiple input edges with different types of inputs edges that has a truth table different from the original truth table. In such case, the node cannot be compressed. Finally, a node cannot be compressed if one input is also an output (e.g., cycle). .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot,show, title c = cnograph.CNOGraph() c.add_edge("a", "c", link="-") c.add_edge("b", "c", link="+") c.add_edge("c", "d", link="+") c.add_edge("b", "d", link="-") c.add_edge("d", "e", link="-") c.add_edge("e", "g", link="+") c.add_edge("g", "h", link="+") c.add_edge("h", "g", link="+") # multiple inputs/outputs are not removed c.add_edge("s1", "n1", link="+") c.add_edge("s2", "n1", link="+") c.add_edge("n1", "o1", link="+") c.add_edge("n1", "o2", link="+") c._stimuli = ["a", "b", "s1", "s2"] c._signals = ["d", "g", "o1", "o2"] subplot(1,2,1) c.plotdot(hold=True) title("Initial graph") c.compress() subplot(1,2,2) c.plotdot(hold=True) title("compressed graph") show() """ if node not in self.nodes(): msg = "node %s is not in the graph" % node raise ValueError(msg) # there is always a MIDAS file for now but we may remove it later on specialNodes = self.inhibitors + self.signals + self.stimuli notcompressable = [x for x in self.nodes() if x in specialNodes] if node in notcompressable: return False succs = self.successors(node) preds = self.predecessors(node) # if one input is an input, no compression if len(set(succs).intersection(set(preds))) > 0: self.logging.debug('skipped node (retroaction ? %s) ' % node) #print("%s case cycle input is in output" % node) return False # if no output or no input, can be compressed. # somehow this is redundant with NONC algorithm, which is not required # anymore. if len(self.successors(node)) == 0 or len(self.predecessors(node))==0: self.logging.debug('skipped node %s (input/output case ' % node) return True # if multiple input AND multiple output, no ambiguity: nothing to compress elif len(self.successors(node)) >1 and len(self.predecessors(node))>1: self.logging.debug('skipped node %s >1,>1 case ' % node) return False # if one input and several outputs OR several input and one ouptut, we # can compress. However, if the compressable node once removed is an # edge that already exists, then we will have multiedges, which is not # handled in a DIgraph. So, we cannot comrpess that particular case. # if one input only and one output only, no ambiguity: we can compress elif len(self.predecessors(node)) == 1 and len(self.successors(node))==1: ambiguous = self._ambiguous_multiedge(node) if ambiguous == True: self.logging.debug('%s could be compressed but ambiguity with existing edge so not removed ' % node) return False else: self.logging.debug('Add node %s =1,=1 to be removed ' % node) return True # one output but several input may be ambiguous. If output is inhibitor # and input contains an mix of inhibitor/activation then we can not # compress elif len(preds) > 1 and len(succs)==1: input_links = [self[p][node]['link'] for p in preds] output_links = self[node][succs[0]]['link'] if (output_links == "-") and len(set(input_links))>=2: self.logging.debug('skipped node %s output=inhibitor and ambiguous inputs' % node) return False else: ambiguous = self._ambiguous_multiedge(node) if ambiguous == True: self.logging.debug('%s could be compressed but ambiguity with existing edge so not removed ' % node) return False else: self.logging.debug('Add node %s >1,=1 case to be removed' % node) return True # one input and several ouptut, no ambiguity: we can compress elif len(preds) == 1 and len(succs)>1: ambiguous = self._ambiguous_multiedge(node) if ambiguous == True: self.logging.debug('%s could be compressed but ambiguity with existing edge so not removed ' % node) return False else: self.logging.debug('Add node %s =1,>1 case to be removed' % node) return True else: self.logging.debug('do not remove node %s' % node) return False
[docs] def relabel_nodes(self, mapping): """see :meth:`rename_node` """ return self.rename_node(mapping)
[docs] def rename_node(self, mapping): """Function to rename a node, while keeping all its attributes. :param dict mapping: a dictionary mapping old names (keys) to new names (values ) :return: new cnograph object if we take this example:: c = CNOGraph(); c.add_reaction("a=b"); c.add_reaction("a=c"); c.add_reaction("b=d"); c.add_reaction("c=d"); c.expand_and_gates() Here, an AND gate has been created. c.nodes() tells us that its name is "b^c=d". If we rename the node b to blong, the AND gate name is unchanged if we use the nx.relabel_nodes function. Visually, it is correct but internally, the "b^c=d" has no more meaning since the node "b" does not exist anymore. This may lead to further issues if we for instance split the node c:: c = nx.relabel_nodes(c, {"b": "blong"}) c.split_node("c", ["c1", "c2"]) This function calls relabel_node taking care of the AND nodes as well. .. warning:: this is not inplace modifications. """ for this in self._find_and_nodes(): gate = ANDGate(this) previous = gate.name[:] # rename the and gate if needed looping over the proposed mapping for oldname, newname in mapping.iteritems(): #if oldname in gate.get_lhs_species(): gate.rename_species(oldname, newname) # add to mapping if previous != gate.name: mapping.update({previous:gate.name}) c = nx.relabel_nodes(self, mapping) c._stimuli = self._stimuli[:] c._inhibitors = self._inhibitors[:] c._signals = self._signals[:] c._compressed = self._compressed[:] for old, new in mapping.iteritems(): print("reanming {} -> {}".format(old, new)) if old in c._stimuli: c._stimuli.append(new) c._stimuli.remove(old) if old in c._signals: c._signals.append(new) c._signals.remove(old) if old in c._inhibitors: c._inhibitors.append(new) c._inhibitors.remove(old) return c # need to copt with the and reactions if any
[docs] def centrality_eigenvector(self,max_iter=1000, tol=0.1): res = nx.eigenvector_centrality(self,max_iter,tol=tol) nx.set_node_attributes(self, 'eigenvector', res) nx.set_node_attributes(self, 'centrality_eigenvector', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest degree centrality %s %s", v,k) return res
[docs] def centrality_degree(self): """Compute the degree centrality for nodes. The degree centrality for a node v is the fraction of nodes it is connected to. :return: list of nodes with their degree centrality. It is also added to the list of attributes with the name "degree_centr" .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.centrality_degree() c.plotdot(node_attribute="centrality_degree") """ res = nx.degree_centrality(self) nx.set_node_attributes(self, 'degree', res) nx.set_node_attributes(self, 'centrality_degree', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest degree centrality %s %s", v,k) return res
[docs] def degree_histogram(self, show=True, normed=False): """Compute histogram of the node degree (and plots the histogram) .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.degree_histogram() """ degree = self.degree().values() Mdegree = max(degree) if show == True: pylab.clf() res = pylab.hist(degree, bins=range(0,Mdegree+1), align='left', rwidth=0.8, normed=normed) xlims = pylab.xlim() ylims = pylab.ylim() pylab.axis([0, xlims[1], ylims[0], ylims[1]*1.1]) pylab.grid() pylab.title("Degree distribution") return res
[docs] def centrality_closeness(self, **kargs): """Compute closeness centrality for nodes. Closeness centrality at a node is 1/average distance to all other nodes. :param v: node, optional Return only the value for node v :param str distance: string key, optional (default=None) Use specified edge key as edge distance. If True, use 'weight' as the edge key. :param bool normalized: optional If True (default) normalize by the graph size. :return: Dictionary of nodes with closeness centrality as the value. .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.centrality_closeness() c.plotdot(node_attribute="centrality_closeness") """ res = nx.centrality.closeness_centrality(self, **kargs) nx.set_node_attributes(self, 'closeness', res) nx.set_node_attributes(self, 'centrality_closeness', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest closeness centrality %s %s", v,k) return res
[docs] def centrality_betweeness(self, k=None, normalized=True, weight=None, endpoints=False, seed=None): r"""Compute the shortest-path betweeness centrality for nodes. Betweenness centrality of a node `v` is the sum of the fraction of all-pairs shortest paths that pass through `v`: .. math:: c_B(v) =\sum_{s,t \in V} \frac{\sigma(s, t|v)}{\sigma(s, t)} where :math:`V` is the set of nodes, :math:`\sigma(s, t)` is the number of shortest :math:`(s, t)`-paths, and :math:`\sigma(s, t|v)` is the number of those paths passing through some node :math:`v` other than :math:`s, t`. If :math:`s = t`, :math:`\sigma(s, t) = 1`, and if :math:`v \in {s, t}`, :math:`\sigma(s, t|v) = 0` . :param int k: (default=None) If k is not None use k node samples to estimate betweeness. The value of k <= n where n is the number of nodes in the graph. Higher values give better approximation. :param bool normalized: If True the betweeness values are normalized by :math:`2/((n-1)(n-2))` for graphs, and :math:`1/((n-1)(n-2))` for directed graphs where :math:`n` is the number of nodes in G. :param str weight: None or string, optional If None, all edge weights are considered equal. .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.centrality_betweeness() c.plotdot(node_attribute="centrality_betweeness") .. seealso:: networkx.centrality.centrality_betweeness """ res = nx.centrality.betweenness_centrality(self,k=k,normalized=normalized, weight=weight, endpoints=endpoints, seed=seed) nx.set_node_attributes(self, 'centrality_betweeness', res) nx.set_node_attributes(self, 'betweeness', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest betweeness centrality %s %s", v,k) return res
[docs] def export2gexf(self, filename): """Export into GEXF format :param str filename: This is the networkx implementation and requires the version 1.7 This format is quite rich and can be used in external software such as Gephi. .. warning:: color and labels are lost. information is stored as attributes.and should be as properties somehow. Examples: c.node['mkk7']['viz'] = {'color': {'a': 0.6, 'r': 239, 'b': 66,'g': 173}} """ from networkx.readwrite import write_gexf write_gexf(self, filename)
[docs] def export2sif(self, filename): """Export CNOGraph into a SIF file. Takes into account and gates. If a species called "A^B=C" is found, it is an AND gate that is encoded in a CSV file as:: A 1 and1 B 1 and1 and1 1 C :param str filename: .. todo:: could use SIF class instead to simplify the code """ andCounter = 0 fh = open(filename, "w") for edge in self.edges(data=True): n1 = edge[0] n2 = edge[1] if "^" not in n1 and "^" not in n2: link = edge[2]['link'] if link == "+": fh.write("%s 1 %s\n" % (n1,n2)) else: fh.write("%s -1 %s\n" % (n1,n2)) for node in self._find_and_nodes(): andCounter += 1 andNode = "and%s" % andCounter succs = self.successors(node) preds = self.predecessors(node) for succ in succs: link = self.edge[node][succ]['link'] if link == "+": fh.write("%s 1 %s\n" % (andNode,succ)) else: fh.write("%s -1 %s\n" % (andNode,succ)) for pred in preds: link = self.edge[pred][node]['link'] if link == "+": fh.write("%s 1 %s\n" % (pred,andNode)) else: fh.write("%s -1 %s\n" % (pred,andNode)) fh.close()
[docs] def export2SBMLQual(self, filename, modelName="CellNOpt_model"): """Export the topology into SBMLqual and save in a file This requires only the topology information (i.e. MIDAS content is ignored). .. seealso:: :meth:`cellnopt.core.sif.SIF.export2SBMLQual` """ s = SIF() for reac in self.reacID: s.add_reaction(reac) _res = s.export2SBMLQual(filename=filename, modelName=modelName)
[docs] def lookfor(self, specyName): """Prints information about a species If not found, try to find species by ignoring cases. """ #try to find the specy first: if specyName not in self.nodes(): print("did not find the requested specy") proposals = [node for node in self.nodes() if specyName.lower() in node.lower()] if len(proposals): print("try one of ") for p in proposals: print(proposals) else: print("predecessors") print(self.predecessors(specyName)) print("successors") print(self.successors(specyName))
[docs] def summary(self): """Plot information about the graph""" import networkx as nx flow = nx.flow_hierarchy(self) print("Flow hierarchy = %s (fraction of edges not participating in cycles)" % flow) print("Average degree = " + str(sum(self.degree().values())/float(len(self.nodes()))))
[docs] def merge_nodes(self, nodes, node): """Merge several nodes into a single one .. todo:: check that if links of the inputs or outputs are different, there is no ambiguity.. .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot c = CNOGraph() c.add_edge("AKT2", "B", link="+") c.add_edge("AKT1", "B", link="+") c.add_edge("A", "AKT2", link="+") c.add_edge("A", "AKT1", link="+") c.add_edge("C", "AKT1", link="+") subplot(1,2,1) c.plotdot(hold=True) c.merge_nodes(["AKT1", "AKT2"], "AKT") subplot(1,2,2) c.plotdot(hold=True) """ assert len(nodes)>1, "nodes must be a list of species" for n in nodes: if n not in self.nodes(): raise ValueError("%s not found in the graph !" % n) self.add_node(node) for n in nodes: for pred in self.predecessors(n): attrs = self.edge[pred][n] if "^" in pred: pred = self._rename_node_in_reaction(pred, n, node) self.add_reaction(pred) else: self.add_edge(pred, node, **attrs) for succ in self.successors(n): attrs = self.edge[n][succ] if "^" in succ: succ = self._rename_node_in_reaction(succ, n, node) self.add_reaction(succ) else: self.add_edge(node, succ, **attrs) for n in nodes: self.remove_node(n) # assume that all nodes are in signals if first one is in signal. # FIXME: not robust if nodes[0] in self._signals: for this in nodes: self._signals.remove(this) self._signals.append(node) if nodes[0] in self._stimuli: for this in nodes: self._stimuli.remove(this) self._stimuli.append(node) if nodes[0] in self._inhibitors: for this in nodes: self._inhibitors.remove(this) self._inhibitors.append(node)
[docs] def split_node(self, node, nodes): """ .. plot:: :include-source: :width: 50% from cellnopt.core import * from pylab import subplot c = CNOGraph() c.add_reaction("!A=C") c.add_reaction("B=C") c.add_reaction("!b1=B") c.add_reaction("b2=B") c.expand_and_gates() subplot(1,2,1) c.plotdot(hold=True) c.split_node("B", ["B1", "B2", "B3"]) subplot(1,2,2) c.plotdot(hold=True) """ for n in nodes: for pred in self.predecessors(node): attrs = self.edge[pred][node] if "^" in pred: pred = self._rename_node_in_reaction(pred, node, n) self.add_reaction(pred) else: self.add_edge(pred, n, **attrs) for succ in self.successors(node): attrs = self.edge[node][succ] # special case of the AND gates if "^" in succ: succ = self._rename_node_in_reaction(succ, node, n) self.add_reaction(succ) else: # normal case self.add_edge(n, succ, **attrs) self.remove_node(node) # remove AND gates as well: for this in self._find_and_nodes(): gate = ANDGate(this) if node in gate.get_lhs_species(): self.remove_node(this) if node in self._signals: self._signals.extend(nodes) self._signals.remove(node) if node in self._stimuli: self._stimuli.extend(nodes) self._stimuli.remove(node) if node in self._inhibitors: self._inhibitors.extend(nodes) self._inhibitors.remove(node)
def _rename_node_in_reaction(self, reaction, old, new): """This function rename a species within a reaction. It takes into account that the species may be inhibited or may be part of an AND reaction. """ lhs, rhs = reaction.split("=") # rename LHS taking ! and AND into account species = lhs.split("^") new_species = [] for name in species: if name.startswith("!"): name = name[1:] if name == old: name = new new_species.append("!"+name) elif name == old: new_species.append(new) else: new_species.append(name) if len(new_species) == 1: lhs = new_species else: lhs = "^".join(new_species) #rename RHS if needed if rhs == old: rhs = new new_reaction = "=".join([lhs,rhs]) return new_reaction # Repeats the compression until no further compression # can be performed (or the max number of compression cycles has been reached)
[docs] def recursive_compress(self, max_num_iter = 25): """Recursive compression. Sometimes, when networks are large and complex, calling the :meth:`compress` only once may not be enough to remove all compressable nodes. Calling this function guarantees that all compressable nodes are removed. """ #the nodes are sorted to obtain every time the same results: n = 0 while self.compressable_nodes and n <= max_num_iter: self.compress() n+=1 if self.compressable_nodes : print("Warning: There still are compressable nodes after {mni} compression cycles".format(mni=max_num_iter))
[docs] def sif(self): """Return a SIF instance corresponding to this graph .. warning:: need to fix the reacID attribute to get AND gates """ s = SIF() for r in self.reacID: s.add_reaction(r) return s
[docs] def findnonc(self): """Finds the Non-Observable and Non-Controllable nodes #. Non observable nodes are those that do not have a path to any measured species in the PKN #. Non controllable nodes are those that do not receive any information from a species that is perturbed in the data. Such nodes can be removed without affecting the readouts. :param G: a CNOGraph object :param stimuli: list of stimuli :param stimuli: list of signals :return: a list of names found in G that are NONC nodes .. doctest:: >>> from cellnopt.core import * >>> from cellnopt.core.nonc import findNONC >>> model = cnodata('PKN-ToyMMB.sif') >>> data = cnodata('MD-ToyMMB.csv') >>> c = CNOGraph(model, data) >>> namesNONC = c.nonc() :Details: Using a floyd Warshall algorithm to compute path between nodes in a directed graph, this class identifies the nodes that are not connected to any signals (Non Observable) and/or any stimuli (Non Controllable) excluding the signals and stimuli, which are kept whatever is the outcome of the FW algorithm. """ # some aliases from numpy import inf assert (self.stimuli!=None and self.signals!=None) dist = nx.algorithms.floyd_warshall(self) namesNONC = [] for node in self.nodes(): # search for paths from the species to the signals spe2sig = [(node, dist[node][s]) for s in self.signals if dist[node][s]!=inf] # and from the nstimuli to the species sti2spe = [(node, dist[s][node]) for s in self.stimuli if dist[s][node]!=inf] if len(spe2sig)==0 or len(sti2spe)==0: if node not in self.signals and node not in self.stimuli: namesNONC.append(node) namesNONC = list(set(namesNONC)) # required ? return namesNONC
[docs] def random_poisson_graph(self, n=10, mu=3, remove_unconnected=False): from scipy.stats import poisson z = [poisson.rvs(mu) for i in range(0,n)] G = nx.expected_degree_graph(z) self.clear() self.add_edges_from(G.edges(), link="+") if remove_unconnected==False: self.add_nodes_from(G.nodes())
[docs] def remove_self_loops(self, key=None): for e in self.edges(): if e[0]==e[1]: self.remove_edge(e[0], e[1], key=key)
[docs] def hcluster(self): """ .. plot:: :include-source: :width: 50% from cellnopt.core import * c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.hcluster() .. warning:: experimental """ from scipy.cluster import hierarchy from scipy.spatial import distance path_length=nx.all_pairs_shortest_path_length(self.to_undirected()) n = len(self.nodes()) distances=np.zeros((n,n)) nodes = self.nodes() for u,p in path_length.iteritems(): for v,d in p.iteritems(): distances[nodes.index(u)-1][nodes.index(v)-1] = d sd = distance.squareform(distances) hier = hierarchy.average(sd) pylab.clf(); hierarchy.dendrogram(hier) pylab.xticks(pylab.xticks()[0], nodes)
CNOGraph.plot.__func__.__doc__ = CNOGraph.plotdot.__doc__
[docs]class CNOGraphMultiEdges(CNOGraph, nx.MultiDiGraph): """A multiDigraph version of CNOGraph. .. warning:: experimental .. plot:: :width: 50% :include-source: >>> from cellnopt.core import * >>> c = cnograph.CNOGraphMultiEdges(cnodata("PKN-ToyPCB.sif"), cnodata("MD-ToyPCB.csv")) >>> c.add_edge("PAK", "Mek", link="-") >>> c.plot() .. plot:: :width: 50% :include-source: >>> from cellnopt.core import * >>> c2 = cnograph.CNOGraphMultiEdges() >>> c2.add_edge("A","B", link="+", edgecolor=.1) >>> c2.add_edge("A","B", link="+", edgecolor=.2) >>> c2.add_edge("A","B", link="-", edgecolor=.3) >>> c2.add_edge("A","B", link="-", edgecolor=.4) >>> c2.plot(edge_attribute="edgecolor", colorbar=True, cmap="spring") .. todo:: self.reacID attribute does not work """ def __init__(self, model=None, data=None, verbose=False, **kargs): super(CNOGraphMultiEdges, self).__init__(**kargs) c = CNOGraph(model, data) self.midas = c.midas for e in c.edges(data=True): self.add_edge(e[0], e[1], **e[2])
[docs] def reset_edge_attributes(self): """set all edge attributes to default attributes required to overwrite the cnograph method that do to handle the multigraph structure .. seealso:: :meth:`~cellnopt.core.cnograph.CNOGraph.set_default_edge_attributes` """ for edge in self.edges(): for key in self.edge[edge[0]][edge[1]].keys(): attrs = self.edge[edge[0]][edge[1]][key] attrs = self.set_default_edge_attributes(**attrs) self.edge[edge[0]][edge[1]][key] = attrs
def _set_edge_attribute_color(self, edge_attribute, cmap): """Need to overwrite the cnograph method to handle the multigraph structure""" import matplotlib cmap = matplotlib.cm.get_cmap(cmap) sm = matplotlib.cm.ScalarMappable( norm=matplotlib.colors.Normalize(vmin=0,vmax=1), cmap=cmap) M = max([self.edge[edge[0]][edge[1]][key][edge_attribute] for edge in self.edges() for key in self.edge[edge[0]][edge[1]].keys()]) for edge in self.edges(): for key in self.edge[edge[0]][edge[1]].keys(): value = self.edge[edge[0]][edge[1]][key][edge_attribute]/float(M) rgb = sm.to_rgba(value) colorHex = matplotlib.colors.rgb2hex(rgb) self.edge[edge[0]][edge[1]][key]['color'] = colorHex def _ambiguous_multiedge(self, node): """This is a multiedge so it is always ambiguous ? """ return True def is_compressable(self, node): return False def _get_compressable_nodes(self): return [] def _get_compressable(self): return [] compressable_nodes = property(_get_compressable)
[docs] def is_compressable(self, node): return False #def _get_compressable_nodes(self): # return [] #def _get_compressable(self): # return [] #compressable_nodes = property(_get_compressable)
[docs] def remove_edge(self, u, v, key=None): """Remove the edge between u and v. :param str u: node u :param str u: node v Calls :meth:`clean_orphan_ands` afterwards """ super(CNOGraph, self).remove_edge(u,v, key=key) #if "+" not in n: self.clean_orphan_ands()
def _set_edge_attribute_label(self, this, edge_attribute): for e in this.edges(): for key in self.edge[e[0]][e[1]].keys(): this.edge[e[0]][e[1]][key]['label'] = this.edge[e[0]][e[1]][key][edge_attribute]
class ANDGate(object): def __init__(self, name, verbose=False): self._name = None self.name = name[:] self.verbose = verbose def _get_name(self): return self._name def _set_name(self, name): if "=" not in name: raise ValueError("An AND reaction must contain the = character") lhs, rhs = name.split("=") if "^" not in lhs or "^" in rhs: raise ValueError("An AND reaction must contain the ^ character in the LHS e.g.: A^B=C") def get_lhs_species(self): species = self.name.split("=")[0] return species.split("^") def get_rhs_species(self): return self.name.split("=")[1] def rename_species(self, oldname, newname): if oldname not in self.name: if self.verbose: print("WARNING, %s not found" % oldname) return # first, let us check the RHS. easy to rename rhs = self.get_rhs_species() if rhs == oldname: rhs = newname[:] lhs_species = self.get_lhs_species() new_lhs = [species if species!=oldname else newname for species in lhs_species] new_lhs = "^".join(new_lhs) self.name = "=".join([new_lhs, rhs])