Source code for ginsfsm.protocols.http.server.c_http_clisrv

# -*- encoding: utf-8 -*-
"""
GObj :class:`GHttpCliSrv`
=========================

Http Channel.

.. autoclass:: GHttpCliSrv
    :members:

"""
from collections import deque
import traceback

from ginsfsm.c_timer import GTimer
from ginsfsm.gobj import GObj
from ginsfsm.gconfig import GConfig
from ginsfsm.protocols.http.common.parser import HTTPRequestParser
from ginsfsm.protocols.http.common.utilities import InternalServerError
from ginsfsm.protocols.http.common.response import (
    HttpResponse,
    HttpErrorResponse,
)


class ResponseInterrupt(Exception):
    """ To use when the response is asynchronous
        or infinite until top level wants to finish it.
    """


def ac_rx_data(self, event):
    """ Receiving data from the partner clisrv gsock.
        Can be one or more requests.
    """
    self.start_inactivity_timer()
    #gsock = event.source[-1]
    gsock = self.gsock  # must be same as event.source[-1]
    data = event.data
    if not data:
        return
    new_request = self.parsing_request
    while data:
        if new_request is None:
            new_request = HTTPRequestParser(self)
        n = new_request.received(data)
        if new_request.expect_continue and new_request.headers_finished:
            # guaranteed by parser to be a 1.1 new_request
            new_request.expect_continue = False

            if not self.sent_continue:
                self.send_event(
                    gsock,
                    'EV_SEND_DATA',
                    data=b'HTTP/1.1 100 Continue\r\n\r\n'
                )
                self.sent_continue = True
                new_request.completed = False

        if new_request.completed:
            # The new_request (with the body) is ready to use.
            self.parsing_request = None
            if not new_request.empty:
                self.enqueue_request(new_request)
            new_request = None
        else:
            self.parsing_request = new_request
        if n >= len(data):
            break
        data = data[n:]

    if len(self.dl_requests):
        self.send_event(self, 'EV_DEQUEUE_REQUEST')


def ac_dequeue_request(self, event):
    if self.responding_request:
        self.logger.exception('Internal ERROR!!!: '
                              'responding_request MUST be None')
    if len(self.dl_requests):
        self.responding_request = self.dl_requests.popleft()
        self.send_event(self, 'EV_HTTP_REQUEST')
    else:
        self.start_inactivity_timer()


def ac_http_request(self, event):
    """ Internal event.
        The request is saved in self.responding_request.
    """
    self.stop_inactivity_timer()

    if self.responding_request.error:
        response = HttpErrorResponse(self.responding_request)
        response.execute()
        self.finish(response)
        return

    self.start_responseless_timer()
    self.set_new_state('ST_WAIT_RESPONSE')
    # TODO: in stratus environment, we need to inform of who srvcli is.
    self.broadcast_event(
        'EV_HTTP_REQUEST',
        request=self.responding_request,
        channel=self,
    )


def ac_http_response(self, event):
    response = event.response
    if not isinstance(response, HttpResponse):
        self.logger and self.logger.error(
            "ERROR response doesn't mach HttpResponse %s" %
            response.request.path)

    if response.request != self.responding_request:
        self.logger and self.logger.error(
            "ERROR response doesn't mach responding request %s" %
            response.request.path)

    if self.responding_response:
        self.logger and self.logger.error(
            "ERROR responding_response is BUSY, of %s" %
            response.request.path)
    self.responding_response = response

    response.start()
    try:
        response.execute()
    except ResponseInterrupt:
        """ Response is asynchronous or infinite.
            Don't clear the current responding_request.
        """
        self.stop_responseless_timer()  # TODO: do some ping-alive
        return

    except:
        self.logger and self.logger.exception(
            'Exception when serving %s' % response.request.path)
        if not response.wrote_header:
            if self.parent.config.expose_tracebacks:
                body = traceback.format_exc()
            else:
                body = ('The server encountered an unexpected '
                        'internal server error')

            request = HTTPRequestParser(self)
            request.error = InternalServerError(body)
            response = HttpErrorResponse(request)
            response.execute()
        else:
            response.close_on_finish = True

    self.finish(response)


def ac_transmit_ready(self, event):
    pass


def ac_inactivity_timeout(self, event):
    """ Close the channel by inactivity.
    """
    if self.gsock:
        self.send_event(self.gsock, 'EV_DROP')


