bytestag Package

bytestag Package

Wide Availability Peer-to-Peer File Sharing

Bytestag is a peer-to-peer (P2P) file sharing system that uses a distributed hash table (DHT) to achieve wide availability of files. Unlike the BitTorrent protocol, files are published entirely via DHT and do not require trackers or swarming. Each computer donates disk space and bandwidth for caching of published values.

This software is currently under development.

__main__ Module

basedir Module

Quick and dirty platform specific base directories.

On Unix environments, XDG environment variables are used. On Windows, the environment variable %LOCALAPPDATA% is used for building paths. If neither, variables exist, the home directory variable is used. If the home variable does not exist. It uses the current directory and returns a directory in the style of .NAMESPACE.

var cache_dir:

A directory suitable for storing files that you don’t mind losing but wish to keep around for a while.

One of the following:

  1. $XDG_CACHE_HOME/NAMESPACE/
  2. %LOCALAPPDATA%/NAMESPACE/cache/
  3. ~/.cache/NAMESPACE
  4. ./.NAMESPACE/cache/
var data_dir:

A directory suitable for storing program code or plugins.

One of the following:

  1. $XDG_DATA_HOME/NAMESPACE/
  2. %LOCALAPPDATA%/NAMESPACE/data/
  3. ~/.share/NAMESPACE
  4. ./.NAMESPACE/data/
var config_dir:

A directory suitable for storing program configurations.

One of the following:

  1. $XDG_CONFIG_HOME/NAMESPACE/
  2. %LOCALAPPDATA%/NAMESPACE/config/
  3. ~/.config/NAMESPACE
  4. ./.NAMESPACE/config/
var runtime_dir:
 

A directory suitable for storing program runtime files such as Unix sockets, PID files, or named pipes.

One of the following:

  1. $XDG_RUNTIME_DIR/NAMESPACE/
  2. $TEMP/NAMESPACE/ (via tempfile.gettempdir())

client Module

Client interfaces

class bytestag.client.Client(cache_dir, address=('0.0.0.0', 0), node_id=None, known_node_address=None, initial_scan=False, config_dir=None)[source]

Bases: threading.Thread

Client interface.

Warning :this class is under development.
cache_table[source]

The DatabaseKVPTable

dht_network[source]
download_slot[source]

Download slot.

See :DHTNetwork.download_slot()
network[source]
run()[source]
shared_files_table[source]

The SharedFilesKVPTable

stop()[source]
upload_slot[source]

The FnTaskSlot which holds StoreValueTask.

events Module

Event handling

class bytestag.events.EventID(arg, *args)[source]

Bases: builtins.object

An event ID.

This class’ comparison equality depends on the arguments given.

args[source]
class bytestag.events.EventReactor(max_queue_size=100)[source]

Bases: builtins.object

A reactor that demultiplexs events from other threads

class STOP_ID[source]

Bases: builtins.object

The identifier that stops all event reactors

EventReactor.max_queue_size[source]

The maximum size of the queue.

EventReactor.put(event_id, *event_data)[source]

Add an event to be dispatched

Parameters :
event_id

Any value that can be used as an index

event_data

Data to be passed to the callback function

EventReactor.queue_size[source]

The current size of the queue.

EventReactor.register_handler(event_id, handler_callback)[source]

Add a callback function to handle events

Parameters :
event_id

Any value that can be used as an index

handler_callback

A callable object such as a function or an instance with the __call__ member

EventReactor.start()[source]

Start the event reactor

class bytestag.events.EventReactorMixin(event_reactor)[source]

Bases: builtins.object

A mix in to provide an event_reactor property

event_reactor[source]

Return the event reactor

Return type:EventReactor
class bytestag.events.EventScheduler(event_reactor)[source]

Bases: threading.Thread, bytestag.events.EventReactorMixin

Schedules events to be added to event reactors

add_absolute(time, event_id, *event_data)[source]

Add an event to be scheduled at given time

Parameters :
time: int, float

The timestamp in the future

event_id

The indexable value to be used as an event ID

event_data

Any extra data to be passed

add_one_shot(seconds, event_id, *event_data)[source]

Add an event to be scheduled once.

