Source code for MetaNetwork

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
.. module:: dynetml2other
:synopsis: Imports DyNetML into NetworkX, igraph, or Python dictionaries.

.. moduleauthor:: Peter M. Landwehr <plandweh@cs.cmu.edu>

"""

__author__ = 'Peter M. Landwehr <plandweh@cs.cmu.edu>'

import codecs
from collections import defaultdict
import dynetmlparsingutils as dmlpu
from lxml import etree
import os


[docs]class MetaNetwork: """ The MetaNetwork class is a container for a meta-network extracted from DyNetML. The base class stores network data \ in dictionaries; using a graphing library to store network data requires that you use one of the two SubClasses. \ Manipulating networks will *not* guarantee corresponding modifications of data contained in __node_tree; you will \ need to make any desired transformations to the nodes yourself. :ivar properties: A dictionary of the different properties associated with the meta-network. :ivar propertyIdentities: A dictionary of the two defining traits of each property associated with the network :ivar __node_tree: A three-layer dictionary laying out node sets grouped into classes, nodes with node sets, and \ properties associated with each node: self.nodesets[node class][node set][node][node property] = <property>. \ *Note* that node attributes are also stored in the dictionary. If an attribute and a property colide on some \ value, the property value will be retained. __node_tree is private in order to regulate how properties are added \ and to help insure that networks are valid. This may change in future updates. :ivar networks: A dictionary of the different networks in the meta-network. :ivar sources: A dictionary of source materials; it exists exclusively in networks generated by AutoMap, and is \ not yet fully handled. """ def __init__(self): """Initializes a MetaNetwork""" self.attributes = {} self.properties = {} self.propertyIdentities = {} self.__node_tree = dmlpu.node_tree() self.networks = {} self.sources = {} def __validate_tree_branch(self, nodeclass_name, nodeset_name=None, node_name=None): """ Verify that a particular nodeclass, nodeset, or node exists in __node_tree :param unicde|str nodeclass_name: The name of a nodeclass :param unicode|str|None nodeset_name: The name of a nodeset :param unicode|str|None node_name: The name of a node """ dmlpu.check_type(nodeclass_name, 'nodeclass_name', (str, unicode)) dmlpu.check_type(nodeset_name, 'nodeset_name', (str, unicode, None)) dmlpu.check_type(node_name, 'node_name', (str, unicode, None)) dmlpu.check_key(nodeclass_name, 'nodeclass_name', self.__node_tree, 'self.__node_tree') if nodeset_name is not None: dmlpu.check_key(nodeset_name, 'nodeset_name', self.__node_tree[nodeclass_name], nodeclass_name) if node_name is not None: dmlpu.check_key(node_name, 'node_name', self.__node_tree[nodeclass_name][nodeset_name][1], 'nodeset_name')
[docs] def load_from_dynetml(self, mn_text, properties_to_include=None, properties_to_ignore=None, nodeclasses_to_include=None, nodeclasses_to_ignore=None, networks_to_include=None, networks_to_ignore=None): """ Parses XML containing a meta-network and loads the contents. If no properties, nodeclasses, or networks are specified to be included, all of them will be included. Lists of included and ignored elements are exclusive - only specify a set to include or a set to ignore for each category. :param str|unicode mn_text: XML containing a meta-network :param list properties_to_include: a list of nodeclass properties that should be included :param list properties_to_ignore: a list of nodeclass properties that should be ignored :param list nodeclasses_to_include: a list of nodeclasses that should be included :param list nodeclasses_to_ignore: a list of nodeclasses that should be ignored :param list networks_to_include: a list of networks that should be included :param list networks_to_ignore: a list of networks that should be ignored """ dmlpu.check_type(mn_text, 'mn_text', (unicode, str)) mn_tag = etree.XML(mn_text) if mn_tag.tag != 'MetaNetwork': return for attrib_key in mn_tag.attrib: self.attributes[attrib_key] = dmlpu.format_prop(mn_tag.attrib[attrib_key]) self.load_from_tag(mn_tag, properties_to_include, properties_to_ignore, nodeclasses_to_include, nodeclasses_to_ignore, networks_to_include, networks_to_ignore)
[docs] def load_from_tag(self, mn_tag, properties_to_include=None, properties_to_ignore=None, nodeclasses_to_include=None, nodeclasses_to_ignore=None, networks_to_include=None, networks_to_ignore=None): """ Parses the content of an :class:`lxml._Element` containing a meta-network and loads the contents. If no properties, nodeclasses, or networks are specified to be included, all of them will be included. Lists of included and ignored elements are exclusive - only specify a set to include or a set to ignore for each category. :param lxml._Element mn_tag: A tag containing a dynamic meta-network :param list properties_to_include: a list of nodeclass properties that should be included :param list properties_to_ignore: a list of nodeclass properties that should be ignored :param list nodeclasses_to_include: a list of nodeclasses that should be included :param list nodeclasses_to_ignore: a list of nodeclasses that should be ignored :param list networks_to_include: a list of networks that should be included :param list networks_to_ignore: a list of networks that should be ignored """ prop_inclusion_test = dmlpu.validate_and_get_inclusion_test( (properties_to_include, 'properties_to_include'), (properties_to_ignore, 'properties_to_ignore')) nodeclass_inclusion_test = dmlpu.validate_and_get_inclusion_test( (nodeclasses_to_include, 'nodeclasses_to_include'), (nodeclasses_to_ignore, 'nodeclasses_to_ignore')) network_inclusion_test = dmlpu.validate_and_get_inclusion_test( (networks_to_include, 'networks_to_include'), (networks_to_ignore, 'networks_to_ignore')) for attrib_key in mn_tag.attrib: self.attributes[attrib_key] = dmlpu.format_prop(mn_tag.attrib[attrib_key]) properties_tag = mn_tag.find('properties') if properties_tag is not None: for prop in properties_tag.iterfind('property'): self.properties[prop.attrib['id']] = dmlpu.format_prop(prop.attrib['value']) # TODO: Deal with source tag in AutoMap output. self.propertyIdentities = \ dmlpu.get_property_identities_dict(mn_tag.find('propertyIdentities'), prop_inclusion_test) self.__node_tree = dmlpu.get_nodeclass_dict(mn_tag.find('nodes'), prop_inclusion_test, nodeclass_inclusion_test) for nk_tag in mn_tag.find('networks').iterfind('network'): if not network_inclusion_test(nk_tag.attrib['id']): continue self._parse_and_add_graph_tag(nk_tag)
[docs] def get_node_tree(self): """ :returns: __node_tree :rtype: :class:`dynetmlparsingutils.node_tree)` """ return self.__node_tree
[docs] def get_nodeclass(self, nodeclass_name): """ :returns: __node_tree[nodeclass_name] :rtype: :class:`dynetmlparsingutils.nodeclass_dict` """ self.__validate_tree_branch(nodeclass_name) return self.__node_tree[nodeclass_name]
[docs] def get_nodeset(self, nodeclass_name, nodeset_name): """ :param str|unicode nodeclass_name: name of the parent of nodeset_name :param str|unicode nodeset_name: name of the nodeset to be returned :return: self.__node_tree[nodeclass_name][nodeset_name] :rtype: :class:`dmlpu.nodeset_tuple` """ self.__validate_tree_branch(nodeclass_name, nodeset_name) return self.__node_tree[nodeclass_name][nodeset_name]
[docs] def get_node(self, nodeclass_name, nodeset_name, node_name): """ :param str|unicode nodeclass_name: name of the parent of nodeset_name :param str|unicode nodeset_name: name of the parent of node_name :param str|unicode node_name: name of the node to be returned :return: self.__node_tree[nodeclass_name][nodeset_name][node] :rtype: :class:`dynetmlparsingutils.node_tuple` """ self.__validate_tree_branch(nodeclass_name, nodeset_name, node_name) return self.__node_tree[nodeclass_name][nodeset_name][node_name]
[docs] def set_node_property(self, nodeclass_name, nodeset_name, node_name, property_name, value): """ Set the value of a node property :param str|unicode nodeclass_name: the name of the parent of nodeset_name :param str|unicode nodeset_name: the name of the parent of node_name :param str|unicode node_name: the name of the node whose property is being set :param str|unicode property_name: the name of the property to be set :param str|unicode|bool|float|datetime.datetime value: the value of the parameter """ self.__validate_tree_branch(nodeclass_name, nodeset_name, node_name) dmlpu.check_type(property_name, 'property_name', (str, unicode)) dmlpu.check_key(property_name, 'property_name', self.__node_tree[nodeclass_name][nodeset_name][0], nodeset_name) self.__node_tree[nodeclass_name][nodeset_name][1][node_name][property_name] = \ dmlpu.format_prop(value, self.__node_tree[nodeclass_name][nodeset_name][0][property_name][0])
[docs] def create_nodeset_property(self, nodeclass_name, nodeset_name, property_name, type_str, singlevalued_bool): """ Create a new nodeset property :param str|unicode nodeclass_name: the name of the parent of nodeset_name :param str|unicode nodeset_name: the name of the nodeset that should get the new property :param str|unicode property_name: the name of the property to be added :param str|unicode type_str: the type of the parameter :param bool singlevalued_bool: whether or not the parameter should be single-valued """ self.__validate_tree_branch(nodeclass_name, nodeset_name) dmlpu.check_key(property_name, 'property_name', self.__node_tree[nodeclass_name][nodeset_name][0], nodeset_name, False) dmlpu.check_type(type_str, 'type_str', (str, unicode)) if type_str not in ['number', 'date', 'text', 'categoryText', 'URI']: raise ValueError('type_str must be "number", "date" "text", "categoryText", or "URI"; got {0}'. format(type_str)) dmlpu.check_type(singlevalued_bool, 'singlevalued_bool', bool) self.__node_tree[nodeclass_name][nodeset_name][0][property_name] = type_str, singlevalued_bool
[docs] def create_node(self, nodeclass_name, nodeset_name, node_name, property_dict=None): """ Create a node in a give nodeset in a give nodeclass. If a set of properties are specified, add the properties. :param str|unicode nodeclass_name: name of the parent of nodeset_name :param str|unicode nodeset_name: name of the parent of node_name :param str|unicode node_name: name of the node to be created :param dict|None property_dict: A dictionary of properties to assign the new node. """ self.__validate_tree_branch(nodeclass_name, nodeset_name) dmlpu.check_type(node_name, 'node_name', (str, unicode)) dmlpu.check_key(node_name, 'node_name', self.__node_tree[nodeclass_name][nodeset_name][1], nodeset_name, False) dmlpu.check_type(property_dict, 'property_dict', (dict, None)) for property_name in property_dict.keys(): dmlpu.check_key( property_name, 'property_name', self.__node_tree[nodeclass_name][nodeset_name][0], '{0} properties'.format(nodeset_name)) self.__node_tree[nodeclass_name][nodeset_name][1][node_name] = property_dict
[docs] def rename_node(self, nodeclass_name, nodeset_name, node_name, new_node_name): """ Rename a particular node, both in the node tree and in the networks containing it. :param str|unicode nodeclass_name: name of the parent of nodeset_name :param str|unicode nodeset_name: name of the parent of node_name :param str|unicode node_name: current node name :param str|unicode new_node_name: new name for the node """ self.__validate_tree_branch(nodeclass_name, nodeset_name, node_name) dmlpu.check_type(new_node_name, 'new_node_name', (str, unicode)) dmlpu.check_key(new_node_name, 'new_node_name', self.__node_tree[nodeclass_name][nodeset_name][1], nodeset_name, False) self.__node_tree[nodeclass_name][nodeset_name][1][new_node_name] = \ self.__node_tree[nodeclass_name][nodeset_name][1][node_name] del self.__node_tree[nodeclass_name][nodeset_name][1][node_name] # We assume 'id' exists. If it doesn't, the data has bigger problems. self.__node_tree[nodeclass_name][nodeset_name][new_node_name]['id'] = new_node_name self._rename_network_nodes(nodeclass_name, nodeset_name, node_name, new_node_name)
[docs] def union_nodesets(self, nodeclass_name, *args): """ Takes two nodesets and combines them in a new nodeset. Properties and entries from the first nodeset override \ those from the second nodeset. :param str|unicode nodeclass_name: The name of the parent nodeclass of nodeset_one_name and nodeset_two_name :param str|unicode nodeset_one_name: The name of the first nodeset in the union. :param str|unicode nodset_two_name: The name of the second ndeset in the union. :param other_nodesets: union_nodesets takes a \*args argument, so any number of nodesets is acceptable :param str|unicode new_nodeset_name: The name of the new nodeset to be created """ if len(args) < 3: raise ValueError('Need at least 3 argumets; got {0}'.format(len(args))) for nodeset_name in args[:-1]: self.__validate_tree_branch(nodeclass_name, nodeset_name) dmlpu.check_type(args[-1], 'new_nodeset_name', (str, unicode)) dmlpu.check_key(args[-1], 'union_nodeset', self.__node_tree[nodeclass_name], nodeclass_name, False) merge_nodeset = self.__node_tree[nodeclass_name][args[-2]] for i in range(len(args)-3, -1, -1): for entry in self.__node_tree[nodeclass_name][args[i]][1]: merge_nodeset[1][entry] = self.__node_tree[nodeclass_name][args[i]][1][entry] for entry in self.__node_tree[nodeclass_name][args[i]][0]: merge_nodeset[0][entry] = self.__node_tree[nodeclass_name][args[i]][0][entry] self.__node_tree[nodeclass_name][args[-1]] = merge_nodeset
[docs] def write_dynetml(self, out_file_path): """:param str|unicode out_file_path: Write the meta-network to this path.""" dmlpu.check_type(out_file_path, 'out_file_path', (str, unicode)) if os.path.exists(out_file_path) and os.path.isdir(out_file_path): raise IOError('out_file_path cannot be a directory') # bs = self.convert_to_dynetml(True) # # with codecs.open(out_file_path, 'w', 'utf8') as outfile: # outfile.write(bs.prettify()) xml_root = self.convert_to_dynetml() with codecs.open(out_file_path, 'w', 'utf8') as outfile: outfile.write('<?xml version="1.0" standalone="yes"?>\n\n') outfile.write(etree.tostring(xml_root, pretty_print=True))
[docs] def convert_to_dynetml(self): """Converts the graph to DyNetML and returns an :class:`lxml._Element`""" # bs = BeautifulSoup(features='xml') # bs.append(bs.new_tag('MetaNetwork')) # # for attr in self.attributes: # bs.MetaNetwork[attr] = dmlpu.unformat_prop(self.attributes[attr]) # # bs.MetaNetwork.append(dmlpu.get_property_identities_tag(self.propertyIdentities)) # # bs.MetaNetwork.append(bs.new_tag('properties')) # for key in self.properties: # prop_tag = bs.new_tag('property') # prop_tag['id'] = key # prop_tag['value'] = dmlpu.unformat_prop(self.properties[key]) # bs.MetaNetwork.properties.append(prop_tag) # # bs.MetaNetwork.append(bs.new_tag('nodes')) # for class_type in self.__node_tree: # for class_id in self.__node_tree[class_type]: # nodeclass_tag = bs.new_tag('nodeclass', type=class_type, id=class_id) # nodeclass_tag.append(dmlpu.get_property_identities_tag(self.__node_tree[class_type][class_id][0])) # # for key in self.__node_tree[class_type][class_id][1]: # node_tag = bs.new_tag('node', id=key) # for attr in self.__node_tree[class_type][class_id][1][key][0]: # node_tag[attr] = dmlpu.unformat_prop(self.__node_tree[class_type][class_id][1][key][0][attr]) # node_tag.append(dmlpu.get_properties_tag(self.__node_tree[class_type][class_id][1][key][1])) # nodeclass_tag.append(node_tag) # # bs.MetaNetwork.nodes.append(nodeclass_tag) # # networks_tag = self._get_networks_tag() # bs.MetaNetwork.networks.append(networks_tag) # # if not is_entire_file: # bs = bs.MetaNetwork # # return bs mn = etree.Element('MetaNetwork') for attr in self.attributes: mn.attrib[attr] = dmlpu.unformat_prop(self.attributes[attr]) etree.SubElement(mn, dmlpu.get_property_identities_tag(self.propertyIdentities)) properties_tag = etree.SubElement(mn, 'properties') for key in self.properties: prop_tag = etree.SubElement(properties_tag, 'property') prop_tag.attrib['id'] = key prop_tag['value'] = dmlpu.unformat_prop(self.properties[key]) nodes_tag = etree.SubElement(mn, 'nodes') for class_type in self.__node_tree: for class_id in self.__node_tree[class_type]: nodeclass_tag = etree.SubElement(nodes_tag, 'nodeclass', attrib={'type': class_type, 'id': class_id}) etree.SubElement(nodeclass_tag, dmlpu.get_property_identities_tag(self.__node_tree[class_type][class_id][0])) etree.SubElement(mn, self._get_networks_tag()) return mn
[docs] def pretty_print(self): """Pretty print the meta-network""" print ' == Meta-Network ==' print ' == Properties ==' for prop in self.properties: print u' {0}: {1}'.format(prop, self.properties[prop]).encode('utf8') print ' == Nodeclasses & Nodesets ==' for n_c in self.__node_tree: print u' Nodeclass {0}:'.format(n_c).encode('utf8') for n_s in self.__node_tree[n_c]: print u' Nodeset {0}:'.format(n_s).encode('utf8') print u' |-' for prop_ident in self.__node_tree[n_c][n_s][0]: print u' | {0}: {1}'.format(prop_ident, self.__node_tree[n_c][n_s][0][prop_ident]) print u' |-\n' for node in self.__node_tree[n_c][n_s][1]: print u' Node {0}'.format(node).encode('utf8') for attr in self.__node_tree[n_c][n_s][1][node][0]: print u' {0}: {1}'.format(attr, self.__node_tree[n_c][n_s][1][node][0][attr]).encode('utf8') for prop in self.__node_tree[n_c][n_s][1][node][1]: print u' {0}: {1}'.format(prop, self.__node_tree[n_c][n_s][1][node][1][prop]).encode('utf8') self._pretty_print_networks()
def _rename_network_nodes(self, nodeclass_name, nodeset_name, node_name, new_node_name): """ Rename a node in all the networks containing it. :param str|unicode nodeclass_name: name of the parent of nodeset_name :param str|unicode nodeset_name: name of the parent of node_name :param str|unicode node_name: current node name :param str|unicode new_node_name: new name for the node """ for nk in self.networks: if nk[0]['sourceType'] == nodeclass_name and nk[0]['source'] == nodeset_name or \ nk[0]['targetType'] == nodeclass_name and nk[0]['target'] == nodeset_name: nk[new_node_name] = nk[node_name] del nk[node_name] for src in nk: if node_name in nk[src]: nk[src][new_node_name] = nk[src][node_name] del nk[src][node_name] def _get_networks_tag(self): """Generates an :class:`lxml._Element` from the networks""" # bs = BeautifulSoup() # networks_tag = bs.new_tag('networks') # for key in self.networks: # network_tag = bs.new_tag('network') # network_tag['sourceType'] = self.networks[key][0]['sourceType'] # network_tag['source'] = self.networks[key][0]['source'] # network_tag['targetType'] = self.networks[key][0]['targetType'] # network_tag['target'] = self.networks[key][0]['target'] # network_tag['id'] = key # network_tag['isDirected'] = dmlpu.unformat_prop(self.networks[key][0]['isDirected']) # network_tag['allowSelfLoops'] = dmlpu.unformat_prop(self.networks[key][0]['allowSelfLoops']) # network_tag['isBinary'] = dmlpu.unformat_prop(self.networks[key][0]['isBinary']) # # if self.networks[key][0]['isBinary']: # for edge in self.networks[key].edges_iter(): # network_tag.append(bs.new_tag('link', source=edge[0], target=edge[1])) # else: # for edge in self.networks[key].edges_iter(data=True): # network_tag.append(bs.new_tag('link', source=edge[0], target=edge[1], value=edge[2]['weight'])) # # networks_tag.append(network_tag) # # return networks_tag networks_tag = etree.Element('networks') for key in self.networks: network_tag = etree.SubElement(networks_tag, 'network', attrib={ 'sourceType': self.networks[key][0]['sourceType'], 'source': self.networks[key][0]['source'], 'targetType': self.networks[key][0]['targetType'], 'target': self.networks[key][0]['target'], 'id': key, 'isDirected': dmlpu.unformat_prop(self.networks[key][0]['isDirected']), 'allowSelfLoops': dmlpu.unformat_prop(self.networks[key][0]['allowSelfLoops']), 'isBinary': dmlpu.unformat_prop(self.networks[key][0]['isBinary'])}) if self.networks[key][0]['isBinary']: for edge in self.networks[key].edges_iter(): etree.SubElement(network_tag, 'link', attrib={'source': edge[0], 'target': edge[1]}) else: for edge in self.networks[key].edges_iter(data=True): etree.SubElement(network_tag, 'link', attrib={'source': edge[0], 'target': edge[1], 'value': edge[2]['weight']}) return networks_tag def _parse_and_add_graph_tag(self, nk_tag): """:param lxml._Element nk_tag: The tag to be parsed and added to the MetaNetwork""" g = {}, defaultdict(dict) g[0]['sourceType'] = nk_tag.attrib['sourceType'] g[0]['source'] = nk_tag.attrib['source'] g[0]['targetType'] = nk_tag.attrib['targetType'] g[0]['target'] = nk_tag.attrib['target'] g[0]['id'] = nk_tag.attrib['id'] g[0]['isDirected'] = nk_tag.attrib['isDirected'] == 'true' g[0]['allowSelfLoops'] = nk_tag.attrib['allowSelfLoops'] == 'true' g[0]['isBinary'] = nk_tag.attrib['isBinary'] == 'true' #for attrib_key in nk_tag.attrib: # g[0][attrib_key] = format_prop(nk_tag.attrib[attrib_key]) if g[0]['isDirected']: for link in nk_tag.iterfind('link'): weight = float(link.attrib['value']) if 'value' in link.attrib else 1.0 g[1][link.attrib['source']][link.attrib['target']] = weight else: for link in nk_tag.iterfind('link'): weight = float(link.attrib['value']) if 'value' in link.attrib else 1.0 g[1][link.attrib['source']][link.attrib['target']] = weight g[1][link.attrib['target']][link.attrib['source']] = weight self.networks[nk_tag.attrib['id']] = g def _pretty_print_networks(self): """Pretty-print the networks""" print ' == Networks ==' network_count = 0 for nk_key in self.networks: nk = self.networks[nk_key] print u' Network {0}: {1}'.format(network_count, nk_key).encode('utf8') for prop in nk[0]: print u' {0}: {1}'.format(prop, nk[0][prop]).encode('utf8') nodes = set() edges = list() for src in nk[1]: nodes.add(src) for target in nk[1][src]: nodes.add(target) edges.append([src, target]) if nk[0]['isDirected']: for edge in edges: edge.sort() print ' {0} nodes'.format(len(nodes)) print ' {0} edges'.format(len(set(edges))) network_count += 1