def ac_responseless_timeout(self, event):
    """ Close the channel by responseless of top level.
    """
    if not self.gsock:
        return

    body = (
        'Response Timeout. The server is busy. '
        'Please re-try your request in a few moments.'
    )
    request = HTTPRequestParser(self)
    request.error = InternalServerError(body)
    response = HttpErrorResponse(request)
    response.execute()
    self.finish(response)


def ac_disconnected(self, event):
    """ Gsock closed.
    """
    self.broadcast_event('EV_HTTP_CHANNEL_CLOSED', channel=self)


GHTTPCLISRV_FSM = {
    'event_list': (
        'EV_SET_TIMER: bottom output',
        'EV_RESPONSELESS_TIMEOUT: bottom input',
        'EV_INACTIVITY_TIMEOUT: bottom input',
        'EV_DISCONNECTED: bottom input',
        'EV_RX_DATA: bottom input',
        'EV_DEQUEUE_REQUEST',
        'EV_TRANSMIT_READY: bottom input',
        'EV_SEND_DATA: bottom output',
        'EV_FLUSH_OUTPUT_DATA: bottom output',
        'EV_WRITE_OUTPUT_DATA: bottom output',
        'EV_HTTP_CHANNEL_OPENED: top output',
        'EV_HTTP_CHANNEL_CLOSED: top output',
        'EV_HTTP_REQUEST: top output',
        'EV_HTTP_RESPONSE: top input',
    ),
    'state_list': (
        'ST_IDLE',
        'ST_WAIT_RESPONSE',
    ),
    'machine': {
        'ST_IDLE':
        (
            ('EV_DISCONNECTED',         ac_disconnected,            None),
            ('EV_INACTIVITY_TIMEOUT',   ac_inactivity_timeout,      None),
            ('EV_RX_DATA',              ac_rx_data,                 None),
            ('EV_DEQUEUE_REQUEST',      ac_dequeue_request,         None),
            ('EV_HTTP_REQUEST',         ac_http_request,            None),
        ),
        'ST_WAIT_RESPONSE':
        (
            ('EV_DISCONNECTED',         ac_disconnected,            None),
            ('EV_RESPONSELESS_TIMEOUT', ac_responseless_timeout,    'ST_IDLE'),
            ('EV_RX_DATA',              ac_rx_data,                 None),
            ('EV_DEQUEUE_REQUEST',      None,                       None),
            ('EV_HTTP_RESPONSE',        ac_http_response,           'ST_IDLE'),

            ('EV_TRANSMIT_READY',       ac_transmit_ready,          None),
        ),
    }
}

GHTTPCLISRV_GCONFIG = {
    'subscriber': [
        None, None, 0, None,
        "subcriber of all output-events."
    ],
    'gsock': [
        None, None, GConfig.FLAG_DIRECT_ATTR, None, "partner gsock."
    ],
    'maximum_simultaneous_requests': [
        int, 0, 0, None,
        "maximum simultaneous requests."
    ],
    'inactivity_timeout': [
        int, 5 * 60 * 60, 0, None,
        "Inactivity timeout in seconds."
    ],
    'responseless_timeout': [
        int, 5 * 60 * 60, 0, None,
        "'Without response' timeout in seconds."
    ],

    # Used in parser:
    'url_scheme': [str, 'http', 0, None, "default ``http`` value"],
    # A tempfile should be created if the pending input is larger than
    # inbuf_overflow, which is measured in bytes. The default is 512K.  This
    # is conservative.
    'inbuf_overflow': [int, 524288, 0, None, ""],
    # maximum number of bytes of all request headers combined (256K default)
    'max_request_header_size': [int, 262144, 0, None, ""],
    # maximum number of bytes in request body (1GB default)
    'max_request_body_size': [int, 1073741824, 0, None, ""],
}


