Source code for bytestag.storage

'''Storage management and implementations of KVPTables'''
# This file is part of Bytestag.
# Copyright © 2012 Christopher Foo <chris.foo@gmail.com>.
# Licensed under GNU GPLv3. See COPYING.txt for details.
from bytestag.dht.models import FileInfo
from bytestag.events import Task
from bytestag.keys import KeyBytes
from bytestag.tables import KVPTable, KVPRecord, KVPID
import collections
import contextlib
import hashlib
import itertools
import logging
import math
import os
import sqlite3
import threading

__docformat__ = 'restructuredtext en'
_logger = logging.getLogger(__name__)


[docs]def part_to_byte_number(part_number, part_size): '''Converts a file segment number to the byte offset :rtype: :obj:`int` ''' return part_number * part_size
[docs]def byte_to_part_number(byte_number, part_size): '''Converts a byte offset to a file segment number. :rtype: :obj:`int` ''' return byte_number // part_size
[docs]def total_parts(total_byte_size, part_size): '''Returns the total number of segments of a file :rtype: :obj:`int` ''' return math.ceil(total_byte_size / part_size)
[docs]class MemoryKVPTable(KVPTable): '''A quick and dirty implementation of :class:`.KVPTable` .. note:: This class is generally used for unit tests. ''' def __init__(self): KVPTable.__init__(self) self._table = collections.defaultdict( lambda: collections.defaultdict(dict)) def _contains(self, kvpid): return kvpid.index in self._table[kvpid.key]
[docs] def indices(self, key): return list(self._table[key].keys())
def _getitem(self, kvpid): return self._table[kvpid.key][kvpid.index]['value'] def _setitem(self, kvpid, value): self._table[kvpid.key][kvpid.index]['value'] = value def _delitem(self, kvpid): del self._table[kvpid.key][kvpid.index]
[docs] def keys(self): for key in self._table: for index in self._table[key]: yield KVPID(key, index)
[docs] def record(self, kvpid): return MemoryKVPRecord(kvpid, self._table[kvpid.key][kvpid.index])
[docs] def is_acceptable(self, kvpid, size, timestamp): if not kvpid in self: return True if self.record(kvpid).timestamp != timestamp: return True
[docs]class MemoryKVPRecord(KVPRecord): '''The record associated with :class:`MemoryKVPTable`''' def __init__(self, kvpid, d): self._kvpid = kvpid self._d = d @property
[docs] def key(self): return self._kvpid.key
@property
[docs] def index(self): return self._kvpid.index
@property
[docs] def size(self): return len(self._d['value'])
@property
[docs] def value(self): return self._d['value']
@property def timestamp(self): return self._d.get('timestamp') @timestamp.setter
[docs] def timestamp(self, seconds): self._d['timestamp'] = seconds
@property def time_to_live(self): return self._d.get('time_to_live') @time_to_live.setter
[docs] def time_to_live(self, seconds): self._d['time_to_live'] = seconds
@property def is_original(self): return self._d.get('is_original') @is_original.setter
[docs] def is_original(self, b): self._d['is_original'] = b
@property def last_update(self): return self._d.get('last_update') @last_update.setter
[docs] def last_update(self, seconds): self._d['last_update'] = seconds
[docs]class SQLite3Mixin(object): '''A SQLite 3 mixin class to provide connection management''' @contextlib.contextmanager
[docs] def connection(self): '''Return a connection context manager''' if not hasattr(self, '_num_connections'): self._num_connections = 0 # if self._num_connections: # _logger.warning('There are %d connections already', # self._num_connections) con = sqlite3.connect(self._path, isolation_level='DEFERRED', detect_types=sqlite3.PARSE_DECLTYPES) con.row_factory = sqlite3.Row con.execute('PRAGMA synchronous=NORMAL') con.execute('PRAGMA journal_mode=WAL') con.execute('PRAGMA foreign_keys = ON') self._num_connections += 1 _logger.debug('Begin transaction current=%d', self._num_connections) try: with con: yield con finally: self._num_connections -= 1 _logger.debug('End transaction current=%d', self._num_connections)
@property
[docs] def database_size(self): '''The size of the database. :rtype: :obj:`int` ''' with self.connection() as con: cur = con.execute('PRAGMA page_count') page_count = cur.fetchone()[0] cur = con.execute('PRAGMA page_size') page_size = cur.fetchone()[0] return page_count * page_size
[docs] def iter_query(self, query, params=(), limit=1000): '''Return rows that are fetch in blocks and stored in memory. This function is useful for iterating the entire database without blocking other connections. ''' offset = 0 deque = collections.deque() while True: deque.clear() with self.connection() as con: cur = con.execute(query.format(limit, offset), params) for row in cur: deque.append(row) if not deque: break while True: try: yield deque.popleft() except IndexError: break offset += limit
[docs]class DatabaseKVPTable(KVPTable, SQLite3Mixin): '''A KVPTable stored as a SQLite database''' def __init__(self, path, max_size=2 ** 36): ''' :param path: A filename to the database. :param max_size: The maximum database size that the table will grow. ''' KVPTable.__init__(self) self._max_size = max_size self._path = path self._create_tables() @property def max_size(self): '''The maximum size the table will grow.''' return self._max_size @max_size.setter
[docs] def max_size(self, s): self._max_size = s
def _create_tables(self): with self.connection() as con: con.execute('CREATE TABLE IF NOT EXISTS kvps (' 'key_id BLOB NOT NULL, index_id BLOB NOT NULL,' 'timestamp INTEGER,' 'time_to_live INTEGER,' 'is_original INTEGER,' 'value BLOB,' 'last_update INTEGER DEFAULT 0,' 'PRIMARY KEY (key_id, index_id))') def _getitem(self, kvpid): with self.connection() as con: cur = con.execute('SELECT value FROM kvps ' 'WHERE key_id = ? AND index_id = ? ' 'LIMIT 1', (kvpid.key, kvpid.index)) for row in cur: return row['value'] def _contains(self, kvpid): with self.connection() as con: cur = con.execute('SELECT 1 FROM kvps ' 'WHERE key_id = ? AND index_id = ? LIMIT 1', (kvpid.key, kvpid.index)) return True if cur.fetchone() else False def _setitem(self, kvpid, value): with self.connection() as con: params = (value, kvpid.key, kvpid.index) try: con.execute('INSERT INTO kvps ' '(value, key_id, index_id) VALUES (?, ?, ?)', params) except sqlite3.IntegrityError: con.execute('UPDATE kvps SET value = ? ' 'WHERE key_id = ? AND index_id = ?', params)
[docs] def keys(self): query = 'SELECT key_id, index_id FROM kvps LIMIT {} OFFSET {}' for row in self.iter_query(query): yield KVPID(KeyBytes(row['key_id']), KeyBytes(row['index_id']))
[docs] def indices(self, key): for row in self.iter_query('SELECT index_id FROM kvps WHERE ' 'key_id = ? LIMIT {} OFFSET {}', (key,)): yield KeyBytes(row['index_id'])
def _delitem(self, kvpid): with self.connection() as con: con.execute('DELETE FROM kvps WHERE ' 'key_id = ? AND index_id = ?', (kvpid.key, kvpid.index))
[docs] def is_acceptable(self, kvpid, size, timestamp): if kvpid in self and self.record(kvpid).timestamp == timestamp: return False if self.database_size + size > self._max_size: return False return True
[docs] def record(self, kvpid): return DatabaseKVPRecord(self, kvpid)
[docs] def clean(self): '''Remove expired key-value pairs.''' _logger.debug('Clean database') with self.connection() as con: con.execute('''DELETE FROM kvps WHERE ''' '''timestamp + time_to_live < strftime('%s', 'now')''')
[docs]class DatabaseKVPRecord(KVPRecord): '''The record associated with :class:`DatabaseKVPTable`.''' __slots__ = ('_table', '_kvpid') def __init__(self, table, kvpid): self._table = table self._kvpid = kvpid def _get_field(self, name): with self._table.connection() as con: cur = con.execute('SELECT {} FROM kvps ' 'WHERE key_id = ? AND index_id = ?'.format(name), (self._kvpid.key, self._kvpid.index)) for row in cur: return row[0] def _save_field(self, name, value): with self._table.connection() as con: con.execute('UPDATE kvps SET {} = ? ' 'WHERE key_id = ? AND index_id = ?'.format(name), (value, self._kvpid.key, self._kvpid.index)) @property
[docs] def key(self): return self._kvpid.key
@property
[docs] def index(self): return self._kvpid.index
@property
[docs] def value(self): return self._table[self._kvpid]
@property
[docs] def size(self): return len(self.value)
@property def timestamp(self): return self._get_field('timestamp') @timestamp.setter
[docs] def timestamp(self, seconds): self._save_field('timestamp', seconds)
@property def time_to_live(self): return self._get_field('time_to_live') @time_to_live.setter
[docs] def time_to_live(self, seconds): self._save_field('time_to_live', seconds)
@property def is_original(self): return self._get_field('is_original') @is_original.setter
[docs] def is_original(self, b): self._save_field('is_original', b)
@property def last_update(self): return self._get_field('last_update') @last_update.setter
[docs] def last_update(self, seconds): self._save_field('last_update', seconds)
[docs]class ReadOnlyTableError(Exception): '''This error is raised when the table does support storing values.''' pass
[docs]class CollectionInfoTypes(object): '''Types of CollectionInfo file types''' DUMMY, BYTESTAG, BITTORRENT = range(3) BYTESTAG_COOKIE = b'{"!":"BytestagCollectionInfo"'
[docs]class SharedFilesKVPTable(KVPTable, SQLite3Mixin): '''Provides a KVPTable interface to shared files split into pieces.''' def __init__(self, path): ''' :param path: The filename of the database. ''' KVPTable.__init__(self) self._path = path self._shared_directories = [] self._create_tables() def _create_tables(self): with self.connection() as con: con.execute('CREATE TABLE IF NOT EXISTS files (' 'id INTEGER PRIMARY KEY,' 'filename TEXT NOT NULL UNIQUE,' 'key BLOB NOT NULL,' '`index` BLOB NOT NULL,' 'size INTEGER NOT NULL,' 'mtime INTEGER NOT NULL,' 'part_size INTEGER NOT NULL,' 'last_update INTEGER DEFAULT 0,' 'file_hash_info BLOB NOT NULL)' ) con.execute('CREATE TABLE IF NOT EXISTS parts (' 'hash_id BLOB PRIMARY KEY,' 'file_id INTEGER NOT NULL,' 'file_offset INTEGER NOT NULL,' 'last_update INTEGER DEFAULT 0,' 'FOREIGN KEY (file_id) REFERENCES files (id)' 'ON DELETE CASCADE' ')') con.execute('CREATE TABLE IF NOT EXISTS collections (' 'file_id INTEGER PRIMARY KEY,' 'type INTEGER NOT NULL,' 'FOREIGN KEY (file_id) REFERENCES files (id)' 'ON DELETE CASCADE' ')') con.execute('CREATE INDEX IF NOT EXISTS key ON files (key)') @property
[docs] def shared_directories(self): '''A list directories to be shared. Modify the list at your will, but be sure to sure to call :func:`hash_directories` as file monitoring is not yet supported. ''' return self._shared_directories
[docs] def is_acceptable(self, kvpid, size, timestamp): return False
[docs] def indices(self, key): if self._contains_part(key): yield key for i in self._file_hash_index(key): yield i
def _file_hash_index(self, key): for row in self.iter_query('SELECT `index` FROM files ' 'WHERE key = ?', (key,)): yield KeyBytes(row['index']) def _contains(self, kvpid): if kvpid.key == kvpid.index: return self._contains_part(kvpid.key) return self._contains_file_hash_info(kvpid) def _contains_part(self, key): with self.connection() as con: cur = con.execute('SELECT 1 FROM parts WHERE ' 'hash_id = ? ', (key,)) row = cur.fetchone() if row: return True def _contains_file_hash_info(self, kvpid): with self.connection() as con: cur = con.execute('SELECT 1 FROM files WHERE ' 'key = ? AND `index` = ? ', (kvpid.key, kvpid.index)) row = cur.fetchone() if row: return True
[docs] def keys(self): return itertools.chain(self._parts_keys(), self._files_keys())
def _parts_keys(self): query = 'SELECT hash_id FROM parts LIMIT {} OFFSET {}' for row in self.iter_query(query): yield KVPID(KeyBytes(row[0]), KeyBytes(row[0])) def _files_keys(self): query = 'SELECT key, `index` FROM files LIMIT {} OFFSET {}' for row in self.iter_query(query): yield KVPID(KeyBytes(row[0]), KeyBytes(row[1])) def _getitem(self, kvpid): if kvpid.key == kvpid.index: return self._get_part(kvpid.key) else: return self._get_file_hash_info(kvpid) def _get_part(self, key): with self.connection() as con: cur = con.execute('SELECT files.filename,' 'parts.file_offset, files.part_size ' 'FROM parts JOIN files ' 'ON parts.file_id = files.id ' 'WHERE hash_id = ?', (key,)) filename, offset, part_size = cur.fetchone() with open(filename, 'rb') as f: f.seek(offset) return f.read(part_size)
[docs] def file_hash_info(self, kvpid): return FileInfo.from_bytes(self._get_file_hash_info(kvpid))
def _get_file_hash_info(self, kvpid): with self.connection() as con: cur = con.execute('SELECT file_hash_info FROM files ' 'WHERE key = ? AND `index` = ? LIMIT 1', (kvpid.key, kvpid.index)) for row in cur: return row['file_hash_info'] raise IndexError('Not found') def _delitem(self, kvpid): raise ReadOnlyTableError() def _setitem(self, kvpid, value): raise ReadOnlyTableError()
[docs] def record(self, kvpid): if kvpid.key == kvpid.index: return SharedFilesRecord(self, kvpid) else: return SharedFileHashRecord(self, kvpid)
[docs] def hash_directories(self): '''Hash the directories and populate the table with file info. :rtype: :class:`SharedFilesHashTask` ''' task = SharedFilesHashTask(self) thread = threading.Thread(target=task) thread.daemon = True thread.name = 'SharedFilesHashTask' thread.start() return task
@property
[docs] def num_files(self): with self.connection() as con: cur = con.execute('SELECT COUNT(1) FROM files') return cur.fetchone()[0]
@property
[docs] def num_collections(self): with self.connection() as con: cur = con.execute('SELECT COUNT(1) FROM collections') return cur.fetchone()[0]
@property
[docs] def total_disk_size(self): with self.connection() as con: cur = con.execute('SELECT SUM(size) FROM files') return cur.fetchone()[0]
[docs]class SharedFilesRecord(KVPRecord): '''The record associated with :class:`SharedFilesKVPTable`. This record describes a single file on the filesystem. :see: :class:`SharedFileHashRecord` ''' __slots__ = ('_table', '_kvpid') def __init__(self, table, kvpid): self._table = table self._kvpid = kvpid def _get_field(self, name): with self._table.connection() as con: cur = con.execute('SELECT {} FROM parts ' 'WHERE hash_id = ?'.format(name), (self._kvpid.key,)) for row in cur: return row[0] def _save_field(self, name, value): with self._table.connection() as con: con.execute('UPDATE parts SET {} = ? ' 'WHERE hash_id = ?'.format(name), (value, self._kvpid.key)) @property
[docs] def key(self): return self._kvpid.key
@property
[docs] def index(self): return self._kvpid.index
@property
[docs] def value(self): return self._table[self._kvpid]
@property
[docs] def size(self): return len(self.value)
@property def timestamp(self): return self.last_update @timestamp.setter
[docs] def timestamp(self, seconds): raise ReadOnlyTableError()
@property def time_to_live(self): return None @time_to_live.setter
[docs] def time_to_live(self, seconds): raise ReadOnlyTableError()
@property def is_original(self): return True @is_original.setter
[docs] def is_original(self, b): raise ReadOnlyTableError()
@property def last_update(self): return self._get_field('last_update') @last_update.setter
[docs] def last_update(self, seconds): self._save_field('last_update', seconds)
[docs]class SharedFileHashRecord(KVPRecord): '''The record associated with :class:`SharedFilesKVPTable`. This record describes a single file on the filesystem. :see: :class:`SharedFileRecord` ''' __slots__ = ('_table', '_kvpid') def __init__(self, table, kvpid): self._table = table self._kvpid = kvpid def _get_field(self, name): with self._table.connection() as con: cur = con.execute('SELECT {} FROM files ' 'WHERE key = ? and `index` = ?'.format(name), (self._kvpid.key, self._kvpid.index)) for row in cur: return row[0] def _save_field(self, name, value): with self._table.connection() as con: con.execute('UPDATE files SET {} = ? ' 'WHERE key = ? AND `index` = ?'.format(name), (value, self._kvpid.key, self._kvpid.index)) @property
[docs] def key(self): return self._kvpid.key
@property
[docs] def index(self): return self._kvpid.index
@property
[docs] def value(self): return self._table[self._kvpid]
@property
[docs] def size(self): return len(self.value)
@property def timestamp(self): return None @timestamp.setter
[docs] def timestamp(self, seconds): raise ReadOnlyTableError()
@property def time_to_live(self): return None @time_to_live.setter
[docs] def time_to_live(self, seconds): raise ReadOnlyTableError()
@property def is_original(self): return True @is_original.setter
[docs] def is_original(self, b): raise ReadOnlyTableError()
@property def last_update(self): return self._get_field('last_update') @last_update.setter
[docs] def last_update(self, seconds): self._save_field('last_update', seconds)
@property
[docs] def file_hash_info(self): return self._table.file_hash_info(self._kvpid)
[docs]class SharedFilesHashTask(Task): '''A task that hashes and populates a shared files table. :ivar progress: a tuple (`str`, `int`) describing the filename and bytes read. ''' def _walk_dir(self, path): '''Walk a directory in a sorted order and yield path, size and mtime''' # TODO: may run into recursion for dirpath, dirnames, filenames in os.walk(path, followlinks=True): dirnames.sort() for filename in sorted(filenames): file_path = os.path.join(dirpath, filename) size = os.path.getsize(file_path) mtime = int(os.path.getmtime(file_path)) yield file_path, size, mtime
[docs] def run(self, table, part_size=2 ** 18): self._table = table self._part_size = part_size for directory in table.shared_directories: if not self.is_running: return self._hash_directory(directory) if not table.shared_directories: _logger.info('No directories to hash') self._clean_database() self._table.value_changed_observer(None)
def _hash_directory(self, directory): _logger.info('Hashing directory %s', directory) for file_path, size, mtime in self._walk_dir(directory): if not self.is_running: return if os.path.isfile(file_path): self._hash_file(file_path, size, mtime) def _hash_file(self, path, size, mtime): self.progress = (path, 0) with self._table.connection() as con: cur = con.execute('SELECT id, size, mtime ' 'FROM files WHERE ' 'filename = ? LIMIT 1', (path,)) for row in cur: id_, result_size, result_mtime = row if result_size == size and result_mtime == mtime: return con.execute('PRAGMA foreign_keys = ON') con.execute('DELETE FROM files WHERE id = ?', (id_,)) self._hash_parts(path, size, mtime) def _hash_parts(self, path, size, mtime): _logger.info('Hashing file %s', path) whole_file_hasher = hashlib.sha1() hashes = [] with open(path, 'rb') as f: while True: if not self.is_running: return data = f.read(self._part_size) if not data: break self.progress = (path, f.tell()) whole_file_hasher.update(data) part_hasher = hashlib.sha1(data) hashes.append(part_hasher.digest()) file_hash = whole_file_hasher.digest() file_hash_info = FileInfo(file_hash, hashes) index = hashlib.sha1(file_hash_info.to_bytes()).digest() with self._table.connection() as con: cur = con.execute('INSERT INTO files ' '(key, `index`, size, mtime, part_size, filename,' 'file_hash_info) ' 'VALUES (?, ? , ? , ? , ?, ?, ?)', (file_hash, index, size, mtime, self._part_size, path, file_hash_info.to_bytes())) row_id = cur.lastrowid for i in range(len(hashes)): offset = i * self._part_size hash_bytes = hashes[i] self.progress = (path, offset) try: con.execute('INSERT INTO parts ' '(hash_id, file_id, file_offset) VALUES ' '(?, ?, ?)', (hash_bytes, row_id, offset)) except sqlite3.IntegrityError: _logger.exception('Possible duplicate') collection_type = self._get_collection_type(path) if collection_type: con.execute('INSERT INTO collections ' '(file_id, type) VALUES ' '(?, ?)', (row_id, collection_type)) def _get_collection_type(self, path): cookie_len = len(CollectionInfoTypes.BYTESTAG_COOKIE) with open(path, 'rb') as f: data = f.read(cookie_len) if data.startswith(CollectionInfoTypes.BYTESTAG_COOKIE): return CollectionInfoTypes.BYTESTAG if path.endswith('.torrent'): f.seek(0) if self._check_bittorrent_file_contents(f): return CollectionInfoTypes.BITTORRENT def _check_bittorrent_file_contents(self, f): data = f.read(1024) if b'info' in data and b'pieces' in data: return True def _clean_database(self): _logger.info('Cleaning database') delete_params = [] with self._table.connection() as con: cur = con.execute('SELECT rowid, filename FROM files') for row in cur: rowid, filename = row if not os.path.exists(filename) \ or not self._is_in_shared_directory(filename): delete_params.append((rowid,)) with self._table.connection() as con: con.execute('PRAGMA foreign_keys = ON') cur = con.executemany('DELETE FROM files WHERE rowid = ?', delete_params) def _is_in_shared_directory(self, path): for shared_dir in self._table._shared_directories: common_prefix = os.path.commonprefix([shared_dir, path]) if common_prefix in self._table._shared_directories: return True