Source code for wsme.types

import base64
import datetime
import decimal
import inspect
import logging
import netaddr
import re
import six
import sys
import uuid
import weakref

from wsme import exc

log = logging.getLogger(__name__)

#: The 'str' (python 2) or 'bytes' (python 3) type.
#: Its use should be restricted to
#: pure ascii strings as the protocols will generally not be
#: be able to send non-unicode strings.
#: To transmit binary strings, use the :class:`binary` type
bytes = six.binary_type

#: Unicode string.
text = six.text_type


class ArrayType(object):
    def __init__(self, item_type):
        if iscomplex(item_type):
            self._item_type = weakref.ref(item_type)
        else:
            self._item_type = item_type

    def __hash__(self):
        return hash(self.item_type)

    def __eq__(self, other):
        return isinstance(other, ArrayType) \
            and self.item_type == other.item_type

    def sample(self):
        return [getattr(self.item_type, 'sample', self.item_type)()]

    @property
    def item_type(self):
        if isinstance(self._item_type, weakref.ref):
            return self._item_type()
        else:
            return self._item_type

    def validate(self, value):
        if value is None:
            return
        if not isinstance(value, list):
            raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
                self.item_type, type(value)
            ))
        return [
            validate_value(self.item_type, item)
            for item in value
        ]


class DictType(object):
    def __init__(self, key_type, value_type):
        if key_type not in pod_types:
            raise ValueError("Dictionaries key can only be a pod type")
        self.key_type = key_type
        if iscomplex(value_type):
            self._value_type = weakref.ref(value_type)
        else:
            self._value_type = value_type

    def __hash__(self):
        return hash((self.key_type, self.value_type))

    def sample(self):
        key = getattr(self.key_type, 'sample', self.key_type)()
        value = getattr(self.value_type, 'sample', self.value_type)()
        return {key: value}

    @property
    def value_type(self):
        if isinstance(self._value_type, weakref.ref):
            return self._value_type()
        else:
            return self._value_type

    def validate(self, value):
        if not isinstance(value, dict):
            raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
                self.key_type, self.value_type, type(value)
            ))
        return dict((
            (
                validate_value(self.key_type, key),
                validate_value(self.value_type, v)
            ) for key, v in value.items()
        ))


class UserType(object):
    basetype = None
    name = None

    def validate(self, value):
        return value

    def tobasetype(self, value):
        return value

    def frombasetype(self, value):
        return value


def isusertype(class_):
    return isinstance(class_, UserType)


