'''Networking'''
# This file is part of Bytestag.
# Copyright © 2012 Christopher Foo <chris.foo@gmail.com>.
# Licensed under GNU GPLv3. See COPYING.txt for details.
from bytestag.events import (EventReactorMixin, EventReactor, EventScheduler,
Task, EventID, WrappedThreadPoolExecutor)
from bytestag.keys import bytes_to_b64
from socketserver import BaseRequestHandler
from threading import Thread
import base64
import binascii
import collections
import io
import json
import logging
import os
import queue
import select
import socket
import socketserver
import tempfile
import threading
import time
import zlib
__docformat__ = 'restructuredtext en'
_logger = logging.getLogger(__name__)
[docs]class UDP_INBOUND_EVENT(object):
'''A UDP inbound event id'''
pass
[docs]class UDPRequestHandler(BaseRequestHandler):
'''UDP request handler for the UDP server'''
[docs] def handle(self):
_logger.debug('Handler')
self.server.event_reactor.put(UDP_INBOUND_EVENT, self.client_address,
self.request[0])
[docs]class DataPacket(collections.namedtuple('DataPacket', ['address', 'dict_obj',
'sequence_id'])):
'''A JSON data packet.
:var address: a tuple of (host, port_number)
:var dict_obj: a :obj:`dict` containing the payload
:var sequence_id: the sequence id
'''
__slots__ = ()
[docs]class UDPServer(EventReactorMixin, Thread, socketserver.UDPServer):
'''UDP server'''
def __init__(self, event_reactor, address=('127.0.0.1', 0)):
EventReactorMixin.__init__(self, event_reactor)
Thread.__init__(self)
self.name = 'network-udp-server'
self.daemon = True
socketserver.UDPServer.__init__(self, address, UDPRequestHandler)
self.event_reactor.register_handler(EventReactor.STOP_ID,
self._stop_cb)
self._running = True
[docs] def run(self):
'''Start the server'''
while self._running:
_logger.debug('Network udp server started')
try:
self.serve_forever()
except select.error as e:
if e.args[0] == 4:
_logger.exception('Possible issue with Qt. Restarting')
else:
raise e
_logger.debug('Network udp server stopped')
def _stop_cb(self, event_id):
self._running = False
Thread(target=self.shutdown).start()
_logger.debug('Network udp server stop requested')
[docs]class UDPClient(object):
'''UDP Client'''
def __init__(self, socket_obj=None):
self.socket = socket_obj or socket.socket(socket.AF_INET,
socket.SOCK_DGRAM)
[docs] def send(self, address, data):
'''Send ``bytes`` to ``address``'''
self.socket.sendto(data, address)
[docs]class JSONKeys(object):
'''The keys used in the JSON data'''
PAYLOAD = 'payload'
SEQUENCE_ID = 'seq_id'
REPLY_SEQUENCE_ID = 'reply_id'
TRANSFER_ID = 'xfer_id'
TRANSFER_DATA = 'xfer_data'
TRANSFER_SIZE = 'xfer_size'
[docs]class ReplyTable(object):
'''Manages the matching of sequence IDs to prevent forged UDP replies'''
def __init__(self):
self.out_table = {}
self.in_table = {}
[docs] def add_out_entry(self, sequence_id, address, event):
'''Add an entry that expects a reply
:Parameters:
sequence_id
The id of the packet send out
address
The destination of the packet
event: :class:`threading.Event`
The :class:`threading.Event` instance to wait on
'''
self.out_table[(sequence_id, address)] = event
[docs] def get_out_entry(self, sequence_id, address):
'''Get the Event instance
:rtype: :class:`threading.Event`, ``None``
'''
return self.out_table.get((sequence_id, address))
[docs] def remove_out_entry(self, sequence_id, address):
'''Remove the entry'''
del self.out_table[(sequence_id, address)]
[docs] def add_in_entry(self, sequence_id, address, data_packet):
'''Store the data packet reply to be retrieved be woken thread'''
self.in_table[(sequence_id, address)] = data_packet
[docs] def get_in_entry(self, sequence_id, address):
'''Get the stored data packet
:rtype: :class:`DataPacket`, ``None``
'''
return self.in_table.get((sequence_id, address))
[docs] def remove_in_entry(self, sequence_id, address):
'''Delete the stored data packet'''
del self.in_table[(sequence_id, address)]
[docs]class Network(EventReactorMixin):
'''Network controller
:CVariables:
MAX_UDP_PACKET_SIZE
The maximum UDP packet size allowed
DEFAULT_TIMEOUT
The time in seconds before a reply is timed out
STREAM_DATA_SIZE
The size in bytes of the parts of the file transmitted
'''
MAX_UDP_PACKET_SIZE = 65507 # bytes
DEFAULT_TIMEOUT = 10 # seconds
STREAM_DATA_SIZE = 1024 # bytes
SEQUENCE_ID_SIZE = 20 # bytes
DEFAULT_POOL_SIZE = 20
def __init__(self, event_reactor, address=('127.0.0.1', 0)):
EventReactorMixin.__init__(self, event_reactor)
self._server = UDPServer(event_reactor, address=address)
# By passing in the same socket object to the client, this method
# allows other nodes to reply to our server's port.
self._client = UDPClient(socket_obj=self._server.socket)
self._reply_table = ReplyTable()
self._downloads = {}
self._pool_executor = WrappedThreadPoolExecutor(
Network.DEFAULT_POOL_SIZE, event_reactor)
self._event_scheduler = EventScheduler(event_reactor)
self._transfer_timer_id = EventID(self, 'Clean transfers')
self._running = True
self._register_handlers()
self._server.start()
@property
[docs] def server_address(self):
'''The address of the server'''
return self._server.server_address
def _register_handlers(self):
'''Register the event callbacks'''
self.event_reactor.register_handler(UDP_INBOUND_EVENT,
self._udp_incoming_callback)
self.event_reactor.register_handler(EventReactor.STOP_ID,
self._stop_callback)
self.event_reactor.register_handler(self._transfer_timer_id,
self._clean_download)
def _stop_callback(self, event_id):
'''Stop and expire everything'''
self._running = False
for transfer_id in list(self._downloads.keys()):
download_task = self._downloads[transfer_id]
del self._downloads[transfer_id]
download_task.transfer(None)
for key in list(self._reply_table.out_table.keys()):
event = self._reply_table.out_table[key]
event.set()
def _clean_download(self, event_id, transfer_id):
'''Remove timed out file download'''
download_task = self._downloads[transfer_id]
last_modified = download_task.last_modified
timeout = download_task.timeout
if last_modified + timeout < time.time():
_logger.debug('Cleaned out download %s', transfer_id)
del self._downloads[transfer_id]
download_task.transfer(None)
else:
_logger.debug('Still alive download %s', transfer_id)
self._event_scheduler.add_one_shot(timeout,
self._transfer_timer_id, transfer_id)
def _udp_incoming_callback(self, event_id, address, data):
'''udp incoming'''
if not self._running:
return
_logger.debug('UDP %s←%s %s', self.server_address, address,
data[:160])
packet_dict = self._unpack_udp_data(data)
if not packet_dict:
return
data_packet = DataPacket(address, packet_dict,
packet_dict.get(JSONKeys.SEQUENCE_ID) \
or packet_dict.get(JSONKeys.REPLY_SEQUENCE_ID))
if JSONKeys.REPLY_SEQUENCE_ID in packet_dict:
self._accept_reply(data_packet)
elif JSONKeys.TRANSFER_ID in packet_dict:
self._accept_transfer(data_packet)
else:
self._accept_packet(data_packet)
def _accept_packet(self, data_packet):
self.receive_callback(data_packet)
[docs] def receive_callback(self, data_packet):
'''The function called when a data packet arrives.
:Parameters:
data_packet: :class:`DataPacket`
The incoming data packet
This function is called for packets that are not replies. Implementors
of this class should override this method.
'''
raise NotImplementedError()
[docs] def expect_incoming_transfer(self, transfer_id, timeout=DEFAULT_TIMEOUT,
download_task_class=None, max_size=None):
'''Allow a transfer for download.
:Parameters:
transfer_id: ``str``
A transfer id that the other client use for transferring data.
timeout: ``int`` ``float``
Time in seconds before the transfer times out.
max_size: ``int`` ``None``
The maximum file size.
:rtype: :class:`DownloadTask`
:return: A future that returns a file object that may have been
interrupted. The progress is the number of bytes downloaded.
'''
download_task_class = download_task_class or DownloadTask
download_task = download_task_class(max_size=max_size)
self._downloads[transfer_id] = download_task
self._event_scheduler.add_one_shot(timeout, self._transfer_timer_id,
transfer_id)
self._pool_executor.submit(download_task)
return download_task
def _accept_reply(self, data_packet):
'''Process a reply and allow a future to resume'''
sequence_id = data_packet.sequence_id
address = data_packet.address
event = self._reply_table.get_out_entry(sequence_id,
address)
if not event:
_logger.debug('Unknown seq id %s, packet discarded', sequence_id)
return
self._reply_table.remove_out_entry(sequence_id, address)
self._reply_table.add_in_entry(sequence_id, address, data_packet)
event.set()
def _accept_transfer(self, data_packet):
'''Process a file download'''
transfer_id = data_packet.dict_obj[JSONKeys.TRANSFER_ID]
if transfer_id in self._downloads:
if JSONKeys.TRANSFER_DATA in data_packet.dict_obj:
self._read_download(data_packet, transfer_id)
return
_logger.debug('Transfer discarded')
def _read_download(self, data_packet, transfer_id):
'''Read data'''
_logger.debug('Read download')
download_task = self._downloads[transfer_id]
data_str = data_packet.dict_obj[JSONKeys.TRANSFER_DATA]
download_task.address = data_packet.address
if data_str is None:
download_task.transfer(None)
_logger.debug('Read download finished')
return
else:
try:
data = base64.b64decode(data_str.encode())
except binascii.Error as e:
_logger.debug('Decode error %s', e)
return
download_task.transfer(data)
_logger.debug('Read download len=%d', len(data))
if download_task.is_running:
d = {
JSONKeys.TRANSFER_ID: transfer_id
}
self.send_answer_reply(data_packet, d)
else:
_logger.debug('Download aborted')
def _pack_udp_data(self, packet_dict):
'''Pack the dict into a format suitable for transmission.
The format currently is JSON.
'''
data = zlib.compress(json.dumps(packet_dict).encode())
if len(data) < Network.MAX_UDP_PACKET_SIZE:
_logger.debug('Packed data %s', data[:20])
return data
else:
raise Exception('data size too large')
def _unpack_udp_data(self, data):
'''Convert the data into a dict'''
try:
dict_obj = json.loads(zlib.decompress(data).decode())
except Exception as e:
_logger.debug('Failed json parsing %s', e)
return
if not isinstance(dict_obj, dict):
_logger.debug('Not a dict')
return
return dict_obj
[docs] def send(self, address, dict_obj, timeout=None):
'''Send the ``dict`` to address
:Parameters:
address: ``tuple``
A 2-tuple with the host and port number.
dict_obj: ``dict``
The ``dict`` that will be converted to JSON format.
timeout: ``None``, ``int``, ``float``, ``True``
If `timeout` is a number, the class will attempt to ensure
delivery and wait for a reply. A future will be returned.
If ``True``, the default timeout will be used.
:rtype: ``None``, :class:`SendPacketTask`
:return: Returns a :class:`SendPacketTask` if timeout is given.
The result is either :class:`DataPacket` or ``None``.
'''
if timeout is None:
self._send_plain(address, dict_obj)
else:
if timeout is True:
timeout = Network.DEFAULT_TIMEOUT
return self._send_expect_reply(address, dict_obj, timeout)
def _send_plain(self, address, dict_obj):
'''Send the data as a single UDP packet'''
_logger.debug('Dict %s→%s', self.server_address, address)
self._client.send(address, self._pack_udp_data(dict_obj))
def _send_expect_reply(self, address, dict_obj, timeout=DEFAULT_TIMEOUT):
'''Send the data and wait for a reply
:rtype: :class:`SendPacketTask`
'''
_logger.debug('Dict %s→%s timeout=%d', self.server_address,
address, timeout)
sequence_id = self.new_sequence_id()
event = threading.Event()
self._reply_table.add_out_entry(sequence_id, address, event)
packet_dict = dict_obj.copy()
packet_dict[JSONKeys.SEQUENCE_ID] = sequence_id
def send_fn():
self._client.send(address, self._pack_udp_data(packet_dict))
send_packet_task = SendPacketTask(send_fn, sequence_id, address,
self._reply_table, event, timeout)
self._pool_executor.submit(send_packet_task)
return send_packet_task
[docs] def send_answer_reply(self, source_data_packet, dict_obj):
'''Send ``dict`` that is a response to a incoming data packet
:Parameters:
source_data_packet: :class:`DataPacket`
The original incoming data packet to respond to.
dict_obj: ``dict``
The data to send back
Use this function to reply to packets that expect a response. This
function automatically adds sequence IDs the reply packet.
'''
address = source_data_packet.address
sequence_id = source_data_packet.sequence_id
_logger.debug('Dict reply %s→%s seq_id=%s', self.server_address,
address, sequence_id)
packet_dict = dict_obj.copy()
packet_dict[JSONKeys.REPLY_SEQUENCE_ID] = sequence_id
self._client.send(address, self._pack_udp_data(packet_dict))
[docs] def send_bytes(self, address, transfer_id, bytes_,
timeout=DEFAULT_TIMEOUT):
'''Transfer data to another client.
:Parameters:
address: ``tuple``
A 2-tuple with host and port number.
bytes_: ``bytes``
The data to send.
timeout: ``int``, ``float``
The time in seconds before the transfer times out.
transfer_id: ``str``, ``None``
The transfer ID to be used. If ``None``, an ID will be
created automatically.
:see: :func:`send_file`
:rtype: :class:`UploadTask`
'''
f = io.BytesIO(bytes_)
return self.send_file(address, transfer_id, f, timeout)
[docs] def send_file(self, address, transfer_id, file_, timeout=DEFAULT_TIMEOUT):
'''Transfer data to another client.
:Parameters:
address: ``tuple``
A 2-tuple with host and port number.
file_: ``str``, ``object``
A filename or a file-like object which has ``read``.
timeout: ``int``, ``float``
The time in seconds before the transfer times out.
transfer_id: ``str``, ``None``
The transfer ID to be used. If ``None``, an ID will be
created automatically.
:rtype: :class:`UploadTask`
:return: A future that returns an ``int`` that is the number of bytes
sent.
'''
if hasattr(file_, 'read'):
source_file = file_
else:
source_file = open(file_, 'rb')
transfer_id = transfer_id or self.new_sequence_id()
_logger.debug('Send file %s→%s', self.server_address, address)
upload_task = UploadTask(self, address, source_file,
transfer_id, timeout)
self._pool_executor.submit(upload_task)
return upload_task
[docs] def new_sequence_id(self):
'''Generate a new sequence ID.
:rtype: ``str``
'''
return bytes_to_b64(os.urandom(Network.SEQUENCE_ID_SIZE))
[docs]class DownloadTask(Task):
'''Downloads data from a contact and returns a file object.'''
def __init__(self, timeout=Network.DEFAULT_TIMEOUT, max_size=None):
Task.__init__(self)
self._file = tempfile.SpooledTemporaryFile(1048576)
self._bytes_queue = queue.Queue(1)
self.timeout = timeout
self.last_modified = time.time()
self.address = None
self.max_size = max_size
[docs] def transfer(self, bytes_):
self.last_modified = time.time()
self._bytes_queue.put(bytes_)
[docs] def run(self):
while self.is_running:
try:
bytes_ = self._bytes_queue.get(timeout=2)
except queue.Empty:
continue
if bytes_:
self.progress = len(bytes_)
self._file.write(bytes_)
else:
break
if self.max_size:
if self._file.tell() >= self.max_size:
break
self._file.seek(0)
return self._file
[docs]class UploadTask(Task):
'''Returns the number of bytes sent.'''
[docs] def run(self, network, address, source_file, transfer_id, timeout):
self.progress = 0
while self.is_running:
data = source_file.read(Network.STREAM_DATA_SIZE)
d = {
JSONKeys.TRANSFER_ID: transfer_id,
JSONKeys.TRANSFER_DATA: bytes_to_b64(data),
}
if data:
future = network.send(address, d, timeout)
data_packet = future.result()
if data_packet and data_packet.dict_obj.get(
JSONKeys.TRANSFER_ID) == transfer_id:
self.progress += len(data)
else:
d[JSONKeys.TRANSFER_DATA] = None
network.send(address, d)
break
return self.progress
[docs]class SendPacketTask(Task):
'''Send a data packet and return the response.
The result returned is either `None` or :class:`DataPacket`.
'''
def __init__(self, *args, **kwargs):
Task.__init__(self, *args, **kwargs)
self.event = args[4] # used by Network._stop_callback
[docs] def run(self, send_fn, sequence_id, address, reply_table, event, timeout,
num_attempts=2):
for i in range(num_attempts):
if not self.is_running:
break
_logger.debug('SendPacketTask →%s attempt=%d', address, i)
send_fn()
event.wait(timeout / num_attempts)
data_packet = reply_table.get_in_entry(sequence_id, address)
if data_packet:
reply_table.remove_in_entry(sequence_id, address)
_logger.debug('SendPacketTask got confirm →%s attempt=%d',
address, i)
return data_packet
_logger.debug('SendPacketTask no reply →%s attempt=%d', address, i)
return data_packet