'''
Everything is string type, even if this is inefficient.
Class.loads always takes str input, and Class.dumps always
returns str output.
'''
import logging
import os
import io
import datetime
import base64
import re
from collections import OrderedDict, Hashable
from xml.etree import ElementTree as E
from functools import wraps
from abc import ABCMeta
from pprint import pformat
from . import exceptions
# Just for checking types
import decimal, inspect
logger = logging.getLogger(__name__)
UNDOCUMENTED = 'Undocumented'
_default_true = 'bool arguments must have default of False; consider changing param %(name)s to no%(name)s'
class AnnotationFactory(object):
def __init__(self, Class, *args, **kwargs):
self._Class = Class
self._args = args
self._kwargs = kwargs
def __call__(self):
try:
return self._Class(*self._args, **self._kwargs)
except ValueError:
raise exceptions.CouldNotParse
class Annotation(object):
allow_get = True
def __init__(self, *args, **kwargs):
raise NotImplementedError
def loads(self, loader, data):
return {
'raw': self._loads_raw,
'wsgi': self._loads_wsgi,
'urwid': self._loads_urwid,
}[loader](data)
def dumps(self, dumper, value):
return {
'raw': self._dumps_raw,
'wsgi': self._dumps_wsgi,
'urwid': self._dumps_urwid,
}[dumper](value)
def _label(self, tree, lines='one'):
parent = E.Element('div', **{'class': 'list-box-row %sline' % lines})
label = E.Element('label', **{
'for': self.name,
'title': self.description,
})
label.text = self.name
parent.append(label)
parent.append(tree)
return parent
#: Default documentation string
description = UNDOCUMENTED
default = inspect.Parameter.empty
@classmethod
def factory(Class, *args, **kwargs):
return AnnotationFactory(Class, *args, **kwargs)
@classmethod
def bind(Class, param, _annotation=None):
'''
Bind an annotation to a parameter.
:param inspect.Parameter param: Parameter from the function signature
:param _annotation: Annotation for the resulting object instead
of param.annotation, used only internally for recursion
:ivar bool is_list_type: Whether the parameter takes multiple arguments
:ivar default: Default value for keyword arguments (used for
generating default configuration files)
'''
if _annotation:
a = _annotation
else:
a = param.annotation
if (a == bool and param.default != False) or \
(a == param.empty and param.default==True):
raise ValueError(_default_true % {'name': param.name})
elif isinstance(a, Class):
raise TypeError('Annotations must be Annotation subclasses, not instances.')
elif isinstance(a, type) and issubclass(a, Annotation):
obj = a() # This works, of course, for only Annotations that accept no arguments.
elif isinstance(a, AnnotationFactory):
obj = a()
elif a == inspect.Parameter.empty and \
isinstance(param.default, bool) and param.default == False:
obj = Boolean()
elif isinstance(a, list) and len(a) == 1:
if isinstance(a[0], list) and len(a[0]) == 1:
raise ValueError('You can\'t nest list annotations.')
obj = Class.bind(param, _annotation=a[0])
elif _is_hashable(a) and a in annotations_map:
obj = annotations_map[a]()
elif hasattr(a, 'loads') and hasattr(a, 'dumps') \
or hasattr(a, 'load') and hasattr(a, 'dump'):
obj = Encoder(a)
elif hasattr(a, '__call__'):
obj = String(a)
elif hasattr(a, 'items'):
obj = FactorMapping(a)
elif isinstance(a, tuple):
obj = Factor(a)
else:
raise ValueError('Bad input annotation: %s' % x)
if hasattr(obj, '__name__'):
pass
elif hasattr(a, '__name__'):
obj.__name__ = a.__name__
else:
obj.__name__ = obj.__class__.__name__
obj.name = param.name
obj.default = param.default
obj.is_list_type = (param.kind == param.VAR_POSITIONAL) or \
(isinstance(a, list) and len(a) == 1)
return obj
def _loads_raw(self, data):
raise AnnotationNotImplemented
def _dumps_raw(self, value):
raise AnnotationNotImplemented
def _loads_wsgi(self, data):
return self._loads_raw(data)
def _dumps_wsgi(self, value):
raise AnnotationNotImplemented
def _loads_urwid(self, data):
return self._loads_raw(data)
def _dumps_urwid(self, value):
raise AnnotationNotImplemented
def __str__(self):
return '<%s: %s>' % (self.name, self.__name__)
YES = 'yes'
NO = 'no'
def _is_hashable(x):
'''
I implemented this because ``isinstance({}.__getitem__, collections.Hashable)``
does not do what I want.
'''
try:
hash(x)
except TypeError:
return False
else:
return True
def _d(f):
def wrapper(x):
if x != inspect.Parameter.empty:
try:
return f('raw', x)
except Exception:
logger.exception('Error in dumping to web form')
return ''
return wrapper
[docs]class Encoder(Annotation, metaclass=ABCMeta):
def __init__(self, x):
if hasattr(x, 'loads') and hasattr(x, 'dumps'):
self._loads_raw = x.loads
self._dumps_raw = x.dumps
elif hasattr(x, 'load') and hasattr(x, 'dump'):
IO = {str: io.StringIO, bytes: io.BytesIO}[self._type]
def _loads_raw(text):
with IO(text) as fp:
obj = load(fp)
return obj
self._loads_raw = _loads_raw
def _dumps_raw(obj):
with IO() as fp:
dump(obj, fp)
text = fp.getvalue()
return text
self._dumps_raw = _dumps_raw
else:
raise TypeError('Not an encoder: %s' % x)
self._x = x # just for repr
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._x)
def _dumps_wsgi(self, value):
'''
:param param: The parameter
:param value: The default value
'''
tree = E.Element('textarea', name=self.name)
if value:
tree.text = _d(self.dumps)(value)
return self._label(tree, lines='multi')
# Flags
[docs]class Flag(Annotation):
def __init__(self, loads):
self._loads_raw = loads
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._loads_raw)
def _loads_raw(self, yes=True):
if yes in {self.YES, True}:
self._loads()
return True
else:
return False
def _dumps_raw(self, yes):
return self.YES if yes else self.NO
def _dumps_wsgi(self, yes):
attr = {'type': 'checkbox', 'name': self.name}
if yes:
attr['checked'] = self.YES
input_ = E.Element('input', **attr)
return self._label(input_)
@Flag.factory
[docs]def Boolean(_=None):
return True
[docs]class Help(Flag):
NAME = 'help'
__name__ = 'Help'
def __init__(self):
pass
def __repr__(self):
return '%s()' % self.__class__.__name__
description = 'Display this help.'
default = False
def _loads_raw(self, _):
raise exceptions.ShowHelp
# String types
[docs]class Text(Annotation):
def _dumps_wsgi(self, value):
attr = {'type': 'text', 'name': self.name}
if value:
attr['value'] = _d(self.dumps)(value)
input_ = E.Element('input', **attr)
return self._label(input_)
[docs]class Identity(Text):
def __init__(self, type=str):
self._type = type
def __repr__(self):
return '%s(type=%s)' % (self.__class__.__name__, self._type)
def _check_type(self, x):
if self._type and not isinstance(x, self._type):
p = (self._type, type(x), repr(x))
raise TypeError('Value must be %s, not %s: %s' % p)
else:
self._assign_type(x)
def _assign_type(self, x):
if not self._type:
self._type = type(x)
def _loads_raw(self, data):
self._check_type(data)
return data
def _dumps_raw(self, value):
self._check_type(value)
return value
[docs]class Config(Identity):
def __init__(self, filename):
self._default_filename = filename
super(Config, self).__init__(type=str)
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self.default_filename)
def _loads_raw(self, _):
raise exceptions.Config(self._default_filename)
def _loads_wsgi(self, data):
raise ValueError('Do not use the Config annotation for websites.')
def _dumps_wsgi(self, obj):
raise ValueError('Do not use the Config annotation for websites.')
[docs]class String(Text):
def __init__(self, loads=lambda x: x):
self.__name__ = loads.__name__
self._loads_raw = loads
self._dumps_raw = str
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._loads_raw.__name__)
[docs]class Regex(Text):
def __init__(self, expr):
def loads(x):
if re.match(expr, x):
return x
else:
raise ValueError('Does not match regular expression: %s' % expr)
self._expr = re.compile(expr)
self.__name__ = '<Regex %s>' % self._expr
self._loads_raw = loads
self._dumps_raw = str
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self._expr)
[docs]class Bytes(Text):
def __init__(self, loads=lambda x: x, dumps=lambda x: x, encoding='ascii'):
decode, encode = _coders(encoding)
self._loads_raw = lambda x: loads(encode(x))
self._dumps_raw = lambda x: decode(dumps(x))
# Just for repr
self._loads = loads
self._dumps = dumps
self._encoding = encoding
def __repr__(self):
return '%s(loads=%s, dumps=%s, encoding=%r)' % \
(self.__class__.__name__, self._loads, self._dumps, self._encoding)
def _coders(encoding):
if isinstance(encoding, str):
loads = lambda x: bytes.decode(x, encoding)
dumps = lambda x: str.encode(x, encoding)
elif encoding == None:
loads = bytes.decode
dumps = str.encode
else:
raise TypeError('Must be None or str, not %s' % type(encoding))
return loads, dumps
@String.factory
@String.factory
[docs]def OutputDirectory(x):
os.makedirs(x, exist_ok=True)
return x
@String.factory
[docs]def Port(x):
y = int(x)
if 1 <= y <= 65535:
return y
else:
raise ValueError('Not a valid port number: %d' % y)
Decimal = String.factory(decimal.Decimal)
Integer = String.factory(int)
Float = String.factory(float)
[docs]class Range(String):
def __init__(self, minimum, maximum, loads=float, dumps=str, step='any'):
self._loads_raw = loads
self._dumps_raw = dumps
self._min = minimum
self._max = maximum
self._step = step
def __repr__(self):
return '%s(%r, %r, loads=%s, dumps=%s, step=%r)' % \
(
self.__class__.name, self._min, self._max,
self._loads_raw, self._dumps_raw, self._step,
)
def _dumps_wsgi(self, name, default):
'''
:param name: The parameter name
:param default: The default value, ignored
'''
attr = {
'type': 'range', 'name': name,
'min': _d(self.dumps)(self._min), 'max': _d(self.dumps)(self._min),
'step': self._step if isinstance(self._step, str) else _d(self.dumps)(self._step),
}
if default:
attr['value'] = _d(self.dumps)(default)
input_ = E.Element('input', **attr)
return self._label(input_)
# Factor types
[docs]class FactorMapping(String):
def __init__(self, options):
if not hasattr(options, 'items'):
raise ValueError('Options must be dict-like')
if not all(isinstance(option, str) for option in options):
raise ValueError('All options must be of str type.')
self._options = options
def _reverse_options(self, value):
xs = list(k for (k,v) in self._options.items() if v == value)
if 0 == len(xs):
raise ValueError('Must be one of %s' % repr(tuple(xs)))
else:
if 1 < len(xs):
logger.warning('Multiple keys for %s, using the first' % repr(y))
return next(xs)
else:
return xs[0]
def _loads_raw(self, key):
if key in self._options:
return self._options[key]
else:
raise ValueError('Must be one of %s' % repr(tuple(self._options)))
def _dumps_raw(self, value):
return self._reverse_options(value)
def _dumps_wsgi(self, selected):
select = E.Element('select', name=self.name)
if self.is_list_type:
select.set('multiple', self.YES)
else:
selected = {selected}
for value, text in self._options.items():
option = E.Element('option', value=value)
option.text = text
if value in selected:
option.set('selected', self.YES)
select.append(option)
return self._label(select)
[docs]class Factor(FactorMapping):
def __init__(self, options):
super(Factor, self).__init__(OrderedDict([(o,o) for o in options]))
# File pointer types
[docs]class File(Annotation):
allow_get = False
def __init__(self, mode, encoding=None, **kwargs):
self._mode = mode
self._decode, _encode = _coders(encoding)
self._kwargs=kwargs
def _loads_raw(self, filename):
if 'r' in self._mode and not os.path.isfile(filename):
raise ValueError('No such file: %s' % filename)
return open(filename, mode=self._mode, encoding=self._encoding, **self._kwargs)
def _dumps_raw(self):
return self.fp.name
def _loads_wsgi(self, data):
if 'b' in self._mode:
return data.file
else:
return io.StringIO(self._decode(data.file.read()))
def _dumps_wsgi(self, default):
'''
:param name: The parameter name
:param default: The default value, ignored
'''
attr = {'type': 'file', 'name': self.name, 'size': '40'}
input_ = E.Element('input', **attr)
return self._label(input_)
InputFile = File.factory('r')
InputBinaryFile = File.factory('rb')
OutputFile = File.factory('w')
OutputBinaryFile = File.factory('wb')
annotations_map = {
bool: Boolean,
bytes: Bytes,
decimal.Decimal: Decimal,
float: Float,
inspect.Parameter.empty: Identity,
int: Integer,
str: Identity,
}