Source code for bytestag.dht.network_test

from bytestag.dht.network import DHTNetwork, FindValueFromNodeResult
from bytestag.events import EventReactor, EventScheduler
from bytestag.keys import KeyBytes
from bytestag.storage import MemoryKVPTable
from bytestag.tables import KVPID
import hashlib
import logging
import threading
import time
import unittest

_logger = logging.getLogger(__name__)


[docs]class TestNetworkControllerMultiNode(unittest.TestCase): TIMEOUT = 5
[docs] def setup_nodes(self, count=2): _logger.debug('DHTNetwork setup---') self.er = [] self.er_thread = [] self.nc = [] self.timer = [] for i in range(count): self.er.append(EventReactor()) er_thread = threading.Thread(target=self.er[i].start) er_thread.daemon = True er_thread.name = 'event-reactor-thread-%d' % i er_thread.start() self.er_thread.append(er_thread) self.nc.append(DHTNetwork(self.er[i], MemoryKVPTable())) timer = EventScheduler(self.er[i]) timer.add_one_shot(self.TIMEOUT, EventReactor.STOP_ID) self.timer.append(timer) _logger.debug('Server %d at %s', i, self.nc[i].address) self.stuff = {}
[docs] def stop_event_reactors(self): for er in self.er: er.put(EventReactor.STOP_ID)
[docs] def join_event_reactors(self): for er_thread in self.er_thread: er_thread.join()
[docs] def test_ping_address(self): '''It should send a ping and receive a response''' self.setup_nodes(2) future = self.nc[0].ping_address(self.nc[1].address) ping_result = future.result() self.stuff['ping'] = ping_result self.stop_event_reactors() self.join_event_reactors() self.assertTrue(self.stuff['ping'])
[docs] def test_ping_rpc(self): '''It should send a PING and reply with a PONG and both contacts are added to routing table''' self.setup_nodes(2) contact_0 = self.nc[0].node contact_1 = self.nc[1].node future = self.nc[0].ping_node(contact_1) ping_result = future.result() self.stuff['server_0'] = ping_result self.stop_event_reactors() self.join_event_reactors() self.assertTrue(self.stuff['server_0']) self.assertTrue(contact_0 in self.nc[1].routing_table) self.assertTrue(contact_1 in self.nc[0].routing_table)
[docs] def test_find_node_rpc(self): '''It should get a list of nodes''' num_nodes = 10 self.setup_nodes(num_nodes) # Add the address of the other nodes for i in range(1, num_nodes): future = self.nc[0].join_network(self.nc[i].address) self.assertTrue(future.result()) logging.debug('SEND FIND NODE') future = self.nc[1].find_nodes_from_node(self.nc[0].node, KeyBytes()) contacts = future.result() logging.debug('GOT CONTACTS') self.stuff['contacts'] = contacts logging.debug('SHUTDOWN') self.stop_event_reactors() self.join_event_reactors() print(self.nc[0].node) print(self.nc[0].routing_table) print(self.nc[1].node) print(self.nc[1].routing_table) print(list(map(str, self.stuff['contacts']))) self.assertTrue(self.stuff['contacts']) self.assertGreaterEqual(len(self.stuff['contacts']), num_nodes / 2)
[docs] def test_find_binary_value_size_from_node(self): '''It should get the size of the data from the node''' self.setup_nodes(2) data = b'\x00\x01\x03' * 500 key = KeyBytes(hashlib.sha1(data).digest()) kvp_table = MemoryKVPTable() self.nc[1]._kvp_table = kvp_table kvpid = KVPID(key, key) kvp_table[kvpid] = data self.assertIn(kvpid, kvp_table) future = self.nc[0].join_network(self.nc[1].address) self.assertTrue(future.result()) future = self.nc[0].find_value_from_node(self.nc[1].node, key) find_value_result = future.result() self.assertIsInstance(find_value_result, FindValueFromNodeResult) self.assertEqual(len(data), find_value_result.kvp_info_list[0].size)
[docs] def test_get_value_from_other_node(self): '''It should download the value from the other node''' self.setup_nodes(2) data = b'\x00\x01\x03' * 500 key = KeyBytes(hashlib.sha1(data).digest()) kvp_table = MemoryKVPTable() self.nc[1]._kvp_table = kvp_table kvpid = KVPID(key, key) kvp_table[kvpid] = data self.assertIn(kvpid, kvp_table) future = self.nc[0].join_network(self.nc[1].address) self.assertTrue(future.result()) read_transfer_task = self.nc[0].get_value_from_node(self.nc[1].node, key) f = read_transfer_task.result() test_data = f.read() self.stop_event_reactors() self.join_event_reactors() self.assertTrue(data, test_data)
[docs] def test_store_to_node(self): '''It should store the data to another node''' self.setup_nodes(2) data = b'\x00\x01\x03' * 500 key = KeyBytes(hashlib.sha1(data).digest()) kvp_table = MemoryKVPTable() self.nc[1]._kvp_table = kvp_table kvpid = KVPID(key, key) future = self.nc[0].join_network(self.nc[1].address) self.assertTrue(future.result()) store_to_node_task = self.nc[0].store_to_node(self.nc[1].node, key, key, data, 12345678) self.assertEqual(len(data), store_to_node_task.result()) # FIXME: once download status mechanism exists, fix sleep time.sleep(0.1) self.assertIn(kvpid, kvp_table) store_to_node_task = self.nc[0].store_to_node(self.nc[1].node, key, key, data, 12345678) self.assertEqual(0, store_to_node_task.result()) self.stop_event_reactors() self.join_event_reactors()