Source code for bytestag.storage_test

from bytestag.keys import KeyBytes
from bytestag.storage import (MemoryKVPTable, DatabaseKVPTable,
    SharedFilesKVPTable)
from bytestag.tables import KVPID
import bytestag.storage
import hashlib
import logging
import os.path
import random
import tempfile
import time
import unittest


_logger = logging.getLogger(__name__)


class TestFunctions(unittest.TestCase):
[docs] SIZE = 1024 def test_part_to_byte_number(self):
[docs] '''It should map part numbers to offsets in files''' self.assertEqual(bytestag.storage.part_to_byte_number(0, self.SIZE), 0) self.assertEqual(bytestag.storage.part_to_byte_number(1, self.SIZE), 1024) self.assertEqual(bytestag.storage.part_to_byte_number(12, self.SIZE), 12288) def test_byte_to_part_number(self):
[docs] '''It should map bytes to part numbers''' self.assertEqual(bytestag.storage.byte_to_part_number(0, self.SIZE), 0) self.assertEqual(bytestag.storage.byte_to_part_number(1, self.SIZE), 0) self.assertEqual(bytestag.storage.byte_to_part_number(1023, self.SIZE), 0) self.assertEqual(bytestag.storage.byte_to_part_number(1024, self.SIZE), 1) self.assertEqual(bytestag.storage.byte_to_part_number(1025, self.SIZE), 1) def test_total_parts(self):
[docs] '''It should give the upperbound of parts needed''' self.assertEqual(bytestag.storage.total_parts(0, self.SIZE), 0) self.assertEqual(bytestag.storage.total_parts(1, self.SIZE), 1) self.assertEqual(bytestag.storage.total_parts(1023, self.SIZE), 1) self.assertEqual(bytestag.storage.total_parts(1024, self.SIZE), 1) self.assertEqual(bytestag.storage.total_parts(1025, self.SIZE), 2) class TableMixin(object):
[docs] def table_store_get(self, data, kvp_table):
[docs] kvpid = KVPID(KeyBytes(), KeyBytes.new_hash(data)) self.assertFalse(kvpid in kvp_table) kvp_table[kvpid] = data self.assertTrue(kvpid in kvp_table) self.assertTrue(kvp_table.indices(kvpid.key)) record = kvp_table.record(kvpid) self.assertEqual(len(data), record.size) del kvp_table[kvpid] self.assertFalse(kvpid in kvp_table) class TestMemoryKVPTable(unittest.TestCase, TableMixin):
[docs] def test_store_get(self):
[docs] '''It should store and get''' data = b'kitteh' * 100 kvp_table = MemoryKVPTable() self.table_store_get(data, kvp_table) class TestDatabaseKVPTable(unittest.TestCase, TableMixin):
[docs] def test_store_get(self):
[docs] '''It should store and get''' temp_dir = tempfile.TemporaryDirectory() path = os.path.join(temp_dir.name, 'test.db') data = b'kitteh' * 100 kvp_table = DatabaseKVPTable(path) self.table_store_get(data, kvp_table) class TestSharedFilesKVPTable(unittest.TestCase, TableMixin):
[docs] def create_file(self, path):
[docs] with open(path, 'wb') as f: f.write(os.urandom(random.randint(0, 10000))) def test_hash(self):
[docs] '''It should hash the files in each directory''' shared_dir = tempfile.TemporaryDirectory() data1 = os.urandom(4) + b'\x00' * 1000 data2 = os.urandom(4) + b'\x00' * 1000 hash1 = hashlib.sha1(data1).digest() hash2 = hashlib.sha1(data2).digest() with open(os.path.join(shared_dir.name, 'a.txt'), 'wb') as f: f.write(data1) with open(os.path.join(shared_dir.name, 'b.txt'), 'wb') as f: f.write(data2) temp_dir = tempfile.TemporaryDirectory() path = os.path.join(temp_dir.name, 'test.db') kvp_table = SharedFilesKVPTable(path) kvp_table.shared_directories.append(shared_dir.name) task = kvp_table.hash_directories() task.result() self.assertIn(KVPID(KeyBytes(hash1), KeyBytes(hash1)), kvp_table) self.assertIn(KVPID(KeyBytes(hash2), KeyBytes(hash2)), kvp_table) data2 = os.urandom(4) + b'\x00' * 1000 data3 = os.urandom(4) + b'\x00' * 1000 # XXX: Delay for integer filesystem timestamps time.sleep(1) os.remove(os.path.join(shared_dir.name, 'a.txt')) with open(os.path.join(shared_dir.name, 'b.txt'), 'wb') as f: f.write(data2) with open(os.path.join(shared_dir.name, 'c.txt'), 'wb') as f: f.write(data3) hash2 = hashlib.sha1(data2).digest() hash3 = hashlib.sha1(data3).digest() task = kvp_table.hash_directories() task.result() # # Row factory is None due to python bug #15545 # with kvp_table._connection(row_factory=None) as con: # for line in con.iterdump(): # print(line) # # raise Exception('test') self.assertNotIn(KVPID(KeyBytes(hash1), KeyBytes(hash1)), kvp_table) self.assertIn(KVPID(KeyBytes(hash2), KeyBytes(hash2)), kvp_table) self.assertIn(KVPID(KeyBytes(hash3), KeyBytes(hash3)), kvp_table)