[docs]class GHttpCliSrv(GObj): """ Http clisrv (client of server) class. This gobj is create by GHttpServer when it receives an EV_CONNECTED event from a new gsock gobj. This class will subscribe all the events of the partner :class:`ginsfsm.c_sock.GSock` gobj, to implement the http protocol. .. ginsfsm:: :fsm: GHTTPCLISRV_FSM :gconfig: GHTTPCLISRV_GCONFIG *Top Output-Events:* * :attr:`'EV_HTTP_CHANNEL_OPENED'`: new http client. Event attributes: * ``channel``: http channel. * :attr:`'EV_HTTP_CHANNEL_CLOSED'`: http client closed. Event attributes: * ``channel``: http channel. * :attr:`'EV_HTTP_REQUEST'`: new http request. Event attributes: * ``channel``: http channel. * ``request``: http request. *Top Input-Events:* * :attr:`'EV_HTTP_RESPONSE'`: response to the current request. Event attributes: * ``response``: http response. *Bottom Input-Events:* * :attr:`'EV_DISCONNECTED'`: socket disconnected. The clisrv `gobj` will be destroyed. * :attr:`'EV_TRANSMIT_READY'`: socket ready to transmit more data. * :attr:`'EV_RX_DATA'`: data received. Process http protocol. *Bottom Output-Events:* * :attr:`'EV_SEND_DATA'`: transmit data socket. * :attr:`'EV_WRITE_OUTPUT_DATA'`: write data to socket output buffer. * :attr:`'EV_FLUSH_OUTPUT_DATA'`: flush data of socket output buffer. """ def __init__(self): GObj.__init__(self, GHTTPCLISRV_FSM, GHTTPCLISRV_GCONFIG) self.parsing_request = None # A request parser instance self.responding_request = None # request waiting a top response self.responding_response = None # current response being responding self.dl_requests = deque() # requests queue self.sent_continue = False # used as a latch after sending 100continue
[docs] def go_out(self): """ Finish zone. """
def start_up(self): if self.config.subscriber is None: self.config.subscriber = self.parent # gsock MUST be passed at creating gobj # Canalize the flow of messages # bottom events for me self.gsock.subscribe_event(None, self) # we need to know disconnected event for deleting gobjs self.gsock.subscribe_event( 'EV_DISCONNECTED', self, __hard_subscription__=True, ) # top events for subscriber self.subscribe_event(None, self.config.subscriber) self.broadcast_event('EV_HTTP_CHANNEL_OPENED', channel=self) # Setup the timers self.inactivity_timer = self.create_gobj( None, GTimer, self, timeout_event_name='EV_INACTIVITY_TIMEOUT' ) self.start_inactivity_timer() self.responseless_timer = self.create_gobj( None, GTimer, self, timeout_event_name='EV_RESPONSELESS_TIMEOUT' ) def enqueue_request(self, new_request): self.dl_requests.append(new_request) max_req = self.config.maximum_simultaneous_requests if max_req > 0 and len(self.dl_requests) > max_req: # Close the channel by maximum simultaneous requests reached. body = 'Please change your behavior.' \ ' You have reached the maximum simultaneous requests (%d).' % ( max_req) request = HTTPRequestParser(self) request.error = InternalServerError(body) response = HttpErrorResponse(request) response.service() self.clear_request_queue() def clear_request_queue(self): for request in self.dl_requests: request._close() self.dl_requests.clear() def start_inactivity_timer(self): self.send_event( self.inactivity_timer, 'EV_SET_TIMER', seconds=self.config.inactivity_timeout ) def stop_inactivity_timer(self): self.send_event( self.inactivity_timer, 'EV_SET_TIMER', seconds=-1 ) def start_responseless_timer(self): self.send_event( self.responseless_timer, 'EV_SET_TIMER', seconds=self.config.responseless_timeout ) def stop_responseless_timer(self): self.send_event( self.responseless_timer, 'EV_SET_TIMER', seconds=-1 )
[docs] def write(self, data): """ Write data to output buffer. To supply asynchronous access to high level. """ if not self.responding_response: self.logger and self.logger.error( "ERROR channel.write() with no responding_response") return self.responding_response.write(data)
[docs] def flush(self, callback=None): """ Flush output buffer. To supply asynchronous access to high level. """ if not self.responding_response: self.logger and self.logger.error( "ERROR channel.flush() with no responding_response") return self.responding_response.flush(callback)
[docs] def finish(self, response=None): """ Finishes this response, flushing output buffer, and ending the HTTP request. To supply asynchronous access to high level. """ if response is None: if not self.responding_response: self.logger and self.logger.error( "ERROR channel.finish()" " with no responding_response") return response = self.responding_response response.finish() self.stop_responseless_timer() self.responding_request = None self.responding_response = None if response.close_on_finish: # ignore all enqueued requests. self.clear_request_queue() if self.gsock: self.gsock.mt_drop() return # pull the request queue self.post_event(self, 'EV_DEQUEUE_REQUEST')