Source code for GeoBases.GeoBaseModule

#!/usr/bin/python
# -*- coding: utf-8 -*-

"""
This module defines a class *GeoBase* to manipulate geographical
data (or not). It loads static files containing data, then provides
tools to play with it.

It relies on four other modules:

- *GeoUtils*: to compute haversine distances between points
- *LevenshteinUtils*: to calculate distances between strings. Indeed, we need
  a good tool to do it, in order to recognize things like station names
  in schedule files where we do not have the station id
- *GeoGridModule*: to handle geographical indexation
- *SourcesManagerModule*: to handle data sources

Examples for airports::

    >>> geo_a = GeoBase(data='airports', verbose=False)
    >>> sorted(geo_a.findNearKey('ORY', 50)) # Orly, airports <= 50km
    [(0.0, 'ORY'), (18.8..., 'TNF'), (27.8..., 'LBG'), (34.8..., 'CDG')]
    >>> geo_a.get('CDG', 'city_code')
    'PAR'
    >>> geo_a.distance('CDG', 'NCE')
    694.5162...


Examples for stations::

    >>> geo_t = GeoBase(data='stations', verbose=False)
    >>>
    >>> # Nice, stations <= 5km
    >>> point = (43.70, 7.26)
    >>> [geo_t.get(k, 'name') for d, k in sorted(geo_t.findNearPoint(point, 3))]
    ['Nice-Ville', 'Nice-Riquier', 'Nice-St-Roch']
    >>>
    >>> geo_t.get('frpaz', 'name')
    'Paris-Austerlitz'
    >>> geo_t.distance('frnic', 'frpaz')
    683.526...

From any point of reference, we have a few duplicates
even with ``('iata_code', 'location_type')`` key:

    >>> geo = GeoBase(data='ori_por', key_fields=['iata_code', 'location_type'])
    In skipped zone, dropping line 1: "iata_code...".
    /!\ [lno ...] CRK+A is duplicated #1, first found lno ...
    /!\ [lno ...] RDU+A is duplicated #1, first found lno ...
    Import successful from ...
    Available fields for things: ...
"""

from __future__ import with_statement

import os.path as op
import heapq
from itertools import izip_longest, count, product
import csv
import json
from shutil import copy

from .SourcesManagerModule import SourcesManager
from .GeoUtils import haversine


# Stubs for LevenshteinUtils
#
handle_t = lambda s : s.replace('-', ' ').replace('\t', ' ')
clean    = lambda s : handle_t(s).strip().lower().split()

from difflib import SequenceMatcher

def mod_leven(a, b):
    '''Stub without CPython.

    >>> mod_leven('antibes', 'antibS')
    0.92...
    '''
    a, b = clean(a), clean(b)

    if not a or not b:
        return 0.

    return SequenceMatcher(a='+'.join(a), b='+'.join(b)).ratio()


# Stubs for fuzzy
#
def soundex(name, length=4):
    """
    Soundex module conforming to Knuth's algorithm
    implementation 2000-12-24 by Gregory Jorgensen
    public domain
    """
    # digits holds the soundex values for the alphabet
    digits = '01230120022455012623010202'
    sndx = ''
    fc = ''

    # translate alpha chars in name to soundex digits
    for c in name.upper():
        if c.isalpha():
            if not fc:
                fc = c # remember first letter
            d = digits[ord(c) - ord('A')]
            # duplicate consecutive soundex digits are skipped
            if not sndx or (d != sndx[-1]):
                sndx += d

    # replace first digit with first alpha character
    sndx = fc + sndx[1:]

    # remove all 0s from the soundex code
    sndx = sndx.replace('0', '')

    # return soundex code padded to len characters
    return (sndx + (length * '0'))[:length]

# We stub mysiis and dmetaphone to the soundex algorithm
nysiis = soundex
dmeta  = lambda s: [soundex(s), None]

try:
    # This wrapper will raise an ImportError
    # if libopentrep cannot be found
    # or if OpenTrepWrapper was not installed
    from OpenTrepWrapper import main_trep

except ImportError as err:
    # Could not import
    HAS_TREP_SUPPORT = False
else:
    # No problem here
    HAS_TREP_SUPPORT = True


# Relative paths handling
DIRNAME = op.dirname(__file__)

def relative(rel_path, root=DIRNAME):
    """Handle relative paths.
    """
    return op.join(op.realpath(root), rel_path)

# The sources manager
S_MANAGER = SourcesManager()

# Special fields for latitude and longitude recognition
LAT_FIELD  = 'lat'
LNG_FIELD  = 'lng'
GEO_FIELDS = (LAT_FIELD, LNG_FIELD)

# Default grid size
GRID_RADIUS = 50 # kms

# Default min match for fuzzy searches
MIN_MATCH  = 0.75
RADIUS     = 50
NB_CLOSEST = 1

# Loading indicator
NB_LINES_STEP = 100000

# Defaults
DEFAULTS = {
    'source'        : None,  # not for configuration file, use path
    'paths'         : None,
    'headers'       : [],
    'key_fields'    : None,
    'indices'       : [],
    'delimiter'     : '^',
    'subdelimiters' : {},
    'join'          : [],
    'quotechar'     : '"',
    'limit'         : None,
    'skip'          : None,
    'discard_dups'  : False,
    'verbose'       : True,
}


# We only export the main class
__all__ = ['GeoBase', 'DEFAULTS']


