Source code for ophelia.wsgi

# Copyright (c) 2007-2012 Thomas Lotze
# See also LICENSE.txt

"""A WSGI application running Ophelia, and an optional wsgiref server running
this application.

import ConfigParser
import logging
import ophelia.request
import ophelia.util
import os.path
import sys
import wsgiref.simple_server
import xsendfile
import zope.exceptions.exceptionformatter

logger = logging.getLogger('ophelia')

[docs]class Request(ophelia.request.Request): @ophelia.request.push_request def __call__(self): try: return super(Request, self).__call__() except ophelia.request.NotFound: env = self.env document_root = env.get('document_root') if not document_root: raise path = env['PATH_INFO'] parts = path.split('/') index_name = env.get('index_name', 'index.html') if (env.get('redirect_index') and parts[-1] == index_name): raise ophelia.request.Redirect(path=path[:-len(index_name)]) fs_path = os.path.join(document_root, *parts) if os.path.isdir(fs_path): if not path.endswith('/'): raise ophelia.request.Redirect(path=path + '/') path = '%s/%s' % (path.rstrip('/'), index_name) raise ophelia.request.NotFound(path)
[docs]class Application(object): """Ophelia's WSGI application. """ def __init__(self, options=None): self.options = options or {} @classmethod
[docs] def paste_app_factory(cls, global_conf, **local_conf): options = global_conf.copy() options.update(local_conf) options = dict((key.replace('-', '_'), value) for key, value in options.items()) return cls(options)
def __call__(self, env, start_response): env = ophelia.util.Namespace(self.options, **env) path = env["PATH_INFO"].lstrip('/') context = env.get("ophelia.context", {}) request = Request( path, env.pop("template_root"), env.pop("site"), **env) response_headers = {"Content-Type": "text/html"} body = None exc_info = None try: try: response_headers, body = request(**context) except ophelia.request.NotFound, e: env['PATH_INFO'] = e.args[0] return self.sendfile(env, start_response) except ophelia.request.Redirect, e: status = "301 Moved permanently" text = ('The resource you were trying to access ' 'has moved permanently to <a href="%(uri)s">%(uri)s</a>' % dict(uri=e.uri)) response_headers["location"] = e.uri except Exception, e: status = "500 Internal server error" exc_info = sys.exc_info() msg = "".join(zope.exceptions.exceptionformatter.format_exception( with_filenames=True, *exc_info)) if isinstance(msg, unicode): msg = msg.encode('utf-8') logger.error(msg) if boolean(env.get('debug', False)): text = '<pre>\n%s\n</pre>' % msg else: text = 'Something went wrong with the server software.' else: status = "200 OK" response_headers = [(key, str(value)) for key, value in response_headers.iteritems()] start_response(status, response_headers, exc_info) if env["REQUEST_METHOD"] == "GET": if body is None: body = self.error_body % {"status": status, "text": text} return [body] else: return []
[docs] def sendfile(self, env, start_response): xsendfile_app = xsendfile.XSendfileApplication( env['document_root'], env.get('xsendfile', 'serve')) return xsendfile_app(env, start_response)
error_body = """\ <html> <head> <title>%(status)s</title> </head> <body> <h1>%(status)s</h1> <p> %(text)s </p> </body> </html> """.replace(" ", "")
[docs]def boolean(value): if isinstance(value, basestring): return value.lower() in ("on", "true", "yes") else: return bool(value)
[docs]def paste_app_factory(global_conf, **local_conf): options = global_conf.copy() options.update(local_conf) options = dict((key.replace('-', '_'), value) for key, value in options.items()) return Application(options)
[docs]def wsgiref_server(): config_file = sys.argv[1] config = ConfigParser.ConfigParser() options = dict((key.replace('-', '_'), value) for key, value in config.items('DEFAULT')) configured_app = Application(options) httpd = wsgiref.simple_server.make_server( options.pop("host"), int(options.pop("port")), configured_app) httpd.serve_forever()