Source code for findig.tools.validator

"""
The :mod:`findig.tools.validator` module exposes the :class:`Validator`
which can be used to validate an application or request's input data.

Validators work by specifying a converter for each field in the 
input data to be validated::

    validator = Validator(app)
    
    @validator.enforce(id=int)
    @app.route("/test")
    def resource():
        pass

    @resource.model("write")
    def write_resource(data):
        assert isinstance(data['id'], int)

If the converter fails to convert the field's value, then a 
``400 BAD REQUEST`` error is sent back.

Converters don't have to be functions; they can be a singleton list
containing another converter, indicating that the field is expected to
be a list of items for which that converter works::

    @validator.enforce(ids=[int])
    @app.route("/test2")
    def resource2():
        pass

    @resource2.model("write")
    def write_resource(data):
        for id in data['ids']:
            assert isinstance(id, int)

Converters can also be string specifications corresponding to a 
pre-registered converter and its arguments. All of
werkzeug's 
`builtin converters and their arguments`__ 
and their arguments are pre-registered and thus usable::

    @validator.enforce(foo='any(bar,baz)', cid='string(length=3)')
    @app.route("/test3")
    def resource3():
        pass

    @resource3.model("write")
    def write_resource(data):
        assert data['foo'] in ('bar', 'baz')
        assert len(data['cid']) == 3

__ http://werkzeug.pocoo.org/docs/routing/#builtin-converters

"""

from collections import namedtuple
from collections.abc import Callable, Mapping, Sequence
from datetime import datetime
from functools import partial
from inspect import getmembers
import re

from werkzeug.datastructures import MultiDict
from werkzeug.exceptions import BadRequest
from werkzeug.routing import parse_converter_args, BaseConverter

from findig.context import ctx
from findig.resource import AbstractResource, Collection
from findig.utils import DataPipe, tryeach


_converter_re = re.compile(r'''
        (?P<name>[a-zA-Z_][a-zA-Z0-9_]*)   # converter name
        (?:\((?P<args>.*?)\))?                  # converter args
''', re.VERBOSE | re.UNICODE)

class converter_spec(namedtuple('converter_spec', 'name args')):
    def __repr__(self):
        if self.args is None:
            return repr(self.name)
        else:
            return repr("{}({})".format(self.name, self.args))