[docs]class GeoBase(object): """ This is the main and only class. After __init__, a file is loaded in memory, and the user may use the instance to get information. """
[docs] def __init__(self, data, **kwargs): """Initialization The ``kwargs`` parameters given when creating the object may be: - source : ``None`` by default, file-like to the source - paths : ``None`` by default, path or list of paths to \ the source. This will only be used if source is ``None``. - headers : ``[]`` by default, list of fields in the data - key_fields : ``None`` by default, list of fields defining the \ key for a line, ``None`` means line numbers will be used \ to generate keys - indices : ``[]`` by default, an iterable of additional \ indexed fields - delimiter : ``'^'`` by default, delimiter for each field, - subdelimiters : ``{}`` by default, a ``{ 'field' : 'delimiter' }`` \ dict to define subdelimiters - join : ``[]`` by default, list of dict defining join \ clauses. A join clause is a dict \ ``{ 'fields' : fields, 'with' : [base, fields]}``, for example \ ``{ 'fields' : 'country_code', 'with' : ['countries', 'code']}`` - quotechar : ``'"'`` by default, this is the string defined for \ quoting - limit : ``None`` by default, put an int if you want to \ load only the first lines - skip : ``None`` by default, put an int if you want to \ skip the first lines during loading - discard_dups : ``False`` by default, boolean to discard key \ duplicates or handle them - verbose : ``True`` by default, toggle verbosity :param data: the type of data, ``'airports'``, ``'stations'``, \ and many more available. ``'feed'`` will create an empty \ instance. :param kwargs: additional parameters :raises: ``ValueError``, if data parameters is not recognized :returns: ``None`` >>> geo_a = GeoBase(data='airports') Import successful from ... Available fields for things: ... >>> geo_t = GeoBase(data='stations') Import successful from ... Available fields for things: ... >>> geo_f = GeoBase(data='feed') No source specified, skipping loading... Available fields for things: ... >>> geo_c = GeoBase(data='odd') Traceback (most recent call last): ValueError: Wrong data type "odd". Not in ['airlines', ...] Import some custom data. >>> p = 'DataSources/Airports/GeoNames/airports_geonames_only_clean.csv' >>> fl = open(relative(p)) >>> GeoBase(data='feed', ... source=fl, ... headers=['iata_code', 'name', 'city'], ... key_fields='iata_code', ... delimiter='^', ... verbose=False).get('ORY', 'name') 'Paris-Orly' >>> fl.close() >>> GeoBase(data='airports', ... headers=['iata_code', 'cname', 'city'], ... verbose=False).get('ORY', 'cname') 'Paris-Orly' """ # Main structure in which everything will be loaded # Dictionary of dictionary self._things = {} self._indexed = {} self._ggrid = None # Other bases for join clauses self._ext_bases = {} # A cache for the fuzzy searches self._fuzzy_cache = {} # An other cache if the algorithms are failing on a single # example, we first look in this cache self._fuzzy_bias_cache = {} # This will be similar as _headers, but can be modified after loading # _headers is just for data loading self.fields = ['__key__', '__dup__', '__par__', '__lno__', '__gar__'] self.data = data self.loaded = None # loaded stuff information, depends on sources and paths # Defaults props = {} for k, v in DEFAULTS.iteritems(): props[k] = v # paths read from the configuration file are by default # relative to the sources dir, if paths are read # as a keyword argument, the default is there are absolute paths if 'paths' in kwargs: default_is_relative = False else: default_is_relative = True allowed_conf = set(props.keys()) - set(['source']) allowed_args = set(props.keys()) if data not in S_MANAGER: raise ValueError('Wrong data type "%s". Not in %s' % \ (data, sorted(S_MANAGER))) # The configuration may be empty conf = S_MANAGER.get(data) if conf is None: conf = {} # File configuration overrides defaults for option in conf: if option in allowed_conf: props[option] = conf[option] else: raise ValueError('Option "%s" for data "%s" not understood in file.' % \ (option, data)) # User input overrides default configuration or file configuration for option in kwargs: if option in allowed_args: props[option] = kwargs[option] else: raise ValueError('Option "%s" not understood in arguments.' % option) # If None, put the default instead for k, v in props.iteritems(): if v is None: props[k] = DEFAULTS[k] # Final parameters affectation self._source = props['source'] self._headers = props['headers'] self._key_fields = props['key_fields'] self._indices = props['indices'] self._delimiter = props['delimiter'] self._subdelimiters = props['subdelimiters'] self._join = props['join'] self._quotechar = props['quotechar'] self._limit = props['limit'] self._skip = props['skip'] self._discard_dups = props['discard_dups'] self._verbose = props['verbose'] self._paths = props['paths'] # Tweaks on types, fail on wrong values self._checkProperties(default_is_relative) # Loading data if self._source is not None: # As a keyword argument, source should be a file-like self._load(self._source, self._verbose) self.loaded = self._source elif self._paths: # Here we read the source from the configuration file for path in self._paths: file_ = S_MANAGER.handle_path(path, data, self._verbose) if file_ is None: continue try: with open(file_) as source_fl: self._load(source_fl, self._verbose) except IOError: if self._verbose: print '/!\ Failed to open "%s", failing over...' % file_ else: self.loaded = file_ break else: # Here the loop did not break, meaning nothing was loaded # We will go here even if self._paths was [] raise IOError('Nothing was loaded from:%s' % \ ''.join('\n(*) %s' % p['file'] for p in self._paths)) if self._verbose: if isinstance(self.loaded, str): print "Import successful from %s" % self.loaded elif self.loaded is not None: print "Import successful from *file-like*" else: print 'No source specified, skipping loading...' print "Available fields for things: %s" % self.fields # Indices for fields in self._indices: self.addIndex(fields, verbose=self._verbose) # Join handling for fields, join_data in self._join.iteritems(): self._loadExtBase(fields, join_data)
def _checkProperties(self, default_is_relative): """Some check on parameters. """ # Tuplification self._headers = tuplify(self._headers) if self._key_fields is not None: self._key_fields = tuplify(self._key_fields) for i, v in enumerate(self._indices): self._indices[i] = tuplify(v) self._indices = tuplify(self._indices) # We remove the None values to avoid creating useless @raw fields for h in self._subdelimiters.keys(): if self._subdelimiters[h] is None: del self._subdelimiters[h] else: self._subdelimiters[h] = tuplify(self._subdelimiters[h]) # Paths conversion to dict self._paths = S_MANAGER.convert_paths_format(self._paths, default_is_relative) # Some headers are not accepted for h in self._headers: if str(h).endswith('@raw') or str(h).startswith('__'): raise ValueError('Header "%s" cannot contain "@raw" or "__".' % h) # We remove None, convert to dict, tuplify keys *and* values new_join = {} for i, v in enumerate(self._join): if v is not None: new_join[tuplify(v['fields'])] = tuplify(v['with']) self._join = new_join def _loadExtBase(self, fields, join_data): """External bases for join fields handling. """ if len(join_data) == 0: raise ValueError('Empty join_data for fields "%s" (was "%s").' % \ (fields, join_data)) elif len(join_data) == 1: # Here if the user did not specify the field # of the join on the external base, we assume # it has the same name # join_data <=> join_base [, join_fields] join_base, join_fields = join_data[0], fields else: join_base, join_fields = join_data[0], tuplify(join_data[1]) # Creation of external bases self._join[fields] = join_base, join_fields # When joining on multiple fields, you have to provide # the same number of fields for current base to external if len(fields) != len(join_fields): raise ValueError('"%s" should be the same length has "%s" as join fields.' % \ (fields, join_fields)) if join_base not in S_MANAGER: raise ValueError('Wrong join data type "%s". Not in %s' % \ (join_base, sorted(S_MANAGER))) if join_base in self._ext_bases: if self._verbose: print '(Join) skipped [already done] load for external base "%s" [with %s] for join on %s' % \ (join_base, join_fields, fields) else: # To avoid recursion, we force the join to be empty if join_base == self.data: self._ext_bases[join_base] = self if self._verbose: print '(Join) auto-referenced base "%s" [with %s] for join on %s' % \ (join_base, join_fields, fields) else: self._ext_bases[join_base] = GeoBase(join_base, join=[], verbose=False) if self._verbose: print '(Join) loaded external base "%s" [with %s] for join on %s' % \ (join_base, join_fields, fields) ext_b = self._ext_bases[join_base] for f in join_fields: if f not in ext_b.fields: raise ValueError('Wrong join field "%s". Not in %s' % \ (f, ext_b.fields)) # We index the field to optimize further findWith ext_b.addIndex(join_fields, verbose=self._verbose)
[docs] def hasIndex(self, fields=None): """Tells if an iterable of fields is indexed. Default value is ``None`` for fields, this will test the presence of any index. :param fields: the iterable of fields :returns: a boolean >>> geo_o.hasIndex('iata_code') True >>> geo_o.hasIndex(('iata_code', 'asciiname')) False >>> geo_o.hasIndex() True """ if fields is None: return not not self._indexed return tuplify(fields) in self._indexed
[docs] def addIndex(self, fields, force=False, verbose=True): """Add an index on an iterable of fields. :param fields: the iterable of fields :param force: ``False`` by default, force index update \ if it already exists :param verbose: toggle verbosity >>> geo_o.addIndex('iata_code', force=True, verbose=True) /!\ Index on ('iata_code',) already built, overriding... Built index for fields ('iata_code',) Index on multiple fields. >>> geo_o.addIndex(('icao_code', 'location_type'), verbose=True) Built index for fields ('icao_code', 'location_type') Do not force. >>> geo_o.addIndex('iata_code', force=False, verbose=True) /!\ Index on ('iata_code',) already built, exiting... """ if not fields: if verbose: print '/!\ Fields %s were empty, index not added' % str(fields) return fields = tuplify(fields) if self.hasIndex(fields): if not force: if verbose: print '/!\ Index on %s already built, exiting...' % str(fields) return elif verbose: print '/!\ Index on %s already built, overriding...' % str(fields) self._indexed[fields] = self._buildIndex(fields, verbose)
[docs] def dropIndex(self, fields=None, verbose=True): """Drop an index on an iterable of fields. If fields is not given all indexes are dropped. :param fields: the iterable of fields, if ``None``, all indexes will be dropped >>> geo_o.hasIndex(('icao_code', 'location_type')) True >>> geo_o.dropIndex(('icao_code', 'location_type')) >>> geo_o.hasIndex(('icao_code', 'location_type')) False """ if fields is None: for fs in self._indexed: del self._indexed[tuplify(fs)] else: if self.hasIndex(fields): del self._indexed[tuplify(fields)] else: if verbose: print 'No index to drop on "%s".' % str(fields)
[docs] def updateIndex(self, fields=None, verbose=True): """Update index on fields. If fields is not given all indexes are updated. :param fields: the iterable of fields, if ``None``, all indexes will be updated :param verbose: toggle verbosity Here is an example, we drop the index then make a query. >>> geo_o.dropIndex('iata_code') >>> list(geo_o.findWith([('iata_code', 'NCE')])) # not indexed [(1, 'NCE'), (1, 'NCE@1')] Now we index and make the same query. >>> geo_o.addIndex('iata_code') Built index for fields ('iata_code',) >>> list(geo_o.findWith([('iata_code', 'NCE')])) # indexed [(1, 'NCE'), (1, 'NCE@1')] Now we add a new key to the data. >>> geo_o.setFromDict('NEW_KEY_2', { ... 'iata_code' : 'NCE', ... }) If we run the query again, the result is wrong when using the index, because it is not up-to-date. >>> list(geo_o.findWith([('iata_code', 'NCE')])) # indexed [(1, 'NCE'), (1, 'NCE@1')] >>> list(geo_o.findWith([('iata_code', 'NCE')], index=False)) [(1, 'NCE'), (1, 'NEW_KEY_2'), (1, 'NCE@1')] Now we update the index, then the query works. >>> geo_o.updateIndex('iata_code') Built index for fields ('iata_code',) >>> list(geo_o.findWith([('iata_code', 'NCE')])) # indexed, up to date [(1, 'NCE'), (1, 'NEW_KEY_2'), (1, 'NCE@1')] >>> geo_o.delete('NEW_KEY_2') # avoid messing other tests Note that ``updateIndex`` will not create indexes if it does not exist. >>> geo_f.updateIndex('iata_code') No index to update on "iata_code". """ if fields is None: for fs in self._indexed: self.dropIndex(fs, verbose=verbose) self.addIndex(fs, verbose=verbose) else: if self.hasIndex(fields): self.dropIndex(fields, verbose=verbose) self.addIndex(fields, verbose=verbose) else: if verbose: print 'No index to update on "%s".' % str(fields)
def _buildIndex(self, fields, verbose=True): """Build index given an iterable of fields :param fields: the iterable of fields :param verbose: toggle verbosity :returns: the dictionary of { values : list of matching keys } >>> geo_o._buildIndex('iata_code', verbose=False)['MRS'] ['MRS', 'MRS@1'] >>> geo_o._buildIndex(('iata_code',), verbose=False)[('MRS',)] ['MRS', 'MRS@1'] >>> geo_o._buildIndex(['iata_code', 'country_code'])[('MRS', 'FR')] Built index for fields ['iata_code', 'country_code'] ['MRS', 'MRS@1'] """ if isinstance(fields, str): compute_val = lambda k: self.get(k, fields) elif isinstance(fields, (list, tuple, set)): compute_val = lambda k: tuple(self.get(k, f) for f in fields) else: raise ValueError('Wrong fields "%s" for index' % str(fields)) # Mapping for every possible value to matching keys index = {} for key in self: try: val = compute_val(key) except KeyError: # Here we have some fields that failed # This can happen if incomplete key information # has been supplied after loading if verbose: print '/!\ Could not compute values for key "%s" and fields %s' % \ (key, str(fields)) continue if val not in index: index[val] = [] index[val].append(key) if verbose: print 'Built index for fields %s' % str(fields) return index @staticmethod def _buildKeyer(key_fields, headers, verbose=True): """Define the function that build a line key. """ # If key_fields is None we index with the line number if key_fields is None: if verbose: print '/!\ key_fields was None, keys will be created from line numbers.' return lambda row, lno: str(lno) # It is possible to have a key_fields which is a list # In this case we build the key as the concatenation between # the different fields try: pos = tuple(headers.index(k) for k in key_fields) except ValueError: raise ValueError("Inconsistent: headers = %s with key_fields = %s" % \ (headers, key_fields)) else: keyer = lambda row, lno: '+'.join(row[p] for p in pos) return keyer @staticmethod def _emptyData(key, lno): """Generate empty data for a key. """ return { '__key__' : key, # special field for key '__dup__' : [], # special field for duplicates '__par__' : [], # special field for parent '__lno__' : lno, # special field for line number '__gar__' : [], # special field for garbage } def _buildRowData(self, row, headers, subdelimiters, key, lno): """Building all data associated to this row. """ # Erase everything, except duplicates counter data = self._emptyData(key, lno=lno) # headers represents the meaning of each column. # Using izip_longest here will replace missing fields # with empty strings '' for h, v in izip_longest(headers, row, fillvalue=None): # if h is None, it means either: # 1) the conf file explicitely specified not to load the column # 2) there was more data than the headers said # Either way, we store it in the __gar__ special field if h is None: data['__gar__'].append(v) else: if h not in subdelimiters: data[h] = v else: data['%s@raw' % h] = v data[h] = recursive_split(v, subdelimiters[h]) return data @staticmethod def _buildReader(verbose, **csv_opt): """Manually configure the reader, to bypass the limitations of csv.reader. """ #quotechar = csv_opt['quotechar'] delimiter = csv_opt['delimiter'] if len(delimiter) == 1: return lambda source_fl : csv.reader(source_fl, **csv_opt) if len(delimiter) == 0: if verbose: print '/!\ Delimiter was empty.' print '/!\ Fallback on splitting-every-char, but quoting is disabled.' def _reader(source_fl): """Custom reader splitting every char. """ for row in source_fl: yield list(row.rstrip('\r\n')) return _reader if verbose: print '/!\ Delimiter "%s" was not 1-character.' % delimiter print '/!\ Fallback on custom reader, but quoting is disabled.' def _m_reader(source_fl): """Custom reader supporting multiple characters split. """ for row in source_fl: yield row.rstrip('\r\n').split(delimiter) return _m_reader def _buildDuplicatedKey(self, key, nb_dups): """ When the key is already in base and we do not want to discard the row, we have to compute a new key for this row. We iterate until we find an available key """ for n in count(nb_dups): dup_key = '%s@%s' % (key, n) if dup_key not in self: return dup_key @staticmethod def _buildLnoEvents(skip, limit, verbose): """ Build lambda functions handling events related to the line number count. """ # Limit handling if skip is None: in_skipped_zone = lambda n : False else: in_skipped_zone = lambda n : n <= skip if limit is None: is_over_limit = lambda n : False else: is_over_limit = lambda n : n > limit # Verbose counter if verbose: show_load_info = lambda n : n % NB_LINES_STEP == 0 else: show_load_info = lambda n : False return in_skipped_zone, is_over_limit, show_load_info def _load(self, source_fl, verbose=True): """Load the file and feed the main structure. :param source_fl: file-like input :param verbose: toggle verbosity during data loading """ # We cache all variables used in the main loop headers = self._headers key_fields = self._key_fields delimiter = self._delimiter subdelimiters = self._subdelimiters quotechar = self._quotechar limit = self._limit skip = self._skip discard_dups = self._discard_dups keyer = self._buildKeyer(key_fields, headers, verbose) # Line number events in_skipped_zone, is_over_limit, show_load_info = self._buildLnoEvents(skip, limit, verbose) # csv reader options csv_opt = { 'delimiter' : delimiter, 'quotechar' : quotechar } _reader = self._buildReader(verbose, **csv_opt) for lno, row in enumerate(_reader(source_fl), start=1): if show_load_info(lno): print '%-10s lines loaded so far' % lno # Skip comments and empty lines # Comments must *start* with #, otherwise they will not be stripped if not row or row[0].startswith('#'): continue if in_skipped_zone(lno): if verbose: print 'In skipped zone, dropping line %s: "%s...".' % \ (lno, row[0]) continue if is_over_limit(lno): if verbose: print 'Over limit %s for loaded lines, stopping.' % limit break try: key = keyer(row, lno) except IndexError: if verbose: print '/!\ Could not compute key with headers %s, key_fields %s for line %s: %s' % \ (headers, key_fields, lno, row) continue data = self._buildRowData(row, headers, subdelimiters, key, lno) # No duplicates ever, we will erase all data after if it is if key not in self: self._createFromDict(key, data) else: if discard_dups is False: # We compute a new key for the duplicate nb_dups = 1 + len(self.get(key, '__dup__')) dup_key = self._buildDuplicatedKey(key, nb_dups) # We update the data with this info data['__key__'] = dup_key data['__dup__'] = self.get(key, '__dup__') data['__par__'] = [key] # We add the dup_key as a new duplicate, # store the duplicate in the main structure self.get(key, '__dup__').append(dup_key) self._createFromDict(dup_key, data) if verbose: print "/!\ [lno %s] %s is duplicated #%s, first found lno %s: creation of %s..." % \ (lno, key, nb_dups, self.get(key, '__lno__'), dup_key) else: if verbose: print "/!\ [lno %s] %s is duplicated, first found lno %s: dropping line..." % \ (lno, key, self.get(key, '__lno__')) # We remove None headers, which are not-loaded-columns self.fields = ['__key__', '__dup__', '__par__', '__lno__'] for h in headers: if h in subdelimiters: self.fields.append('%s@raw' % h) if h is not None: self.fields.append(h) self.fields.append('__gar__')
[docs] def hasGeoSupport(self, key=None): """Check if data type has geocoding support. If a key parameter is given, check the geocode support of this specific key. :param key: if key parameter is not ``None``, we check the geocode support for this specific key, not for the general data with ``fields`` attribute :returns: boolean for geocoding support >>> geo_t.hasGeoSupport() True >>> geo_f.hasGeoSupport() False For a specific key. >>> geo_o.hasGeoSupport('ORY') True >>> geo_o.set('EMPTY') >>> geo_o.hasGeoSupport('EMPTY') False >>> geo_o.delete('EMPTY') # avoid messing other tests """ if key is None: fields = set(self.fields) else: fields = set(self.get(key).keys()) for required in GEO_FIELDS: if required not in fields: return False return True
[docs] def hasGrid(self): """Tells if an iterable of fields is indexed. :param fields: the iterable of fields :returns: a boolean >>> geo_t.hasGrid() False """ return self._ggrid is not None
[docs] def get(self, key, field=None, **kwargs): """Simple get on the base. Get data on ``key`` for ``field`` information. For example you can get data on ``CDG`` for its ``city_code``. You can use the ``None`` as ``field`` value to get all information in a dictionary. You can give an additional keyword argument ``default``, to avoid ``KeyError`` on the ``key`` parameter. :param key: the key of the thing (like ``'SFO'``) :param field: the field (like ``'name'`` or ``'iata_code'``) :param kwargs: other named arguments, use 'default' to avoid \ ``KeyError`` on ``key`` (not ``KeyError`` on ``field``). \ Use 'ext_field' to field data from join base. :raises: ``KeyError`` if the key is not in the base :returns: the needed information >>> geo_a.get('CDG', 'city_code') 'PAR' >>> geo_t.get('frnic', 'name') 'Nice-Ville' >>> geo_t.get('frnic') {'info': 'Desserte Voyageur-Infrastructure', 'code': 'frnic', ...} Cases of unknown key. >>> geo_t.get('frmoron', 'name', default='There') 'There' >>> geo_t.get('frmoron', 'name') Traceback (most recent call last): KeyError: 'Thing not found: frmoron' >>> geo_t.get('frmoron', 'name', default=None) >>> geo_t.get('frmoron', default='There') 'There' Cases of unknown field, this is a bug and always fail. >>> geo_t.get('frnic', 'not_a_field', default='There') Traceback (most recent call last): KeyError: "Field 'not_a_field' [for key 'frnic'] not in ['__dup__', ... """ if key not in self: # Unless default is set, we raise an Exception if 'default' in kwargs: return kwargs['default'] raise KeyError("Thing not found: %s" % str(key)) if 'ext_field' in kwargs: return self._joinGet(key, field, kwargs['ext_field']) # Key is in geobase here if field is None: return self._things[key] try: res = self._things[key][field] except KeyError: raise KeyError("Field '%s' [for key '%s'] not in %s" % \ (field, key, sorted(self._things[key]))) else: return res
[docs] def getJoinBase(self, fields, verbose=True): """Get joined base from the fields who have join. :param fields: the iterable of fields :param verbose: boolean, toggle verbosity :returns: a GeoBase object or ``None`` if fields are not joined >>> geo_o.getJoinBase('iata_code') Fields "('iata_code',)" do not have join, cannot retrieve external base. >>> geo_o.getJoinBase('country_code') # doctest: +SKIP <GeoBases.GeoBaseModule.GeoBase object at 0x...> """ fields = tuplify(fields) if not self.hasJoin(fields): if verbose: print 'Fields "%s" do not have join, cannot retrieve external base.' % str(fields) return # This is the data type of the joined base join_base = self._join[fields][0] return self._ext_bases[join_base]
[docs] def hasJoin(self, fields=None): """Tells if an iterable of fields has join information. Default value is ``None`` for fields, this will test the presence of any join information. :param fields: the iterable of fields :returns: a boolean >>> geo_o.hasJoin('iata_code') False >>> geo_o.hasJoin('tvl_por_list') True >>> geo_o.hasJoin() True """ if fields is None: return not not self._join return tuplify(fields) in self._join
def _joinGet(self, key, fields=None, ext_field=None): """Get that performs join with external bases. :param key: the key of the thing (like ``'SFO'``) :param fields: the iterable of fields (like ``'name'`` or \ ``'iata_code'``) :param ext_field: the external field we want in the external \ base :raises: ``KeyError`` if the key is not in the base :raises: ``ValueError`` if ``fields`` has no join information :returns: the needed information >>> geo_o._joinGet('CDG', 'country_code', '__key__') ('FR',) >>> geo_o._joinGet('CDG', 'country_code', 'name') ('France',) >>> geo_o._joinGet('CDG', 'name') Traceback (most recent call last): ValueError: Fields "('name',)" has no join information, available: ... """ # We only work with tuple of fields for joining fields = tuplify(fields) if not self.hasJoin(fields): raise ValueError('Fields "%s" has no join information, available: %s' % \ (str(fields), self._join.keys())) join_base, join_fields = self._join[fields] ext_b = self._ext_bases[join_base] values = tuple(self.get(key, f) for f in fields) if ext_field == '__loc__': ext_get = ext_b.getLocation else: ext_get = lambda k : ext_b.get(k, ext_field) if any(f in self._subdelimiters for f in fields): # This is the cartesian product of all possible combinations # of sub-delimited values # *iter_over_subdel* is here to create the lists from values which are # not embedded in a container, before given it to *product* comb = product(*(iter_over_subdel(v, deep=False) for v in values)) return tuple(tuple(ext_get(k) for _, k in ext_b.findWith(zip(join_fields, c))) for c in comb) else: return tuple(ext_get(k) for _, k in ext_b.findWith(zip(join_fields, values)))
[docs] def getLocation(self, key, **kwargs): """Returns geocode as (float, float) or None. :param key: the key of the thing (like ``'SFO'``) :param kwargs: other named arguments, use 'default' to avoid \ ``KeyError`` on ``key`` (not ``None`` on wrong value). :returns: the location, a tuple of floats like ``(lat, lng)``, or \ ``None`` if any problem happened during execution >>> geo_o.getLocation('AGN') (57.5..., -134...) >>> geo_o.getLocation('WPS') # no usable geocode => None Behavior on unkwown key. >>> geo_o.getLocation('UNKNOWN') Traceback (most recent call last): KeyError: 'Thing not found: UNKNOWN' >>> geo_o.getLocation('UNKNOWN', default=(0, 0)) (0, 0) """ if key not in self: # Unless default is set, we raise an Exception if 'default' in kwargs: return kwargs['default'] raise KeyError("Thing not found: %s" % str(key)) try: loc = tuple(float(self.get(key, f)) for f in GEO_FIELDS) except (ValueError, TypeError, KeyError): # Decode geocode, if error, returns None # TypeError : input type is not a string, probably None # ValueError: could not convert to float # KeyError : could not find lat or lng 'fields' return else: return loc
[docs] def hasParents(self, key): """Tell if a key has parents. :param key: the key of the thing (like ``'SFO'``) :returns: the number of parents >>> geo_o.hasParents('MRS') 0 >>> geo_o.hasParents('MRS@1') 1 >>> geo_o.hasParents('PAR') 0 """ return len(self.get(key, '__par__'))
[docs] def hasDuplicates(self, key): """Tell if a key has duplicates. :param key: the key of the thing (like ``'SFO'``) :returns: the number of duplicates >>> geo_o.hasDuplicates('MRS') 1 >>> geo_o.hasDuplicates('MRS@1') 1 >>> geo_o.hasDuplicates('PAR') 0 """ return len(self.get(key, '__dup__'))
[docs] def getFromAllDuplicates(self, key, field=None, **kwargs): """Get all duplicates data, parent key included. :param key: the key of the thing (like ``'SFO'``) :param field: the field (like ``'name'`` or ``'iata_code'``) :param kwargs: other named arguments, use 'default' to avoid \ key failure :returns: the list of values for the given field iterated \ on all duplicates for the key, including the key itself >>> geo_o.getFromAllDuplicates('ORY', 'name') ['Paris-Orly'] >>> geo_o.getFromAllDuplicates('THA', 'name') ['Tullahoma Regional Airport/William Northern Field', 'Tullahoma'] One parent, one duplicate example. >>> geo_o.get('THA@1', '__par__') ['THA'] >>> geo_o.get('THA', '__dup__') ['THA@1'] Use getFromAllDuplicates on master or duplicates gives the same results. >>> geo_o.getFromAllDuplicates('THA', '__key__') ['THA', 'THA@1'] >>> geo_o.getFromAllDuplicates('THA@1', '__key__') ['THA@1', 'THA'] Corner cases are handled in the same way as ``get`` method. >>> geo_o.getFromAllDuplicates('nnnnnnoooo', default='that') 'that' >>> it = geo_o.getFromAllDuplicates('THA', field=None) >>> [e['__key__'] for e in it] ['THA', 'THA@1'] """ if key not in self: # Unless default is set, we raise an Exception if 'default' in kwargs: return kwargs['default'] raise KeyError("Thing not found: %s" % str(key)) # Building the list of all duplicates keys = [key] for k in self.get(key, '__dup__') + self.get(key, '__par__'): if k not in keys: keys.append(k) # Key is in geobase here if field is None: return [self.get(k) for k in keys] try: res = [self.get(k, field) for k in keys] except KeyError: raise KeyError("Field '%s' [for key '%s'] not in %s" % \ (field, key, self.get(key).keys())) else: return res
def _findWithUsingSingleIndex(self, fields, values): """Perform findWith using one index. """ if values not in self._indexed[fields]: # No key matched these values for the fields raise StopIteration m = len(fields) for key in self._indexed[fields][values]: yield m, key def _checkIndexUsability(self, conditions, mode): """Check if indexes are usable for a given iterable of fields. """ fields = tuple(f for f, _ in conditions) if self.hasIndex(fields) and mode == 'and': return True if all(self.hasIndex(f) for f in fields): return True return False def _findWithUsingMultipleIndex(self, conditions, from_keys, mode, verbose=False): """Perform findWith using several indexes. """ # In case conditions is an iterator conditions = list(conditions) fields = tuple(f for f, _ in conditions) values = tuple(v for _, v in conditions) if self.hasIndex(fields) and mode == 'and': if verbose: print 'Using index for %s: value(s) %s' % (str(fields), str(values)) # Here we use directly the multiple index to have the matching keys from_keys = set(from_keys) for m, key in self._findWithUsingSingleIndex(fields, values): if key in from_keys: yield m, key elif all(self.hasIndex(f) for f in fields): if verbose: print 'Using index for %s: value(s) %s' % \ (' and '.join(str((f,)) for f in set(fields)), '; '.join(str((v,)) for v in values)) if mode == 'or': # Here we use each index to check the condition on one field # and we return the keys matching *any* condition candidates = set() for f, v in conditions: candidates = candidates | set(k for _, k in self._findWithUsingSingleIndex((f,), (v,))) for key in candidates & set(from_keys): m = sum(self.get(key, f) == v for f, v in conditions) yield m, key elif mode == 'and': # Here we use each index to check the condition on one field # and we keep only the keys matching *all* conditions candidates = set(from_keys) for f, v in conditions: candidates = candidates & set(k for _, k in self._findWithUsingSingleIndex((f,), (v,))) m = len(fields) for key in candidates: yield m, key
[docs] def findWith(self, conditions, from_keys=None, reverse=False, force_str=False, mode='and', index=True, verbose=False): """Get iterator of all keys with particular field. For example, if you want to know all airports in Paris. :param conditions: a list of ``('field', 'value')`` conditions :param reverse: we look keys where the field is *not* the \ particular value. Note that this negation is done at \ the lower level, before combining conditions. So if you \ have two conditions with ``mode='and'``, expect \ results matching not condition 1 *and* not condition 2. :param force_str: for the ``str()`` method before every test :param mode: either ``'or'`` or ``'and'``, how to handle \ several conditions :param from_keys: if given, we will look for results from this \ iterable of keys :param index: boolean to disable index when searching :param verbose: toggle verbosity during search :returns: an iterable of ``(v, key)`` where ``v`` is the \ number of matched conditions >>> list(geo_a.findWith([('city_code', 'PAR')])) [(1, 'ORY'), (1, 'TNF'), (1, 'CDG'), (1, 'BVA')] >>> list(geo_o.findWith([('comment', '')], reverse=True)) [] >>> list(geo_o.findWith([('__dup__', '[]')])) [] >>> len(list(geo_o.findWith([('__dup__', [])]))) # 7013 exactly 69... >>> len(list(geo_o.findWith([('__dup__', '[]')], force_str=True))) 69... >>> # Counting duplicated keys >>> len(list(geo_o.findWith([('__par__', [])], reverse=True))) 44... Testing indexes. >>> list(geo_o.findWith([('iata_code', 'MRS')], mode='and', verbose=True)) Using index for ('iata_code',): value(s) ('MRS',) [(1, 'MRS'), (1, 'MRS@1')] >>> geo_o.addIndex('iata_code', force=True) /!\ Index on ('iata_code',) already built, overriding... Built index for fields ('iata_code',) >>> geo_o.addIndex('location_type') Built index for fields ('location_type',) Now querying with simple indexes (dropping multiple index if it exists). >>> geo_o.dropIndex(('iata_code', 'location_type'), verbose=False) >>> list(geo_o.findWith([('iata_code', 'NCE'), ('location_type', 'A')], ... mode='and', ... verbose=True)) Using index for ('iata_code',) and ('location_type',): value(s) ('NCE',); ('A',) [(2, 'NCE')] Multiple index. >>> geo_o.addIndex(('iata_code', 'location_type'), verbose=False) >>> list(geo_o.findWith([('iata_code', 'NCE'), ('location_type', 'A')], ... mode='and', ... verbose=True)) Using index for ('iata_code', 'location_type'): value(s) ('NCE', 'A') [(2, 'NCE')] Mode "or" with index. >>> geo_o.addIndex('city_code') Built index for fields ('city_code',) >>> list(geo_o.findWith([('iata_code', 'NCE'), ('city_code', 'NCE')], ... mode='or', ... verbose=True)) Using index for ('iata_code',) and ('city_code',): value(s) ('NCE',); ('NCE',) [(2, 'NCE@1'), (2, 'NCE')] >>> list(geo_o.findWith([('iata_code', 'NCE'), ('city_code', 'NCE')], ... mode='or', ... index=False, ... verbose=True)) [(2, 'NCE'), (2, 'NCE@1')] Testing several conditions. >>> c_1 = [('city_code', 'PAR')] >>> c_2 = [('location_type', 'H')] >>> len(list(geo_o.findWith(c_1))) 18 >>> len(list(geo_o.findWith(c_2))) 93 >>> len(list(geo_o.findWith(c_1 + c_2, mode='and'))) 2 >>> len(list(geo_o.findWith(c_1 + c_2, mode='or'))) 109 """ if from_keys is None: from_keys = iter(self) # In case conditions is an iterator conditions = list(conditions) # We check here the fields in conditions # because KeyError are catched next for field, _ in conditions: if field not in self.fields: raise ValueError('Conditions %s include unknown field "%s"' % \ (conditions, field)) # If indexed if index and not force_str and not reverse: # If this condition is not met, we do not raise StopIteration, # we will proceed with non-indexed code after if self._checkIndexUsability(conditions, mode): for t in self._findWithUsingMultipleIndex(conditions, from_keys=from_keys, mode=mode, verbose=verbose): yield t raise StopIteration # We set the lambda function now to avoid testing # force_str and reverse at each key later if not force_str and not reverse: pass_one = lambda a, b: a == b elif not force_str and reverse: pass_one = lambda a, b: a != b elif force_str and not reverse: pass_one = lambda a, b: str(a) == str(b) else: pass_one = lambda a, b: str(a) != str(b) # Handle and/or cases when multiple conditions if mode == 'and': pass_all = all elif mode == 'or': pass_all = any else: raise ValueError('"mode" argument must be in %s, was %s' % \ (str(['and', 'or']), mode)) for key in from_keys: if key not in self: # This means from_keys parameters contained unknown keys if verbose: print 'Key %-10s and conditions %s failed in findWith, moving on...' % \ (key, conditions) continue matches = [pass_one(self.get(key, f), v) for f, v in conditions] if pass_all(matches): yield sum(matches), key
def __iter__(self): """Returns iterator of all keys in the base. :returns: the iterator of all keys >>> list(a for a in geo_a) ['AGN', 'AGM', 'AGJ', 'AGH', ... """ return self._things.iterkeys() def __contains__(self, key): """Test if a thing is in the base. :param key: the key of the thing to be tested :returns: a boolean >>> 'AN' in geo_a False >>> 'AGN' in geo_a True """ if key in self._things: return True return False def __nonzero__(self): """Testing emptiness of structure. :returns: a boolean >>> if not geo_o: print('empty') >>> if geo_o: print('not empty') not empty This geo_f is actually empty. >>> if not geo_f: print('empty') empty >>> if geo_f: print('not empty') """ if self._things: return True return False
[docs] def keys(self): """Returns a list of all keys in the base. :returns: the list of all keys >>> geo_a.keys() ['AGN', 'AGM', 'AGJ', 'AGH', ... """ return self._things.keys()
[docs] def distance(self, key0, key1): """Compute distance between two elements. This is just a wrapper between the original haversine function, but it is probably one of the most used feature :) :param key0: the first key :param key1: the second key :returns: the distance (km) >>> geo_t.distance('frnic', 'frpaz') 683.526... """ return haversine(self.getLocation(key0), self.getLocation(key1))
def _buildDistances(self, lat_lng_ref, keys): """ Compute the iterable of ``(dist, keys)`` of a reference ``lat_lng`` and a list of keys. Keys which have not valid geocodes will not appear in the results. >>> list(geo_a._buildDistances((0, 0), ['ORY', 'CDG'])) [(5422.74..., 'ORY'), (5455.45..., 'CDG')] """ if lat_lng_ref is None: raise StopIteration for key in keys: # Do not fail on unknown keys if key not in self: continue lat_lng = self.getLocation(key) if lat_lng is not None: yield haversine(lat_lng_ref, lat_lng), key
[docs] def findNearPoint(self, lat_lng, radius=RADIUS, from_keys=None, grid=True, double_check=True): """ Returns a list of nearby things from a point (given latidude and longitude), and a radius for the search. Note that the haversine function, which compute distance at the surface of a sphere, here returns kilometers, so the radius should be in kms. :param lat_lng: the lat_lng of the point (a tuple ``(lat, lng)``) :param radius: the radius of the search (kilometers) :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform search. :param grid: boolean, use grid or not :param double_check: when using grid, perform an additional check on \ results distance, this is useful because the grid is approximate, \ so the results are only as accurate as the grid size :returns: an iterable of ``(distance, key)`` like \ ``[(3.2, 'SFO'), (4.5, 'LAX')]`` >>> # Paris, airports <= 20km >>> [geo_a.get(k, 'name') for d, k in ... sorted(geo_a.findNearPoint((48.84, 2.367), 20))] ['Paris-Orly', 'Paris-Le Bourget'] >>> >>> # Nice, stations <= 3km >>> [geo_t.get(k, 'name') for d, k in ... sorted(geo_t.findNearPoint((43.70, 7.26), 3))] ['Nice-Ville', 'Nice-Riquier', 'Nice-St-Roch'] >>> >>> # Wrong geocode >>> sorted(geo_t.findNearPoint(None, 5)) [] No grid mode. >>> # Paris, airports <= 20km >>> [geo_a.get(k, 'name') for d, k in ... sorted(geo_a.findNearPoint((48.84, 2.367), 20, grid=False))] ['Paris-Orly', 'Paris-Le Bourget'] >>> >>> # Nice, stations <= 3km >>> [geo_t.get(k, 'name') for d, k in ... sorted(geo_t.findNearPoint((43.70, 7.26), 3, grid=False))] ['Nice-Ville', 'Nice-Riquier', 'Nice-St-Roch'] >>> >>> # Paris, airports <= 50km with from_keys input list >>> sorted(geo_a.findNearPoint((48.84, 2.367), 50, ... from_keys=['ORY', 'CDG', 'BVE'], ... grid=False)) [(12.76..., 'ORY'), (23.40..., 'CDG')] """ if from_keys is None: from_keys = iter(self) for dist, thing in self._buildDistances(lat_lng, from_keys): if dist <= radius: yield dist, thing
[docs] def findNearKey(self, key, radius=RADIUS, from_keys=None, grid=True, double_check=True): """ Same as ``findNearPoint``, except the point is given not by a ``(lat, lng)``, but with its key, like ``'ORY'`` or ``'SFO'``. We just look up in the base to retrieve latitude and longitude, then call ``findNearPoint``. :param key: the key of the thing (like ``'SFO'``) :param radius: the radius of the search (kilometers) :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform search. :param grid: boolean, use grid or not :param double_check: when using grid, perform an additional check on \ results distance, this is useful because the grid is \ approximate, so the results are only as accurate as the \ grid size :returns: an iterable of ``(distance, key)`` like \ ``[(3.2, 'SFO'), (4.5, 'LAX')]`` >>> sorted(geo_o.findNearKey('ORY', 10)) # Orly, por <= 10km [(0.0, 'ORY'), (1.82..., 'JDP'), (8.06..., 'XJY'), (9.95..., 'QFC')] >>> sorted(geo_a.findNearKey('ORY', 50)) # Orly, airports <= 50km [(0.0, 'ORY'), (18.8..., 'TNF'), (27.8..., 'LBG'), (34.8..., 'CDG')] >>> sorted(geo_t.findNearKey('frnic', 3)) # Nice station, stations <= 3km [(0.0, 'frnic'), (2.2..., 'fr4342'), (2.3..., 'fr5737')] No grid. >>> # Orly, airports <= 50km >>> sorted(geo_a.findNearKey('ORY', 50, grid=False)) [(0.0, 'ORY'), (18.8..., 'TNF'), (27.8..., 'LBG'), (34.8..., 'CDG')] >>> >>> # Nice station, stations <= 3km >>> sorted(geo_t.findNearKey('frnic', 3, grid=False)) [(0.0, 'frnic'), (2.2..., 'fr4342'), (2.3..., 'fr5737')] >>> >>> keys = ['ORY', 'CDG', 'SFO'] >>> sorted(geo_a.findNearKey('ORY', 50, grid=False, from_keys=keys)) [(0.0, 'ORY'), (34.8..., 'CDG')] """ if from_keys is None: from_keys = iter(self) if key not in self: raise StopIteration for dist, thing in self.findNearPoint(lat_lng=self.getLocation(key), radius=radius, from_keys=from_keys, grid=grid, double_check=double_check): yield dist, thing
[docs] def findClosestFromPoint(self, lat_lng, N=NB_CLOSEST, from_keys=None, grid=True, double_check=True): """ Concept close to ``findNearPoint``, but here we do not look for the things radius-close to a point, we look for the closest thing from this point, given by latitude/longitude. :param lat_lng: the lat_lng of the point (a tuple ``(lat, lng)``) :param N: the N closest results wanted :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform \ ``findClosestFromPoint``. This is useful when we have names and \ have to perform a matching based on name and location \ (see ``fuzzyFindNearPoint``). :param grid: boolean, use grid or not :param double_check: when using grid, perform an additional check on \ results distance, this is useful because the grid is \ approximate, so the results are only as accurate as the grid size :returns: an iterable of ``(distance, key)`` like \ ``[(3.2, 'SFO'), (4.5, 'LAX')]`` >>> point = (43.70, 7.26) # Nice >>> list(geo_a.findClosestFromPoint(point)) [(5.82..., 'NCE')] >>> list(geo_a.findClosestFromPoint(point, N=3)) [(5.82..., 'NCE'), (30.28..., 'CEQ'), (79.71..., 'ALL')] >>> list(geo_t.findClosestFromPoint(point, N=1)) [(0.56..., 'frnic')] >>> # Corner case, from_keys empty is not used >>> list(geo_t.findClosestFromPoint(point, N=2, from_keys=())) [] >>> list(geo_t.findClosestFromPoint(None, N=2)) [] No grid. >>> list(geo_o.findClosestFromPoint(point, grid=False)) [(0.60..., 'NCE@1')] >>> list(geo_a.findClosestFromPoint(point, grid=False)) [(5.82..., 'NCE')] >>> list(geo_a.findClosestFromPoint(point, N=3, grid=False)) [(5.82..., 'NCE'), (30.28..., 'CEQ'), (79.71..., 'ALL')] >>> list(geo_t.findClosestFromPoint(point, N=1, grid=False)) [(0.56..., 'frnic')] Custom keys as search domain. >>> keys = ('frpaz', 'frply', 'frbve') >>> list(geo_t.findClosestFromPoint(point, ... N=2, ... grid=False, ... from_keys=keys)) [(482.84..., 'frbve'), (683.89..., 'frpaz')] """ if from_keys is None: from_keys = iter(self) iterable = self._buildDistances(lat_lng, from_keys) for dist, thing in heapq.nsmallest(N, iterable): yield dist, thing
[docs] def findClosestFromKey(self, key, N=NB_CLOSEST, from_keys=None, grid=True, double_check=True): """ Same as ``findClosestFromPoint``, except the point is given not by a ``(lat, lng)``, but with its key, like ``'ORY'`` or ``'SFO'``. We just look up in the base to retrieve latitude and longitude, then call ``findClosestFromPoint``. :param key: the key of the thing (like ``'SFO'``) :param N: the N closest results wanted :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform \ ``findClosestFromKey``. This is useful when we have names and \ have to perform a matching based on name and location \ (see ``fuzzyFindNearPoint``). :param grid: boolean, use grid or not :param double_check: when using grid, perform an additional check on \ results distance, this is useful because the grid is \ approximate, so the results are only as accurate as the \ grid size :returns: an iterable of ``(distance, key)`` like \ ``[(3.2, 'SFO'), (4.5, 'LAX')]`` >>> list(geo_a.findClosestFromKey('ORY')) # Orly [(0.0, 'ORY')] >>> list(geo_a.findClosestFromKey('ORY', N=3)) [(0.0, 'ORY'), (18.80..., 'TNF'), (27.80..., 'LBG')] >>> # Corner case, from_keys empty is not used >>> list(geo_t.findClosestFromKey('ORY', N=2, from_keys=())) [] >>> list(geo_t.findClosestFromKey(None, N=2)) [] No grid. >>> list(geo_o.findClosestFromKey('ORY', grid=False)) [(0.0, 'ORY')] >>> list(geo_a.findClosestFromKey('ORY', N=3, grid=False)) [(0.0, 'ORY'), (18.80..., 'TNF'), (27.80..., 'LBG')] >>> list(geo_t.findClosestFromKey('frnic', N=1, grid=False)) [(0.0, 'frnic')] Custom keys as search domain. >>> keys = ('frpaz', 'frply', 'frbve') >>> list(geo_t.findClosestFromKey('frnic', ... N=2, ... grid=False, ... from_keys=keys)) [(482.79..., 'frbve'), (683.52..., 'frpaz')] """ if from_keys is None: from_keys = iter(self) if key not in self: raise StopIteration for dist, thing in self.findClosestFromPoint(lat_lng=self.getLocation(key), N=N, from_keys=from_keys, grid=grid, double_check=double_check): yield dist, thing
@staticmethod
[docs] def fuzzyClean(value): """Cleaning from LevenshteinUtils. >>> GeoBase.fuzzyClean('antibes ville 2') 'antibes+ville+2' """ return '+'.join(clean(value))
def _buildFuzzyRatios(self, fuzzy_value, field, min_match, keys): """ Compute the iterable of (dist, keys) of a reference fuzzy_value and a list of keys. >>> list(geo_a._buildFuzzyRatios(fuzzy_value='marseille', ... field='name', ... min_match=0.60, ... keys=['ORY', 'MRS', 'CDG'])) [(0.66..., 'MRS')] """ for key in keys: # Do not fail on unkwown keys if key not in self: continue r = mod_leven(fuzzy_value, self.get(key, field)) if r >= min_match: yield r, key
[docs] def fuzzyFind(self, fuzzy_value, field, max_results=None, min_match=MIN_MATCH, from_keys=None): """ Fuzzy searches are retrieving an information on a thing when we do not know the code. We compare the value ``fuzzy_value`` which is supposed to be a field (e.g. a city or a name), to all things we have in the base, and we output the best match. Matching is performed using Levenshtein module, with a modified version of the Lenvenshtein ratio, adapted to the type of data. Example: we look up 'Marseille Saint Ch.' in our base and we find the corresponding code by comparing all station names with ''Marseille Saint Ch.''. :param fuzzy_value: the value, like ``'Marseille'`` :param field: the field we look into, like ``'name'`` :param max_results: max number of results, None means all results :param min_match: filter out matches under this threshold :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform \ ``fuzzyFind``. This is useful when we have geocodes and have to \ perform a matching based on name and location (see \ ``fuzzyFindNearPoint``). :returns: an iterable of ``(distance, key)`` like \ ``[(0.97, 'SFO'), (0.55, 'LAX')]`` >>> geo_t.fuzzyFind('Marseille Charles', 'name')[0] (0.91..., 'frmsc') >>> geo_a.fuzzyFind('paris de gaulle', 'name')[0] (0.78..., 'CDG') >>> geo_a.fuzzyFind('paris de gaulle', ... field='name', ... max_results=3, ... min_match=0.55) [(0.78..., 'CDG'), (0.64..., 'LBG'), (0.60..., 'HUX')] Some corner cases. >>> geo_a.fuzzyFind('paris de gaulle', 'name', max_results=None)[0] (0.78..., 'CDG') >>> geo_a.fuzzyFind('paris de gaulle', 'name', ... max_results=1, from_keys=[]) [] """ if from_keys is None: from_keys = iter(self) # All 'intelligence' is performed in the Levenshtein # module just here. All we do is minimize this distance iterable = self._buildFuzzyRatios(fuzzy_value, field, min_match, from_keys) if max_results is None: return sorted(iterable, reverse=True) else: return heapq.nlargest(max_results, iterable)
[docs] def fuzzyFindNearPoint(self, lat_lng, radius, fuzzy_value, field, max_results=None, min_match=MIN_MATCH, from_keys=None, grid=True, double_check=True): """ Same as ``fuzzyFind`` but with we search only within a radius from a geocode. :param lat_lng: the lat_lng of the point (a tuple ``(lat, lng)``) :param radius: the radius of the search (kilometers) :param fuzzy_value: the value, like ``'Marseille'`` :param field: the field we look into, like ``'name'`` :param max_results: if ``None``, returns all, if an int, only \ returns the first ones :param min_match: filter out matches under this threshold :param from_keys: if ``None``, it takes all keys in consideration, \ else takes a from_keys iterable of keys to perform search. :param grid: boolean, use grid or not :param double_check: when using grid, perform an additional check on \ results distance, this is useful because the grid is \ approximate, so the results are only as accurate as the \ grid size :returns: an iterable of ``(distance, key)`` like \ ``[(0.97, 'SFO'), (0.55, 'LAX')]`` >>> geo_a.fuzzyFind('Brussels', 'name', min_match=0.50)[0] (0.58..., 'EFC') >>> geo_a.get('BQT', 'name') # Brussels just matched on Brest!! 'Brest' >>> geo_a.get('BRU', 'name') # We wanted BRU for 'Bruxelles' 'Bruxelles National' >>> >>> # Now a request limited to a circle of 20km around BRU gives BRU >>> point = (50.9013, 4.4844) >>> geo_a.fuzzyFindNearPoint(point, ... radius=20, ... fuzzy_value='Brussels', ... field='name', ... min_match=0.40)[0] (0.46..., 'BRU') >>> >>> # Now a request limited to some input keys >>> geo_a.fuzzyFindNearPoint(point, ... radius=2000, ... fuzzy_value='Brussels', ... field='name', ... max_results=1, ... min_match=0.30, ... from_keys=['ORY', 'CDG']) [(0.33..., 'ORY')] """ if from_keys is None: from_keys = iter(self) nearest = (k for _, k in self.findNearPoint(lat_lng, radius, from_keys, grid, double_check)) return self.fuzzyFind(fuzzy_value, field, max_results, min_match, from_keys=nearest)
[docs] def fuzzyFindCached(self, fuzzy_value, field, max_results=None, min_match=MIN_MATCH, from_keys=None, verbose=False, d_range=None): """ Same as ``fuzzyFind`` but with a caching and bias system. :param fuzzy_value: the value, like ``'Marseille'`` :param field: the field we look into, like ``'name'`` :param max_results: max number of results, None means all results :param min_match: filter out matches under this threshold :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform fuzzyFind. \ This is useful when we have geocodes and have to perform a \ matching based on name and location (see ``fuzzyFindNearPoint``). :param verbose: display information on caching for a certain \ range of similarity :param d_range: the range of similarity :returns: an iterable of ``(distance, key)`` like \ ``[(0.97, 'SFO'), (0.55, 'LAX')]`` >>> geo_t.fuzzyFindCached('Marseille Saint Ch.', 'name')[0] (0.76..., 'frmsc') >>> geo_a.fuzzyFindCached('paris de gaulle', ... field='name', ... verbose=True, ... d_range=(0, 1))[0] [0.79] paris+de+gaulle -> paris+charles+de+gaulle ( CDG) (0.78..., 'CDG') >>> geo_a.fuzzyFindCached('paris de gaulle', ... field='name', ... min_match=0.60, ... max_results=2, ... verbose=True, ... d_range=(0, 1)) [0.79] paris+de+gaulle -> paris+charles+de+gaulle ( CDG) [0.65] paris+de+gaulle -> paris+le+bourget ( LBG) [(0.78..., 'CDG'), (0.64..., 'LBG')] Some biasing: >>> geo_a.biasFuzzyCache('paris de gaulle', ... field='name', ... biased_result=[(0.5, 'Biased result')]) >>> geo_a.fuzzyFindCached('paris de gaulle', ... field='name', ... max_results=None, ... verbose=True, ... d_range=(0, 1)) Using bias: ('paris+de+gaulle', 'name', None, 0.75, None) [(0.5, 'Biased result')] >>> geo_a.clearFuzzyBiasCache() >>> geo_a.fuzzyFindCached('paris de gaulle', ... field='name', ... max_results=None, ... verbose=True, ... min_match=0.75) [(0.78..., 'CDG')] """ if d_range is None: d_range = (min_match, 1.0) # Cleaning is for keeping only useful data entry = build_cache_key(self.fuzzyClean(fuzzy_value), field, max_results, min_match, from_keys) if entry in self._fuzzy_bias_cache: # If the entry is stored is our bias # cache, we do not perform the fuzzy search if verbose: print 'Using bias: %s' % str(entry) return self._fuzzy_bias_cache[entry] if entry not in self._fuzzy_cache: matches = self.fuzzyFind(*entry) self._fuzzy_cache[entry] = matches # We display information everytime a value is added to the cache if verbose: self._showFuzzyMatches(matches, fuzzy_value, field, d_range) return self._fuzzy_cache[entry]
[docs] def biasFuzzyCache(self, fuzzy_value, field, max_results=None, min_match=MIN_MATCH, from_keys=None, biased_result=()): """ If algorithms for fuzzy searches are failing on a single example, it is possible to use a first cache which will block the research and force the result. :param fuzzy_value: the value, like ``'Marseille'`` :param field: the field we look into, like ``'name'`` :param max_results: if ``None``, returns all, if an int, only \ returns the first ones :param min_match: filter out matches under this threshold :param from_keys: if ``None``, it takes all keys into \ consideration, else takes ``from_keys`` iterable of keys \ as search domain :param biased_result: the expected result :returns: ``None`` >>> geo_t.fuzzyFindCached('Marseille Saint Ch.', 'name')[0] (0.76..., 'frmsc') >>> geo_t.biasFuzzyCache('Marseille Saint Ch.', ... field='name', ... biased_result=[(1.0, 'Me!')]) >>> geo_t.fuzzyFindCached('Marseille Saint Ch.', 'name')[0] (1.0, 'Me!') """ # Cleaning is for keeping only useful data entry = build_cache_key(self.fuzzyClean(fuzzy_value), field, max_results, min_match, from_keys) self._fuzzy_bias_cache[entry] = biased_result
[docs] def clearFuzzyCache(self): """Clear cache for fuzzy searches. >>> geo_t.clearFuzzyCache() """ self._fuzzy_cache = {}
[docs] def clearFuzzyBiasCache(self): """Clear biasing cache for fuzzy searches. >>> geo_t.clearFuzzyBiasCache() """ self._fuzzy_bias_cache = {}
def _showFuzzyMatches(self, matches, fuzzy_value, field, d_range): """Some debugging. """ for d, key in matches: if d >= d_range[0] and d < d_range[1]: print "[%.2f] %25s -> %25s (%5s)" % \ (d, self.fuzzyClean(fuzzy_value), self.fuzzyClean(self.get(key, field)), key) @staticmethod
[docs] def phonemes(value, method='dmetaphone'): """Compute phonemes for any value. :param value: the input value :param method: change the phonetic method used :returns: the phonemes >>> GeoBase.phonemes('sheekago') ['S220', None] >>> GeoBase.phonemes('sheekago', 'nysiis') 'S220' """ get_phonemes, _ = build_get_phonemes(method) return get_phonemes(value)
[docs] def phoneticFind(self, value, field, method='dmetaphone', from_keys=None, verbose=False): """Phonetic search. :param value: the value for which we look for a match :param field: the field, like ``'name'`` :param method: change the phonetic method used :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform search. :param verbose: toggle verbosity :returns: an iterable of (phonemes, key) matching >>> list(geo_o.get(k, 'name') for _, k in ... geo_o.phoneticFind(value='chicago', ... field='name', ... method='dmetaphone', ... verbose=True)) Looking for phonemes like ['C220', None] (for "chicago") ['Chickasha', 'Cayo Coco', 'Chicago', 'Casigua', 'Caucasia'] >>> list(geo_o.get(k, 'name') for _, k in ... geo_o.phoneticFind('chicago', 'name', 'nysiis')) ['Chickasha', 'Cayo Coco', 'Chicago', 'Casigua', 'Caucasia'] Alternate methods. >>> list(geo_o.phoneticFind('chicago', 'name', 'dmetaphone'))[0:2] [(['C220', None], 'CHK@1'), (['C220', None], 'CCC')] >>> list(geo_o.phoneticFind('chicago', 'name', 'metaphone'))[0:2] [('C220', 'CHK@1'), ('C220', 'CCC')] >>> list(geo_o.phoneticFind('chicago', 'name', 'nysiis'))[0:2] [('C220', 'CHK@1'), ('C220', 'CCC')] """ get_phonemes, matcher = build_get_phonemes(method) if from_keys is None: from_keys = iter(self) exp_phonemes = get_phonemes(value) if verbose: print 'Looking for phonemes like %s (for "%s")' % \ (str(exp_phonemes), value) for key in from_keys: # Do not fail on unkown keys if key not in self: continue phonemes = get_phonemes(self.get(key, field)) if matcher(phonemes, exp_phonemes): yield phonemes, key
def _updateFields(self, field): """Update fields list. :param field: the field to add :returns: ``None`` """ if field not in self.fields: self.fields.append(field)
[docs] def set(self, key, field=None, value=None, update_fields=False): """Method to manually change a value in the base. :param key: the key we want to change a value of :param field: the concerned field, like ``'name'`` :param value: the new value :param update_fields: boolean to toggle general fields updating \ or not after data update :returns: ``None`` >>> geo_t.get('frnic', 'name') 'Nice-Ville' >>> geo_t.set('frnic', 'name', 'Nice Gare SNCF') >>> geo_t.get('frnic', 'name') 'Nice Gare SNCF' >>> geo_t.set('frnic', 'name', 'Nice-Ville') # tearDown We may even add new fields. >>> geo_t.set('frnic', 'new_field', 'some_value') >>> geo_t.get('frnic', 'new_field') 'some_value' We can create just the key. >>> geo_t.set('NEW_KEY_1') >>> geo_t.get('NEW_KEY_1') {'__gar__': [], ..., '__lno__': 0, '__key__': 'NEW_KEY_1'} >>> geo_t.delete('NEW_KEY_1') # tearDown """ # If the key is not in the base, we add it if key not in self: self._things[key] = self._emptyData(key, lno=0) if field is not None: # field cannot be None, None is used to get all fields self._things[key][field] = value if update_fields: # If the field was not in the headers we add it self._updateFields(field)
def _createFromDict(self, key, dictionary): """Create key entry from dict. This method is hidden, because there if no check on fields types, and no check on dict formatting, which may lack the special fields like __key__ or __lno__. """ self._things[key] = dictionary
[docs] def setFromDict(self, key, dictionary, update_fields=False): """ Same as ``set`` method, except we perform the input with a whole dictionary. :param key: the key we want to change a value of :param dictionary: the dict containing the new data :param update_fields: boolean to toggle general fields updating \ or not after data update :returns: ``None`` Let's take an empty base. >>> geo_f.keys() [] Set a new key with a dict, then get the data back. >>> d = { ... 'code' : 'frnic', ... 'name' : 'Nice', ... } >>> geo_f.setFromDict('frnic', d) >>> geo_f.keys() ['frnic'] >>> geo_f.get('frnic', 'name') 'Nice' Here the base fields did not change. >>> geo_f.fields ['__key__', '__dup__', '__par__', '__lno__', '__gar__'] How to automatically update the base fields when setting data. >>> geo_f.setFromDict('frnic', d, update_fields=True) >>> geo_f.fields ['__key__', '__dup__', '__par__', '__lno__', '__gar__', 'code', 'name'] """ # If the key is not in the base, we add it if key not in self: self._things[key] = self._emptyData(key, lno=0) if None in dictionary: raise ValueError('None is not accepted as field (in %s).' % dictionary) self._things[key].update(dictionary) if update_fields: for field in dictionary: self._updateFields(field)
[docs] def delete(self, key, field=None): """Method to manually remove a value in the base. :param key: the key we want to delete :returns: ``None`` >>> data = geo_t.get('frxrn') # Output all data in one dict >>> geo_t.delete('frxrn') >>> geo_t.get('frxrn', 'name') Traceback (most recent call last): KeyError: 'Thing not found: frxrn' How to reverse the delete if data has been stored: >>> geo_t.setFromDict('frxrn', data) >>> geo_t.get('frxrn', 'name') 'Redon' We can delete just a field. >>> geo_t.delete('frxrn', 'lat') >>> geo_t.get('frxrn', 'lat') Traceback (most recent call last): KeyError: "Field 'lat' [for key 'frxrn'] not in ... >>> geo_t.get('frxrn', 'name') 'Redon' And put it back again. >>> geo_t.set('frxrn', 'lat', '47.65179') >>> geo_t.get('frxrn', 'lat') '47.65179' """ if field is None: del self._things[key] else: del self._things[key][field]
@staticmethod
[docs] def hasTrepSupport(): """Check if module has OpenTrep support. """ return HAS_TREP_SUPPORT
@staticmethod
[docs] def trepSearch(fuzzy_value, trep_format='S', from_keys=None, verbose=False): """OpenTrep integration. If not hasTrepSupport(), main_trep is not defined and trepSearch will raise an exception if called. :param fuzzy_value: the fuzzy value :param trep_format: the format given to OpenTrep :param from_keys: if ``None``, it takes all keys in consideration, \ else takes ``from_keys`` iterable of keys to perform search. :param verbose: toggle verbosity :returns: an iterable of ``(distance, key)`` like \ ``[(0.97, 'SFO'), (0.55, 'LAX')]`` >>> if GeoBase.hasTrepSupport(): ... print geo_t.trepSearch('sna francisco los agneles') # doctest: +SKIP [(31.5192, 'SFO'), (46.284, 'LAX')] >>> if GeoBase.hasTrepSupport(): ... print geo_t.trepSearch('sna francisco', verbose=True) # doctest: +SKIP -> Raw result: SFO/31.5192 -> Fmt result: ([(31.5192, 'SFO')], '') [(31.5192, 'SFO')] """ r = main_trep(searchString=fuzzy_value, outputFormat=trep_format, verbose=verbose) if trep_format == 'S': # Only this outputFormat is handled by upper layers if from_keys is None: return r[0] else: from_keys = set(from_keys) return [(k, e) for k, e in r[0] if e in from_keys] # For all other formats we return an empty # list to avoid failures return []
[docs] def buildGraphData(self, graph_fields, graph_weight=None, with_types=False, directed=False, from_keys=None): """Build graph data. :param graph_fields: iterable of fields used to define the nodes. \ Nodes are the values of these fields. Edges represent the \ data. :param graph_weight: field used to define the weight of nodes and \ edges. If ``None``, the weight is ``1`` for each key. :param with_types: boolean to consider values from different fields \ of the same "type" or not, meaning we will create only one \ node if the same value is found accross different fields, if \ there are no types. Otherwise we create different nodes. \ Default is ``False``, meaning untyped graphs. :param directed: boolean, if the graph is directed or not, \ default is ``False``. :param from_keys: only display this iterable of keys if not None :returns: the nodes data >>> nodes = geo_o.buildGraphData( ... graph_fields=['continent_name', 'country_code'], ... graph_weight='page_rank' ... ) >>> edges = nodes['Antarctica']['edges'].values() >>> sorted(edges[0].items()) [('from', 'Antarctica'), ('to', 'AQ'), ('weight', 0)] """ if from_keys is None: from_keys = iter(self) for field in graph_fields: if field not in self.fields: raise ValueError('graph_fields "%s" not in fields %s.' % \ (field, self.fields)) if graph_weight is not None and graph_weight not in self.fields: raise ValueError('graph_weight "%s" not in fields %s.' % \ (graph_weight, self.fields)) if graph_weight is None: get_weight = lambda k: 1 else: get_weight = lambda k: self.get(k, graph_weight) def _empty_node(type_, name): """Make an empty node. """ return { 'types' : set([type_]), 'name' : name, 'edges' : {}, 'weight' : 0 } def _empty_edge(ori_id, des_id): """Make an empty edge. """ return { 'from' : ori_id, 'to' : des_id, 'weight' : 0 } nodes = {} nb_edges = len(graph_fields) - 1 for key in from_keys: values = tuple(self.get(key, f) for f in graph_fields) try: weight = float(get_weight(key)) except ValueError: weight = 0 for i in xrange(nb_edges): ori_type = graph_fields[i] des_type = graph_fields[i + 1] ori_val = values[i] des_val = values[i + 1] if with_types: # We include the type in the key # We do not create tuples because json requires string as keys # A bit "moisi" here... ori_id = '%s/%s' % (ori_type, ori_val) des_id = '%s/%s' % (des_type, des_val) else: # Here the key is just the value, no type ori_id = ori_val des_id = des_val # Adding nodes if do not exist already if ori_id not in nodes: nodes[ori_id] = _empty_node(ori_type, ori_val) if des_id not in nodes: nodes[des_id] = _empty_node(des_type, des_val) # Updating types and weight ori_node = nodes[ori_id] des_node = nodes[des_id] ori_node['types'].add(ori_type) des_node['types'].add(des_type) ori_node['weight'] += weight des_node['weight'] += weight # Updating edges edge_id = '%s/%s' % (ori_id, des_id) if edge_id not in ori_node['edges']: ori_node['edges'][edge_id] = _empty_edge(ori_id, des_id) edge = ori_node['edges'][edge_id] edge['weight'] += weight if not directed: # If not directed we create the "mirror" edge edge_id = '%s/%s' % (des_id, ori_id) if edge_id not in des_node['edges']: des_node['edges'][edge_id] = _empty_edge(des_id, ori_id) edge = des_node['edges'][edge_id] edge['weight'] += weight # In this case we did not iterate through the previous loop # Note that if graph_fields is [], nb_edges is -1 so # we do not go here either if nb_edges == 0: _type = graph_fields[0] _val = values[0] if with_types: _id = '%s/%s' % (_type, _val) else: _id = _val if _id not in nodes: nodes[_id] = _empty_node(_type, _val) _node = nodes[_id] _node['types'].add(_type) _node['weight'] += weight # Getting rid of sets because not JSON serializable # And fixing order with sorted to make sure # we do not get different colors in frontend for node in nodes.itervalues(): node['types'] = sorted(node['types']) return nodes
[docs] def graphVisualize(self, graph_fields, graph_weight=None, with_types=False, from_keys=None, output='example', verbose=True): """Graph display. :param graph_fields: iterable of fields used to define the nodes. \ Nodes are the values of these fields. Edges represent the \ data. :param graph_weight: field used to define the weight of nodes and \ edges. If ``None``, the weight is ``1`` for each key. :param with_types: boolean to consider values from different fields \ of the same "type" or not, meaning we will create only one \ node if the same value is found accross different fields, if \ there are no types. Otherwise we create different nodes. \ Default is ``False``, meaning untyped graphs. :param from_keys: only display this iterable of keys if not None :param output: set the name of the rendered files :param verbose: toggle verbosity :returns: this is the tuple of (names of templates \ rendered, (list of html templates, list of static files)) """ graph_fields = tuplify(graph_fields) nodes = self.buildGraphData(graph_fields=graph_fields, graph_weight=graph_weight, with_types=with_types, directed=False, from_keys=from_keys) # Dump the json geocodes json_name = '%s_graph.json' % output with open(json_name, 'w') as out: out.write(json.dumps({ 'nodes' : nodes, 'meta' : { 'graph_fields' : graph_fields, 'graph_weight' : graph_weight, 'with_types' : with_types, }, })) return ['graph'], render_templates(['graph'], output, json_name, verbose=verbose)
[docs] def visualize(self, output='example', icon_label=None, icon_weight=None, icon_color=None, icon_type='auto', from_keys=None, add_lines=None, add_anonymous_icons=None, add_anonymous_lines=None, link_duplicates=True, draw_join_fields=False, catalog=None, line_colors=None, verbose=True): """Creates map and other visualizations. :param output: set the name of the rendered files :param icon_label: set the field which will appear as map icons title :param icon_weight: set the field defining the map icons circle \ surface :param icon_color: set the field defining the map icons colors :param icon_type: set the icon size, either ``'B'``, ``'S'``, \ ``'auto'`` or ``None`` for no-icons mode :param from_keys: only display this iterable of keys if not None :param add_lines: list of ``(key1, key2, ..., keyN)`` to draw \ additional lines :param add_anonymous_icons: list of geocodes, like \ ``[(lat1, lng1), (lat2, lng2), ..., (latN, lngN)]``, \ to draw additional icons from geocodes not in the data :param add_anonymous_icons: list of list of geocodes, like \ ``[[(lat1, lng1), (lat2, lng2), ..., (latN, lngN)], ...]``, \ to draw additional lines from geocodes not in the data :param link_duplicates: boolean toggling lines between duplicated \ keys, default ``True`` :param draw_join_fields: boolean toggling drawing of join fields \ containing geocode information, default ``False`` :param catalog: dictionary of ``{'value': 'color'}`` to have \ specific colors for some categories, which is computed with \ the ``icon_color`` field :param line_colors: tuple of 4 colors to change the default lines \ color, the three values are for the three line types: those \ computed with ``link_duplicates``, those given with \ ``add_lines``, those given with ``add_anonymous_lines``, \ those computed with ``draw_join_fields`` :param verbose: toggle verbosity :returns: this is the tuple of (names of templates \ rendered, (list of html templates, list of static files)) """ if not self.hasGeoSupport(): if verbose: print print '/!\ Could not find fields %s in headers %s.' % \ (' and '.join(GEO_FIELDS), self.fields) print '/!\ Setting draw_join_fields to True.' draw_join_fields = True if icon_label is not None and icon_label not in self.fields: raise ValueError('icon_label "%s" not in fields %s.' % (icon_label, self.fields)) if icon_weight is not None and icon_weight not in self.fields: raise ValueError('icon_weight "%s" not in fields %s.' % (icon_weight, self.fields)) if icon_color is not None and icon_color not in self.fields: raise ValueError('icon_color "%s" not in fields %s.' % (icon_color, self.fields)) # Optional function which gives points weight if icon_label is None: get_label = lambda key: key else: get_label = lambda key: self.get(key, icon_label) # Optional function which gives points weight if icon_weight is None: get_weight = lambda key: 0 else: get_weight = lambda key: self.get(key, icon_weight) # Optional function which gives points category if icon_color is None: get_category = lambda key: None else: get_category = lambda key: self.get(key, icon_color) # from_keys lets you have a set of keys to visualize if from_keys is None: from_keys = iter(self) # Additional stuff if add_lines is None: add_lines = [] if add_anonymous_icons is None: add_anonymous_icons = [] if add_anonymous_lines is None: add_anonymous_lines = [] # catalog is a user defined color scheme if catalog is None: # Default diff-friendly catalog catalog = { ' ' : 'blue', '+' : 'green', 'Y' : 'green', '-' : 'red', 'N' : 'red', '@' : 'yellow', } if line_colors is None: line_colors = 'blue', 'orange', 'yellow', 'purple' if len(line_colors) != 4: raise ValueError('line_colors must a tuple of 4 colors, was %s.' % \ str(line_colors)) # Storing json data data = [ self._buildIconData(key, get_label, get_weight, get_category) for key in from_keys if key in self ] + [ self._buildAnonymousIconData(lat_lng) for lat_lng in add_anonymous_icons ] # Join data join_icons, join_lines = [], [] if draw_join_fields: # Finding out which external base has geocode support # We start goin over the self.fields to preserve fields order # then we look for potential join on multiple fields # in self._join.keys() geo_join_fields_list = [] for fields in self.fields + self._join.keys(): fields = tuplify(fields) if fields in geo_join_fields_list: continue if self.hasJoin(fields): if self.getJoinBase(fields).hasGeoSupport(): geo_join_fields_list.append(fields) if verbose: print '* Detected geocode support in join fields %s [%s].' % \ (str(fields), str(self._join[fields])) if not geo_join_fields_list: if verbose: print '* Could not detect geocode support in join fields.' else: join_icons, join_lines = self._buildJoinLinesData(geo_join_fields_list, data, 'Join line', line_colors[3], get_label, get_weight, get_category, verbose) if verbose: print '* Added icons for join fields, total %s' % len(join_icons) print '* Added lines for join fields, total %s' % len(join_lines) # Adding join icons on already computed data data = data + join_icons # Duplicates data dup_lines = [] if link_duplicates: dup_lines = self._buildLinksForDuplicates(data) if verbose: print '* Added lines for duplicates linking, total %s' % len(dup_lines) # Gathering data for lines data_lines = [ self._buildLineData(l, get_label, 'Duplicates', line_colors[0]) for l in dup_lines ] + [ self._buildLineData(l, get_label, 'Line', line_colors[1]) for l in add_lines ] + [ self._buildAnonymousLineData(l, 'Anonymous line', line_colors[2]) for l in add_anonymous_lines ] + \ join_lines # Icon type has_many = len(data) >= 100 base_icon = compute_base_icon(icon_type, has_many) # Building categories with_icons = icon_type is not None with_circles = icon_weight is not None categories = build_categories(data, with_icons, with_circles, catalog, verbose) # Finally, we write the colors as an element attribute for elem in data: elem['__col__'] = categories[elem['__cat__']]['color'] # Dump the json geocodes json_name = '%s_map.json' % output with open(json_name, 'w') as out: out.write(json.dumps({ 'meta' : { 'icon_label' : icon_label, 'icon_weight' : icon_weight, 'icon_color' : icon_color, 'icon_type' : icon_type, 'base_icon' : base_icon, 'link_duplicates' : link_duplicates, 'toggle_lines' : True if (add_lines or \ add_anonymous_lines or \ draw_join_fields) else False, }, 'points' : data, 'lines' : data_lines, 'categories' : sorted(categories.items(), key=lambda x: x[1]['volume'], reverse=True) })) # We do not render the map template if nothing to see nb_geocoded_points = 0 for elem in data: if (elem['lat'], elem['lng']) != ('?', '?'): nb_geocoded_points += 1 if nb_geocoded_points > 0 or data_lines: rendered = ['map', 'table'] else: rendered = ['table'] return rendered, render_templates(rendered, output, json_name, verbose=verbose)
def _buildIconData(self, key, get_label, get_weight, get_category): """Build data for key display. """ lat_lng = self.getLocation(key) if lat_lng is None: lat_lng = '?', '?' elem = { '__key__' : key, '__lab__' : get_label(key), '__wei__' : get_weight(key), '__cat__' : get_category(key), 'lat' : lat_lng[0], 'lng' : lat_lng[1] } for field in self.fields: # Keeping only important fields if not str(field).startswith('__') and \ not str(field).endswith('@raw') and \ field not in elem: elem[field] = str(self.get(key, field)) return elem @staticmethod def _buildAnonymousIconData(lat_lng): """Build data for anonymous point display. """ if lat_lng is None: lat_lng = '?', '?' return { '__key__' : '(%s, %s)' % lat_lng, '__lab__' : 'Anonymous', '__wei__' : 0, '__cat__' : '@', 'lat' : lat_lng[0], 'lng' : lat_lng[1] } def _buildLineData(self, line, get_label, title, color): """Build data for line display. """ data_line = [] for l_key in line: if l_key not in self: continue lat_lng = self.getLocation(l_key) if lat_lng is None: lat_lng = '?', '?' data_line.append({ '__key__' : l_key, '__lab__' : get_label(l_key), 'lat' : lat_lng[0], 'lng' : lat_lng[1], }) return { '__lab__' : title, '__col__' : color, 'path' : data_line, } @staticmethod def _buildAnonymousLineData(line, title, color): """Build data for anonymous line display. """ data_line = [] for lat_lng in line: if lat_lng is None: lat_lng = '?', '?' data_line.append({ '__key__' : '(%s, %s)' % lat_lng, '__lab__' : 'Anonymous', 'lat' : lat_lng[0], 'lng' : lat_lng[1], }) return { '__lab__' : title, '__col__' : color, 'path' : data_line, } def _buildLinksForDuplicates(self, data): """Build lines data between duplicated keys. """ dup_lines = [] # We add to dup_lines all list of duplicates # We keep a set of already processed "master" keys to avoid # putting several identical lists in the json done_keys = set() for elem in data: key = elem['__key__'] if key not in self: # Possible for anonymous keys added for display continue if not self.hasParents(key): mkey = set([key]) else: mkey = set(self.get(key, '__par__')) if self.hasDuplicates(key) and not mkey.issubset(done_keys): # mkey have some keys which are not in done_keys dup_lines.append(self.getFromAllDuplicates(key, '__key__')) done_keys = done_keys | mkey return dup_lines def _buildJoinLinesData(self, geo_join_fields_list, data, title, line_color, get_label, get_weight, get_category, verbose=True): """Build lines data for join fields """ # Precaution on fields type geo_join_fields_list = [ tuplify(fields) for fields in geo_join_fields_list ] join_lines = [] join_icons = {} for elem in data: key = elem['__key__'] key_lat_lng = self.getLocation(key) if key not in self: # Possible for anonymous keys added for display continue joined_values = [ self.get(key, fields, ext_field='__key__') for fields in geo_join_fields_list ] # Cartesian product is made on non-empty join results if verbose: for v, fields in zip(joined_values, geo_join_fields_list): if not v: values = [str(self.get(key, f)) for f in fields] print 'Could not retrieve data from join on "%s" for "%s", key "%s".' % \ ('/'.join(fields), '/'.join(values), key) comb = product(*[v for v in joined_values if v]) for c in comb: #print c if not c: # Case where there is no fields in self._join continue data_line = [] if key_lat_lng is not None: # We add the geocode at the beginning of the line data_line.append({ '__key__' : key, '__lab__' : get_label(key), 'lat' : key_lat_lng[0], 'lng' : key_lat_lng[1], }) for jkeys, fields in zip(c, geo_join_fields_list): # Is a tuple if we had some subdelimiters jkeys = tuplify(jkeys) for jkey in jkeys: lat_lng = self.getJoinBase(fields).getLocation(jkey) if lat_lng is None: lat_lng = '?', '?' values = [str(self.get(key, f)) for f in fields] join_icons[jkey] = { '__key__' : jkey, '__lab__' : '%-6s [line %s, join on field(s) %s for value(s) %s]' % \ (jkey, key, '/'.join(fields), '/'.join(values)), '__wei__' : get_weight(key), # *key*, not *jkey* '__cat__' : get_category(key), # *key*, not *jkey* 'lat' : lat_lng[0], 'lng' : lat_lng[1] } data_line.append({ '__key__' : jkey, '__lab__' : '%-6s [line %s, join on field(s) %s for value(s) %s]' % \ (jkey, key, '/'.join(fields), '/'.join(values)), 'lat' : lat_lng[0], 'lng' : lat_lng[1], }) join_lines.append({ '__lab__' : title, '__col__' : line_color, 'path' : data_line, }) return join_icons.values(), join_lines
def compute_base_icon(icon_type, has_many): """Compute icon. """ if icon_type is None: return '' if icon_type == 'auto': return 'point.png' if has_many else 'marker.png' if icon_type == 'S': return 'point.png' if icon_type == 'B': return 'marker.png' raise ValueError('icon_type "%s" not in %s.' % \ (icon_type, ('auto', 'S', 'B', None))) def build_categories(data, with_icons, with_circles, catalog, verbose): """Build categories from data and catalog """ # Count the categories for coloring categories = {} for elem in data: if not with_icons: # Here we are in no-icon mode, categories # will be based on the entries who will have a circle try: c = float(elem['__wei__']) except ValueError: c = 0 else: c = 1 cat = elem['__cat__'] if cat not in categories: categories[cat] = 0 if c > 0: categories[cat] += c # Color repartition given biggest categories colors = ('red', 'orange', 'yellow', 'green', 'cyan', 'purple') col_num = 0 if not categories: step = 1 else: # c > 0 makes sure we do not create a category # for stuff that will not be displayed nb_non_empty_cat = len([c for c in categories.values() if c > 0]) if nb_non_empty_cat > 0: step = max(1, len(colors) / nb_non_empty_cat) else: # All categories may be empty if not icons + not circles step = 1 for cat, vol in sorted(categories.items(), key=lambda x: x[1], reverse=True): categories[cat] = { 'volume' : vol } if cat is None: # None is also the default category, when icon_color is None categories[cat]['color'] = 'blue' elif col_num < len(colors): # We affect the next color available categories[cat]['color'] = colors[col_num] col_num += step else: # After all colors are used, remaining categories are black categories[cat]['color'] = 'black' if verbose: if with_icons: field_vol = 'volume' elif with_circles: field_vol = 'weight' else: field_vol = '(not used)' print '> Affecting category %-8s to color %-7s | %s %s' % \ (cat, categories[cat]['color'], field_vol, vol) for cat in catalog: if cat in categories: old_color = categories[cat]['color'] new_color = catalog[cat] categories[cat]['color'] = new_color if verbose: print '> Overrides category %-8s to color %-7s (from %-7s)' % \ (cat, new_color, old_color) # We test other categories to avoid duplicates in coloring for ocat in categories: if ocat == cat: continue ocat_color = categories[ocat]['color'] if ocat_color == new_color: categories[ocat]['color'] = old_color if verbose: print '> Switching category %-8s to color %-7s (from %-7s)' % \ (ocat, old_color, ocat_color) return categories # Assets for map and table ASSETS = { 'map' : { 'template' : { # source : v_target relative('MapAssets/template.html') : '%s_map.html', }, 'static' : { # source : target relative('MapAssets/map.js') : 'map.js', relative('MapAssets/point.png') : 'point.png', relative('MapAssets/marker.png') : 'marker.png', relative('MapAssets/red_point.png') : 'red_point.png', relative('MapAssets/red_marker.png') : 'red_marker.png', relative('MapAssets/orange_point.png') : 'orange_point.png', relative('MapAssets/orange_marker.png') : 'orange_marker.png', relative('MapAssets/yellow_point.png') : 'yellow_point.png', relative('MapAssets/yellow_marker.png') : 'yellow_marker.png', relative('MapAssets/green_point.png') : 'green_point.png', relative('MapAssets/green_marker.png') : 'green_marker.png', relative('MapAssets/cyan_point.png') : 'cyan_point.png', relative('MapAssets/cyan_marker.png') : 'cyan_marker.png', relative('MapAssets/blue_point.png') : 'blue_point.png', relative('MapAssets/blue_marker.png') : 'blue_marker.png', relative('MapAssets/purple_point.png') : 'purple_point.png', relative('MapAssets/purple_marker.png') : 'purple_marker.png', relative('MapAssets/black_point.png') : 'black_point.png', relative('MapAssets/black_marker.png') : 'black_marker.png', } }, 'table' : { 'template' : { # source : v_target relative('TableAssets/template.html') : '%s_table.html', }, 'static' : { # source : target relative('TableAssets/table.js') : 'table.js', } }, 'graph' : { 'template' : { # source : v_target relative('GraphAssets/template.html') : '%s_graph.html', }, 'static' : { # source : target relative('GraphAssets/graph.js') : 'graph.js', relative('GraphAssets/jit.js') : 'jit.js', relative('GraphAssets/jit-yc.js') : 'jit-yc.js', } } } def render_templates(names, output, json_name, verbose): """Render HTML templates. """ tmp_template = [] tmp_static = [json_name] for name in names: if name not in ASSETS: raise ValueError('Unknown asset name %s' % name) assets = ASSETS[name] for template, v_target in assets['template'].iteritems(): target = v_target % output with open(template) as temp: with open(target, 'w') as out: for row in temp: row = row.replace('{{file_name}}', output) row = row.replace('{{json_file}}', json_name) out.write(row) tmp_template.append(target) for source, target in assets['static'].iteritems(): copy(source, target) tmp_static.append(target) if verbose: print print '* Now you may use your browser to visualize:' print ' '.join(tmp_template) print print '* If you want to clean the temporary files:' print 'rm %s' % ' '.join(tmp_static + tmp_template) print return tmp_template, tmp_static def ext_split(value, split): """Extended split function handling None and '' splitter. :param value: the value to be split :param split: the splitter :returns: the split value >>> ext_split('', ',') () >>> ext_split('PAR', 'A') ('P', 'R') >>> ext_split('PAR', '') ('P', 'A', 'R') >>> ext_split('PAR', None) 'PAR' """ if split is None: return value if split == '': # Here we convert a string like 'CA' into ('C', 'A') return tuple(value) # Python split function has ''.split(';') -> [''] # But in this case we prefer having [] as a result if not value: return () return tuple(value.split(split)) def recursive_split(value, splits): """Recursive extended split. :param value: the value to be split :param splits: the list of splitters :returns: the split value >>> recursive_split('PAR^Paris/Parys', ['^', '/']) (('PAR',), ('Paris', 'Parys')) >>> recursive_split('|PAR|=', ['=', '|']) (('', 'PAR', ''),) Multiple splits on empty string should return empty tuple. >>> recursive_split('', ['^']) () >>> recursive_split('', ['^', '/']) () >>> recursive_split('', ['^', '/', ':']) () """ # Case where no splits if not splits: return value if len(splits) == 1: return ext_split(value, splits[0]) if len(splits) == 2: return tuple(ext_split(v, splits[1]) for v in ext_split(value, splits[0]) if v) if len(splits) == 3: return tuple(tuple(ext_split(sv, splits[2]) for sv in ext_split(v, splits[1]) if sv) for v in ext_split(value, splits[0]) if v) raise ValueError('Sub delimiter "%s" not supported.' % str(splits)) def iter_over_subdel(value, deep=False): """Iterator over recursive_split values. We iter over the sub elements of the structure. >>> list(iter_over_subdel(())) [] >>> list(iter_over_subdel('T0')) ['T0'] >>> list(iter_over_subdel(['T1', 'T1'])) ['T1', 'T1'] >>> list(iter_over_subdel([('T2', 'T2'), 'T1'])) [('T2', 'T2'), 'T1'] >>> list(iter_over_subdel([('T2', 'T2'), 'T1'], deep=True)) ['T2', 'T2', 'T1'] """ if isinstance(value, (list, tuple, set)): for e in value: if not deep: yield e else: for ee in iter_over_subdel(e): yield ee else: yield value def tuplify(s): """ Convert iterable into tuple, if string just put in in a tuple. >>> tuplify('test') ('test',) >>> tuplify(['test', 'titi']) ('test', 'titi') """ if isinstance(s, str): return (s,) else: return tuple(s) def build_get_phonemes(method): """Compute phonemes method and matching phonemes method. """ if method == 'metaphone': get_phonemes = lambda s: dmeta(s)[0] matcher = lambda s1, s2: s1 == s2 elif method == 'dmetaphone-strict': get_phonemes = dmeta matcher = lambda s1, s2: s1 == s2 elif method == 'dmetaphone': get_phonemes = dmeta matcher = lambda s1, s2: set(s1) & set(s2) - set([None]) elif method == 'nysiis': get_phonemes = nysiis matcher = lambda s1, s2: s1 == s2 else: raise ValueError('Accepted methods are %s' % \ ['metaphone', 'dmetaphone-strict', 'dmetaphone', 'nysiis']) return get_phonemes, matcher def build_cache_key(*args, **kwargs): """Build key for the cache of fuzzyFind, based on parameters. >>> build_cache_key(GeoBase.fuzzyClean('paris de gaulle'), ... 'name', ... max_results=None, ... min_match=0, ... from_keys=None) ('paris+de+gaulle', 'name', None, None, 0) >>> build_cache_key(GeoBase.fuzzyClean('Antibes SNCF 2'), ... 'name', ... max_results=3, ... min_match=0, ... from_keys=None) ('antibes+sncf+2', 'name', None, 3, 0) """ # We handle the fact that dictionary are not sorted, but this # will build the smae key for parameters return tuple(args) + tuple(kwargs[k] for k in sorted(kwargs)) def _test(): """When called directly, launching doctests. """ import doctest extraglobs = { 'geo_o': GeoBase(data='ori_por', verbose=False), 'geo_a': GeoBase(data='airports', verbose=False), 'geo_t': GeoBase(data='stations', verbose=False), 'geo_f': GeoBase(data='feed', verbose=False) } opt = (doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) #doctest.REPORT_ONLY_FIRST_FAILURE) #doctest.IGNORE_EXCEPTION_DETAIL) doctest.testmod(extraglobs=extraglobs, optionflags=opt) if __name__ == '__main__': _test()