CellNOpt homepage|cellnopt 0.1.3 documentation

Source code for cno.io.cnograph

# -*- python -*-
#
#  This file is part of the CNO package
#
#  Copyright (c) 2012-2013 - EMBL-EBI
#
#  File author(s): Thomas Cokelaer (cokelaer@ebi.ac.uk)
#
#  Distributed under the GLPv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#
#  website: github.com/cellnopt/cellnopt
#
##############################################################################
""".. topic:: **One of the main data structures of cellnopt to manipulate networks**"""
from __future__ import print_function
import os
import copy
import tempfile
import itertools
import subprocess
import shutil
import json
import collections
from functools import wraps

import matplotlib
import pylab
import networkx as nx

from cno.misc.profiler import do_profile

try:
    import pygraphviz as gv
except ImportError:
    print("Warning:: Pygraphviz not found")
    pass

import numpy as np
from easydev import Logging, TempFile, DevTools

# cellnopt modules
from cno.io.sif import SIF
from cno.io.midas import XMIDAS
from cno.io.reactions import Reaction
from cno.misc import CNOError
import colormap

from cno.misc.profiler import do_profile
__all__ = ["CNOGraph", "CNOGraphAttributes"]



def modifier(func):
    @wraps(func)
    def wrapper(self, *args, **kargs):
        return func(self, *args, **kargs)
    return wrapper


class Attributes(dict):
    """Simple dictionary to handle attributes (nodes or eges)"""
    def __init__(self, color, **kargs):
        self['color'] = color
        for k,v in kargs.items():
            self[k] = v


class EdgeAttributes(Attributes):
    """Simple dictionary to handle edge attributes"""
    def __init__(self, penwidth=1, color="black", arrowhead="normal", **kargs):
        super(EdgeAttributes, self).__init__(color=color, **kargs)
        self['penwidth'] = penwidth
        self['arrowhead'] = arrowhead


class NodeAttributes(dict):
    """Simple dictionary to handle node attributes

    Used by the :class:`CNOGraph`.

    ::

        >>> a = Attributes(color="red", fillcolor="white")
        >>> assert a['color'] == "red"
        True

    """
    def __init__(self, color="black", fillcolor="white", shape="rectangle",
                 style="filled,bold", **kargs):
        """

        :param color: color of the border
        :param fillcolor: color inside the shape
        :param shape color inside the shape
        :param style: color inside the shape
        """
        super(NodeAttributes, self).__init__(color=color, **kargs)

        #self['color'] = color
        self['fillcolor'] = fillcolor
        self['shape'] = shape
        self['style'] = style


