Source code for bytestag.network_test

from bytestag.events import EventReactor, EventScheduler
from bytestag.network import UDPServer, UDPClient, Network, ReplyTable
import bytestag.network
import hashlib
import io
import logging
import threading
import unittest

_logger = logging.getLogger(__name__)


[docs]class TestUDP(unittest.TestCase):
[docs] def test_udp(self): '''It should be able to send itself a udp packet through events''' event_reactor = EventReactor() server = UDPServer(event_reactor) client = UDPClient() server.start() reactor_thread = threading.Thread(target=event_reactor.start) reactor_thread.daemon = True reactor_thread.start() data = None def my_callback(event_id, address, data_): nonlocal data data = data_ # @UnusedVariable event_reactor.put(EventReactor.STOP_ID) event_reactor.register_handler( bytestag.network.UDP_INBOUND_EVENT, my_callback) client.send(server.server_address, b'hello') reactor_thread.join(1) event_reactor.put(EventReactor.STOP_ID) self.assertEqual(data, b'hello')
[docs]class TestNetworkControllerComponents(unittest.TestCase):
[docs] def test_udp_packing(self): '''It should pack and unpack the data symmetrically''' event_reactor = EventReactor() nc = Network(event_reactor) d = {'my_key': 123} self.assertEqual(d, nc._unpack_udp_data(nc._pack_udp_data(d)))
[docs] def test_faulty_udp_unpacking_bad_json(self): '''It should return None if bad json parsing''' event_reactor = EventReactor() nc = Network(event_reactor) self.assertFalse(nc._unpack_udp_data(b'{"hello:}'))
[docs]class TestNetworkControllerMultiNode(unittest.TestCase): TIMEOUT = 5
[docs] def setup_nodes(self, count=2): _logger.debug('Network setup---') self.er = [] self.er_thread = [] self.nc = [] self.timer = [] for i in range(count): self.er.append(EventReactor()) er_thread = threading.Thread(target=self.er[i].start) er_thread.daemon = True er_thread.name = 'event-reactor-thread-%d' % i er_thread.start() self.er_thread.append(er_thread) self.nc.append(Network(self.er[i])) timer = EventScheduler(self.er[i]) timer.add_one_shot(5, EventReactor.STOP_ID) self.timer.append(timer) _logger.debug('Server %d at %s', i, self.nc[i].server_address) self.stuff = {}
[docs] def stop_event_reactors(self): for er in self.er: er.put(EventReactor.STOP_ID)
[docs] def join_event_reactors(self): for er_thread in self.er_thread: er_thread.join()
[docs] def test_incoming_send_packet(self): '''Server 0 should send a packet to server 1''' self.setup_nodes() def my_cb(data_packet): self.stuff = data_packet.dict_obj self.stop_event_reactors() self.nc[1].receive_callback = my_cb self.nc[0].send(self.nc[1].server_address, {'hello': True}) self.join_event_reactors() self.assertEqual(self.stuff, {'hello': True})
[docs] def test_expect_reply(self): '''It should send a packet and the other server replies''' self.setup_nodes() def other_server_cb(data_packet): self.stuff['1st_server_msg'] = data_packet.dict_obj self.nc[1].send_answer_reply(data_packet, {'kittehs': 3}) self.nc[1].receive_callback = other_server_cb future = self.nc[0].send(self.nc[1].server_address, {'hello': True}, timeout=self.TIMEOUT) data_packet = future.result(self.TIMEOUT) self.stuff['2nd_server_msg'] = data_packet.dict_obj self.stop_event_reactors() self.join_event_reactors() self.assertEqual(self.stuff['1st_server_msg']['hello'], True) self.assertEqual(self.stuff['2nd_server_msg']['kittehs'], 3)
[docs] def test_expect_reply_failure(self): '''It should send a packet and it times-out''' self.setup_nodes() def other_server_cb(data_packet): # Simulate lost packet pass self.nc[1].receive_callback = other_server_cb future = self.nc[0].send(self.nc[1].server_address, {'hello': True}, timeout=0.0) # self.nc[0]._process_unreplied_cb(None) data_packet = future.result(self.TIMEOUT) self.stuff['2nd_server_msg'] = data_packet self.stop_event_reactors() self.join_event_reactors() self.assertEqual(self.stuff['2nd_server_msg'], None)
[docs] def test_send_file(self): '''It should transfer a file''' transfer_id = '123' f = io.BytesIO() data = b'\x0F\xF0' * 10000 hasher = hashlib.sha1(data) f.write(data) f.seek(0) self.setup_nodes(2) read_transfer_task = self.nc[1].expect_incoming_transfer(transfer_id, timeout=self.TIMEOUT) future = self.nc[0].send_file(self.nc[1].server_address, transfer_id, f, timeout=self.TIMEOUT) bytes_sent = future.result() f_other = read_transfer_task.result() self.stop_event_reactors() self.assertEqual(bytes_sent, len(data)) self.join_event_reactors() f_other.seek(0) test_hasher = hashlib.sha1(f_other.read()) self.assertEqual(len(data), f_other.tell()) self.assertEqual(test_hasher.digest(), hasher.digest())
[docs]class TestReplyTable(unittest.TestCase):
[docs] def test_add_remove_out(self): '''It should add and remove''' table = ReplyTable() table.add_out_entry(0, 0, None) self.assertEqual(table.get_out_entry(0, 0), None) table.remove_out_entry(0, 0) self.assertFalse(table.get_out_entry(0, 0))
[docs] def test_add_remove_in(self): '''It should add and remove''' table = ReplyTable() table.add_in_entry(0, 0, None) self.assertEqual(table.get_in_entry(0, 0), None) table.remove_in_entry(0, 0) self.assertFalse(table.get_in_entry(0, 0))