[docs]class BinaryType(UserType): """ A user type that use base64 strings to carry binary data. """ basetype = bytes name = 'binary' def tobasetype(self, value): if value is None: return None return base64.encodestring(value) def frombasetype(self, value): if value is None: return None return base64.decodestring(value) #: The binary almost-native type
binary = BinaryType() class IntegerType(UserType): """ A simple integer type. Can validate a value range. :param minimum: Possible minimum value :param maximum: Possible maximum value Example:: Price = IntegerType(minimum=1) """ basetype = int name = "integer" def __init__(self, minimum=None, maximum=None): self.minimum = minimum self.maximum = maximum @staticmethod def frombasetype(value): return int(value) if value is not None else None def validate(self, value): if self.minimum is not None and value < self.minimum: error = 'Value should be greater or equal to %s' % self.minimum raise ValueError(error) if self.maximum is not None and value > self.maximum: error = 'Value should be lower or equal to %s' % self.maximum raise ValueError(error) return value class StringType(UserType): """ A simple string type. Can validate a length and a pattern. :param min_length: Possible minimum length :param max_length: Possible maximum length :param pattern: Possible string pattern Example:: Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$') """ basetype = six.string_types name = "string" def __init__(self, min_length=None, max_length=None, pattern=None): self.min_length = min_length self.max_length = max_length if isinstance(pattern, six.string_types): self.pattern = re.compile(pattern) else: self.pattern = pattern def validate(self, value): if not isinstance(value, self.basetype): error = 'Value should be string' raise ValueError(error) if self.min_length is not None and len(value) < self.min_length: error = 'Value should have a minimum character requirement of %s' \ % self.min_length raise ValueError(error) if self.max_length is not None and len(value) > self.max_length: error = 'Value should have a maximum character requirement of %s' \ % self.max_length raise ValueError(error) if self.pattern is not None and not self.pattern.search(value): error = 'Value should match the pattern %s' % self.pattern.pattern raise ValueError(error) return value class IPv4AddressType(UserType): """ A simple IPv4 type. """ basetype = six.string_types name = "ipv4address" @staticmethod def validate(value): try: netaddr.IPAddress(value, version=4, flags=netaddr.INET_PTON) except netaddr.AddrFormatError: error = 'Value should be IPv4 format' raise ValueError(error) else: return value class IPv6AddressType(UserType): """ A simple IPv6 type. This type represents IPv6 addresses in the short format. """ basetype = six.string_types name = "ipv6address" @staticmethod def validate(value): try: netaddr.IPAddress(value, version=6, flags=netaddr.INET_PTON) except netaddr.AddrFormatError: error = 'Value should be IPv6 format' raise ValueError(error) else: return value class UuidType(UserType): """ A simple UUID type. This type allows not only UUID having dashes but also UUID not having dashes. For example, '6a0a707c-45ef-4758-b533-e55adddba8ce' and '6a0a707c45ef4758b533e55adddba8ce' are distinguished as valid. """ basetype = six.string_types name = "uuid" @staticmethod def validate(value): try: return six.text_type((uuid.UUID(value))) except (TypeError, ValueError, AttributeError): error = 'Value should be UUID format' raise ValueError(error)
[docs]class Enum(UserType): """ A simple enumeration type. Can be based on any non-complex type. :param basetype: The actual data type :param values: A set of possible values If nullable, 'None' should be added the values set. Example:: Gender = Enum(str, 'male', 'female') Specie = Enum(str, 'cat', 'dog') """ def __init__(self, basetype, *values, **kw): self.basetype = basetype self.values = set(values) name = kw.pop('name', None) if name is None: name = "Enum(%s)" % ', '.join((six.text_type(v) for v in values)) self.name = name def validate(self, value): if value not in self.values: raise ValueError("Value should be one of: %s" % ', '.join(map(six.text_type, self.values))) return value def tobasetype(self, value): return value def frombasetype(self, value): return value
class UnsetType(object): if sys.version < '3': def __nonzero__(self): return False else: def __bool__(self): return False def __repr__(self): return 'Unset' Unset = UnsetType() #: A special type that corresponds to the host framework request object. #: It can only be used in the function parameters, and if so the request object #: of the host framework will be passed to the function. HostRequest = object() pod_types = six.integer_types + ( bytes, text, float, bool) dt_types = (datetime.date, datetime.time, datetime.datetime) extra_types = (binary, decimal.Decimal) native_types = pod_types + dt_types + extra_types # The types for which we allow promotion to certain numbers. _promotable_types = six.integer_types + (text, bytes) def iscomplex(datatype): return inspect.isclass(datatype) \ and '_wsme_attributes' in datatype.__dict__ def isarray(datatype): return isinstance(datatype, ArrayType) def isdict(datatype): return isinstance(datatype, DictType) def validate_value(datatype, value): if value in (Unset, None): return value # Try to promote the data type to one of our complex types. if isinstance(datatype, list): datatype = ArrayType(datatype[0]) elif isinstance(datatype, dict): datatype = DictType(*list(datatype.items())[0]) # If the datatype has its own validator, use that. if hasattr(datatype, 'validate'): return datatype.validate(value) # Do type promotion/conversion and data validation for builtin # types. v_type = type(value) if datatype in six.integer_types: if v_type in _promotable_types: try: # Try to turn the value into an int value = datatype(value) except ValueError: # An error is raised at the end of the function # when the types don't match. pass elif datatype is float and v_type in _promotable_types: try: value = float(value) except ValueError: # An error is raised at the end of the function # when the types don't match. pass elif datatype is text and isinstance(value, bytes): value = value.decode() elif datatype is bytes and isinstance(value, text): value = value.encode() if not isinstance(value, datatype): raise ValueError( "Wrong type. Expected '%s', got '%s'" % ( datatype, v_type )) return value
[docs]class wsproperty(property): """ A specialised :class:`property` to define typed-property on complex types. Example:: class MyComplexType(wsme.types.Base): def get_aint(self): return self._aint def set_aint(self, value): assert avalue < 10 # Dummy input validation self._aint = value aint = wsproperty(int, get_aint, set_aint, mandatory=True) """ def __init__(self, datatype, fget, fset=None, mandatory=False, doc=None, name=None): property.__init__(self, fget, fset) #: The property name in the parent python class self.key = None #: The attribute name on the public of the api. #: Defaults to :attr:`key` self.name = name #: property data type self.datatype = datatype #: True if the property is mandatory self.mandatory = mandatory
[docs]class wsattr(object): """ Complex type attribute definition. Example:: class MyComplexType(wsme.types.Base): optionalvalue = int mandatoryvalue = wsattr(int, mandatory=True) named_value = wsattr(int, name='named.value') After inspection, the non-wsattr attributes will be replaced, and the above class will be equivalent to:: class MyComplexType(wsme.types.Base): optionalvalue = wsattr(int) mandatoryvalue = wsattr(int, mandatory=True) """ def __init__(self, datatype, mandatory=False, name=None, default=Unset, readonly=False): #: The attribute name in the parent python class. #: Set by :func:`inspect_class` self.key = None # will be set by class inspection #: The attribute name on the public of the api. #: Defaults to :attr:`key` self.name = name self._datatype = (datatype,) #: True if the attribute is mandatory self.mandatory = mandatory #: Default value. The attribute will return this instead #: of :data:`Unset` if no value has been set. self.default = default #: If True value cannot be set from json/xml input data self.readonly = readonly self.complextype = None def _get_dataholder(self, instance): dataholder = getattr(instance, '_wsme_dataholder', None) if dataholder is None: dataholder = instance._wsme_DataHolderClass() instance._wsme_dataholder = dataholder return dataholder def __get__(self, instance, owner): if instance is None: return self return getattr( self._get_dataholder(instance), self.key, self.default ) def __set__(self, instance, value): try: value = validate_value(self.datatype, value) except ValueError as e: raise exc.InvalidInput(self.name, value, six.text_type(e)) dataholder = self._get_dataholder(instance) if value is Unset: if hasattr(dataholder, self.key): delattr(dataholder, self.key) else: setattr(dataholder, self.key, value) def __delete__(self, instance): self.__set__(instance, Unset) def _get_datatype(self): if isinstance(self._datatype, tuple): self._datatype = \ self.complextype().__registry__.resolve_type(self._datatype[0]) if isinstance(self._datatype, weakref.ref): return self._datatype() if isinstance(self._datatype, list): return [ item() if isinstance(item, weakref.ref) else item for item in self._datatype ] return self._datatype def _set_datatype(self, datatype): self._datatype = datatype #: attribute data type. Can be either an actual type, #: or a type name, in which case the actual type will be #: determined when needed (generally just before scanning the api). datatype = property(_get_datatype, _set_datatype)
def iswsattr(attr): if inspect.isfunction(attr) or inspect.ismethod(attr): return False if isinstance(attr, property) and not isinstance(attr, wsproperty): return False return True def sort_attributes(class_, attributes): """Sort a class attributes list. 3 mechanisms are attempted : #. Look for a _wsme_attr_order attribute on the class_. This allow to define an arbitrary order of the attributes (useful for generated types). #. Access the object source code to find the declaration order. #. Sort by alphabetically""" if not len(attributes): return attrs = dict((a.key, a) for a in attributes) if hasattr(class_, '_wsme_attr_order'): names_order = class_._wsme_attr_order else: names = attrs.keys() names_order = [] try: lines = [] for cls in inspect.getmro(class_): if cls is object: continue lines[len(lines):] = inspect.getsourcelines(cls)[0] for line in lines: line = line.strip().replace(" ", "") if '=' in line: aname = line[:line.index('=')] if aname in names and aname not in names_order: names_order.append(aname) if len(names_order) < len(names): names_order.extend(( name for name in names if name not in names_order)) assert len(names_order) == len(names) except (TypeError, IOError): names_order = list(names) names_order.sort() attributes[:] = [attrs[name] for name in names_order] def inspect_class(class_): """Extract a list of (name, wsattr|wsproperty) for the given class_""" attributes = [] for name, attr in inspect.getmembers(class_, iswsattr): if name.startswith('_'): continue if inspect.isroutine(attr): continue if isinstance(attr, (wsattr, wsproperty)): attrdef = attr else: if attr not in native_types and ( inspect.isclass(attr) or isinstance(attr, (list, dict))): register_type(attr) attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr) attrdef.key = name if attrdef.name is None: attrdef.name = name attrdef.complextype = weakref.ref(class_) attributes.append(attrdef) setattr(class_, name, attrdef) sort_attributes(class_, attributes) return attributes def list_attributes(class_): """ Returns a list of a complex type attributes. """ if not iscomplex(class_): raise TypeError("%s is not a registered type") return class_._wsme_attributes def make_dataholder(class_): # the slots are computed outside the class scope to avoid # 'attr' to pullute the class namespace, which leads to weird # things if one of the slots is named 'attr'. slots = [attr.key for attr in class_._wsme_attributes] class DataHolder(object): __slots__ = slots DataHolder.__name__ = class_.__name__ + 'DataHolder' return DataHolder class Registry(object): def __init__(self): self._complex_types = [] self.array_types = set() self.dict_types = set() @property def complex_types(self): return [t() for t in self._complex_types if t()] def register(self, class_): """ Make sure a type is registered. It is automatically called by :class:`expose() <wsme.expose>` and :class:`validate() <wsme.validate>`. Unless you want to control when the class inspection is done there is no need to call it. """ if class_ is None or \ class_ in native_types or \ isusertype(class_) or iscomplex(class_) or \ isarray(class_) or isdict(class_): return class_ if isinstance(class_, list): if len(class_) != 1: raise ValueError("Cannot register type %s" % repr(class_)) dt = ArrayType(class_[0]) self.register(dt.item_type) self.array_types.add(dt) return dt if isinstance(class_, dict): if len(class_) != 1: raise ValueError("Cannot register type %s" % repr(class_)) dt = DictType(*list(class_.items())[0]) self.register(dt.value_type) self.dict_types.add(dt) return dt class_._wsme_attributes = None class_._wsme_attributes = inspect_class(class_) class_._wsme_DataHolderClass = make_dataholder(class_) class_.__registry__ = self self._complex_types.append(weakref.ref(class_)) return class_ def reregister(self, class_): """Register a type which may already have been registered. """ self._unregister(class_) return self.register(class_) def _unregister(self, class_): """Remove a previously registered type. """ # Clear the existing attribute reference so it is rebuilt if # the class is registered again later. if hasattr(class_, '_wsme_attributes'): del class_._wsme_attributes # FIXME(dhellmann): This method does not recurse through the # types like register() does. Should it? if isinstance(class_, list): at = ArrayType(class_[0]) try: self.array_types.remove(at) except KeyError: pass elif isinstance(class_, dict): key_type, value_type = list(class_.items())[0] self.dict_types = set( dt for dt in self.dict_types if (dt.key_type, dt.value_type) != (key_type, value_type) ) # We can't use remove() here because the items in # _complex_types are weakref objects pointing to the classes, # so we can't compare with them directly. self._complex_types = [ ct for ct in self._complex_types if ct() is not class_ ] def lookup(self, typename): log.debug('Lookup %s' % typename) modname = None if '.' in typename: modname, typename = typename.rsplit('.', 1) for ct in self._complex_types: ct = ct() if ct is not None and typename == ct.__name__ and ( modname is None or modname == ct.__module__): return ct def resolve_type(self, type_): if isinstance(type_, six.string_types): return self.lookup(type_) if isinstance(type_, list): type_ = ArrayType(type_[0]) if isinstance(type_, dict): type_ = DictType(list(type_.keys())[0], list(type_.values())[0]) if isinstance(type_, ArrayType): type_ = ArrayType(self.resolve_type(type_.item_type)) self.array_types.add(type_) elif isinstance(type_, DictType): type_ = DictType( type_.key_type, self.resolve_type(type_.value_type) ) self.dict_types.add(type_) else: type_ = self.register(type_) return type_ # Default type registry registry = Registry() def register_type(class_): return registry.register(class_) class BaseMeta(type): def __new__(cls, name, bases, dct): if bases and bases[0] is not object and '__registry__' not in dct: dct['__registry__'] = registry return type.__new__(cls, name, bases, dct) def __init__(cls, name, bases, dct): if bases and bases[0] is not object and cls.__registry__: cls.__registry__.register(cls)
[docs]class Base(six.with_metaclass(BaseMeta)): """Base type for complex types""" def __init__(self, **kw): for key, value in kw.items(): if hasattr(self, key): setattr(self, key, value)
class File(Base): """A complex type that represents a file. In the particular case of protocol accepting form encoded data as input, File can be loaded from a form file field. """ #: The file name filename = wsattr(text) #: Mime type of the content contenttype = wsattr(text) def _get_content(self): if self._content is None and self._file: self._content = self._file.read() return self._content def _set_content(self, value): self._content = value self._file = None #: File content content = wsproperty(binary, _get_content, _set_content) def __init__(self, filename=None, file=None, content=None, contenttype=None, fieldstorage=None): self.filename = filename self.contenttype = contenttype self._file = file self._content = content if fieldstorage is not None: if fieldstorage.file: self._file = fieldstorage.file self.filename = fieldstorage.filename self.contenttype = text(fieldstorage.type) else: self._content = fieldstorage.value @property def file(self): if self._file is None and self._content: self._file = six.BytesIO(self._content) return self._file class DynamicBase(Base): """Base type for complex types for which all attributes are not defined when the class is constructed. This class is meant to be used as a base for types that have properties added after the main class is created, such as by loading plugins. """ @classmethod def add_attributes(cls, **attrs): """Add more attributes The arguments should be valid Python attribute names associated with a type for the new attribute. """ for n, t in attrs.items(): setattr(cls, n, t) cls.__registry__.reregister(cls)