[docs]class CNOGraphAttributes(object): """ define attributes of the cnograp when calling the plotting function. """ def __init__(self): self.attributes = { 'activation': EdgeAttributes(color='black', arrowhead="normal"), 'inhibition': EdgeAttributes(color='red', arrowhead="tee"), 'stimuli': NodeAttributes(fillcolor='#9ACD32'), 'inhibitors': NodeAttributes(fillcolor='orangered'), 'signals': NodeAttributes(fillcolor='lightblue'), 'nonc': NodeAttributes( fillcolor='gray', style='diagonals, filled'), 'compressed': NodeAttributes(fillcolor="white",style='dashed', penwidth=2), 'others': NodeAttributes(fillcolor="white",style='filled,bold',penwidth=2), 'and': NodeAttributes(shape='circle',style='filled',width=.1, height=.1, fixedsize=True, label='') }
[docs] def keys(self): return self.attributes.keys()
def __getitem__(self, key): return self.attributes[key]
[docs]class CNOGraph(nx.DiGraph): """Data structure (Digraph) used to manipulate networks The networks can represent for instance a protein interaction network (PIN). CNOGraph is a data structure dedicated to the analysis of phosphorylation data within protein-protein interaction networks but can be used in a more general context. Note that CNOGraph inherits from the **directed graph** data structure of networkx. However, we impose links between nodes to be restricted to two types: * "+" for activation * "-" for inhibition. An empty instance can be created as follows:: c = CNOGraph() and edge can be added as follows:: c.add_edge("A", "B", link="+") c.add_edge("A", "C", link="-") An even simpler way is to add :class:`cno.io.reactions.Reaction`, which can be strings or instance of the Reaction class. The methods :meth:`add_node` and :meth:`add_edge` methods can be used to populate the graph. However, it is also possible to read a network stored in a file in :class:`cno.io.sif.SIF` format:: >>> from cno import CNOGraph, cnodata >>> pknmodel = cnodata("PKN-ToyPB.sif") >>> c = CNOGraph(pknmodel) The SIF model can be a filename, or an instance of :class:`~cno.io.sif.SIF`. Note for CellNOpt users that if **and** nodes are contained in the original SIF files, they are transformed int AND gates using "^" as the logical AND. Other imports are available, in particular :meth:`read_sbmlqual`. You can add or remove nodes/edges in the CNOGraph afterwards using NetworkX methods. When instanciating a CNOGraph instance, you can also populate data from a :class:`~cno.io.midas.XMIDAS` data instance or a MIDAS filename. MIDAS file contains measurements made on proteins in various experimental conditions (stimuli and inhibitors). The names of the simuli, inhibitors and signals are used to color the nodes in the plotting function. However, the data itself is not used. If you don't use any MIDAS file as input, you can set the stimuli/inhibitors/signals manually by filling the hidden attributes _stimuli, _signals and _inhibitors with list of nodes contained in the graph. .. rubric:: Node and Edge attributes The node and edge attributes can be accessed as follows (and changed):: >>> c.node['egf'] {'color': u'black', u'fillcolor': u'white', 'penwidth': 2, u'shape': u'rectangle', u'style': u'filled,bold'} >>> c.edge['egf']['egfr'] {u'arrowhead': u'normal', u'color': u'black', u'compressed': [], 'link': u'+', u'penwidth': 1} .. rubric:: OPERATORS CNOGraph is a data structure with useful operators (e.g. union). Note, however, that these operators are applied on the topology only (MIDAS information is ignored). For instance, you can add graphs with the **+** operator or check that there are identical :: c1 = CNOGraph() c1.add_reaction("A=B") c2 = CNOGraph() c2.add_reaction("A=C") c3 = c1 +c2 Let us illustrate the + operation with another example. Let us consider the following graphs: .. plot:: :include-source: :width: 30% from cno import CNOGraph c1 = CNOGraph() c1.add_edge("A","B", link="+") c1.add_edge("A","C", link="-") c1.plot() .. plot:: :include-source: :width: 30% from cno import CNOGraph c2 = CNOGraph() c2.add_edge("A","E", link="+") c2.add_edge("C","E", link="+") c2.plot() :: (c1+c2).plot() .. plot:: :width: 50% from cno import CNOGraph c1 = CNOGraph() c1.add_edge("A","B", link="+") c1.add_edge("A","C", link="-") c1.plot() c2 = CNOGraph() c2.add_edge("A","E", link="+") c2.add_edge("C","E", link="+") c2.plot() (c1+c2).plot() You can also substract a graph from another one:: c3 = c1 - c2 c3.nodes() The new graph should contains only one node (B). Additional functionalities such as :meth:`intersect`, :meth:`union` and :meth:`difference` can be used to see the difference between two graphs. .. rubric:: PLOTTING There are plotting functionalities to look at the graph, which are based on graphviz library. For instance, the :meth:`plot` function is quite flexible. If a MIDAS file is provided, the default behaviour follow CellNOptR convention, where stimuli are colored in green, inhibitors in red and measurements in blue: .. plot:: :include-source: :width: 50% from cno import CNOGraph, cnodata pknmodel = cnodata("PKN-ToyPB.sif") data = cnodata("MD-ToyPB.csv") c = CNOGraph(pknmodel, data) c.plot() If you did not use any MIDAS file as input parameter, you can still populate the hidden fields :attr:`_stimuli`, :attr:`_inhibitors`, :attr:`_signals`. You can also overwrite this behaviour by using the node_attribute parameter when calling :meth:`plot`. For instance, if you call :meth:`centrality_degree`, which computes and populate the node attribute **degree**. You can then call plot as follows to replace the default color: .. plot:: :include-source: :width: 50% from cno import CNOGraph, cnodata pknmodel = cnodata("PKN-ToyPB.sif") data = cnodata("MD-ToyPB.csv") c = CNOGraph(pknmodel, data) c.centrality_degree() c.plot(node_attribute="centrality_degree", colorbar) Similarly, you can tune the color of the edge attribute. See the :meth:`plot` for more details. .. seealso:: tutorial, user guide .. seealso:: The :class:`cno.io.xcnograph.XCNOGraph` provides many more tools for plotting various information on the graph structure. """ def __init__(self, model=None, data=None, verbose=False, **kargs): """.. rubric:: Constructor :param str model: optional network in SIF format. Can be the filename or instance of :class:`~cno.io.sif.SIF` :param data: optional data file in MIDAS format. Can be a filename or instance of :class:`~cno.io.midas.XMIDAS` :param bool verbose: :param str celltype: if a MIDAS file contains more that 1 celltype, you must provide a celltype name """ super(CNOGraph, self).__init__(**kargs) self.kargs = kargs.copy() self._graph_type = 'digraph' # This is a DIgraph attribute # self.graph is a DiGraph attribute that is overwritten sometinmes self.graph_options = { 'graph': { "title": "CNOGraph output generated by cno", "dpi":200, 'rankdir':'TB', # TB, LR, RL #'ordering': "out", 'splines':True, 'fontsize': 22, #'nodesep': .5, #'ranksep':.6, 'ratio':'auto', # numeric, 'fill','compress','expand','auto' # 0.8 is good for laptop screens. 2 is good for 'size': "15,15", # 'fontname': 'helvetica', }, 'node':{ #'width':1, #'fontsize':40, #'height':1, #'width':2, 'fontname':'bold' }, 'edge': { 'minlen':1, 'color':'black' }, 'ipython': { 'width': 500, 'rank_method': 'cno' } } # cellnoptR has always the same layout: #s.model.graph_options['graph']['nodesep'] = 0.5 #s.model.plot(rank_method='same') self.plot_options = { 'colorbar.orientation': 'horizontal', 'colorbar.shrink': 0.5, 'colorbar.fraction': 0.15, 'colorbar.pad': 0.1, } #: nodes and edges attributes. See :class:`CNOGraphAttributes` self.attributes = CNOGraphAttributes() self._set_default_node_attribute = 'cno' self.and_symbol = "^" self.or_symbol = "+" self._midas = None self._verbose = verbose self.logging = Logging(self.verbose) #: stimuli self._stimuli = [] #: inhibitors self._inhibitors = [] self._compressed = [] self._signals =[] self._nonc = None self._ranks = None # the model if hasattr(model, '__class__') and \ model.__class__.__name__ in ['CNOGraph', 'XCNOGraph']: for node in model.nodes(): self.add_node(str(node)) for edge in model.edges(data=True): if "link" in edge[2]: self.add_edge(str(edge[0]), str(edge[1]), link=edge[2]['link']) else: self.add_edge(str(edge[0]), str(edge[1]), link="+") self.set_default_node_attributes() # must be call if sif or midas modified. self.filename = None if model.midas is not None: self.midas = model.midas.copy() elif model is None: self.filename = None elif isinstance(model, str): if model.endswith('.sif'): self.read_sif(model) self.filename = model[:] elif model.endswith(".xml"): self.read_sbmlqual(model) self.filename = model[:] else: raise CNOError("Only filenames with .sif and .xml (SBML-qual) extension are recognised.") elif isinstance(model, SIF): self.read_sif(model) self.filename = 'undefined' # the data if self.midas is None: self.midas = data self._colormap = colormap.Colormap() self._changed = True def _set_verbose(self, verbose): self.logging.debugLevel = verbose self.midas.logging.debugLevel = verbose def _get_verbose(self): return self._verbose verbose = property(_get_verbose, _set_verbose) # SOME PROPERTIES def _get_midas(self): return self._midas def _set_midas(self, data): self._changed = True if isinstance(data, str): self._midas = XMIDAS(data, cellLine=self.kargs.get("cellLine", None), verbose=self.verbose) elif isinstance(data, XMIDAS): self._midas = copy.deepcopy(data) elif data is None: self._midas = data else: msg = "Incorrect data, Must a valid MIDAS file or instance of XMIDAS class {}" raise ValueError(msg.format(data)) self.check_data_compatibility() self.set_default_node_attributes() midas = property(fget=_get_midas, fset=_set_midas, doc="MIDAS Read/Write attribute.") def _get_stimuli(self): stimuli = list(self._stimuli[:]) if self.midas: stimuli += self.midas.names_stimuli[:] return stimuli stimuli = property(_get_stimuli, doc="list of stimuli found in the :attr:`midas` and hidden attribute :meth:`_stimuli`") def _get_inhibitors(self): inhibitors = list(self._inhibitors) if self.midas: inhibitors += self.midas.names_inhibitors[:] return inhibitors inhibitors = property(_get_inhibitors, doc="list of inhibitors found in the :attr:`midas` and hidden attribute :attr:`_inhibitors`") def _get_signals(self): signals = list(self._signals) # do not reference if self.midas: signals += list(self.midas.names_signals) return signals signals = property(_get_signals, doc="list of signals found in the :attr:`midas` and hidden attribute :meth:`_signals`") # METHODS
[docs] def read_sif(self, model): """If the SIF file changes, we need to rebuild the graph.""" # takes the SIF input file and build up the CNOGraph. remove all nodes # before self.clear() self.logging.debug("Reading the model") #if isinstance(model, (str, unicode)): if isinstance(model, (str)): sif = SIF(model) elif isinstance(model, SIF): sif = model else: raise ValueError("The sif input must be a filename to a SIF file or an instance of the SIF class") # add all reactions self.add_reactions(sif.reactions) # now, we need to set the attributes, only if we have a cnolist, # otherwise color is the default (white) self.set_default_node_attributes() # must be call if sif or midas modified. self.logging.debug("Model loaded")
def _add_simple_reaction(self, reac, node_dict=None, edge_dict=None): """A=B or !A=B""" #reac = Reaction(reac) # validate the reaction #reac = reac.name self._changed = True this = self.attributes.attributes['others'] if node_dict: this.update(node_dict) node_dict = this lhs, rhs = reac.split("=", 1) if rhs == "": self.add_node(lhs, attr_dict=node_dict) else: if lhs.startswith("!"): link = "-" lhs = lhs[1:] else: link = "+" if self.has_edge(lhs, rhs): if self[lhs][rhs]['link'] == link: self.logging.info("skip existing reactions %s %s %s" % (lhs, link, rhs)) else: self.add_edge(lhs, rhs, attr_dict=edge_dict, link=link) else: self.add_edge(lhs,rhs, attr_dict=edge_dict, link=link) self._changed = True
[docs] def add_reactions(self, reactions, node_dict=None, edge_dict=None): self._changed = True for reac in reactions: self.add_reaction(reac, node_dict=node_dict, edge_dict=edge_dict)
[docs] def is_reaction_in(self, reaction): """Return boolean with present of a reaction in the network :param reaction: a valid reaction .. todo:: Could be a string of reaction """ reaction = Reaction(reaction) if reaction in self.reactions: return True else: return False
@modifier
[docs] def add_reaction(self, reac, node_dict=None, edge_dict=None): """Add nodes and edges given a reaction :param str reac: a valid reaction. See below for examples Here are some valid reactions that includes NOT, AND and OR gates. + is an OR and ^ character is an AND gate:: >>> s.add_reaction("A=B") >>> s.add_reaction("A+B=C") >>> s.add_reaction("A^C=E") >>> s.add_reaction("!F+G=H") .. plot:: :width: 50% :include-source: from cno import CNOGraph c = CNOGraph() c.add_reaction("a+b^c+e+d^h=Z") c.plot() .. warning:: component of AND gates are ordered alphabetically. """ # add the nodes first so that the attributes are ste properly # make sure that (1) there is no extra spaces and (2) this is a string, not # a unicode reac = Reaction(str(reac.strip())) # no need to sort the reaction here since we will split the ORs and sort the # AND gates. # make sure that nodes are created with the default (or use attributes) # before creating the edges. for node in reac.species: self.add_node(node, attr_dict=node_dict) # if there is an OR gate, easy, just need to add simple reactions # A+!B=C is split into A=C and !B=C for this_lhs in reac.lhs.split("+"): # + has priority upon ^ unlike in maths so we can split with + # A+B^C^D+E=C means 3 reactions: A=C, E=C and B^C^D=C if self.isand(this_lhs) is False: name = this_lhs + "=" + reac.rhs self._add_simple_reaction(name, node_dict=node_dict, edge_dict=edge_dict) else: and_gate_name = this_lhs + "=" + reac.rhs reac = Reaction(and_gate_name) reac.sort() # and gates need a little bit more work # the and gate to the output self.add_edge(reac.name, reac.rhs, attr_dict=edge_dict, link="+") # now the inputs to the and gate for this in this_lhs.split(self.and_symbol): name = this + "=" + reac.name self._add_simple_reaction(name, node_dict=node_dict, edge_dict=edge_dict) self._changed = True #@do_profile()
[docs] def get_default_edge_attributes(self, **attr): #if "compressed" not in attr.keys(): # attr["compressed"] = [] link = attr.get("link", "+") if link == '+': attr['color'] = 'black' attr['arrowhead'] = 'normal' attr['penwidth'] = attr.get('penwidth', 1) else: attr['color'] = 'red' attr['arrowhead'] = 'tee' attr['penwidth'] = attr.get('penwidth', 1) return attr
[docs] def reset_edge_attributes(self): """set all edge attributes to default attributes .. seealso:: :meth:`get_default_edge_attribute` if we set an edge label, which is an AND ^, then plot fails in this function c.edge["alpha^NaCl=HOG1"]['label'] = "?" """ for edge in self.edges(): attrs = self.edge[edge[0]][edge[1]] attrs = self.get_default_edge_attributes(**attrs) self.edge[edge[0]][edge[1]] = attrs
[docs] def add_edges_from(self, ebunch, attr_dict=None, **attr): self._changed = True for edge in ebunch: self.add_edge(edge[0], edge[1], attr_dict=attr_dict, **attr) #@do_profile()
[docs] def add_edge(self, u, v, attr_dict=None, **attr): """adds an edge between node u and v. :param str u: source node :param str v: target node :param str link: compulsary keyword. must be "+" or "-" :param dict attr_dict: dictionary, optional (default= no attributes) Dictionary of edge attributes. Key/value pairs will update existing data associated with the edge. :param attr: keyword arguments, optional edge data (or labels or objects) can be assigned using keyword arguments. keywords provided will overwrite keys provided in the **attr_dict** parameter .. warning:: color, penwidth, arrowhead keywords are populated according to the value of the link. * If link="+", then edge is black and arrowhead is normal. * If link="-", then edge is red and arrowhead is a tee .. plot:: :include-source: :width: 50% from cno import CNOGraph c = CNOGraph() c.add_edge("A","B",link="+") c.add_edge("A","C",link="-") c.add_edge("C","D",link="+", mycolor="blue") c.add_edge("C","E",link="+", data=[1,2,3]) If you want multiple edges, use add_reaction() method. c.add_reaction("A+B+C=D") equivalent to :: c.add_edge("A", "D", link="+") c.add_edge("B", "D", link="+") c.add_edge("C", "D", link="+") Attributes on the edges can be provided using the parameters **attr_dict** (a dictionary) and/or ****attr**, which is a list of key/value pairs. The latter will overwrite the key/value pairs contained in the dictionary. Consider this example:: c = CNOGraph() c.add_edge("a", "c", attr_dict={"k":1, "data":[0,1,2]}, link="+", k=3) c.edges(data=True) [('a', 'c', {'arrowhead': 'normal', 'color': 'black', 'compressed': [], 'k':3 'link': '+', 'penwidth': 1})] The field "k" in the dictionary (attr_dict) is set to 1. However, it is also provided as an argument but with the value 3. The latter is the one used to populate the edge attributes, which can be checked by printing the data of the edge (c.edges(data=True()) .. seealso:: special attributes are automatically set by :meth:`get_default_edge_attributes`. the color of the edge is black if link is set to "+" and red otherwie. """ # cast u to str to search for + sign if "+" in str(u): lhs = u.split("+") for x in lhs: if x.startswith("!"): attr["link"] = "-" attr['color'] = 'red' attr['arrowhead'] = 'tee' super(CNOGraph, self).add_edge(x[1:], v, attr_dict, **attr) else: attr["link"] = "+" super(CNOGraph, self).add_edge(x, v, attr_dict, **attr) else: attr = self.get_default_edge_attributes(**attr) super(CNOGraph, self).add_edge(u, v, attr_dict, **attr)
[docs] def clear(self): """Remove nodes and edges and MIDAS instance""" super(CNOGraph, self).clear() self._changed = True self.midas = None self._stimuli = [] self._signals = [] self._inhibitors = [] self._ranks = None
@modifier
[docs] def clean_orphan_ands(self): """Remove AND gates that are not AND gates anymore When removing an edge or a node, AND gates may not be valid anymore either because the output does not exists or there is a single input. This function is called when :meth:`remove_node` or :meth:`remove_edge` are called. However, if you manipulate the nodes/edges manually you may need to call this function afterwards. """ self._changed = True for node in self._find_and_nodes(): if len(self.successors(node))==0 or len(self.predecessors(node))<=1: self.remove_node(node) continue
[docs] def check_data_compatibility(self): """When setting a MIDAS file, need to check that it is compatible with the graph, i.e. species are found in the model.""" if self.midas: msg = "The %s %s was found in the MIDAS file but is " msg += "not present in the model. Adding that node to your model" for x in self.midas.names_cues: if x not in self.nodes(): self.add_node(x) self.logging.warning(msg % ('cues', x)) for x in self.midas.names_signals: if x not in self.nodes(): self.add_node(x) self.logging.warning(msg % ('signals', x))
@modifier
[docs] def remove_and_gates(self): """Remove the AND nodes added by :meth:`expand_and_gates`""" self._changed = True for n in self._find_and_nodes(): self.remove_node(n)
def __eq__(self, other): # we must look at the data to figure out the link + or - but should ignore # all other keys edges1 = sorted(self.edges(data=True)) edges2 = sorted(other.edges(data=True)) edges1 = [(e[0], e[1], {'link':e[2]['link']}) for e in edges1] edges2 = [(e[0], e[1], {'link':e[2]['link']}) for e in edges2] res = edges1 == edges2 return res def __add__(self, other): """allows a+b operation combines the _inhibitors, _signals, _stimuli but keep only the first midas file ! """ G = self.copy() G.add_nodes_from(other.nodes(data=True)) edges = other.edges(data=True) for e1,e2,d in edges: G.add_edge(e1,e2,None, **d) G._inhibitors += other._inhibitors G._signals += other._signals G._stimuli += other._stimuli # TODO: merge the MIDAS files. ? return G def __sub__(self, other): G = self.copy() G.remove_nodes_from([n for n in G if n in other.nodes()]) return G def __str__(self): nodes = len([x for x in self.nodes() if self.and_symbol not in str(x)]) andnodes = len([x for x in self.nodes() if self.and_symbol in str(x)]) msg = "The model contains %s nodes (and %s AND node)\n" % (nodes, andnodes) self.logging.warning("Edge counting valid only if and node have only 2 inputs") edges = len([e for e in self.edges() if self.and_symbol not in str(e[0]) and self.and_symbol not in str(e[1])]) andedges = len([e for e in self.edges() if self.and_symbol in str(e[0]) or self.and_symbol in str(e[1])])/3 msg += "%s Hyperedges found (%s+%s) \n" % (edges+andedges, edges, andedges) return msg #def __rsub__(self, other): # self.remove_nodes_from([n for n in self if n in other.nodes()])
[docs] def union(self, other): """Return graph with elements from this instance and the input graph. .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "B", link="+") c1.add_edge("A", "C", link="-") c1.add_edge("C", "E", link="+") subplot(1,3,1) title(r"graph $C_1$") c1.plot(hold=True) c2 = CNOGraph() c2.add_edge("A", "B", link="+") c2.add_edge("B", "D", link="+") c2.add_edge("B", "F", link="+") subplot(1,3,2) c2.plot(hold=True) title(r"graph $C_2$") c3 = c1.union(c2) subplot(1,3,3) c3.plot(hold=True) title(r"graph $C_3 = C_1 \cup C_2$") """ c = self + other return c
[docs] def difference(self, other): """Return a CNOGraph instance that is the difference with the input graph (i.e. all elements that are in this set but not the others.) .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "B", link="+") c1.add_edge("A", "C", link="-") c1.add_edge("C", "E", link="+") subplot(1,3,1) title("graph C1") c1.plot(hold=True) c2 = CNOGraph() c2.add_edge("A", "B", link="+") c2.add_edge("B", "D", link="+") c2.add_edge("B", "F", link="+") subplot(1,3,2) c2.plot(hold=True) title("graph C2") c3 = c1.difference(c2) subplot(1,3,3) c3.plot(hold=True) title("graph C3=C1-C2") .. note:: this method should be equivalent to the - operator. So c1-c2 == c1.difference(c2) """ G = self.copy() G.remove_nodes_from([n for n in G if n in other.nodes()]) return G
[docs] def intersect(self, other): """Return a graph with only nodes found in "other" graph. .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "B", link="+") c1.add_edge("A", "C", link="-") c1.add_edge("C", "E", link="+") subplot(1,3,1) title(r"graph $C_1$") c1.plot(hold=True) c2 = CNOGraph() c2.add_edge("A", "B", link="+") c2.add_edge("B", "D", link="+") c2.add_edge("B", "F", link="+") subplot(1,3,2) c2.plot(hold=True) title(r"graph $C_2$") c3 = c1.intersect(c2) subplot(1,3,3) c3.plot(hold=True) title(r"graph $C_3 = C_1 \cap C_2$") """ G = self.copy() G.remove_nodes_from([n for n in G if n not in other]) return G
[docs] def draw(self, prog="dot", attribute="fillcolor", hold=False, **kargs): """Draw the network using matplotlib. Not exactly what we want but could be useful. :param str prog: one of the graphviz program (default dot) :param bool hold: hold previous plot (default is False) :param str attribute: attribute to use to color the nodes (default is "fillcolor"). :param node_size: default 1200 :param width: default 2 Uses the fillcolor attribute of the nodes Uses the link attribute of the edges .. seealso:: :meth:`plot` that is dedicated to this kind of plot using graphviz """ self.logging.warning("Not for production. Use plot() instead") pos = nx.drawing.graphviz_layout(self, prog=prog) node_size = kargs.get('node_size', 1200) kargs['node_size'] = node_size width = kargs.get('width', 2) kargs['width'] = width # node attributes nodes = sorted(self.nodes()) node_colors = [self.node[x][attribute] if attribute in self.node[x].keys() else "gray" for x in nodes] # edge attributes edges = self.edges(data=True) colors = {'-':'red', '+':'black'} edge_colors = [colors[x[2]['link']] for x in edges] nx.draw(self, prog=prog, hold=hold, nodelist=nodes, edge_color=edge_colors, node_color=node_colors, pos=pos, **kargs)
def _check_dot_prog(self, prog): DevTools().check_param_in_list(prog, ["twopi", "gvcolor", "wc", "ccomps", "tred", "sccmap", "fdp", "circo", "neato", "acyclic", "nop", "gvpr", "dot", "sfdp"]) def _get_cmap(self, cmap=None): if cmap == "heat": cmap = self._colormap.get_cmap_heat_r() elif cmap == "green": cmap = self._colormap.get_cmap_red_green() else: try: cmap = colormap.cmap_builder(cmap) except: pass # already a valid cmap ? return cmap
[docs] def plot(self, prog="dot", viewer="pylab", hold=False, show=True, filename=None, node_attribute=None, edge_attribute=None, cmap='heat', colorbar=False, remove_dot=True, normalise_cmap=True, edge_attribute_labels=True, rank_method='inout' ): """plotting graph using dot program (graphviz) and networkx By default, a temporary file is created to hold the image created by graphviz, which is them shown using pylab. You can choose not to see the image (show=False) and to save it in a local file instead (set the filename). The output format is PNG. You can play with networkx.write_dot to save the dot and create the SVG yourself. :param str prog: the graphviz layout algorithm (default is dot) :param viewer: pylab :param bool show: show the plot (True by default) :param bool remove_dot: if True, remove the temporary dot file. :param edge_attribute_labels: is True, if the label are available, show them. otherwise, if edge_attribute is provided, set lael as the edge_attribute :param rank_method: If none, let graphviz do the job. Issue is that (i) input stimuli may not be aligned and output neither. The rank_method set to **cno** constraints the stimuli and measured species that are sinks all others are free. The **same** constraint all nodes with same rank to be in the same subgraph. Additional attributes on the graph itself can be set up by populating the graph attribute with a dictionary called "graph":: c.graph['graph'] = {"splines":True, "size":(20,20), "dpi":200} Useful other options are:: c.edge["tnfa"]["tnfr"]["penwidth"] = 3 c.edge["tnfa"]["tnfr"]["label"] = " 5" If you use edge_attribute and show_edge_labels, label are replaced by the content of edge_attribute. If you still want differnt labels, you must stet show_label_edge to False and set the label attribute manually :: c = cnograph.CNOGraph() c.add_reaction("A=C") c.add_reaction("B=C") c.edge['A']['C']['measure'] = 0.5 c.edge['B']['C']['measure'] = 0.1 c.expand_and_gates() c.edge['A']['C']['label'] = "0.5 seconds" # compare this that shows only one user-defined label c.plot() # with that show all labels c.plot(edge_attribute="whatever", edge_attribute_labels=False) See the graphviz homepage documentation for more options. :: c.plot(filename='test.svg', viewer='yout_favorite_viewer', remove_dot=False, rank_method='cno') .. note:: edge attribute in CNOGraph (Directed Graph) are not coded in the same way in CNOGraphMultiEdges (Multi Directed Graph). So, this function does not work for MultiGraph .. todo:: use same colorbar as in midas. rigtht now; the vmax is not correct. .. todo:: precision on edge_attribute to 2 digits. if filename provided with extension different from png, pylab must be able to read the image. If not, you should set viewer to something else. c.plot() # uses default layout, with PNG format viewed with pylab c.plot(rank_method='same') # uses ranks of each node with PNG format viewed with pylab c.plot(rank_method='cno', filename='test.svg', viewer='browse') # uses browse (defautl viewer) c.plot(rank_method='cno', filename='test.svg', viewer='browse') """ # graph is a DiGraph attribute # that is sometimes replaced by {} inside networkx so we need to overwrite it here # each time we want to plot the graph. if len(self) == 0: self.logging.error("empty graph, nothing to plot") return if rank_method: assert rank_method in ['inout', 'all', 'cno'] self._check_dot_prog(prog) # Get the default/user attributes for the graph/nodes/edges for graphviz # FIXME is this still required ? #self.graph = self.graph_options.copy() # Set the colormap cmap = self._get_cmap(cmap) # update the node attributes if required with default color # required is we manipualte the _signals, _inbitors and so on self.set_default_node_attributes() # or ues the requried node attribute. M = 1 if node_attribute is not None: # TODO check that it exists # cmap = matplotlib.cm.get_cmap(cmap) sm = matplotlib.cm.ScalarMappable( norm = matplotlib.colors.Normalize(vmin=0, vmax=1), cmap=cmap) # node[0] is the name, node[1] is the data data = [node[1][node_attribute] for node in self.nodes(data=True) if node_attribute in node[1].keys()] if normalise_cmap is True: M = max(data) # color could be encoded as values between 0 and 1 # or hexa. Default to 1. If not all are provided, # no errors raised. for node in self.nodes(): # default self.node[node]['fillcolor'] = "#FFFFFF" try: value = self.node[node][node_attribute]/float(M) rgb = sm.to_rgba(value) colorHex = matplotlib.colors.rgb2hex(rgb) self.node[node]['fillcolor'] = colorHex except: try: color = self.node[node][node_attribute] self.node[node]['fillcolor'] = colormap.Color(color).hex except: pass # update the edge attribute if edge_attribute: M = self._set_edge_attribute_color(edge_attribute, cmap) # set edge with edge_attribute set to 0 as invisible edges reactions = [self.edge2reaction(edge) for edge in self.edges(data=True) if edge[2][edge_attribute] > 0] self.set_edge_visibility_from_reactions(reactions) # create temp files # FIXME we create png here ?? do we use outfile ? infile = tempfile.NamedTemporaryFile(suffix=".dot", delete=False) if filename is None: outfile = tempfile.NamedTemporaryFile(suffix=".png", delete=False) filename = outfile.name # Some nodes may belong to 2 colors. Creating subgraph is one way to go # around. Having graphviz 2.30 we could use striped node. if node_attribute is None: for node in self.nodes(): if node in self.signals and node in self.inhibitors: self.node[node]['style'] = "diagonals,filled" self.node[node]['color'] = "red" self.node[node]['fillcolor'] = "lightblue" if node in self.stimuli and node in self.inhibitors: self.node[node]['style'] = "diagonals,filled" self.node[node]['color'] = "red" self.node[node]['fillcolor'] = "#9ACD32" # to not change the current graph, let us copy it # FIXME we use 'this' variable and use it for edges # but not for the nodes... why ? this = self.copy() if edge_attribute_labels and edge_attribute is not None: self._set_edge_attribute_label(this, edge_attribute) count = 0 ret = -1 while count < 10: # this is a hack for graphviz 2.30 H = self._get_ranked_agraph(rank_method) H.write(infile.name) frmt = os.path.splitext(filename)[1][1:] try: if filename.endswith('svg') and 'dpi' in H.graph_attr.keys(): del H.graph_attr['dpi'] if rank_method != 'cno': H.draw(path=filename, prog=prog, format=frmt) else: H.to_dot(infile.name, frmt=frmt) args=[" -T"+frmt, infile.name] args=' '.join(args) data = H._run_prog(prog, args) fh = open(filename, "w") fh.write(data) fh.close() count = 1000 ret = 0 except Exception as err: print(err.message) self.logging.warning("%s program failed. Trying again" % prog) count += 1 if ret !=0: if rank_method is not None: self.logging.warning("%s program failed to create image" % prog) H = self._get_ranked_agraph('cno') #H = nx.to_agraph(this) if filename.endswith('svg') and 'dpi' in H.graph_attr.keys(): del H.graph_attr['dpi'] frmt = os.path.splitext(filename)[1][1:] #H.draw(path=filename, prog=prog, format=frmt) H.to_dot(infile.name, frmt=frmt) args=[" -T"+frmt, infile.name] args=' '.join(args) #print(os.path.exists(infile.name)) data = H._run_prog(prog, args) fh = open(filename, "w") fh.write(data) # Here is the visualisation only iif image was created if viewer=="pylab" and show is True and frmt!='svg': if hold == False: if hold is False: pylab.clf() f = pylab.gcf() f.set_facecolor("white") a = f.add_axes([0.05,0.04,0.8,0.9]) a.imshow(pylab.imread(filename)) a.axis('off') else: a = pylab.imshow(pylab.imread(filename)) pylab.axis('off') if colorbar: cbar = pylab.linspace(0, 1, 100) e = f.add_axes([.86, .05, .05, .9]) e.pcolor(pylab.array([cbar, cbar]).transpose(), cmap=cmap, vmin=0, vmax=1); e.yaxis.tick_right() e.set_xticks([],[]) e.set_yticks([0,20,40,60,80,100]) if normalise_cmap is True: # TODO precision for small number ?? from easydev.tools import precision e.set_yticklabels([precision(x,3) for x in pylab.linspace(0, M, 6)]) else: e.set_yticklabels([0, 0.2, 0.4, 0.6, 0.8, 1]) elif show is True: try: if viewer=='pylab': viewer = 'browse' subprocess.call("%s %s &" % (viewer, filename), shell=True) except: # for MAC users ? if viewer!='pylab': subprocess.call("open -a Preview %s &" % (viewer, filename), shell=True) if remove_dot == True: infile.delete = True infile.close() else: print("Creating dot file named 'model.dot'") shutil.move(infile.name, "model.dot") infile.close() if filename == False: outfile.delete = True outfile.close() if node_attribute is not None: self.set_default_node_attributes() #if show == True: # try: # from biokit.dev.mpl_focus import ZoomPan # ax = pylab.gca() # zp = ZoomPan() # _ = zp.zoom_factory(ax, base_scale = 1.2) # _ = zp.pan_factory(ax) # except: # pass
def _repr_png_(self): """Returns an Image for display in an IPython console""" fh = TempFile(suffix='.png') self.plot(show=False, filename=fh.name, rank_method=self.graph_options['ipython']['rank_method']) return fh.name def _repr_svg_(self): """Returns an Image for display in an IPython console""" from IPython.display import SVG fh = TempFile(suffix='.svg') self.plot(show=False, filename=fh.name) return fh.name @property
[docs] def png(self): from IPython.display import Image data = Image(self._repr_png_(), embed=True, width=self.graph_options['ipython']['width']) return data
@property
[docs] def svg(self): from IPython.display import SVG data = SVG(self._repr_svg_() ) return data
@property def _ipython_display_(self): from IPython.display import Image data = Image(self._repr_png_(), embed=True, width=200) return data def _plot_rmse_fromR(self, filename, F=.3, scale=2, col=None): """ filename should be a CSV with first column being node names and second column the values. This function populates the node attribute "mse", which should be the average RMSE over all experiment for each species. :param col: if col is None, average all columns c.plot() """ import pandas as pd df = pd.read_csv(filename, index_col=0, header=None) if col==None: df = df.mean(axis=1) elif col in df.columns: df = df[col] else: print("Invalid column provided. Use one of {}".format(df.columns)) for this in self.nodes(): if this in self.signals: mse = df.ix[this] #.values[0] self.node[this]['mse'] = (1-(mse/F)**scale) self.node[this]['label'] = this+"\n"+str(int(mse*1000)/1000.) else: self.node[this]['mse'] = 1 cm = colormap.Colormap() self.plot(node_attribute="mse", cmap=cm.get_cmap_heat()) def _set_edge_attribute_label(self, this, edge_attribute): for e in this.edges(): this.edge[e[0]][e[1]]['label'] = this.edge[e[0]][e[1]][edge_attribute] def _set_edge_attribute_color(self, edge_attribute, cmap): import matplotlib cmap = matplotlib.cm.get_cmap(cmap) sm = matplotlib.cm.ScalarMappable( norm=matplotlib.colors.Normalize(vmin=0, vmax=1), cmap=cmap) # normalise if needed only values = [self.edge[edge[0]][edge[1]][edge_attribute] for edge in self.edges()] if max(values)>1: M = max([self.edge[edge[0]][edge[1]][edge_attribute] for edge in self.edges()]) else: M = 1. for edge in self.edges(): value = self.edge[edge[0]][edge[1]][edge_attribute] / M rgb = sm.to_rgba(value) colorHex = matplotlib.colors.rgb2hex(rgb) self.edge[edge[0]][edge[1]]['color'] = colorHex return M def _get_ranked_agraph(self, rank_method=None): """and gates should have intermediate ranks""" if rank_method == 'cno': H = AGraphCNO(self) else: H = nx.to_agraph(self) for k, v in self.graph_options['graph'].items(): H.graph_attr[k] = v for k, v in self.graph_options['edge'].items(): H.edge_attr[k] = v for k, v in self.graph_options['node'].items(): H.node_attr[k] = v if rank_method is None: return H if self.midas is None: return H if rank_method == 'cno': return H # order the graph for ranks allranks = self.get_same_rank() # this has been checkd on MMB to and seems correct ranks = {} M = max(allranks.keys()) for k, v in allranks.items(): if rank_method in ['inout', 'all']: #H.strict = True ranks[k] = sorted([x for x in v if '=' not in x], key=lambda x: x.lower()) else: ranks[k] = [x for x in v if '=' not in x] selfloops = [x[1] for x in self.selfloop_edges()] # Note: if name is set to "cluster"+name, black box is added # Group by cluster with same ranks # Note2 that subgraph should be added before the invisiable edges. # The inverse resets the attribute e.g., style="invis" for rank in ranks.keys(): name = str(rank) # may be numbers if rank == 0: # label will be used if name == 'cluster_source' species = [x for x in ranks[rank] if x not in selfloops] aa = H.add_subgraph(species, rank='source', name='source', label='stimuli') if len(species)>=2: for i, node1 in enumerate(species[0:-1]): node2 = species[i+1] if self._graph_type != 'multigraph': aa.add_edge(node1, node2, style="invis") elif rank == M: species = sorted([x for x in ranks[rank] if x not in selfloops]) aa = H.add_subgraph(species, name="sink", rank='sink') if len(species)>=2: for i, node1 in enumerate(species[0:-1]): node2 = species[i+1] if self._graph_type != 'multigraph': aa.add_edge(node1, node2, style="invis") else: if rank_method in ["all", 'cnor']: # something funky here with self loop # self loop should be ignored here species = sorted([x for x in ranks[rank] if x not in selfloops]) H.add_subgraph(species, name=name, rank='all') return H def _get_nonc(self): if self._nonc is None: nonc = self.findnonc() self._nonc = nonc return self._nonc nonc = property(fget=_get_nonc, doc="Returns list of Non observable and non controlable nodes (Read-only).") def _get_reactions(self): # todo: could use edge2reactions # tood: sort reactions inside SIF sif = self.to_sif() return sorted(sif.reactions) reactions = property(_get_reactions, doc="return the reactions (edges)") def _get_namesSpecies(self): nodes = self.nodes() nodes = [x for x in nodes if "+" not in x and "=" not in x] return sorted(nodes) species = property(fget=_get_namesSpecies, doc="Return sorted list of species (ignoring and gates) Read-only attribute.") #@do_profile()
[docs] def swap_edges(self, nswap=1, inplace=True, self_loop=False): """Swap two edges in the graph while keeping the node degrees fixed. A double-edge swap two randomly chosen edges u-v and x-y and creates the new edges u-x and v-y:: u v u v | | becomes | | x y y x If either the edge u-x or v-y already exist no swap is performed and another attempt is made to find a suitable edge pair. :param int nswap: number of swaps to perform (Defaults to 1) :return: nothing .. warning:: the graph is modified in place. .. warning:: and gates are removed at the beginning since they do not make sense with the new topolgy. They can easily be added back a proposal swap is ignored in 3 cases: #. if the summation of in_degree is changed #. if the summation of out_degree is changed #. if resulting graph is disconnected .. note:: what about self loop ? if proposed, there are ignored except if required to be kept """ self._changed = True self.remove_and_gates() Ninh = [x[2]["link"] for x in self.edges(data=True)].count('-') I = sum(self.in_degree().values()) O = sum(self.out_degree().values()) # find 2 nodes that have at least one successor count = 0 trials = 0 status = {'and':0, 'selfloop':0, 'indegree':0, 'outdegree':0, 'unconnected':0} if inplace is False: local = self.copy() while count < nswap and trials<nswap*5: trials += 1 if inplace is False: edges = local.edges() else: edges = self.edges() np.random.shuffle(edges) e1, e2 = edges[0:2] # ignore self loop: if (e1[0] == e2[1] or e2[0] == e1[1]) and self_loop is False: status['selfloop']+=1 continue if inplace is False: d1 = local.edge[e1[0]][e1[1]].copy() d2 = local.edge[e2[0]][e2[1]].copy() G = nx.DiGraph(local) else: d1 = self.edge[e1[0]][e1[1]].copy() d2 = self.edge[e2[0]][e2[1]].copy() G = nx.DiGraph(self) #self.copy() G.add_edge(e1[0], e2[1], None, **d1) G.add_edge(e2[0], e1[1], None, **d2) G.remove_edge(e1[0], e1[1]) G.remove_edge(e2[0], e2[1]) #print(count, len(G)) if sum(G.in_degree().values()) != I: # the link already exists status['indegree'] +=1 continue # This is slow, let us use something different #if nx.is_connected(G.to_undirected()) == False: # status['unconnected'] += 1 # continue if 0 in G.degree().values(): status['unconnected'] += 1 continue if sum(G.out_degree().values()) != O: status['outdegree'] +=1 continue # seems okay , so let us swap the edges now. if inplace is True: self.add_edge(e1[0], e2[1], None, **d1) self.add_edge(e2[0], e1[1], None, **d2) self.remove_edge(e1[0], e1[1]) self.remove_edge(e2[0], e2[1]) else: local.add_edge(e1[0], e2[1], None, **d1) local.add_edge(e2[0], e1[1], None, **d2) local.remove_edge(e1[0], e1[1]) local.remove_edge(e2[0], e2[1]) # number of inhibitory link must remain identical #Ninh2 = [x[2]["link"] for x in self.edges(data=True)].count('-') #assert Ninh2 == Ninh # seems to be true #assert nx.is_connected(self.to_undirected()) == True count +=1 status['count'] = count status['trials'] = trials self._status_swap_edges = status if inplace is False: return local, status else: self._changed = True
[docs] def adjacency_matrix(self, nodelist=None, weight=None): """Return adjacency matrix. :param list nodelist: The rows and columns are ordered according to the nodes in nodelist. If nodelist is None, then the ordering is produced by :meth:`nodes` method. :param str weight: (default=None) The edge data key used to provide each value in the matrix. If None, then each edge has weight 1. Otherwise, you can set it to "weight" :returns: numpy matrix Adjacency matrix representation of CNOGraph. .. note:: alias to :meth:`networkx.adjacency_matrix` .. seealso:: :meth:`adjacency_iter` and :meth:`adjacency_list` """ return nx.adjacency_matrix(self, nodelist=nodelist).astype(int)
[docs] def remove_edge(self, u, v): """Remove the edge between u and v. :param str u: node u :param str u: node v Calls :meth:`clean_orphan_ands` afterwards """ self._changed = True super(CNOGraph, self).remove_edge(u,v) self.clean_orphan_ands()
@modifier
[docs] def remove_node(self, n): """Remove a node n :param str node: the node to be removed Edges linked to **n** are also removed. **AND** gate may now be irrelevant (only one input or no input) and are therefore removed as well. .. seealso:: :meth:`clean_orphan_ands` """ self._changed = True super(CNOGraph, self).remove_node(n) if "^" not in str(n): self.clean_orphan_ands()
[docs] def add_nodes_from(self, nbunch, attr_dict=None, **attr): self._changed = True for node in nbunch: if isinstance(node, str): self.add_node(node, attr_dict=attr_dict, **attr) elif isinstance(node, tuple): name = node[0] attr.update(node[1]) self.add_node(name, attr_dict=attr_dict, **attr)
[docs] def add_node(self, node, attr_dict=None, **attr): """Add a node :param str node: a node to add :param dict attr_dict: dictionary, if None, replace by default shape/color. recognised keys are those from graphviz. Default keys set are color, fillcolor, shape, style, penwidth :param attr: additional keyword provided will overwrite keys found in **attr_dict** parameter :: c = CNOGraph() c.add_node("A", data=[1,2,3,]) c.add_node("B", shape='circle') .. warning:: **attr** replaces any key found in attr_dict. See :meth:`add_edge` for details. .. todo:: currently nodes that contains a ^ sign are interpreted as AND gate and will appear as small circle. One way to go around is to use the label attribute. you first add the node with a differnt name and populate the label with the correct nale (the one that contain the ^ sign); When calling the plot function, they should all appear as expected. """ self._changed = True if attr_dict is None: attr_dict = self.get_node_attributes(node) super(CNOGraph, self).add_node(node, attr_dict=attr_dict, **attr)
[docs] def preprocessing(self, expansion=True, compression=True, cutnonc=True, maxInputsPerGate=2): """Performs the 3 preprocessing steps (cutnonc, expansion, compression) :param bool expansion: calls :meth:`expand_and_gates` method :param bool compression: calls :meth:`compress` method :param bool cutnon: calls :meth:`cutnonc` method :param int maxInputPerGates: parameter for the expansion step .. plot:: :width: 80% :include-source: from cno import CNOGraph, cnodata c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.preprocessing() c.plot() """ self._changed = True if cutnonc: self.cutnonc() if compression: self.compress() if expansion: self.expand_and_gates(maxInputsPerGate=maxInputsPerGate)
[docs] def cutnonc(self): """Finds non-observable and non-controllable nodes and removes them. .. plot:: :include-source: :width: 50% from cno import CNOGraph, cnodata c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.cutnonc() c.plot() """ self._changed = True nonc = self.nonc[:] for node in nonc: self.collapse_node(node)
[docs] def compress(self, recursive=True, iteration=1, max_iteration=5): """Finds compressable nodes and removes them from the graph A compressable node is a node that is not part of the special nodes (stimuli/inhibitors/readouts mentionned in the MIDAS file). Nodes that have multiple inputs and multiple outputs are not compressable either. .. plot:: :include-source: :width: 50% from cno import CNOGraph, cnodata c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.cutnonc() c.compress() c.plot() .. seealso:: :meth:`compressable_nodes`, :meth:`is_compressable` """ self._changed = True assert max_iteration >=1 #Patched to proceed on sorted lists and provide always the same results. #for node in self.compressable_nodes: for node in sorted(self.compressable_nodes): if self.is_compressable(node) == True: # update the graph G self._compressed.append(node) self.collapse_node(node) # proceed on a sorted list and provide # always the same results in theory. # for node in sorted(self.nodes()): if self.degree(node) == 0 and node not in self.stimuli and \ node not in self.signals and node not in self.inhibitors: """ FIXME It consists in finding the list of nodes that belong to the network but are in no edges (if a node A is removed during compression, it appears in no edges, but you can still find it in CNOGraph.nodes()). For this set of nodes that do not belong to the network, I add fake edges like A + mock1 and so on. Doing like this I am able to save the compressed (or the repaired) network, while using the same MIDAS file. I was afraid to touch the code because I am not sure whether this is the intended behaviour.""" self.logging.info("Found an orphan, which has been removed (%s)" % node) self.remove_node(node) if len(self.compressable_nodes) > 0: self.logging.warning("There are still compressable nodes. Call again") if recursive and iteration<max_iteration: self.compress(iteration=iteration+1) # finally, let us rename the AND gates if needed self._update_and_gate_names()
def _get_and_attribute(self, reaction): # attributes on the N edges of an AND gate must be identical preds = self.predecessors(reaction) attr = self.edge[preds[0]][reaction] return attr def _update_and_gate_names(self): """Update and gates names are updated. Individual names are sorted alphabetically """ ands_before = self._find_and_nodes() # for node in ands_before: preds = self.signed_predecessors(node) succs = self.successors(node) if len(succs) == 1: succ = succs[0] # build new AND gate and sort it alphabetically newname = Reaction("=".join(["^".join(preds), succ])) newname.sort() # sort the current node alphabetically as well. r1 = Reaction(node) r1.sort() if newname != r1.name: attr = self._get_and_attribute(node) attr2 = self.attributes.attributes['and'] self.add_reaction(newname.name) self.node[newname.name].update(**attr2) self.node[newname.name].update(**attr) self.remove_node(node) else: CNOError("an AND gate cannot have several successors. This should not happen")
[docs] def signed_predecessors(self, node): preds = self.predecessors(node) for i, pred in enumerate(preds): if self.edge[pred][node]['link'] == '-': preds[i] = "!" + pred return preds
def _get_collapse_edge(self, inputAttrs, outputAttrs): attrs = inputAttrs.copy() if inputAttrs['link'] == outputAttrs['link']: attrs['link'] = "+" attrs['color'] = "black" attrs['arrowhead'] = "normal" else: attrs['link'] = "-" attrs['color'] = "red" attrs['arrowhead'] = "tee" return attrs
[docs] def collapse_node(self, node): """Collapses a node (removes a node but connects input nodes to output nodes) This is different from :meth:`remove_node`, which removes a node and its edges thus creating non-connected graph. :meth:`collapse_node`, instead remove the node but merge the input/output edges IF possible. If there are multiple inputs AND multiple outputs the node is not removed. :param str node: a node to collapse. * Nodes are collapsed if there is at least one input or output. * Node are not removed if there is several inputs and several ouputs. * if the input edge is -, and the next is + or viceversa then the final edge if - * if the input edge is - and output is - then final edge is + """ # Removing a node that has several entries and several outputs is not # implemented because there is no such situation in CNO. self._changed = True successors = self.successors(node) predecessors = self.predecessors(node) # todo: may be simplified ? if len(successors) == 1 and len(predecessors)==1: self.logging.debug("Compressing %s 1,1 mode" % node) # compressed node attr1 = self.edge[node][successors[0]] # attr2 = self.edge[predecessors[0]][node] attrs = self._get_collapse_edge(attr1, attr2) if predecessors[0] != successors[0]: self.add_edge(predecessors[0], successors[0], None, **attrs) elif len(successors) == 1: for predecessor in predecessors: attr = self.edge[predecessor][node] if predecessor != successors[0]: attr2 = self.edge[node][successors[0]] attrs = self._get_collapse_edge(attr, attr2) self.add_edge(predecessor, successors[0], None, **attrs) elif len(predecessors) == 1: for successor in successors: attr = self.edge[node][successor] if predecessors[0] != successor: attr2 = self.edge[predecessors[0]][node] attrs = self._get_collapse_edge(attr, attr2) self.add_edge(predecessors[0], successor, None, **attrs) else: if len(successors) > 1 and len(predecessors) > 1: self.logging.debug(node, successors, predecessors) self.logging.warning("N succ >1 and N pred >1. Node not removed. use remove_node() if you really want to remove it") return #raise ValueError("invalid node to remove several in/out") else: self.logging.debug("%s %s %s" % (node, successors, predecessors)) self.logging.warning("unknown case (no output or input ?). Node %s removed"% node) #raise ValueError("invalid node to remove several in/out") self.remove_node(node) self._changed = True
[docs] def get_node_attributes(self, node): """Returns attributes of a node using the MIDAS attribute Given a node, this function identifies the type of the input node and returns a dictionary with the relevant attributes found in :attr:`node_attributes.attributes`. For instance, if a midas file exists and if **node** belongs to the stimuli, then the dicitonary returned contains the color green. :param str node: :returns: dictionary of attributes. """ # default attr = self.attributes['others'].copy() # otherwisen if self.midas: if node in self.stimuli: attr = self.attributes['stimuli'].copy() elif node in self.signals: attr = self.attributes['signals'].copy() elif node in self.inhibitors: attr = self.attributes['inhibitors'].copy() elif node in self.nonc: # or node in self.findnonc(): attr = self.attributes['nonc'].copy() elif '^' in node: attr = self.attributes['and'].copy() elif node in self.compressable_nodes: attr = self.attributes['compressed'].copy() else: if '^' in str(node): attr = self.attributes['and'].copy() if node in self._stimuli: attr = self.attributes['stimuli'].copy() if node in self._signals: attr = self.attributes['signals'].copy() if node in self._inhibitors: attr = self.attributes['inhibitors'].copy() return attr
[docs] def predecessors_as_reactions(self, node): """Build up reactions for a given node from predecessors only""" predecessors = self.predecessors(node) reactions = [] for pred in predecessors: if self.isand(pred): reactions.append(pred) elif self[pred][node]['link'] == '+': reactions.append(pred + "=" +node) else: reactions.append("!" + pred + "=" +node) return reactions
[docs] def set_default_node_attributes(self): """Set all node attributes to default attributes .. seealso:: :meth:`get_node_attributes` """ if self._set_default_node_attribute == 'cno': for node in self.nodes(): attrs = self.get_node_attributes(node) for k, v in attrs.items(): self.node[node][k] = v
[docs] def set_node_attributes(self, attr_dict ): """Experimental dictionaries where keys are nodes and keys are dictionaries of attributes. Does not need to be {'Akt': {'color':red', 'strength':0.5}} Does not need to have a value for each node could set default values may be ? """ for node in attr_dict.keys(): if node in self.nodes(): for k,v in attr_dict[node].items(): self.node[node][k] = v
def _remove_edge_attribute(self, name): for edge in self.edges(data=True): try: del self.edge[edge[0]][edge[1]][name] except: pass
[docs] def set_edge_attribute(self, name, values): # FIXME: not used can be deleted # Used in plot_optimised_model in base.py # values is a dictionary of reaction as keys for reaction in values.keys(): if reaction in values.keys(): edges = self.reac2edges(reaction) for edge in edges: e0, e1 = edge[0], edge[1] self.edge[e0][e1][name] = values[reaction]
[docs] def reac2edges(self, reaction): """Here no informatiom about links is returned""" #if reaction not in self.reactions: # raise CNOError("Unknown reaction {0}".format(reaction)) reac = Reaction(reaction) dic = reac.get_signed_lhs_species() if "^" in reaction: inputs = reac.lhs_species out = reac.rhs def get_links(this): if this in dic['+']: return '+' else: return '-' return [(this, reaction, get_links(this)) for this in inputs] + [(reaction,out)] else: #simple case if dic['+']: link = '+' else: link ='-' e0 = reac.lhs e1 = reac.rhs return [(e0.replace("!",""), e1.replace("!",""), link)]
[docs] def set_edge_visibility_from_reactions(self, reactions): # First, reset the style, which may have been set already self._remove_edge_attribute('style') # AND gate should be handled differently for edge in self.edges(data=True): if self.and_symbol in edge[1]: reaction = edge[0] + "=" + edge[1] if edge[2]['link'] == "-": reaction = "!" + reaction elif self.and_symbol in edge[0]: reaction = edge[0] + "=" + edge[1] if edge[2]['link'] == "-": reaction = "!" + reaction else: reaction = self.edge2reaction(edge) if self._graph_type != 'multigraph': if reaction not in reactions: self.edge[edge[0]][edge[1]]['style'] = 'invis'
[docs] def get_same_rank(self): """Return ranks of the nodes. """ # no need to run this function, which could be long if already computed if self._ranks is not None: return self._ranks.copy() import time t1 = time.time() # some aliases try: stimuli = self.stimuli if len(stimuli) == 0: stimuli = self._get_inputs() except: stimuli = self._get_inputs() try: signals = self.signals if len(signals) == 0: signals = self._get_outputs() except: signals = self._get_outputs() func_path = nx.algorithms.floyd_warshall(self) maxrank = int(self.get_max_rank()) # start populating the ranks starting with the obvious one: stimuli and # signals ranks = collections.defaultdict(list) ranks[0] = stimuli for node in sorted(self.nodes(), key=lambda x: str(x).lower()): # skip and gate if self.isand(node): continue # skip end signals for now if node in signals and len(self.successors(node))==0: continue elif node not in stimuli: distances = [func_path[s][node] for s in stimuli] distances = [x for x in distances if x != pylab.inf] if len(distances) != 0: M = int(np.nanmax([abs(x) for x in distances if x != pylab.inf])) ranks[M].append(node) else: self.logging.debug('warning, rank %s is empyt'% node) # now the end signal for node in sorted(self.nodes(), key=lambda x: str(x).lower()): if node in signals and len(self.successors(node))==0: ranks[maxrank+1].append(node) t2 = time.time() if t2-t1 > 1: print("get_same_rank %s seconds" % str(t2-t1)) self._ranks = ranks.copy() return ranks
def _get_inputs(self): return [x for x in self.nodes() if len(self.predecessors(x))==0] def _get_outputs(self): self.logging.warning('WARNING. no signals found, tryring to build a list (node with no successors)') return [x for x in self.nodes() if len(self.successors(x))==0]
[docs] def get_max_rank(self): """Get the maximum rank from the inputs using floyd warshall algorithm If a MIDAS file is provided, the inputs correspond to the stimuli. Otherwise, (or if there is no stimuli in the MIDAS file), use the nodes that have no predecessors as inputs (ie, rank=0). """ stimuli = self.stimuli if len(stimuli) == 0: stimuli = self._get_inputs() func_path = nx.algorithms.floyd_warshall(self) # compute the longest path from Stimuli by using the floyd warshall # algorithm. inputs/stimuli has rank 0. ranks = [[x for x in func_path[stimulus].values() if x !=pylab.inf] for stimulus in stimuli] allranks = [] for r in ranks: allranks = allranks + r #concatenate all ranks includeing empty [] maxrank = np.nanmax(allranks) return maxrank
def _add_and_gates(self, node, maxInputsPerGate=2): """See expand_and_gates docstring""" preds = self.predecessors(node) preds = [pred for pred in preds if self.isand(pred) is False] assert maxInputsPerGate>=2 and maxInputsPerGate<=5, "maxInputsPerGate must be >2 and less than 5" #todo: order predecessirs according to nameSpecies order self.logging.debug( "node %s, pred=%s " % (node, preds)) if len(preds) == 1: self.logging.debug("Nothing to do with %s (only 1 predecessor)" % node) return for inputsPerGates in range(2, maxInputsPerGate+1): self.logging.debug(inputsPerGates) if inputsPerGates>len(preds): continue for combi in itertools.combinations(preds, inputsPerGates): self.logging.debug("adding %s input gate from %s to %s" % (inputsPerGates, combi, node)) shape = "circle" if len(combi) == 3: shape = "triangle" elif len(combi) >= 4: shape = "square" andNode = self._nodes2reac(list(combi), node) self.logging.debug("add_and_gates: %s " % andNode) self.add_node(andNode, shape=shape) for combinode in combi: attr = self.edge[combinode][node].copy() attr['link'] = self.edge[combinode][node]['link'] #attr['weight'] = pylab.nan # edge from and to specy always + and nan weight self.add_edge(combinode, andNode, None, **attr) attr['link'] = "+" # edge from and to specy always + #attr['weight'] = pylab.nan # edge from and to specy always + and nan weight attr['color'] = 'black' # and output is always black self.add_edge(andNode, node, None, **attr) self._changed = True def _nodes2reac(self, inputsNodes, output): inputs = [] for node in inputsNodes: if self.edge[node][output]['link']=="-": inputs.append("!"+str(node)) else: inputs.append(node) reac = self.and_symbol.join([str(x) for x in inputs]) reac += "=" + str(output) reac = Reaction(reac) reac.sort() return reac.name
[docs] def edge2reaction(self, edge): e1 = edge[0] e2 = edge[1] link = edge[2]['link'] if link == '+': return "=".join([e1, e2]) else: return "=".join(["!"+e1, e2])
[docs] def isand(self, node): if self.and_symbol in str(node): return True else: return False
def _find_nodes_with_multiple_inputs(self): """return a list of nodes that have multiple predecessors""" nodes = [] for node in self.nodes(): if len(self.predecessors(node)) > 1 and self.isand(node) is False: nodes.append(node) else: if len(self.predecessors(node)) > 1 and self.isand(node): self.logging.debug("ignore " + str(node)) return nodes def _find_and_nodes(self): andNodes = [node for node in self.nodes() if self.isand(node)] return andNodes
[docs] def expand_or_gates(self): """Expand OR gates given AND gates If a graph contains AND gates (without its OR gates), you can add back the OR gates automatically using this function. .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot, title c1 = CNOGraph() c1.add_edge("A", "C", link="-") c1.add_edge("B", "C", link="+") c1.expand_and_gates() subplot(1,3,1) title("OR and AND gates") c1.plot(hold=True) c1.remove_edge("A", "C") c1.remove_edge("B", "C") subplot(1,3,2) c1.plot(hold=True) title("AND gates only") c1.expand_or_gates() subplot(1,3,3) c1.plot(hold=True) title("after call to \\n expand_or_gates function") .. seealso:: :meth:`~cno.io.cnograph.CNOGraph.expand_and_gates` """ for this in self._find_and_nodes(): p = self.predecessors(this) s = self.successors(this) assert len(s) == 1 for node in p: link = self.edge[node][this]['link'] self.add_edge(node, s[0], link=link) self._changed = True
[docs] def expand_and_gates(self, maxInputsPerGate=2): """Expands the network to incorporate AND gates :param int maxInputsPerGate: restrict maximum number of inputs used to create AND gates (default is 2) The CNOGraph instance can be used to model a boolean network. If a node has several inputs, then the combinaison of the inputs behaves like an OR gate that is we can take the minimum over the inputs. In order to include AND behaviour, we introduce a special node called AND gate. This function adds AND gates whenever a node has several inputs. The AND gates can later on be used in a boolean formalism. In order to recognise AND gates, we name them according to the following rule. If a node A has two inputs B and C, then the AND gate is named:: B^C=A and 3 edges are added: B to the AND gates, C to the AND gates and the AND gate to A. If an edge is a "-" link then, an **!** character is introduced. In this expansion process, AND gates themselves are ignored. If there are more than 2 inputs, all combinaison of inputs may be considered but the default parameter **maxInputsPerGate** is set to 2. For instance, with 3 inputs A,B,C you may have the following combinaison: A^B, A^C, B^C. The link A^B^C will be added only if **maxInputsPerGate** is set to 3. .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot, title c = CNOGraph() c.add_edge("A", "C", link="+") c.add_edge("B", "C", link="+") subplot(1,2,1) title("Original network") c.plot(hold=True) c.expand_and_gates() subplot(1,2,2) c.plot(hold=True) title("Expanded network") .. seealso:: :meth:`remove_and_gates`, :meth:`clean_orphan_ands`, :meth:`expand_or_gates`. .. note:: this method adds all AND gates in one go. If you want to add a specific AND gate, you have to do it manually. You can use the :meth:`add_reaction` for that purpose. .. note:: propagate data from edge on the AND gates. """ self._changed = True nodes2expand = self._find_nodes_with_multiple_inputs() for node in nodes2expand: self._add_and_gates(node, maxInputsPerGate)
[docs] def add_cycle(self, nodes, **attr): """Add a cycle :param list nodes: a list of nodes. A cycle will be constructed from the nodes (in order) and added to the graph. :param dict attr: must provide the "link" keyword. Valid values are "+", "-" the links of every edge in the cycle will be identical. .. plot:: :include-source: :width: 50% from cno import CNOGraph c = CNOGraph() c.add_edge("A", "C", link="+") c.add_edge("B", "C", link="+") c.add_cycle(["B", "C", "D"], link="-") c.plot() .. warning:: added cycle overwrite previous edges """ self._changed = True if "link" not in attr.keys(): raise KeyError("link keyword must be provided") attr = self.get_default_edge_attributes(**attr) super(CNOGraph, self).add_cycle(nodes, **attr)
[docs] def add_path(self): """networkx method not to be used""" raise NotImplementedError
[docs] def add_star(self): """networkx method not to be used""" raise NotImplementedError
[docs] def remove_edges_from(self, edges): for edge in edges: self.remove_edge(edge)
[docs] def add_weighted_edges_from(self): """networkx method not to be used""" raise NotImplementedError
[docs] def remove_nodes_from(self, nbunch): """Removes a bunch of nodes .. warning:: need to be tests with and gates.""" self._changed = True for node in nbunch: self.remove_node(node)
def _get_compressable_nodes(self): if self._changed is True: compressables = [x for x in self.nodes() if self.is_compressable(x)] self._compressable_nodes = compressables[:] self._changed = False else: compressables = self._compressable_nodes[:] return [x for x in compressables if self.isand(x) is False] compressable_nodes = property(fget=_get_compressable_nodes, doc="Returns list of compressable nodes (Read-only).") def _ambiguous_multiedge(self, node): """Test whether the removal of a node may lead to multi edges ambiguity e.g., A-->B and A--| B """ edges = [(x[0], x[1], x[2]['link']) for x in self.edges(data=True)] for A in self.predecessors(node): for B in self.successors(node): link = self.edge[node][B]['link'] if link == "+": link = "-" elif link == "-": link = "+" if (A, B, link) in edges: return True return False
[docs] def is_compressable(self, node): """Returns True if the node can be compressed, False otherwise :param str node: a valid node name :return: boolean value Here are the rules for compression. The main idea is that a node can be removed if the boolean logic is preserved (i.e. truth table on remaining nodes is preserved). A node is compressable if it is not part of the stimuli, inhibitors, or signals specified in the MIDAS file. If a node has several outputs and inputs, it cannot be compressed. If a node has one input or one output, it may be compressed. However, we must check the following possible ambiguity that could be raised by the removal of the node: once removed, the output of the node may have multiple input edges with different types of inputs edges that has a truth table different from the original truth table. In such case, the node cannot be compressed. Finally, a node cannot be compressed if one input is also an output (e.g., cycle). .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot,show, title c = cnograph.CNOGraph() c.add_edge("a", "c", link="-") c.add_edge("b", "c", link="+") c.add_edge("c", "d", link="+") c.add_edge("b", "d", link="-") c.add_edge("d", "e", link="-") c.add_edge("e", "g", link="+") c.add_edge("g", "h", link="+") c.add_edge("h", "g", link="+") # multiple inputs/outputs are not removed c.add_edge("s1", "n1", link="+") c.add_edge("s2", "n1", link="+") c.add_edge("n1", "o1", link="+") c.add_edge("n1", "o2", link="+") c._stimuli = ["a", "b", "s1", "s2"] c._signals = ["d", "g", "o1", "o2"] subplot(1,2,1) c.plot(hold=True) title("Initial graph") c.compress() subplot(1,2,2) c.plot(hold=True) title("compressed graph") show() """ if node not in self.nodes(): msg = "node %s is not in the graph" % node raise ValueError(msg) # there is always a MIDAS file for now but we may remove it later on specialNodes = self.inhibitors + self.signals + self.stimuli notcompressable = [x for x in self.nodes() if x in specialNodes] notcompressable += [x for x in self.nodes() if self.isand(x)] if node in notcompressable: return False succs = self.successors(node) preds = self.predecessors(node) # if one input is an input, no compression if len(set(succs).intersection(set(preds))) > 0: self.logging.debug('skipped node (retroaction ? %s) ' % node) #print("%s case cycle input is in output" % node) return False # if input and ouputs are provided, they should contain not AND on # both the input and output # see https://github.com/cellnopt/cellnopt/issues/39 a = sum([1 for x in succs if self.isand(x)]) b = sum([1 for x in preds if self.isand(x)]) if a>=1 and b>=1: return False # if no output or no input, can be compressed. # somehow this is redundant with NONC algorithm, which is not required # anymore. if len(self.successors(node)) == 0 or len(self.predecessors(node))==0: self.logging.debug('skipped node %s (input/output case ' % node) return True # if multiple input AND multiple output, no ambiguity: nothing to compress elif len(self.successors(node)) >1 and len(self.predecessors(node))>1: self.logging.debug('skipped node %s >1,>1 case ' % node) return False # if one input and several outputs OR several input and one ouptut, we # can compress. However, if the compressable node once removed is an # edge that already exists, then we will have multiedges, which is not # handled in a DIgraph. So, we cannot comrpess that particular case. # if one input only and one output only, no ambiguity: we can compress elif len(self.predecessors(node)) == 1 and len(self.successors(node))==1: ambiguous = self._ambiguous_multiedge(node) if ambiguous == True: self.logging.debug('%s could be compressed but ambiguity with existing edge so not removed ' % node) return False else: self.logging.debug('Add node %s =1,=1 to be removed ' % node) return True # one output but several input may be ambiguous. If output is inhibitor # and input contains an mix of inhibitor/activation then we can not # compress elif len(preds) > 1 and len(succs)==1: input_links = [self[p][node]['link'] for p in preds] output_links = self[node][succs[0]]['link'] if (output_links == "-") and len(set(input_links))>=2: self.logging.debug('skipped node %s output=inhibitor and ambiguous inputs' % node) return False else: ambiguous = self._ambiguous_multiedge(node) if ambiguous == True: self.logging.debug('%s could be compressed but ambiguity with existing edge so not removed ' % node) return False else: self.logging.debug('Add node %s >1,=1 case to be removed' % node) return True # one input and several ouptut, no ambiguity: we can compress elif len(preds) == 1 and len(succs)>1: ambiguous = self._ambiguous_multiedge(node) if ambiguous == True: self.logging.debug('%s could be compressed but ambiguity with existing edge so not removed ' % node) return False else: self.logging.debug('Add node %s =1,>1 case to be removed' % node) return True else: self.logging.debug('do not remove node %s' % node) return False
[docs] def relabel_nodes(self, mapping, inplace=True): """Function to rename a node, while keeping all its attributes. :param dict mapping: a dictionary mapping old names (keys) to new names (values ) :return: new :class:`CNOGraph` object if we take this example:: c = CNOGraph(); c.add_reaction("a=b"); c.add_reaction("a=c"); c.add_reaction("b=d"); c.add_reaction("c=d"); c.expand_and_gates() Here, an AND gate has been created. c.nodes() tells us that its name is "b^c=d". If we rename the node b to blong, the AND gate name is unchanged if we use the nx.relabel_nodes function. Visually, it is correct but internally, the "b^c=d" has no more meaning since the node "b" does not exist anymore. This may lead to further issues if we for instance split the node c:: c = nx.relabel_nodes(c, {"b": "blong"}) c.split_node("c", ["c1", "c2"]) This function calls relabel_node taking care of the AND nodes as well. .. warning:: inplace modifications .. todo:: midas must also be modified """ # keep names of the AND gates for book keeping # ands_before = self._find_and_nodes() # mapping_ands = {} for this in self._find_and_nodes(): reac = Reaction(this) reac.rename_species(mapping) # add to mapping if reac.name != this: mapping.update({this:reac.name}) self.mapping = mapping # ideally, nx.relabels_nodes(self, mapping, copy=False) could # do the work but somehow loses the edge data dictionary.,, # so, we will do it ourself: edge_data = [x for x in self.edges(data=True) if x[0] in mapping.keys() or x[1] in mapping.keys()] self.edge_data = edge_data for edge in edge_data: # do not remove the edge first otherwise AND may be lost # since clean_orphans may be called. # now add new edges. First, if "=" in edge[0]: # an AND gate r = Reaction(edge[0]) r.rename_species(mapping) e0 = r.name elif edge[0] in mapping.keys(): e0 = mapping[edge[0]] else: # normal node e0 = edge[0] if "=" in edge[1]: # an AND gate r = Reaction(edge[1]) r.rename_species(mapping) e1 = r.name elif edge[1] in mapping.keys(): e1 = mapping[edge[1]] else: e1 = edge[1] self.add_edge(e0, e1, **edge[2]) # Now, handle the MIDAS file if any by renaming all species updating # the stimuli/inhibitors/signals if needed. Need to keep track of user # defined names self._stimuli = [mapping[x] if x in mapping.keys() else x for x in self._stimuli] self._signals = [mapping[x] if x in mapping.keys() else x for x in self._signals] self._inhibitors = [mapping[x] if x in mapping.keys() else x for x in self._inhibitors] self._compressed = [mapping[x] if x in mapping.keys() else x for x in self._compressed] # copy the node data for k, v in mapping.items(): self.node[v] = self.node[k] if self.midas is not None: for k, v in mapping.items(): if "^" in k: del mapping[k] try: self.midas.rename_species(mapping) except: pass try: self.midas.rename_inhibitors(mapping) except: pass try: self.midas.rename_stimuli(mapping) except: pass # finally remove the old nodes self.remove_nodes_from(mapping.keys()) self._changed = True
[docs] def centrality_eigenvector(self, max_iter=1000, tol=0.1): res = nx.eigenvector_centrality(self,max_iter,tol=tol) nx.set_node_attributes(self, 'eigenvector', res) nx.set_node_attributes(self, 'centrality_eigenvector', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest degree centrality %s %s", v,k) return res
[docs] def centrality_degree(self): """Compute the degree centrality for nodes. The degree centrality for a node v is the fraction of nodes it is connected to. :return: list of nodes with their degree centrality. It is also added to the list of attributes with the name "degree_centr" .. plot:: :include-source: :width: 50% from cno import CNOGraph, cnodata c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.centrality_degree() c.plot(node_attribute="centrality_degree") """ res = nx.degree_centrality(self) nx.set_node_attributes(self, 'degree', res) nx.set_node_attributes(self, 'centrality_degree', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest degree centrality %s %s", v,k) return res
[docs] def centrality_closeness(self, **kargs): """Compute closeness centrality for nodes. Closeness centrality at a node is 1/average distance to all other nodes. :param v: node, optional Return only the value for node v :param str distance: string key, optional (default=None) Use specified edge key as edge distance. If True, use 'weight' as the edge key. :param bool normalized: optional If True (default) normalize by the graph size. :return: Dictionary of nodes with closeness centrality as the value. .. plot:: :include-source: :width: 50% from cno import CNOGraph, cnodata c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.centrality_closeness() c.plot(node_attribute="centrality_closeness") """ res = nx.centrality.closeness_centrality(self, **kargs) nx.set_node_attributes(self, 'closeness', res) nx.set_node_attributes(self, 'centrality_closeness', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest closeness centrality %s %s", v,k) return res
[docs] def centrality_betweeness(self, k=None, normalized=True, weight=None, endpoints=False, seed=None): r"""Compute the shortest-path betweeness centrality for nodes. Betweenness centrality of a node `v` is the sum of the fraction of all-pairs shortest paths that pass through `v`: .. math:: c_B(v) =\sum_{s,t \in V} \frac{\sigma(s, t|v)}{\sigma(s, t)} where :math:`V` is the set of nodes, :math:`\sigma(s, t)` is the number of shortest :math:`(s, t)`-paths, and :math:`\sigma(s, t|v)` is the number of those paths passing through some node :math:`v` other than :math:`s, t`. If :math:`s = t`, :math:`\sigma(s, t) = 1`, and if :math:`v \in {s, t}`, :math:`\sigma(s, t|v) = 0` . :param int k: (default=None) If k is not None use k node samples to estimate betweeness. The value of k <= n where n is the number of nodes in the graph. Higher values give better approximation. :param bool normalized: If True the betweeness values are normalized by :math:`2/((n-1)(n-2))` for graphs, and :math:`1/((n-1)(n-2))` for directed graphs where :math:`n` is the number of nodes in G. :param str weight: None or string, optional If None, all edge weights are considered equal. .. plot:: :include-source: :width: 50% from cno import CNOGraph, cnodata c = CNOGraph(cnodata("PKN-ToyPB.sif"), cnodata("MD-ToyPB.csv")) c.centrality_betweeness() c.plot(node_attribute="centrality_betweeness") .. seealso:: networkx.centrality.centrality_betweeness """ res = nx.centrality.betweenness_centrality(self,k=k,normalized=normalized, weight=weight, endpoints=endpoints, seed=seed) nx.set_node_attributes(self, 'centrality_betweeness', res) nx.set_node_attributes(self, 'betweeness', res) import operator degcent_sorted = sorted(res.items(), key=operator.itemgetter(1), reverse=True) #for k,v in degcent_sorted: # self.logging.info("Highest betweeness centrality %s %s", v,k) return res
[docs] def to_gexf(self, filename): """Export into GEXF format :param str filename: This is the networkx implementation and requires the version 1.7 This format is quite rich and can be used in external software such as Gephi. .. warning:: color and labels are lost. information is stored as attributes.and should be as properties somehow. Examples: c.node['mkk7']['viz'] = {'color': {'a': 0.6, 'r': 239, 'b': 66,'g': 173}} """ from networkx.readwrite import write_gexf write_gexf(self, filename)
[docs] def to_directed(self): """networkx method not to be used""" raise NotImplementedError
[docs] def to_sif(self, filename=None): """Export CNOGraph into a SIF file. Takes into account and gates. If a species called "A^B=C" is found, it is an AND gate that is encoded in a CSV file as:: A 1 and1 B 1 and1 and1 1 C :param str filename: """ sif = SIF() for edge in self.edges(data=True): n1 = edge[0] n2 = edge[1] link = edge[2]['link'] reaction = "" if self.isand(n1) is False and self.isand(n2) is False: if link == "-": reaction += "!" reaction += n1 + "=" + n2 sif.add_reaction(reaction) for edge in self._find_and_nodes(): sif.add_reaction(edge) if filename: sif.save(filename) else: return sif
[docs] def to_json(self, filename=None): """Export the graph into a JSON format :param str filename: .. seealso:: :meth:`loadjson` """ from networkx.readwrite import json_graph data = json_graph.node_link_data(self) if filename is not None: json.dump(data, open(filename, "w")) else: return data
[docs] def read_sbmlqual(self, filename): sif = SIF() sif.read_sbmlqual(filename) self.clear() for reac in sif.reactions: self.add_reaction(reac)
[docs] def to_sbmlqual(self, filename=None): """Export the topology into SBMLqual and save in a file :param str filename: if not provided, returns the SBML as a string. :return: nothing if filename is not provided .. seealso:: :meth:`cno.io.sbmlqual` """ self._changed = True s = SIF() for reac in self.reactions: s.add_reaction(reac) return s.to_sbmlqual(filename=filename)
[docs] def read_json(self, filename): """Load a network in JSON format as exported from :meth:`to_json` :param str filename: .. seealso:: :meth:`to_json` """ self._changed = True from networkx.readwrite import json_graph graph = json_graph.node_link_graph(json.loads(open(filename).read())) self.clear() for node in graph.nodes(): self.add_node(node) for edge in graph.edges(data=True): self.add_edge(edge[0], edge[1], link=edge[2]['link']) return graph
[docs] def lookfor(self, specyName): """Prints information about a species If not found, try to find species by ignoring cases. """ #try to find the specy first: if specyName not in self.nodes(): print("did not find the requested specy",) proposals = [node for node in self.nodes() if specyName.lower() in node.lower()] if len(proposals): print("try one of ") for p in proposals: print(proposals) else: print("predecessors") print(self.predecessors(specyName)) print("successors") print(self.successors(specyName))
[docs] def get_stats(self): stats = {} flow = nx.flow_hierarchy(self) stats['flow'] = flow stats['mean_degree'] = sum(self.degree().values())/float(len(self.nodes())) return stats
[docs] def summary(self): """Plot information about the graph""" stats = self.get_stats() print("Flow hierarchy = %s (fraction of edges not participating in cycles)" % stats['flow']) print("Average degree = " + str(sum(self.degree().values())/float(len(self.nodes()))))
[docs] def merge_nodes(self, nodes, node): """Merge several nodes into a single one .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot c = CNOGraph() c.add_edge("AKT2", "B", link="+") c.add_edge("AKT1", "B", link="+") c.add_edge("A", "AKT2", link="+") c.add_edge("A", "AKT1", link="+") c.add_edge("C", "AKT1", link="+") subplot(1,2,1) c.plot(hold=True) c.merge_nodes(["AKT1", "AKT2"], "AKT") subplot(1,2,2) c.plot(hold=True) """ self._changed = True assert len(nodes)>1, "nodes must be a list of species" for n in nodes: if n not in self.nodes(): raise ValueError("%s not found in the graph !" % n) self.add_node(node) for n in nodes: for pred in self.predecessors(n): attrs = self.edge[pred][n] if self.isand(pred): pred = self._rename_node_in_reaction(pred, n, node) self.add_reaction(pred) else: self.add_edge(pred, node, **attrs) for succ in self.successors(n): attrs = self.edge[n][succ] if self.isand(succ): succ = self._rename_node_in_reaction(succ, n, node) self.add_reaction(succ) else: self.add_edge(node, succ, **attrs) for n in nodes: self.remove_node(n) # assume that all nodes are in signals if first one is in signal. # FIXME: not robust if nodes[0] in self._signals: for this in nodes: self._signals.remove(this) self._signals.append(node) if nodes[0] in self._stimuli: for this in nodes: self._stimuli.remove(this) self._stimuli.append(node) if nodes[0] in self._inhibitors: for this in nodes: self._inhibitors.remove(this) self._inhibitors.append(node) self._changed = True
[docs] def split_node(self, node, nodes): """ .. plot:: :include-source: :width: 50% from cno import CNOGraph from pylab import subplot c = CNOGraph() c.add_reaction("!A=C") c.add_reaction("B=C") c.add_reaction("!b1=B") c.add_reaction("b2=B") c.expand_and_gates() subplot(1,2,1) c.plot(hold=True) c.split_node("B", ["B1", "B2", "B3"]) subplot(1,2,2) c.plot(hold=True) """ for n in nodes: for pred in self.predecessors(node): attrs = self.edge[pred][node] if self.isand(pred): pred = self._rename_node_in_reaction(pred, node, n) self.add_reaction(pred) else: self.add_edge(pred, n, **attrs) for succ in self.successors(node): attrs = self.edge[node][succ] # special case of the AND gates if self.isand(succ): succ = self._rename_node_in_reaction(succ, node, n) self.add_reaction(succ) else: # normal case self.add_edge(n, succ, **attrs) self.remove_node(node) # remove AND gates as well: for this in self._find_and_nodes(): gate = Reaction(this) if node in gate.lhs_species: self.remove_node(this) if node in self._signals: self._signals.extend(nodes) self._signals.remove(node) if node in self._stimuli: self._stimuli.extend(nodes) self._stimuli.remove(node) if node in self._inhibitors: self._inhibitors.extend(nodes) self._inhibitors.remove(node) self._changed = True
def _rename_node_in_reaction(self, reaction, old, new): """This function rename a species within a reaction.""" reac = Reaction(reaction) reac.rename_species({old:new}) return reac.name
[docs] def findnonc(self): """Finds the Non-Observable and Non-Controllable nodes #. Non observable nodes are those that do not have a path to any measured species in the PKN #. Non controllable nodes are those that do not receive any information from a species that is perturbed in the data. Such nodes can be removed without affecting the readouts. :param G: a CNOGraph object :param stimuli: list of stimuli :param stimuli: list of signals :return: a list of names found in G that are NONC nodes .. doctest:: >>> from cno import CNOGraph, cnodata >>> model = cnodata('PKN-ToyMMB.sif') >>> data = cnodata('MD-ToyMMB.csv') >>> c = CNOGraph(model, data) >>> namesNONC = c.nonc() :Details: Using a floyd Warshall algorithm to compute path between nodes in a directed graph, this class identifies the nodes that are not connected to any signals (Non Observable) and/or any stimuli (Non Controllable) excluding the signals and stimuli, which are kept whatever is the outcome of the FW algorithm. """ # some aliases assert (self.stimuli!=None and self.signals!=None) dist = nx.algorithms.floyd_warshall(self) namesNONC = [] for node in self.nodes(): # search for paths from the species to the signals spe2sig = [(node, dist[node][s]) for s in self.signals if dist[node][s]!=np.inf] # and from the nstimuli to the species sti2spe = [(node, dist[s][node]) for s in self.stimuli if dist[s][node]!=np.inf] if len(spe2sig)==0 or len(sti2spe)==0: if node not in self.signals and node not in self.stimuli: namesNONC.append(node) namesNONC = list(set(namesNONC)) # required ? return namesNONC
[docs] def remove_self_loops(self): self._changed = True for e in self.edges(): if e[0] == e[1]: self.remove_edge(e[0], e[1])
class AGraphCNO(gv.AGraph): def __init__(self, N): """N should be a cnograph so that we can get the ranks""" self.cnograph = N directed = N.is_directed() strict = N.number_of_selfloops()==0 and not N.is_multigraph() super(AGraphCNO, self).__init__(name=N.name, strict=strict, directed=directed) # default graph attributes self.graph_attr.update(N.graph.get('graph',{})) self.node_attr.update(N.graph.get('node',{})) self.edge_attr.update(N.graph.get('edge',{})) # add nodes for n,nodedata in N.nodes(data=True): self.add_node(n,**nodedata) # loop over edges if N.is_multigraph(): for u,v,key,edgedata in N.edges_iter(data=True,keys=True): str_edgedata=dict((k,str(v)) for k,v in edgedata.items()) self.add_edge(u,v,key=str(key),**str_edgedata) else: for u,v,edgedata in N.edges_iter(data=True): str_edgedata=dict((k,str(v)) for k,v in edgedata.items()) self.add_edge(u,v,**str_edgedata) self._changed = True def to_dot(self, filename=None, frmt='png'): allranks = self.cnograph.get_same_rank() txt = "digraph G{\n" for k, v in self.cnograph.graph_options['graph'].items(): if 'svg' in frmt and k == 'dpi': continue txt += """ %s="%s";\n""" % (k,v) for rank in sorted(allranks.keys()): if rank == 0: rankname = "source" elif rank == max(allranks.keys()): rankname = "sink" else: rankname = 'same' if len(allranks[rank]): names = ";".join(sorted(allranks[rank])) txt += " {rank=%s;%s;}\n" % (rankname, names) for node in self.cnograph.nodes(data=True): attrs = node[1].copy() if "^" in node[0]: attrs['name'] = '"%s"' % node[0] attrs['label'] = '' attrs['penwidth'] = '' + 'shape="circle" width=.1 height=.1 fixedsize=True' else: attrs['name'] = node[0] attrs['label'] = node[0] try: attrs['penwidth'] = """penwidth="%s" """ % attrs['penwidth'] except: print("FIXME") attrs['penwidth'] = '' pass txt += """%(name)s [color="%(color)s", fillcolor="%(fillcolor)s" %(penwidth)s shape="%(shape)s" style="%(style)s" label="%(label)s" fontname=Helvetica fontsize=22.0 ];\n""" % attrs for edge in self.cnograph.edges(data=True): attrs = edge[2] attrs['in'] = '"%s"' % edge[0] attrs['out'] = '"%s"' % edge[1] attrs['label']= '' txt += """ %(in)s -> %(out)s [ color="%(color)s" label="%(label)s" weight="1.000000" penwidth="2" arrowhead="%(arrowhead)s" style="solid"];\n""" % (attrs) txt += "}" fh = open(filename, 'w') fh.write(txt) fh.close()