Source code for flask.ext.alchemyview

# vim: set fileencoding=utf-8 :

Flask view for SQL-Alchemy declarative models.

:copyright: (c) 2013 by Daniel Holmström.
:licence: MIT, see LICENCE for more information.

from __future__ import absolute_import, division

import re
import os
import json
import datetime
import decimal
import logging
import traceback
import colander
from sqlalchemy.exc import IntegrityError
from flask import (Response,
from flask.ext.classy import FlaskView
from werkzeug.exceptions import HTTPException
from jinja2.exceptions import TemplateNotFound

def _gettext(msg, *args, **kwargs):
    """Dummy translation method used if Flask-Babel isn't installed

    :returns: Formatted string
    return re.sub(r'%\(([a-z0-9_]+)\)', r'{\1}', msg).format(*args,

    from flask.ext.babel import gettext
    _ = gettext
except ImportError:
    _ = _gettext

_logger = logging.getLogger('flask.ext.alchemyview')
"""The logger that is used. It uses the 'flask.ext.alchemyview' name."""

def _remove_colander_null(result):
    """Removes colaner.null values from a dict or list

    Works recursively.

    :param result: dict or list of values
    :raises: Exception if `result` isn't a dict or list

    :returns: a copy of result with all colander.null values removed
    if isinstance(result, dict):
        rc = {}
        for (k, v) in result.iteritems():
            if isinstance(v, dict) or isinstance(v, list):
                rc[k] = _remove_colander_null(v)
                if v is not colander.null:
                    rc[k] = v
        return rc
    elif isinstance(result, list):
        return [v for v in result if v is not colander.null]
        raise Exception("Argument 'result' is not dict or list(%r)" %

def _exception_to_dict(error):
    """Get a dict from an Exception

    Currently the following exceptions are supported:
        * :class:`sqlalchemy.exc.IntegrityError`
        * :class:`colander.Invalid`

    :param error: An Exception

    :returns: Dict with errors. Contains 'message' and 'errors', \
            where 'errors' is a dict containing {'key': unicode message}
    if isinstance(error, IntegrityError):
        m ='(Key) \((\w+)\)=\(([^)]*)\) already exists',
        if m:
            return {u'message': _(u"'%(key)' already exists",,
                    'errors': { _(u'Already exists')}}
    elif isinstance(error, colander.Invalid):
        return {u'errors': error.asdict(),
                u"message": _("Invalid Data")}

                  'Got unhandled error: %r:%s\nTraceback: %s' %
                  (error, str(error),
    return {u'message': _(u'Unknown error'), u'errors': {}}

class _JSONEncoder(json.JSONEncoder):
    """JSON Encoder class that handles conversion for a number of types not
    supported by the default json library

    - datetime.* objects will be converted with their isoformat() function.
    - Decimal will be converted to a unicode string
    - Any object with an asdict() method will be converted to a dict that is

    :returns: object that can be converted to json

    def default(self, obj):
        if isinstance(obj, (datetime.datetime,, datetime.time)):
            return obj.isoformat()
        elif isinstance(obj, (decimal.Decimal)):
            return unicode(obj)
        elif hasattr(obj, 'asdict') and callable(getattr(obj, 'asdict')):
            return obj.asdict()
            return json.JSONEncoder.default(self, obj)

[docs]class BadRequest(HTTPException): """HTTPException class that also contains error data This will be raised when a request is invalid, use `Flask.errorhandler()` to handle these for HTML responses. :ivar data: Dict explaining the error, contains the keys 'message' and \ 'errors' """ def __init__(self, code, data): """Create a BadRequest If data is an exception it will be converted to a dict, otherwise a dict is assumed. The description will be set to data['message'] :param code: HTTP Status code :param data: Dict or Exception """ if isinstance(data, Exception): = _exception_to_dict(data) else: = (data[u'message'] if u'message' in data else _(u'Unknown error')) = data self.code = code super(BadRequest, self).__init__([u'message'])
[docs]class AlchemyView(FlaskView): """View for SQLAlchemy dictable models The pre-defined methods will always return JSON, with the mimetype set to application/json. """ JSONEncoder = _JSONEncoder """The JSON Encoder that should be used to load/dump json""" session = None """The SQLAlchemy session If not set the session will be taken from the Flask-SQLAlchemy extension. If that is missing the view will not work. """ model = None """SQLAlchemy declarative model""" schema = None """Colander schema that will be used as both update_schema and create_schema if they aren't set""" update_schema = None """Update schema""" create_schema = None """Create schema""" dict_params = None """Will be used instead of :attr:`AlchemyView.asdict_params` and :attr:`AlchemyView.from_params` if they're not set""" asdict_params = None """Parameters that will be used when getting an item The parameters will be used in :meth:`dictalchemy.utils.asdict`. """ fromdict_params = None """Parameters that will be used when updating an item The parameters will be used in :meth:`dictalchemy.utils.fromdict`. """ max_page_limit = 50 """Max page limit""" page_limit = 10 """Default page limit""" sortby = None """Default sortby column If not set no sortby will be applied by default in :func:`AlchemyView.index`. In order for sortby to have any effect it also needs to be set in :attr:`AlchemyView.sortby_map` """ sort_direction = 'asc' """Default sort direction""" sortby_map = None """Map of string=>column for sortby The values can be anything that will work together with the query returned by :meth:`AlchemyView._base_query`. So if there is a join in the base query that column, or name of that colum can be mapped to a key in the sortby_map. """ template_suffixes = {'text/html': 'jinja2'} """Suffixes for response types, currently 'text/html' is the only one supported"""
[docs] def _json_dumps(self, obj, ensure_ascii=False, **kwargs): """Load object from json string Uses :attr:`AlchemyView.JSONEncoder` to dump the data. :param obj: Object that should be dumped :returns: JSON string """ kwargs['ensure_ascii'] = ensure_ascii kwargs['cls'] = self.JSONEncoder return json.dumps(obj, **kwargs)
[docs] def _json_loads(self, string, **kwargs): """Load json""" return json.loads(string, **kwargs)
[docs] def _json_response(self, obj, status=200): """Get a json response :param obj: Exception OR something that can used by :meth:`AlchemyView._json_dumps` If this is an exception the status will be set to 400 if status is less than 400. """ if isinstance(obj, Exception): if status < 400: status = 400 obj = _exception_to_dict(obj) return Response(self._json_dumps(obj), status=status, mimetype='application/json')
[docs] def _base_query(self): """Get the base query that should be used For example add joins here. Default implementation returns `self._get_session().query(self.model)`. :returns: Sqlalchemy query """ return self._get_session().query(self.model)
[docs] def _item_url(self, item): """Get the url to read an item :raises: Exception if the items primary key is a composite key """ if len(self.model.__table__.primary_key) != 1: raise Exception("AlchemyView doesn't handle models with " "composite primary key") primary_key = [(, column.type.python_type) for column in self.model.__table__.primary_key] primary_key_name = primary_key[0][0] return url_for(self.build_route_name('get'), id=getattr(item, primary_key_name))
[docs] def _get_item(self, id): """Get item based on id Can handle models with int and string primary keys. :raises: Exception if the primary key is a composite or not \ int or string :returns: An item, calls flask.abort(400) if the item isn't found """ primary_key = [(, column.type.python_type) for column in self.model.__table__.primary_key] if len(primary_key) != 1: raise Exception("AlchemyView doesn't handle models with " "composite primary key") primary_key_type = primary_key[0][1] primary_key_name = primary_key[0][0] if primary_key_type not in (int, str, unicode): raise Exception("AlchemyView can only handle int and string " "primary keys not %r" % primary_key_type) try: if type(id) != primary_key_type: id = primary_key_type(id) except: abort(404) item = self._base_query().filter( getattr(self.model, primary_key_name) == id).limit(1).first() if not item: abort(404) return item
[docs] def _get_session(self): """Get SQLAlchemy session :raises: An exception if self.session isn't set and \ Flask-SQLAlchemy isn't used :returns: SQLAlchemy session """ return self.session or current_app.extensions['sqlalchemy'].db.session
[docs] def _get_schema(self, data): """Get basic colander schema This schema is used if create_schema or update_schema isn't set. :returns: Colander schema for create or update schema """ return self.schema()
[docs] def _get_create_schema(self, data): """Get colander schema for create :returns: Colander schema for create data """ if getattr(self, 'create_schema', None): return self.create_schema() else: return self._get_schema(data)
[docs] def _get_update_schema(self, data): """Get colander update schema :returns: Colander schema for create data """ if getattr(self, 'update_schema', None): return self.update_schema() else: return self._get_schema(data)
[docs] def _get_response_mimetype(self): """Get response type from response :returns: 'application/json' or 'text/html' """ best = request.accept_mimetypes.best_match(['application/json', 'text/html']) if best == 'application/json' and \ request.accept_mimetypes[best] > \ request.accept_mimetypes['text/html']: return 'application/json' else: return 'text/html'
[docs] def _get_template_name(self, name, mimetype): """Get template name for a specific mimetype The template name is by default get_route_base()/name.suffix, the suffixes are stored in :attr:`AlchemyView.template_suffixes`. :returns: string """ return os.path.join(self.get_route_base(), '%s.%s' % (name, self.template_suffixes[mimetype]))
[docs] def _response(self, data, template, status=200): """Get a response If the response will be rendered with a template and the method '_TEMPLATE_template_vars' is set that method will be called and the returnvalue will be added to the template parameters. :raises: If status is beteen 400 and 500 OR data is an exception a \ :class:`BadRequest` will be raised. :returns: A json or html response, based on the request accept headers """ mimetype = self._get_response_mimetype() if mimetype == 'application/json': return self._json_response(data, status) else: if isinstance(data, Exception): if status < 400: status = 400 if status >= 400: raise BadRequest(status, data) else: fn_name = 'before_%s_render' % template if hasattr(self, fn_name) and callable(getattr(self, fn_name)): kwargs = getattr(self, fn_name)(data) or {} else: kwargs = {} try: return render_template(self._get_template_name(template, mimetype), data=data, **kwargs) except TemplateNotFound: raise BadRequest(406, {'message': _('Not a valid Accept-Header')})
[docs] def get(self, id): """Handles GET requests""" return self._response(self._get_item(id). asdict(**(getattr(self, 'asdict_params', self.dict_params or None) or {})), 'get')
[docs] def post(self): """Handles POST This method will create a model with request data if the data was valid. It validates the data with :meth:`AlchemyView._get_create_schema`. If everything was successful it will return a 303 redirect to the newly created item. If any error except validation errors are encountered a 500 will be returned. :returns: A response :rtype: :class:`flask.Response` """ session = self._get_session() try: result = _remove_colander_null(self._get_create_schema( request.json).deserialize(request.json)) except Exception, e: session.rollback() return self._response(e, 'post', 400) else: try: item = self.model(**result) session.add(item) except Exception, e: session.rollback() return self._response(e, 'post', 500) else: try: session.commit() except: return self._response(e, 'post', 500) return redirect(self._item_url(item), 303)
[docs] def put(self, id): """Handles PUT If any error except validation errors are encountered a 500 will be returned. """ item = self._get_item(id) session = self._get_session() try: result = _remove_colander_null(self._get_update_schema( request.json).deserialize(request.json)) item.fromdict(result, **(getattr(self, 'fromdict_params', self.dict_params or None) or {})) session.add(item) session.commit() except colander.Invalid, e: return self._response(e, 'put', 400) except Exception, e: return self._response(e, 'put', 500) else: return redirect(self._item_url(item), 303)
[docs] def _delete(self, id): """Delete an item""" item = self._get_item(id) session = self._get_session() session.delete(item) try: session.commit() except Exception, e: return self._response(e, 'delete', 400) # TODO: What should a delete return? return self._response({}, 'delete', 200)
delete = _delete """Delete an item This is just an alias for :meth:`AlchemyView._delete`. """
[docs] def index(self): """Returns a list The response look like this:: items: [...] count: Integer limit: Integer offset: Integer """ try: limit = min(int(request.args.get('limit', self.page_limit)), self.max_page_limit) except: return self._response({u'message': _(u'Invalid limit')}, 'index', 400) if limit > 100: limit = 10 try: offset = int(request.args.get('offset', 0)) except: return self._response({u'message': _(u'Invalid offset')}, 'index', 400) try: sortby = request.args.get('sortby', None) if sortby: sortby = str(sortby) except: return self._response({u'message': _(u'Invalid sortby')}, 'index', 400) try: direction = str(request.args.get('direction', self.sort_direction)) except: return self._response({u'message': _(u'Invalid direction')}, 'index', 400) if direction not in ('asc', 'desc'): return self._response({u'message': _(u'Invalid direction')}, 'index', 400) query = self._base_query() # Add sortby if sortby and self.sortby_map and sortby in self.sortby_map: query = query.order_by(getattr(self.sortby_map[sortby], direction)()) return self._response({ 'items': [p.asdict(**(getattr(self, 'asdict_params', self.dict_params or None) or {})) for p in query.limit(limit).offset(offset).all()], 'count': query.count(), 'limit': limit, 'offset': offset}, 'index')