[docs]class Validator: """ A higher-level tool to be used to validate request input data. :param app: The Findig application that the validator is attached to. :type app: :class:`findig.App` :param include_collections: If ``True``, any validation rules set on any resource will also be used for any :class:`~findig.resource.Collection` that collects it. Even when this argument is set, inherited rules can still be overridden by declaring rules specifically for the collection. Validators are only capable of validating request input data (i.e., data received as part of the request body). To validate URL fragments, consider using *converters* in your URL rules. See `werkzeug's routing reference <http://werkzeug.pocoo.org/docs/0.10/routing/#rule-format>`_. Validators work by specifying converters for request input fields. If a converter is specified, the validator will use it to convert the field and replace it with the converted value. See :meth:`enforce` for more about converters. """ def __init__(self, app=None, include_collections=True): self.include_collections = include_collections self.validation_specs = {} self.restriction_specs = {} self.strip_extras = {} if app is not None: self.attach_to(app)
[docs] def attach_to(self, app): """ Hook the validator into a Findig application. Doing so allows the validator to inspect and replace incoming input data. This is called automatically for an app passed to the validator's constructor, but can be called for additional app instances. This function should only be called once per application. :param app: The Findig application that the validator is attached to. :type app: :class:`findig.App` """ # Hook the validator into the dispatcher's data pre processor # so that we can look at incoming request data and complain # if the request data doesn't match what we're looking for app.pre_processor = DataPipe(app.pre_processor, self.validate) app.startup_hook(partial(self.__prepare_converters, app))
@staticmethod
[docs] def regex(pattern, flags=0, template=None): r""" Create a function that validates strings against a regular expression. :: >>> func = Validator.regex("boy") >>> func("boy") 'boy' >>> func("That boy") Traceback (most recent call last): ... ValueError: That boy >>> func("boy, that's handy.") Traceback (most recent call last): ... ValueError: boy, that's handy. If you supply a template, it is used to construct a return value by doing backslash substitution:: >>> func = Validator.regex("(male|female)", template=r"Gender: \1") >>> func("male") 'Gender: male' >>> func("alien") Traceback (most recent call last): ... ValueError: alien """ regexp = re.compile(pattern, flags) def match_string(s): m = regexp.fullmatch(s) if m is None: raise ValueError(s) elif template is not None: return m.expand(template) else: return m.string return match_string
@staticmethod
[docs] def date(format, *alternatives): """ date(format[, format[, ...]]) Create a function that validates a date field. :param format: A date/time format according to :meth:`datetime.datetime.strptime`. If more than one formats are passed in, the generated function will try each format in order until one of them works on the field (or until there are no formats left to try). Example:: >>> func = Validator.date("%Y-%m-%d %H:%M:%S%z") >>> func("2015-07-17 09:00:00+0400") datetime.datetime(2015, 7, 17, 9, 0, tzinfo=datetime.timezone(datetime.timedelta(0, 14400))) >>> func("not-a-date") Traceback (most recent call last): ... ValueError: time data 'not-a-date' does not match format '%Y-%m-%d %H:%M:%S%z' >>> func = Validator.date("%Y-%m-%d %H:%M:%S%z", "%Y-%m-%d") >>> func("2015-07-17") datetime.datetime(2015, 7, 17, 0, 0) """ formats = [format]; formats.extend(alternatives) funcs = [lambda s: datetime.strptime(s, fmt) for fmt in formats] return partial(tryeach, funcs)
[docs] def restrict(self, *args, strip_extra=False): """ restrict([field[, field[, ...]],] strip_extra=True) Restrict the input data to the given fields :param field: A field name that should be allowed. An asterisk at the start of the field name indicates a required field (asterisks at the start of field names can be escaped with another asterisk character). This parameter can be used multiple times to indicate different fields. :param strip_extra: Controls the behavior upon encountering a field not contained in the list, during validation. If ``True``, the field will be removed. Otherwise, a :class:`UnexpectedFields` is raised. Once this method is called, any field names that do not appear in the list are disallowed. """ FIELD_REGEX = re.compile(r"^(\**)\1(\*?)") def conv_field(field_name): """conv_field("*field") -> ("field", FIELD-IS-REQUIRED)""" m = FIELD_REGEX.match(field_name) if m: return (FIELD_REGEX.sub(r"\1", field_name), m.group(2) == "*") else: return field_name, False def restrict_to(resource, fields=None): fields = list(map(conv_field, [] if fields is None else fields)) self.restriction_specs.setdefault(resource.name, {}).update(fields) self.strip_extras[resource.name] = strip_extra return resource if len(args) == 0: return restrict_to elif isinstance(args[0], AbstractResource): return restrict_to(args[0], args[1:]) else: return partial(restrict_to, fields=args)
[docs] def enforce(self, *args, **validator_spec): """ enforce(resource, **validation_spec) Register a validation specification for a resource. The validation specification is a set of ``field=converter`` arguments linking an input field name to a converter that should be used to validate the field. A converter can be any of the following: * :class:`collections.abc.Callable` (including functions) -- This can be a simple type such as :class:`int` or :class:`uuid.UUID`, but any function or callable can work. It should take a field value and convert it to a value of the desired type. If it throws an error, then findig will raise a :class:`~werkzeug.exceptions.BadRequest` exception. Example:: # Converts an int from a valid string base 10 representation: validator.enforce(resource, game_id=int) # Converts to a float from a valid string validator.enforce(resource, duration=float) * :class:`str` -- If a string is given, then it is interpreted as a converter specification. A converter specification includes the converter name and optionally arguments for pre-registered converters. The following converters are pre-registered by default (you may notice that they correspond to the URL rule converters available for werkzeug): .. function:: string(minlength=1, length=None, maxlength=None) :noindex: This converter will accept a string. :param length: If given, it will indicate a fixed length field. :param minlength: The minimum allowed length for the field. :param maxlength: The maximum allowed length for the field. .. function:: any(*items) :noindex: This converter will accept only values from the variable list of options passed as the converter arguments. It's useful for limiting a field's value to a small set of possible options. .. function:: int(fixed_digits=0, min=None, max=None) :noindex: This converter will accept a string representation of a non-negative integer. :param fixed_digits: The number of fixed digits in the field. For example, set this to **3** to convert ``'001'`` but not ``'1'``. The default is a variable number of digits. :param min: The minimum allowed value for the field. :param max: The maximum allowed value for the field. .. function:: float(min=None, max=None) :noindex: This converter will accept a string representation of a non-negative floating point number. :param min: The minimum allowed value for the field. :param max: The maximum allowed value for the field. .. function:: uuid() :noindex: This converter will accept a string representation of a uuid and convert it to a :class:`uuid.UUID`. Converters that do not need arguments can omit the parentheses in the converter specification. Examples:: # Converts a 4 character string validator.enforce(resource, student_id='string(length=10)') # Converts any of these string values: 'foo', 1000, True validator.enforce(resource, field='any(foo, 1000, True)') # Converts any non-negative integer validator.enforce(resource, game_id='int') # and any float <1000 validator.enforce(resource, duration='float(max=1000)') .. important:: Converter specifications in this form **cannot** match strings that contain forward slashes. For example, *'string(length=2)'* will fail to match *'/e'* and *'any(application/json,html)'* will fail to match *'application/json'*. * or, :class:`list` -- This must be a singleton list containing a converter. When this is given, the validator will treat the field like a list and use the converter to convert each item. Example:: # Converts a list of integers validator.enforce(resource, games=[int]) # Converts a list of uuids validator.enforce(resource, components=['uuid']) # Converts a list of fixed length strings validator.enforce(resource, students=['string(length=10)']) This method can be used as a decorator factory for resources:: @validator.enforce(uid=int, friends=[int]) @app.route("/") def res(): return {} Converter specifications given here are only checked when a field is present; see :meth:`restrict` for specifying required fields. .. warning:: Because of the way validators are hooked up, registering new specifications after the first request has run might cause unexpected behavior (and even internal server errors). """ def decorator(resource): self.__register_spec(resource.name, validator_spec) return resource if len(args) == 0: return decorator elif len(args) == 1: return decorator(args[0]) else: raise TypeError
[docs] def enforce_all(self, **validator_spec): """ enforce_all(**validation_spec) Register a global validation specification. This function works like :meth:`enforce`, except that the validation specification is registered for all resources instead of a single one. Global validation specifications have lower precedence than resource specific ones. """ self.__register_spec(None, validator_spec)
def __register_spec(self, key, spec): def validate_item_spec(item_spec): if isinstance(item_spec, str): m = _converter_re.fullmatch(item_spec) if m is not None: return converter_spec(m.group('name'), m.group('args')) else: return False elif isinstance(item_spec, list): if len(item_spec) != 1: return False else: return [validate_item_spec(item_spec[0])] elif not isinstance(item_spec, Callable): return False else: return item_spec for field, item_spec in spec.items(): new_item_spec = validate_item_spec(item_spec) if item_spec is False: raise InvalidSpecificationError(item_spec) else: spec[field] = new_item_spec else: self.validation_specs.setdefault(key, {}) self.validation_specs[key].update(spec) def __prepare_converters(self, app): def fix_spec(item_spec): if isinstance(item_spec, converter_spec): cname, args = item_spec args, kwargs = ((), {}) if args is None \ else parse_converter_args(args) if cname not in app.url_map.converters: return None ccls = app.url_map.converters[cname] converter = ccls(app.url_map, *args, **kwargs) compiled_re = re.compile(converter.regex) return compiled_re, converter elif isinstance(item_spec, list): return [fix_spec(item_spec[0])] else: return item_spec for resource in self.validation_specs: for field, spec in self.validation_specs[resource].items(): new_spec = fix_spec(spec) if new_spec is not None: self.validation_specs[resource][field] = new_spec else: raise InvalidSpecificationError( "\"{}={!r}\"".format(field, spec) ) def __check_item(self, data, key, item_spec): # '89', int -> pass # ['58', '84', '58'], [int] -> pass # ['89', 'foo', '59'], [int] -> fail # ['89', 'foo', '59'], [str] -> pass if isinstance(item_spec, Callable): # Easiest case: call the callable on the item data # to get the converted answer: return item_spec(data[key]) elif isinstance(item_spec, list): unconverted_items = _ContainerWrapper(data.getlist(key)) converted_items = [] child_spec = item_spec[0] for i, _ in enumerate(unconverted_items): converted_items.append( self.__check_item(unconverted_items, i, child_spec) ) return converted_items elif isinstance(item_spec, tuple): regexp, converter = item_spec val = data[key] if not regexp.fullmatch(val): raise ValueError(val) else: return converter.to_python(val) else: raise InvalidSpecificationError(item_spec) def __handle_restrictions(self, data): strip_extras = self.strip_extras.get(ctx.resource.name, False) restrictions = self.restriction_specs.get(ctx.resource.name, None) if restrictions is None \ and self.include_collections \ and isinstance(ctx.resource, Collection): restrictions = self.restriction_specs.get( ctx.resource.collects.resource.name, {} ) strip_extras = self.strip_extras.get( ctx.resource.collects.resource.name, False ) if restrictions is not None: # Handle extra fields extras = [field for field in data if field not in restrictions] if extras and strip_extras: for field in extras: del data[field] elif extras: raise UnexpectedFields(extras, self) # Check for required fields missing = [field for field,required in restrictions.items() if required and field not in data] if missing: raise MissingFields(missing, self) return data
[docs] def validate(self, data): """ Validate the data with the validation specifications that have been collected. This function must be called within an active request context in order to work. :param data: Input data :type data: mapping, or object with gettable/settable fields :raises: :class:`ValidationFailed` if one or more fields could not be validated. **This is an internal method.** """ spec = {} spec.update(self.validation_specs.get(None, {})) if self.include_collections and isinstance(ctx.resource, Collection): spec.update( self.validation_specs.get( ctx.resource.collects.resource.name, {} ) ) spec.update(self.validation_specs.get(ctx.resource.name, {})) wrapped = self.__handle_restrictions(_ContainerWrapper(data)) conversion_errs = [] # Transform the data according to the conversion spec for field, field_spec in spec.items(): # Ignore the field if it isn't in the specification if field not in wrapped: continue try: converted = self.__check_item(wrapped, field, field_spec) except InvalidSpecificationError: raise except: import traceback traceback.print_exc() conversion_errs.append(field) else: wrapped[field] = converted if conversion_errs: raise InvalidFields(conversion_errs, self) else: return wrapped.unwrap()
class _ContainerWrapper: def __init__(self, container): self._direct = False self._orig = container self._is_multidict = False if isinstance(container, Sequence): self._c = list(container) elif isinstance(container, MultiDict): self._is_multidict = True self._list_fields = set() self._c = container.copy() elif isinstance(container, Mapping): self._c = dict(container) else: self._c = container self._direct = True def __getitem__(self, key): if self._direct: return getattr(self._c, key) else: return self._c[key] def getlist(self, key): if self._is_multidict: l = self._c.getlist(key) self._list_fields.add(key) return l else: l = self[key] if not isinstance(l, Sequence): raise ValueError(l) else: return l def __setitem__(self, key, value): if self._direct: setattr(self._c, key, value) else: self._c[key] = value def __delitem__(self, key): if self._direct: delattr(self._c, key) else: del self._c[key] def __contains__(self, key): if self._direct: return hasattr(self._c, key) else: return key in self._c def unwrap(self): if isinstance(self._c, MultiDict): for field in self._list_fields: items = self._c.get(field, []) self._c.setlist(field, items) return self._c def __iter__(self): if self._direct: yield from getmembers(self._c) else: yield from self._c class InvalidSpecificationError(ValueError): """Raised when an invalid specification is used."""
[docs]class ValidationFailed(BadRequest): """ Raised whenever a :class:`Validator` fails to validate one or more fields. This exception is a subclass of :class:`werkzeug.exceptions.BadRequest`, so if allowed to bubble up, findig will send a ``400 BAD REQUEST`` response automatically. Applications can, however, customize the way this exception is handled:: from werkzeug.wrappers import Response # This assumes that the app was not supplied a custom error_handler # function as an argument. # If a custom error_handler function is being used, then # do a test for this exception type inside the function body # and replicate the logic @app.error_handler.register(ValidationFailed) def on_validation_failed(e): # Construct a response based on the error received msg = "Failed to convert input data for the following fields: " msg += str(e.fields) return Response(msg, status=e.status) """ def __init__(self, fields, validator): super().__init__() #: A list of field names for which validation has failed. This will #: always be a complete list of failed fields. self.fields = fields #: The :class:`Validator` instance that raised the exception. self.validator = validator
[docs]class UnexpectedFields(ValidationFailed): """ Raised whenever a resource receives an unexpected input field. """
[docs]class MissingFields(ValidationFailed): """ Raised when a resource does not receive a required field in its input. """
[docs]class InvalidFields(ValidationFailed): """ Raised when a resource receives a field that the validator can't convert. """
__all__ = ['Validator', 'InvalidSpecificationError', 'ValidationFailed', 'UnexpectedFields', 'MissingFields', 'InvalidFields']