Source code for eleve.leveldb

""":mod:`eleve.leveldb`
======================

Provide a Storage (:class:`eleve.leveldb.LeveldbStorage`) and a Trie
(:class:`eleve.leveldb.LeveldbTrie`) that use LevelDB as disk backend.
The implementation over LevelDB is done in python by using :mod:`plyvel`.
"""
import os
import struct
import math
import collections
import logging
import os

import plyvel

from eleve.memory import MemoryTrie, MemoryStorage

NaN = float('nan')

NORMALIZATION_KEY_PREFIX = b'\xff'
NORMALIZATION_PACKER = struct.Struct('<ff')

PACKER = struct.Struct('<Lf')
SEPARATOR = b'\x00' # before every word that is inserted we put that byte, so that they are separated.
SEPARATOR_PLUS_ONE = bytes((SEPARATOR[0]+1,))

[docs]def to_bytes(o): """ Encode the object as a bytes object: - if it's already a bytes object, don't do nothing - else, take its string representation and encode it as a bytes """ return o if type(o) == bytes else str(o).encode()
[docs]def ngram_to_key(ngram): """ Convert a ngram to a leveldb key (a bytes object). The first byte is the length of the ngram, then we have SEPARATOR and the bytes representation of the token, for each token. """ assert len(ngram) < 256 return bytes([len(ngram)]) + b''.join([SEPARATOR + to_bytes(i) for i in ngram])
[docs]class Node: """ Represents a node of the trie in Leveldb. Loaded by its key. Can update its entropy, and save it in leveldb. Can list its childs. """
[docs] def __init__(self, db, key, data=None): """ :param db: the leveldb object (used to retrieve/save the nodes) :param key (bytes): the key of the node in the database :param data: should be generally kept as a None. if you have the data, you can pass them as a bytes object. if you pass False, we won't try to retrieve them and assume the node doesn't exists. """ self.db = db self.key = key if data is None: data = db.get(key) self.count, self.entropy = PACKER.unpack(data) if data else (0, NaN)
[docs] def iter_childs(self): """ :returns: the childs of the node as other Node objects. """ start = bytes([self.key[0] + 1]) + self.key[1:] + SEPARATOR stop = start[:-1] + SEPARATOR_PLUS_ONE for key, value in self.db.iterator(start=start, stop=stop): yield Node(self.db, key, value)
[docs] def save(self, db=None): """ Save the node in the database. :param db: You can optionally pass a database if you want to save it here instead of the default database. """ value = PACKER.pack(self.count, self.entropy) (db or self.db).put(self.key, value)
[docs] def update_entropy(self, terminals): """ Update the entropy of the node (and save it if it changed). :param terminals: a set of bytes. If a token is inside that set, it will count as N different tokens instead of a token with count N. """ entropy = 0 sum_counts = 0 for child in self.iter_childs(): if child.count == 0: continue sum_counts += child.count if child.key.split(SEPARATOR)[-1] in terminals: entropy += (child.count / self.count) * math.log(self.count, 2) else: entropy -= (child.count / self.count) * math.log(child.count / self.count, 2) assert entropy >= 0 if not sum_counts: entropy = NaN else: assert sum_counts == self.count if self.entropy != entropy and not(math.isnan(self.entropy) and math.isnan(entropy)): self.entropy = entropy self.save()
[docs]class LeveldbTrie(MemoryTrie):
[docs] def __init__(self, path, terminals=[]): """ Create or opent a Trie using leveldb as backend. """ self.path = path self.terminals = set(to_bytes(i) for i in terminals) self.db = plyvel.DB(path, create_if_missing=True, write_buffer_size=32*1024**2, #block_size=16*1024, #lru_cache_size=512*1024**2, #bloom_filter_bits=8, ) # retrieve the normalization constants from leveldb self.normalization = [] depth_level = 0 while True: ndata = self.db.get(NORMALIZATION_KEY_PREFIX + bytes((depth_level,))) if ndata is None: break self.normalization.append(NORMALIZATION_PACKER.unpack(ndata)) depth_level += 1 # if no normalization vector founds self.dirty = len(self.normalization) == 0
@property def root(self): """ Returns root node """ return Node(self.db, b'\x00')
[docs] def close(self): self.db.close()
[docs] def clear(self): """ Delete the trie that's in the database. """ for key in self.db.iterator(include_value=False): self.db.delete(key) self.dirty = True
[docs] def update_stats(self): super(LeveldbTrie, self).update_stats() # store normalization vector in DB for pseudo_depth, (mean, stdev) in enumerate(self.normalization): self.db.put(NORMALIZATION_KEY_PREFIX + bytes((pseudo_depth,)), NORMALIZATION_PACKER.pack(mean, stdev)) self.db.compact_range()
def _check_dirty(self): if self.dirty: self.update_stats()
[docs] def node(self, ngram): return Node(self.db, ngram_to_key(ngram))
[docs] def add_ngram(self, ngram, freq=1): if freq <= 0: raise ValueError("freq should be larger or equal to 1") if len(ngram) == 0: logging.warning("Adding empty ngram just do nothing.") return if not self.dirty: self.dirty = True self.db.delete(b'\xff\x00') b = bytearray(b'\x00') w = self.db.write_batch() # shortcut : if we encounter a node with a counter to zero, we will # set the node data to False and avoid doing queries for the following nodes. create = False node = self.root node.count += freq node.save(w) for i in range(1, len(ngram) + 1): b[0] = i b.extend(SEPARATOR + str(ngram[i - 1]).encode()) node = Node(self.db, bytes(b), data=(False if create else None)) if node.count == 0: create = True node.count += freq node.save(w) w.write()
[docs] def query_count(self, ngram): return self.node(ngram).count
[docs] def query_entropy(self, ngram): self._check_dirty() return self.node(ngram).entropy
[docs] def query_ev(self, ngram): self._check_dirty() if not ngram: return NaN node = self.node(ngram) if math.isnan(node.entropy): return NaN parent = self.node(ngram[:-1]) if node.entropy != 0 or parent.entropy != 0: return node.entropy - parent.entropy return NaN
[docs] def query_autonomy(self, ngram): self._check_dirty() ev = self.query_ev(ngram) if math.isnan(ev): return NaN try: mean, stdev = self.normalization[len(ngram)-1] return (ev - mean) / stdev except (ZeroDivisionError, IndexError): return NaN
[docs]class LeveldbStorage(MemoryStorage):
[docs] def __init__(self, path, default_ngram_length=None): """ Initialize the model. :param path: Path to the database where to load and store the model. If the path is not existing an empty model will be created. :param default_ngram_length: the default maximum length of n-gram beeing stored. It will equals 5 for a newly created storage. Note that it may be overriden in :func:`add_sentence`. """ self.path = path # store the path, in RAM, usefull at least for test if not os.path.isdir(path): os.makedirs(path) config_path = path + "/config" new_storage = not os.path.isdir(config_path) # create/open Storage config/metadata DB self.config = plyvel.DB(config_path, create_if_missing=True, write_buffer_size=32*1024**2, ) if new_storage: if default_ngram_length is None: default_ngram_length = 5 assert isinstance(default_ngram_length, int) and default_ngram_length > 0 self.config.put(b"default_ngram_length", str(default_ngram_length).encode()) self._default_ngram_length = int(self.config.get(b"default_ngram_length")) # create/open both trie terminals = [self.sentence_start, self.sentence_end] self.bwd = LeveldbTrie(path=(path + '/bwd'), terminals=terminals) self.fwd = LeveldbTrie(path=(path + '/fwd'), terminals=terminals) #TODO: if loading (path exist?) then read the default_ngram_length from DD
@property def default_ngram_length(self): return self._default_ngram_length
[docs] def close(self): self.config.close() self.bwd.close() self.fwd.close()