Open Table Of Contents

Source code for leekspin.util

# -*- coding: utf-8 -*-

"""Common general utilities."""

from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals

from codecs import open as open

import argparse
import ipaddr
import random
import sys
import time

from leekspin import __version__
from leekspin import __package__


[docs]def getArgParser(): """Get our :class:`~argparse.ArgumentParser`.""" version_ = '-'.join([__package__, __version__.rsplit('_', 1)[0]]) parser = argparse.ArgumentParser(prog=__package__, add_help=True) # Otherwise we hit an argparse bug that prints the following cryptic error: # `'Namespace' object has no 'version' attribute` parser.prog = __package__ parser.version = version_ parser.description = "Generate a signed set of network-status, " parser.description += "extra-info, and server descriptor documents " parser.description += "for mock Tor relays or bridges." otherargs = parser.add_mutually_exclusive_group() infoargs = otherargs.add_mutually_exclusive_group() infoargs.add_argument("-v", "--verbose", action="store_true", help="print information to stdout") infoargs.add_argument("-q", "--quiet", action="store_true", help="don't print anything") versionargs = parser.add_mutually_exclusive_group() versionargs.add_argument("--version", action="store_true", help="print leekspin version and exit") descgroup = parser.add_argument_group() descgroup.title = "descriptor types" descgroup.argument_default = "--bridge" descgroup1 = descgroup.add_mutually_exclusive_group() descgroup1.add_argument("-r", "--relay", action="store_true", help="generate relay descriptors") descgroup1.add_argument("-b", "--bridge", action="store_true", help="generate bridge descriptors") descgroup1.add_argument("-hs", "--hidden-service", action="store_true", help="generate HS rendezvous descriptors") descgroup1.set_defaults(relay=False, bridge=True, hidden_service=False) descgroup2 = parser.add_argument_group() descgroup2.title = "descriptor variations" descgroup2.add_argument("-xt", "--without-tap", action="store_true", help=("generate descriptors without TAP support, " "e.g. without RSA keys")) descgroup2.add_argument("-xn", "--without-ntor", action="store_true", help=("generate descriptors without ntor support, " "e.g. without Ed25519 keys")) descgroup2.set_defaults(without_tap=False, without_ntor=False) group = parser.add_argument_group() group.title = "required arguments" group.add_argument("-n", "--descriptors", default=0, help="generate <n> descriptor sets", type=int) return parser
def _checkIPValidity(ip): """Check that an IP address is valid. :type ip: ``ipaddr.IPAddress`` :param ip: The ip address to check. """ if (ip.is_link_local or ip.is_loopback or ip.is_multicast or ip.is_private or ip.is_unspecified or ((ip.version == 6) and ip.is_site_local) or ((ip.version == 4) and ip.is_reserved)): return False return True
[docs]def randomIP(): """Create a random IPv4 or IPv6 address.""" maybe = int(random.getrandbits(1)) ip = randomIPv4() if maybe else randomIPv6() return ip
[docs]def randomIPv4(): """Create a random IPv4 address.""" validIP = None while not validIP: maybe = ipaddr.IPv4Address(random.getrandbits(32)) valid = _checkIPValidity(maybe) if valid: validIP = maybe break return validIP
[docs]def randomIPv6(): """Create a random IPv6 address.""" validIP = None while not validIP: maybe = ipaddr.IPv6Address(random.getrandbits(128)) valid = _checkIPValidity(maybe) if valid: validIP = maybe break return validIP
[docs]def randomPort(): """Get a random integer in the range ``[1026, 65530]``. The reason that port 1025 is missing is because the IPv6 port (in the ``or-address``/``a`` lines), if there will be one, will be whatever the random ORPort is, minus one. The pluggable transport in the extrainfo descriptor (if there are any) are calculated as the random ORPort, plus some. :rtype: int """ return random.randint(1026, 65530)
[docs]def getHexString(size): """Get a capitalised hexidecimal string **size** bytes long. :param int size: The number of bytes in the returned string. :rtype: str :returns: A hex string. """ hexstr = "" for _ in xrange(size): hexstr += random.choice("ABCDEF0123456789") return hexstr
[docs]def makeTimeStamp(now=None, fmt=None, variation=False, period=None): """Get a random timestamp suitable for a bridge server descriptor. :param int now: The time, in seconds since the Epoch, to generate the timestamp for (and to consider as the maximum time, if other options are enabled). :param str fmt: A strftime(3) format string for the timestamp. If not given, defaults to ISO-8601 format without the ``'T'`` separator. :param bool variation: If True, enable timestamp variation. Otherwise, make all timestamps be set to the current time. :type period: int or None :param period: If given, vary the generated timestamps to be a random time between **period** hours ago and the current time. If ``None``, generate completely random timestamps which are anywhere between the Unix Epoch and the current time. This parameter only has an effect if **variation** is enabled. """ now = int(now) if now is not None else int(time.time()) fmt = fmt if fmt else "%Y-%m-%d %H:%M:%S" if variation: then = 1 if period is not None: secs = int(period) * 3600 then = now - secs # Get a random number between one epochseconds number and another diff = random.randint(then, now) # Then rewind the clock now = diff return time.strftime(fmt, time.localtime(now))
[docs]def writeDescToFile(filename, descriptors): """Open **filename** and write a string containing **descriptors** into it. :param str filename: The name of the file to write to. :param str descriptors: A giant string containing descriptors, newlines, formatting, whatever is necessary to make it look like a file tor would generate. """ encoding = sys.getfilesystemencoding() descript = descriptors.encode(encoding, 'replace') try: with open(filename, 'wb', encoding=encoding, errors='replace') as fh: fh.write(descript) fh.flush() except (IOError, OSError) as err: print("Failure while attempting to write descriptors to file '%s': %s" % (filename, err.message))