Parameters :
seconds: int, float

The interval in seconds

event_id

The indexable value to be used as an event ID

event_data

Any extra data to be passed

add_periodic(seconds, event_id, *event_data)[source]

Add an event to be scheduled periodically.

Parameters :
seconds: int, float

The interval in seconds

event_id

The indexable value to be used as an event ID

event_data

Any extra data to be passed

run()[source]
class bytestag.events.EventSchedulerEntry(abs_time, event_id, periodic_interval, *event_data)[source]

Bases: builtins.object

An event scheduler entry.

class bytestag.events.FnTaskSlot(max_size=3)[source]

Bases: threading.Thread

Limit task execution

add(fn, *args, **kwargs)[source]

Executes function with given arguments.

This function blocks until the slot is not full.

Return type:Task
Returns:The Task that given fn returns.
add_no_block(fn, *args, **kwargs)[source]
current_tasks[source]
observer[source]

An observer that fires when a task is added or removed.

The observer callback arguments are:

  1. bool - If True, then the task is added. Otherwise, the task was removed.
  2. Task - The task added or removed.
queue[source]
run()[source]
stop()[source]
class bytestag.events.Observer(callback_fn=None, one_shot=False)[source]

Bases: builtins.object

A callback manager.

Example usage:

>>> def my_function(some_arg):
...     print(some_arg)
>>> observer = Observer()
>>> observer.register(my_function)
>>> observer('Observer activated!')
'Observer activated!'
register(callback_fn)[source]

Register a callback function

class bytestag.events.Task(*args, **kwargs)[source]

Bases: builtins.object

An enhanced future.

Pass an instance of this task to concurrent.futures.Executor. Instead of using the future provided by the executor, use this instance.

hook_task(task)[source]

Hook another task into this task.

This function should be called within the task so that stop will be propagated to the given task. Once the task finishes, it is automatically unhooked. As well, the task will update the progress.

is_finished[source]
is_running[source]

Return whether the task is running

Return type:bool
observer[source]

Return the observer

The observer will callback when the task is finished.

Return type:Observer
progress[source]

Return the progress made so far

result(timeout=None)[source]

Wait and return the result

result_[source]

Return the result.

Result may be None if the task is not finished.

See :result()
run(*args, **kwargs)[source]

The task’s main body.

Implementors should override this function. This function should periodically check is_running and update progress. The function should return a value which is the result.

stop()[source]

Request the task to stop

class bytestag.events.WrappedThreadPoolExecutor(max_workers, event_reactor)[source]

Bases: concurrent.futures.thread.ThreadPoolExecutor, bytestag.events.EventReactorMixin

Wraps a ThreadPoolExecutor that listens to a stop event

submit(fn, *args, **kwargs)[source]
bytestag.events.asynchronous(daemon=True, name=None)[source]

Wrap a function to run in a separate thread

events_test Module

