# -*- coding: utf-8 -*-
from collections import defaultdict
import fnmatch
from itertools import izip_longest
import json
import os
import pprint
import re
import enum
import yaml
from . import colors
import sys
import logging
log = logging.getLogger(__name__)
# we're normally not interested in imports of std python packages.
PYLIB_PATH = {
# in virtualenvs that see the system libs, these will be different.
os.path.split(os.path.split(pprint.__file__)[0])[0].lower(),
os.path.split(os.__file__)[0].lower()
}
[docs]class imp(enum.Enum):
C_BUILTIN = 6
C_EXTENSION = 3
IMP_HOOK = 9
PKG_DIRECTORY = 5
PY_CODERESOURCE = 8
PY_COMPILED = 2
PY_FROZEN = 7
PY_RESOURCE = 4
PY_SOURCE = 1
UNKNOWN = 0
[docs]class Source(object):
def __init__(self, name, kind=imp.UNKNOWN, path=None, imports=(), exclude=False, args=None):
self.args = args or {}
if name == "__main__" and path:
self.name = path.replace('\\', '/').replace('/', '.')
if self.args.get('verbose', 0) >= 2: # pragma: nocover
print "changing __main__ =>", self.name
else:
self.name = name
self.kind = kind
self.path = path # needed here..?
self.imports = set(imports) # modules we import
self.imported_by = set() # modules that import us
self.bacon = sys.maxint # bacon distance
self.excluded = exclude
@property
def name_parts(self):
return self.name.split('.')
@property
def path_parts(self):
p = self.path or ""
return p.replace('\\', '/').lower().split('/')
@property
def in_degree(self):
"""Number of incoming arrows.
"""
return len(self.imports)
@property
def out_degree(self):
"""Number of outgoing arrows.
"""
return len(self.imported_by)
@property
def degree(self):
return self.in_degree + self.out_degree
[docs] def is_noise(self):
"""Is this module just noise? (too common either at top or bottom of
the graph).
"""
noise = self.args['noise_level']
if not (self.in_degree and self.out_degree):
return self.degree > noise
return False
def __json__(self):
res = dict(
name=self.name,
path=self.path,
kind=str(self.kind),
bacon=self.bacon,
)
if self.excluded:
res['excluded'] = 'EXCLUDED'
if self.imports:
res['imports'] = list(sorted(self.imports))
if self.imported_by:
res['imported_by'] = list(sorted(self.imported_by))
return res
def __str__(self):
return "%s (%s, %s)" % (self.name, self.path, self.kind)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
def __repr__(self):
return json.dumps(self.__json__(), indent=4)
def __iadd__(self, other):
if self.name == other.name and self.imports == other.imports and self.bacon == other.bacon:
return self
log.debug("iadd lhs: %r", self)
log.debug("iadd rhs: %r", other)
assert self.name == other.name
self.path = self.path or other.path
self.kind = self.kind or other.kind
self.imports |= other.imports
self.imported_by |= other.imported_by
self.bacon = min(self.bacon, other.bacon)
self.excluded = self.excluded or other.excluded
log.debug("iadd result: %r", self)
return self
# def imported_modules(self, depgraph):
# for name in self.imports:
# yield depgraph[name]
@property
def label(self):
"""Convert a module name to a formatted node label. This is a default
policy - please override.
"""
if len(self.name) > 14 and '.' in self.name:
return '\\.\\n'.join(self.name.split('.'))
return self.name
@property
def basename(self):
if self.kind == imp.PKG_DIRECTORY or self.path.endswith('__init__.py'):
return self.name
else:
i = self.name.rfind('.')
if i < 0:
return ''
else:
return self.name[:i]
[docs]class DepGraph(object):
skip_modules = """
os sys qt time __future__ types re string bdb pdb __main__
south
""".split()
[docs] def levelcounts(self):
pass
[docs] def get_colors(self, src, colorspace=None):
if colorspace is None:
if src.basename not in self.colors:
h = self.curhue
# self.curhue += 7 # relative prime with 360
self.curhue += 37 # relative prime with 360
self.curhue %= 360
# print "NAME:", src.name, "BASENAME:", src.basename
bg = colors.name2rgb(h)
black = (0, 0, 0)
white = (255, 255, 255)
fg = colors.foreground(bg, black, white)
self.colors[src.basename] = bg, fg
return self.colors[src.basename]
else:
return colorspace.color(src)
def _is_pylib(self, path):
log.info('path %r in PYLIB_PATH %r => %s', path, PYLIB_PATH, path in PYLIB_PATH)
return path in PYLIB_PATH
[docs] def proximity_metric(self, a, b):
"""Return the weight of the dependency from a to b. Higher weights
usually have shorter straighter edges. Return 1 if it has normal
weight. A value of 4 is usually good for ensuring that a related
pair of modules are drawn next to each other.
Returns an int between 1 (unknown, default), and 4 (very related).
"""
# if self._is_pylib(a) and self._is_pylib(b):
# return 1
res = 1
for ap, bp, n in zip(a.path_parts, b.path_parts, range(4)):
res += ap == bp
if n >= 3:
break
return res
[docs] def dissimilarity_metric(self, a, b):
"""Return non-zero if references to this module are strange, and
should be drawn extra-long. The value defines the length, in
rank. This is also good for putting some vertical space between
seperate subsystems.
Returns an int between 1 (default) and 4 (highly unrelated).
"""
# if self._is_pylib(a) and self._is_pylib(b):
# return 1
res = 4
for an, bn, n in izip_longest(a.name_parts, b.name_parts, range(4)):
res -= an == bn
if n >= 3:
break
return res
def _exclude(self, name):
# excl = any(skip.match(name) for skip in self.skiplist)
# if 'metar' in name:
# print "Exclude?", name, excl
# print [s.pattern for s in self.skiplist]
return any(skip.match(name) for skip in self.skiplist)
def __init__(self, depgraf, types, **args):
self.curhue = 150 # start with a green-ish color
self.colors = {}
self.cycles = []
self.cyclenodes = set()
self.cyclerelations = set()
self.args = args
self.sources = {} # module_name -> Source
self.skiplist = [re.compile(fnmatch.translate(arg)) for arg in args['exclude']]
# print "SKPLIST:", self.skiplist[0].pattern
depgraf = {name: imports for (name, imports) in depgraf.items() if not name.endswith('.py')}
for name, imports in depgraf.items():
log.debug("depgraph name=%r imports=%r", name, imports)
if name.endswith('.py'):
name = name[:-3]
src = Source(
name=name,
# kind=imp(types.get(name, 0)),
imports=imports.keys(), # XXX: throwing away .values(), which is abspath!
args=args,
exclude=self._exclude(name),
)
self.add_source(src)
for iname, path in imports.items():
if iname.endswith('.py'):
iname = iname[:-3]
src = Source(
name=iname,
# kind=imp(types.get(name, 0)),
path=path,
args=args,
exclude=self._exclude(iname)
)
self.add_source(src)
self.module_count = len(self.sources)
self.verbose(1, "there are", self.module_count, "total modules")
self.connect_generations()
if self.args['show_cycles']:
self.find_import_cycles()
self.calculate_bacon()
if self.args['show_raw_deps']:
print self
self.exclude_noise()
self.exclude_bacon(self.args['max_bacon'])
excluded = [v for v in self.sources.values() if v.excluded]
# print "EXCLUDED:", excluded
self.skip_count = len(excluded)
self.verbose(1, "skipping", self.skip_count, "modules")
for module in excluded:
# print 'exclude:', module.name
self.verbose(2, " ", module.name)
self.remove_excluded()
if not self.args['show_deps']:
self.verbose(3, self)
[docs] def verbose(self, n, *args):
if self.args['verbose'] >= n:
print ' '.join(str(a) for a in args)
[docs] def add_source(self, src):
if src.name in self.sources:
log.debug("ADD-SOURCE[+=]\n%r", src)
self.sources[src.name] += src
else:
log.debug("ADD-SOURCE[=]\n%r", src)
self.sources[src.name] = src
def __getitem__(self, item):
return self.sources[item]
def __iter__(self):
visited = set(self.skip_modules) | set(self.args['exclude'])
def visit(src):
if src.name in visited:
return
visited.add(src.name)
for name in src.imports:
impmod = self.sources[name]
if impmod.path and not impmod.path.endswith('__init__.py'):
yield impmod, src
visit(impmod)
for _src in self.sources.values():
for source in visit(_src):
self.verbose(4, "Yielding", source[0], source[1])
yield source
def __repr__(self):
return json.dumps(self.sources, indent=4, sort_keys=True,
default=lambda obj: obj.__json__() if hasattr(obj, '__json__') else obj)
[docs] def find_import_cycles(self):
def traverse(node, path):
if node.name in self.cyclenodes:
return
if node.name in path:
# found cycle
cycle = path[path.index(node.name):] + [node.name]
self.cycles.append(cycle)
for nodename in cycle:
self.cyclenodes.add(nodename)
for i in range(len(cycle)-1):
self.cyclerelations.add(
(cycle[i], cycle[i+1])
)
return
for impmod in node.imports:
traverse(self.sources[impmod], path + [node.name])
for src in self.sources.values():
traverse(src, [])
[docs] def connect_generations(self):
"""Traverse depth-first adding imported_by.
"""
for src in self.sources.values():
for _child in src.imports:
if _child in self.sources:
child = self.sources[_child]
child.imported_by.add(src.name)
[docs] def calculate_bacon(self):
count = defaultdict(int)
def bacon(src, n):
count[src.name] += 1
# print 'bacon:', src, src.bacon, n,
if src.bacon <= n:
# print 'returning'
return
src.bacon = min(src.bacon, n)
# print 'new bacon', src.bacon
for imp in src.imports:
bacon(self.sources[imp], n + 1)
# print "SOURCES:", self.sources
bacon(self.sources['__main__'], 0)
# ritems = [(v, k) for k, v in count.items()]
# for i, (v, k) in enumerate(sorted(ritems, reverse=True)):
# print k.rjust(25), v
[docs] def exclude_noise(self):
for src in self.sources.values():
if src.excluded:
continue
if src.is_noise():
self.verbose(2, "excluding", src, "because it is noisy:", src.degree)
src.excluded = True
# print "Exluding noise:", src.name
self._add_skip(src.name)
[docs] def exclude_bacon(self, limit):
"""Exclude models that are more than `limit` hops away from __main__.
"""
for src in self.sources.values():
if src.bacon > limit:
src.excluded = True
# print "Excluding bacon:", src.name
self._add_skip(src.name)
[docs] def remove_excluded(self):
"""Remove all sources marked as excluded.
"""
# print yaml.dump({k:v.__json__() for k,v in self.sources.items()}, default_flow_style=False)
sources = self.sources.values()
for src in sources:
if src.excluded:
del self.sources[src.name]
src.imports = [m for m in src.imports if not self._exclude(m)]
src.imported_by = [m for m in src.imported_by if not self._exclude(m)]
def _add_skip(self, name):
# print 'add skip:', name
self.skiplist.append(re.compile(fnmatch.translate(name)))