""":mod:`eleve.memory`
======================
Provide full-python reference implementation of ``eleve`` storage and Trie.
"""
from __future__ import division
import math
import logging
__all__ = ["MemoryTrie", "MemoryStorage"]
NaN = float('nan')
class MemoryNode(object):
""" Node used by :class:`MemoryTrie`
"""
# to take a little less memory
__slots__ = ['count', 'entropy', 'childs']
def __init__(self, count=0):
self.count = count
self.entropy = float('nan')
self.childs = {}
def update_entropy(self, terminals):
""" Update the entropy of the node.
:param terminals: a set of bytes. If a token is inside that set, it will
count as N different tokens instead of a token with count N.
"""
entropy = 0
sum_counts = 0
for token, child in self.childs.items():
if child.count == 0:
continue
sum_counts += child.count
if token in terminals:
entropy += (child.count / self.count) * math.log(self.count, 2)
else:
entropy -= (child.count / self.count) * math.log(child.count / self.count, 2)
if not sum_counts:
entropy = NaN
else:
assert sum_counts == self.count
if self.entropy != entropy and not(math.isnan(self.entropy) and math.isnan(entropy)):
self.entropy = entropy
def iter_childs(self):
""" Returns an iterator over childs nodes
"""
return self.childs.values()
class MemoryLeaf(object):
__slots__ = ['count']
def __init__(self, count=0):
self.count = count
@property
def entropy(self):
return float('nan')
def to_node(self):
return MemoryNode(count=self.count)
[docs]class MemoryTrie:
""" In-memory tree (made to be simple, no specific optimizations)
"""
[docs] def __init__(self, terminals=[]):
""" Constructor
:param terminals: Tokens that are in "terminals" array are counted as
distinct in the entropy computation. By default, the symbols are for
start and end of sentences.
"""
self.root = MemoryNode()
# normalization params :
# * one for each level
# * on each level : mean, stdev
# * WARNING: self.normalization[0] gives data for depth 1 (depth 0 is root and always NaN, NaN)
self.normalization = []
self.terminals = set(terminals)
self.dirty = True
[docs] def max_depth(self):
""" Returns the maximum depth of the Trie
>>> trie = MemoryTrie()
>>> trie.max_depth()
0
>>> trie.add_ngram(["A", "B", "C"])
>>> trie.max_depth()
3
"""
self._check_dirty()
return len(self.normalization)
[docs] def clear(self):
""" Clear the trie.
"""
self.root = MemoryNode()
self.dirty = True
return self
[docs] def iter_leafs(self):
def _rec(ngram, node):
if node.childs:
for token, child in node.childs.items():
for i in _rec(ngram + [token], child): yield i
elif node is not self.root:
yield ngram
for i in _rec([], self.root): yield i
def _update_stats_rec(self, parent_entropy, depth, node):
""" Recurively update both entropy and normalization vector
"""
# extend normalization vector if needed
while len(self.normalization) < depth:
self.normalization.append((0., 0., 0))
# if MemoryLeaf nothing else should be done
if isinstance(node, MemoryLeaf):
return
node.update_entropy(self.terminals)
# update entropy variation mean and std if possible (not NaN)
if depth > 0 and not math.isnan(node.entropy) and (node.entropy or parent_entropy):
ev = node.entropy - parent_entropy
mean, stdev, count = self.normalization[depth-1]
old_mean = mean
count += 1
mean += (ev - old_mean) / count
stdev += (ev - old_mean) * (ev - mean)
self.normalization[depth-1] = mean, stdev, count
# recurifs calls
for child in node.iter_childs():
self._update_stats_rec(node.entropy, depth + 1, child)
[docs] def update_stats(self):
""" Update the internal statistics (like entropy, and stdev & means)
for the entropy variations.
Called automatically if the trie is modified and we then do queries on it.
"""
if not self.dirty:
return
self.normalization = []
self._update_stats_rec(NaN, 0, self.root)
for pseudo_depth, (mean, _stdev, count) in enumerate(self.normalization):
stdev = math.sqrt(_stdev / (count or 1))
self.normalization[pseudo_depth] = (mean, stdev)
self.dirty = False
def _check_dirty(self):
if self.dirty:
logging.warning("Updating the tree statistics (update_stats method), as we query it while dirty. This is a slow operation.")
self.update_stats()
[docs] def add_ngram(self, ngram, freq=1):
""" Add a ngram to the trie.
:param ngram: A list of tokens.
:param freq: specify the number of times you add (or substract) that ngram.
"""
if freq <= 0:
raise ValueError("freq should be larger or equal to 1")
if len(ngram) == 0:
logging.warning("Adding empty ngram just do nothing.")
return
self.dirty = True
parent = self.root
parent.count += freq
for depth, token in enumerate(ngram):
try:
child = parent.childs[token]
child.count += freq
# transform leaf to node, if we are not at the end
if depth < len(ngram) - 1 and isinstance(child, MemoryLeaf):
child = child.to_node()
parent.childs[token] = child
except KeyError: #node do not exist yet
child = MemoryNode(freq) if depth < len(ngram) - 1 else MemoryLeaf(freq)
parent.childs[token] = child
parent = child
def _lookup(self, ngram):
""" Search for a node.
:returns: a couple with the parent node and the node.
:raises KeyError: if the node doesn't exists.
"""
node = self.root
last_node = node
while ngram:
last_node = node
node = node.childs[ngram[0]]
ngram = ngram[1:]
return (last_node, node)
[docs] def query_count(self, ngram):
""" Query for the number of occurences we have seen the n-gram in the training data.
:param ngram: A list of tokens.
:returns: An integer.
"""
try:
_, node = self._lookup(ngram)
except (KeyError, AttributeError):
return 0.0
return node.count
[docs] def query_entropy(self, ngram):
""" Query for the branching entropy.
:param ngram: A list of tokens.
:returns: A float, that can be NaN if it is not defined.
"""
self._check_dirty()
try:
_, node = self._lookup(ngram)
except (KeyError, AttributeError):
return float('nan')
return node.entropy
[docs] def query_ev(self, ngram):
""" Query for the branching entropy variation.
:param ngram: A list of tokens.
:returns: A float, that can be NaN if it is not defined.
"""
self._check_dirty()
if not ngram:
return float('nan')
try:
last_node, node = self._lookup(ngram)
except (KeyError, AttributeError):
return float('nan')
if not math.isnan(node.entropy) and (node.entropy != 0 or last_node.entropy != 0):
return node.entropy - last_node.entropy
return float('nan')
[docs] def query_autonomy(self, ngram, z_score=True):
""" Query the autonomy (normalized entropy variation) for the n-gram.
:param ngram: A list of tokens.
:param z_score: If True, compute the z_score ((value - mean) / stdev). If False, just substract the mean.
:returns: A float, that can be NaN if it is not defined.
"""
self._check_dirty()
try:
mean, stdev = self.normalization[len(ngram)-1]
except IndexError:
return float('nan')
ev = self.query_ev(ngram)
if math.isnan(ev):
return float('nan')
nev = ev - mean
if z_score:
try:
nev /= stdev
except ZeroDivisionError:
return float('nan')
return nev
[docs]class MemoryStorage:
""" Full-Python in-memory storage.
"""
# Use PRIVATE_USE_AREA codes
sentence_start = "\ue02b" # in utf8 : b"\xee\x80\xab"
# see http://www.fileformat.info/info/unicode/char/e02b/index.htm
sentence_end = "\ue02d" # in utf8 : b"\xee\x80\xad"
# see http://www.fileformat.info/info/unicode/char/e02d/index.htm
[docs] def __init__(self, default_ngram_length=5):
""" Storage constructor.
:param default_ngram_length: the default maximum length of n-gram beeing
stored. May be overriden in :func:`add_sentence`.
"""
assert isinstance(default_ngram_length, int) and default_ngram_length > 0
self._default_ngram_length = default_ngram_length
terminals = [self.sentence_start, self.sentence_end]
self.bwd = MemoryTrie(terminals=terminals)
self.fwd = MemoryTrie(terminals=terminals)
@property
def default_ngram_length(self):
return self._default_ngram_length
[docs] def add_sentence(self, sentence, freq=1, ngram_length=None):
""" Add a sentence to the model.
:param sentence: The sentence to add. Should be a list of tokens.
:param freq: The number of times to add this sentence. One by default. May be negative to "remove" a sentence.
:param ngram_length: The length of n-grams that are stored. If None the
default value setup in __init__ is used.
"""
if freq <= 0:
raise ValueError("freq should be larger or equal to 1")
if not sentence:
return
if ngram_length is None:
ngram_length = self.default_ngram_length
token_list = [self.sentence_start] + sentence + [self.sentence_end]
for i in range(len(token_list) - 1):
self.fwd.add_ngram(token_list[i:i+ngram_length], freq)
token_list = token_list[::-1]
for i in range(len(token_list) - 1):
self.bwd.add_ngram(token_list[i:i+ngram_length], freq)
[docs] def clear(self):
""" Clear the training data in the model, effectively resetting it.
"""
self.bwd.clear()
self.fwd.clear()
[docs] def update_stats(self):
""" Update the entropies and normalization factors. This function is called automatically when you modify the model and then query it.
"""
self.bwd.update_stats()
self.fwd.update_stats()
[docs] def query_autonomy(self, ngram):
""" Query the autonomy for a ngram.
:param ngram: A list of tokens.
:returns: A float, that can be NaN if it is not defined.
"""
result_fwd = self.fwd.query_autonomy(ngram)
result_bwd = self.bwd.query_autonomy(ngram[::-1])
if math.isnan(result_fwd) or math.isnan(result_bwd):
return float('nan')
return (result_fwd + result_bwd) / 2
[docs] def query_ev(self, ngram):
""" Query the entropy variation for a ngram.
:param ngram: A list of tokens.
:returns: A float, that can be NaN if it is not defined.
"""
result_fwd = self.fwd.query_ev(ngram)
result_bwd = self.bwd.query_ev(ngram[::-1])
if math.isnan(result_fwd) or math.isnan(result_bwd):
return float('nan')
return (result_fwd + result_bwd) / 2
[docs] def query_count(self, ngram):
""" Query the count for a ngram (the number of time it appeared in the training corpus).
:param ngram: A list of tokens.
:returns: A float.
"""
return self.fwd.query_count(ngram)
[docs] def query_entropy(self, ngram):
""" Query the branching entropy for a n-gram.
:param ngram: A list of tokens.
:returns: A float, that can be NaN if it is not defined.
"""
entropy_fwd = self.fwd.query_entropy(ngram)
entropy_bwd = self.bwd.query_entropy(ngram[::-1])
if math.isnan(entropy_fwd) or math.isnan(entropy_bwd):
return float('nan')
return (entropy_fwd + entropy_bwd) / 2