class bytestag.events_test.TestAsync(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_async()[source]
test_async_error(*args, **kwargs)[source]
class bytestag.events_test.TestEventReactor(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_reactor()[source]

It should process 1 event and then stop

class bytestag.events_test.TestObserver(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_observer()[source]

It should activate the callback functions

files Module

File manipulation

bytestag.files.file_overwriter(*args, **kwds)[source]

keys Module

Keys

class bytestag.keys.KeyBytes[source]

Bases: builtins.bytes

A fixed-width binary value that represents keys and node IDs

BIT_SIZE = 160
base16[source]

Return the hex representation.

Return type:str
base32[source]

Return the base 32 representation.

Return type:str
base64[source]

Return the base 64 representation.

Return type:str
binary[source]

Return the bytes representation.

Return type:bytes
binary_str[source]

Return the binary representation (zeros and ones).

Return type:str
distance(other)[source]

Return the distance from another Key.

Return type:bytes
distance_int(other)[source]

Return the distance from another Key.

Return type:int
integer[source]

Return the integer representation.

Return type:int
classmethod new_hash(bytes_)[source]
classmethod new_silent(value)[source]

Return a new Key instance if successfully parsed.

Return type:Key, None
validate()[source]

Check if the key is a valid size.

Raises ValueError:
 Invalid bit size
classmethod validate_hash_value(hash_bytes, value)[source]
validate_value(value)[source]
bytestag.keys.b16_to_bytes(s, ignore_error=False)[source]

Convert hex string to bytes

bytestag.keys.b32_to_bytes(s, ignore_error=False)[source]

Convert base32 string to bytes

bytestag.keys.b64_to_bytes(s, ignore_error=False)[source]

Convert base64 string to bytes

bytestag.keys.bytes_to_b16(b)[source]

Convert bytes to hex string

bytestag.keys.bytes_to_b32(b)[source]

Convert bytes to base32 string

bytestag.keys.bytes_to_b64(b)[source]

Convert bytes to base64 string

bytestag.keys.compute_bucket_number(key_1, key_2)[source]

Compute the bucket number for two keys.

Parameters :
key_1: KeyBytes

The first key

key_2: KeyBytes

The second key

Return type:

int

bytestag.keys.leading_zero_bits(bytes_)[source]

Return the number of leading zero bits in bytes value.

Return type:int
bytestag.keys.random_bucket_key(node_key, bucket_number, bit_size=160)[source]

Return a key that corresponds to a given bucket number

keys_test Module

KeyBytes value testing

class bytestag.keys_test.TestByteConvertion(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_conversion()[source]
test_silent_conversion()[source]
class bytestag.keys_test.TestFunctions(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_compute_bucket_number()[source]

It should return the bucket number based on leading zeros bits

test_leading_zero_bits()[source]

It should count the leading zeros bits

class bytestag.keys_test.TestKeyBytes(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_bad_length()[source]

It should raise ValueError on invalid id

test_equality()[source]

It should be equal if the id is the same

test_random_bucket_key()[source]

It should generate keys that goes into given bucket number

test_serialize()[source]

It should return a hex string

test_valid_id()[source]

It should not raise error on valid id

main Module

Command line application entry point

bytestag.main.main()[source]

network Module

Networking

class bytestag.network.DataPacket[source]

Bases: bytestag.network.DataPacket

A JSON data packet.

Variables:
  • address – a tuple of (host, port_number)
  • dict_obj – a dict containing the payload
  • sequence_id – the sequence id
class bytestag.network.DownloadTask(timeout=10, max_size=None)[source]

Bases: bytestag.events.Task

Downloads data from a contact and returns a file object.

run()[source]
transfer(bytes_)[source]
class bytestag.network.JSONKeys[source]

Bases: builtins.object

The keys used in the JSON data

PAYLOAD = 'payload'
REPLY_SEQUENCE_ID = 'reply_id'
SEQUENCE_ID = 'seq_id'
TRANSFER_DATA = 'xfer_data'
TRANSFER_ID = 'xfer_id'
TRANSFER_SIZE = 'xfer_size'
class bytestag.network.Network(event_reactor, address=('127.0.0.1', 0))[source]

Bases: bytestag.events.EventReactorMixin

Network controller

Cvariables :
MAX_UDP_PACKET_SIZE

The maximum UDP packet size allowed

DEFAULT_TIMEOUT

The time in seconds before a reply is timed out

STREAM_DATA_SIZE

The size in bytes of the parts of the file transmitted

DEFAULT_POOL_SIZE = 20
DEFAULT_TIMEOUT = 10
MAX_UDP_PACKET_SIZE = 65507
SEQUENCE_ID_SIZE = 20
STREAM_DATA_SIZE = 1024
expect_incoming_transfer(transfer_id, timeout=10, download_task_class=None, max_size=None)[source]

Allow a transfer for download.

Parameters :
transfer_id: str

A transfer id that the other client use for transferring data.

timeout: int float

Time in seconds before the transfer times out.

max_size: int None

The maximum file size.

Return type:

DownloadTask

Returns:

A future that returns a file object that may have been interrupted. The progress is the number of bytes downloaded.

new_sequence_id()[source]

Generate a new sequence ID.

Return type:str
receive_callback(data_packet)[source]

The function called when a data packet arrives.

Parameters :
data_packet: DataPacket

The incoming data packet

This function is called for packets that are not replies. Implementors of this class should override this method.

send(address, dict_obj, timeout=None)[source]

Send the dict to address

Parameters :
address: tuple

A 2-tuple with the host and port number.

dict_obj: dict

The dict that will be converted to JSON format.

timeout: None, int, float, True

If timeout is a number, the class will attempt to ensure delivery and wait for a reply. A future will be returned. If True, the default timeout will be used.

Return type:

None, SendPacketTask

Returns:

Returns a SendPacketTask if timeout is given. The result is either DataPacket or None.

send_answer_reply(source_data_packet, dict_obj)[source]

Send dict that is a response to a incoming data packet

Parameters :
source_data_packet: DataPacket

The original incoming data packet to respond to.

dict_obj: dict

The data to send back

Use this function to reply to packets that expect a response. This function automatically adds sequence IDs the reply packet.

send_bytes(address, transfer_id, bytes_, timeout=10)[source]

Transfer data to another client.

Parameters :
address: tuple

A 2-tuple with host and port number.

bytes_: bytes

The data to send.

timeout: int, float

The time in seconds before the transfer times out.

transfer_id: str, None

The transfer ID to be used. If None, an ID will be created automatically.

See :

send_file()

Return type:

UploadTask

send_file(address, transfer_id, file_, timeout=10)[source]

Transfer data to another client.

Parameters :
address: tuple

A 2-tuple with host and port number.

file_: str, object

A filename or a file-like object which has read.

timeout: int, float

The time in seconds before the transfer times out.

transfer_id: str, None

The transfer ID to be used. If None, an ID will be created automatically.

Return type:

UploadTask

Returns:

A future that returns an int that is the number of bytes sent.

server_address[source]

The address of the server

class bytestag.network.ReplyTable[source]

Bases: builtins.object

Manages the matching of sequence IDs to prevent forged UDP replies

add_in_entry(sequence_id, address, data_packet)[source]

Store the data packet reply to be retrieved be woken thread

add_out_entry(sequence_id, address, event)[source]

Add an entry that expects a reply

Parameters :
sequence_id

The id of the packet send out

address

The destination of the packet

event: threading.Event

The threading.Event instance to wait on

get_in_entry(sequence_id, address)[source]

Get the stored data packet

Return type:DataPacket, None
get_out_entry(sequence_id, address)[source]

Get the Event instance

Return type:threading.Event, None
remove_in_entry(sequence_id, address)[source]

Delete the stored data packet

remove_out_entry(sequence_id, address)[source]

Remove the entry

class bytestag.network.SendPacketTask(*args, **kwargs)[source]

Bases: bytestag.events.Task

Send a data packet and return the response.

The result returned is either None or DataPacket.

run(send_fn, sequence_id, address, reply_table, event, timeout, num_attempts=2)[source]
class bytestag.network.UDPClient(socket_obj=None)[source]

Bases: builtins.object

UDP Client

send(address, data)[source]

Send bytes to address

class bytestag.network.UDPRequestHandler(request, client_address, server)[source]

Bases: socketserver.BaseRequestHandler

UDP request handler for the UDP server

handle()[source]
class bytestag.network.UDPServer(event_reactor, address=('127.0.0.1', 0))[source]

Bases: bytestag.events.EventReactorMixin, threading.Thread, socketserver.UDPServer

UDP server

run()[source]

Start the server

class bytestag.network.UDP_INBOUND_EVENT[source]

Bases: builtins.object

A UDP inbound event id

class bytestag.network.UploadTask(*args, **kwargs)[source]

Bases: bytestag.events.Task

Returns the number of bytes sent.

run(network, address, source_file, transfer_id, timeout)[source]

network_test Module

class bytestag.network_test.TestNetworkControllerComponents(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_faulty_udp_unpacking_bad_json()[source]

It should return None if bad json parsing

test_udp_packing()[source]

It should pack and unpack the data symmetrically

class bytestag.network_test.TestNetworkControllerMultiNode(methodName='runTest')[source]

Bases: unittest.case.TestCase

TIMEOUT = 5
join_event_reactors()[source]
setup_nodes(count=2)[source]
stop_event_reactors()[source]
test_expect_reply()[source]

It should send a packet and the other server replies

test_expect_reply_failure()[source]

It should send a packet and it times-out

test_incoming_send_packet()[source]

Server 0 should send a packet to server 1

test_send_file()[source]

It should transfer a file

class bytestag.network_test.TestReplyTable(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_add_remove_in()[source]

It should add and remove

test_add_remove_out()[source]

It should add and remove

class bytestag.network_test.TestUDP(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_udp()[source]

It should be able to send itself a udp packet through events

queue Module

Specialized queues

class bytestag.queue.BigDiskQueue(memory_size=100)[source]

Bases: builtins.object

A queue that spools onto disk when needed.

The core functionality is similar to queue.Queue.

get(block=True, timeout=None)[source]

Get an item from the queue.

get_nowait()[source]

Get an item from the queue without blocking.

put(item, block=None, timeout=None)[source]

Put an item on the queue.

This function is nonblocking. Parameters are provided for compatibility with queue.Queue.

put_nowait(item)[source]

Put an item on the queue.

queue_test Module

class bytestag.queue_test.TestBigDiskQueue(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_queue()[source]

It should add n and return n items

test_queue_random()[source]

It should add n and return n items randomly

storage Module

Storage management and implementations of KVPTables

class bytestag.storage.CollectionInfoTypes[source]

Bases: builtins.object

Types of CollectionInfo file types

BITTORRENT = 2
BYTESTAG = 1
DUMMY = 0
class bytestag.storage.DatabaseKVPRecord(table, kvpid)[source]

Bases: bytestag.tables.KVPRecord

The record associated with DatabaseKVPTable.

index[source]
is_original[source]
key[source]
last_update[source]
size[source]
time_to_live[source]
timestamp[source]
value[source]
class bytestag.storage.DatabaseKVPTable(path, max_size=68719476736)[source]

Bases: bytestag.tables.KVPTable, bytestag.storage.SQLite3Mixin

A KVPTable stored as a SQLite database

clean()[source]

Remove expired key-value pairs.

indices(key)[source]
is_acceptable(kvpid, size, timestamp)[source]
keys()[source]
max_size[source]

The maximum size the table will grow.

record(kvpid)[source]
class bytestag.storage.MemoryKVPRecord(kvpid, d)[source]

Bases: bytestag.tables.KVPRecord

The record associated with MemoryKVPTable

index[source]
is_original[source]
key[source]
last_update[source]
size[source]
time_to_live[source]
timestamp[source]
value[source]
class bytestag.storage.MemoryKVPTable[source]

Bases: bytestag.tables.KVPTable

A quick and dirty implementation of KVPTable

Note

This class is generally used for unit tests.

indices(key)[source]
is_acceptable(kvpid, size, timestamp)[source]
keys()[source]
record(kvpid)[source]
exception bytestag.storage.ReadOnlyTableError[source]

Bases: builtins.Exception

This error is raised when the table does support storing values.

class bytestag.storage.SQLite3Mixin[source]

Bases: builtins.object

A SQLite 3 mixin class to provide connection management

connection(*args, **kwds)[source]

Return a connection context manager

database_size[source]

The size of the database.

Return type:int
iter_query(query, params=(), limit=1000)[source]

Return rows that are fetch in blocks and stored in memory.

This function is useful for iterating the entire database without blocking other connections.

class bytestag.storage.SharedFileHashRecord(table, kvpid)[source]

Bases: bytestag.tables.KVPRecord

The record associated with SharedFilesKVPTable.

This record describes a single file on the filesystem.

See :SharedFileRecord
file_hash_info[source]
index[source]
is_original[source]
key[source]
last_update[source]
size[source]
time_to_live[source]
timestamp[source]
value[source]
class bytestag.storage.SharedFilesHashTask(*args, **kwargs)[source]

Bases: bytestag.events.Task

A task that hashes and populates a shared files table.

Variables:progress – a tuple (str, int) describing the filename and bytes read.
run(table, part_size=262144)[source]
class bytestag.storage.SharedFilesKVPTable(path)[source]

Bases: bytestag.tables.KVPTable, bytestag.storage.SQLite3Mixin

Provides a KVPTable interface to shared files split into pieces.

file_hash_info(kvpid)[source]
hash_directories()[source]

Hash the directories and populate the table with file info.

Return type:SharedFilesHashTask
indices(key)[source]
is_acceptable(kvpid, size, timestamp)[source]
keys()[source]
num_collections[source]
num_files[source]
record(kvpid)[source]
shared_directories[source]

A list directories to be shared.

Modify the list at your will, but be sure to sure to call hash_directories() as file monitoring is not yet supported.

total_disk_size[source]
class bytestag.storage.SharedFilesRecord(table, kvpid)[source]

Bases: bytestag.tables.KVPRecord

The record associated with SharedFilesKVPTable.

This record describes a single file on the filesystem.

See :SharedFileHashRecord
index[source]
is_original[source]
key[source]
last_update[source]
size[source]
time_to_live[source]
timestamp[source]
value[source]
bytestag.storage.byte_to_part_number(byte_number, part_size)[source]

Converts a byte offset to a file segment number.

Return type:int
bytestag.storage.part_to_byte_number(part_number, part_size)[source]

Converts a file segment number to the byte offset

Return type:int
bytestag.storage.total_parts(total_byte_size, part_size)[source]

Returns the total number of segments of a file

Return type:int

storage_test Module

class bytestag.storage_test.TableMixin[source]

Bases: builtins.object

table_store_get(data, kvp_table)[source]
class bytestag.storage_test.TestDatabaseKVPTable(methodName='runTest')[source]

Bases: unittest.case.TestCase, bytestag.storage_test.TableMixin

test_store_get()[source]

It should store and get

class bytestag.storage_test.TestFunctions(methodName='runTest')[source]

Bases: unittest.case.TestCase

SIZE = 1024
test_byte_to_part_number()[source]

It should map bytes to part numbers

test_part_to_byte_number()[source]

It should map part numbers to offsets in files

test_total_parts()[source]

It should give the upperbound of parts needed

class bytestag.storage_test.TestMemoryKVPTable(methodName='runTest')[source]

Bases: unittest.case.TestCase, bytestag.storage_test.TableMixin

test_store_get()[source]

It should store and get

class bytestag.storage_test.TestSharedFilesKVPTable(methodName='runTest')[source]

Bases: unittest.case.TestCase, bytestag.storage_test.TableMixin

create_file(path)[source]
test_hash()[source]

It should hash the files in each directory

tables Module

Key-value pair management

class bytestag.tables.AggregatedKVPTable(primary_table, tables)[source]

Bases: bytestag.tables.KVPTable

Combines several KVPTable

indices(key)[source]
is_acceptable(kvpid, size, timestamp)[source]
keys()[source]
primary_table[source]
record(kvpid)[source]
records_by_key(key)[source]
tables[source]
class bytestag.tables.KVPID[source]

Bases: bytestag.tables.KVPID

The components of a key.

Variables:
class bytestag.tables.KVPRecord[source]

Bases: builtins.object

Information about a key-value pair

index[source]

The index of the key-value pair.

Return type:KeyBytes
is_original[source]

Whether this client is the original publisher of the key-value pair.

Return type:bool
key[source]

The key of the key-value pair.

Return type:KeyBytes
last_update[source]

The timestamp of when the value was published or replicated.

Return type:int
size[source]

The length of the value.

Return type:int
time_to_live[source]

The time in seconds the record is kept.

Return type:int
timestamp[source]

The publication timestamp of the key-value pair.

Return type:int
value[source]

The value of the key-value pair.

Return type:bytes
class bytestag.tables.KVPTable[source]

Bases: builtins.object

A base class for key-value tables.

This table supports Python idioms for add and removing values:

table[kvpid] = b'123'
kvpid in table
del table[kvpid]
indices(key)[source]

Return the indicies associated with the key.

Return type:list
Returns:a list of indices KeyBytes
is_acceptable(kvpid, size, timestamp)[source]

Return whether the table accepts adding new keys.

Return type:bool
keys()[source]

Return an iterator of KVPID

record(kvpid)[source]

Return the KVPRecord associated with the KVPID

records_by_key(key)[source]

Return a list of KVPRecord associated with given key.

value_changed_observer[source]

The observer for value changes.

Return type:Observer