Source code for eventsource.listener

.. module:: listener 
:platform: Unix
:synopsis: This module provides an eventsource listener based on tornado

.. moduleauthor:: Bernard Pratz <>

.. resources::

import sys
import time
import argparse
import logging
log = logging.getLogger("eventsource.listener")

import json
import httplib
import tornado.web
import tornado.ioloop

"""Event base"""

[docs]class Event(object): """ Class that defines an event, its behaviour and the matching actions LISTEN is the GET event that will open an event source communication FINISH is the POST event that will end an event source communication started by LISTEN RETRY is the POST event that defines reconnection timeouts for the client ACTIONS contains the list of acceptable POST targets. target is the token that matches an event source channel action contains the name of the action (which shall be in ACTIONS) value contains a list of every lines of the value to be parsed """ content_type = "text/plain" LISTEN = "poll" FINISH = "close" RETRY = "retry" ACTIONS=[FINISH] """Property to encapsulate processing on value"""
[docs] def get_value(self): return self._value
[docs] def set_value(self, v): self._value = v
value = property(get_value,set_value) id = None def __init__(self, target, action, value=None): """ Creates a new Event object with :param target: a string matching an open channel :param action: a string matching an action in the ACTIONS list :param value: a value to be embedded """ = target self.action = action self.set_value(value)
[docs]class EventId(object): cnt = 0
[docs] def get_id(self): if self.cnt == EventId.cnt: self.cnt = EventId.cnt EventId.cnt+=1 return self.cnt
id = property(get_id)
""" Reusable events """
[docs]class StringEvent(Event): ACTIONS=["ping",Event.FINISH] """Property to enable multiline output of the value"""
[docs] def get_value(self): return [line for line in self._value.split('\n')]
value = property(get_value,Event.set_value)
[docs]class JSONEvent(Event): content_type = "application/json" LISTEN = "poll" FINISH = "close" ACTIONS=["ping",FINISH] """Property to enable JSON checking of the value"""
[docs] def get_value(self): return [json.dumps(self._value)]
[docs] def set_value(self, v): self._value = json.loads(v)
value = property(get_value,set_value)
[docs]class StringIdEvent(StringEvent,EventId): ACTIONS=["ping",Event.RETRY,Event.FINISH] id = property(EventId.get_id)
[docs]class JSONIdEvent(JSONEvent,EventId): content_type = JSONEvent.content_type ACTIONS=["ping",Event.RETRY,Event.FINISH] id = property(EventId.get_id) ##
"""EventSource mechanism"""
[docs]class EventSourceHandler(tornado.web.RequestHandler): _connected = {} _events = {}
[docs] def initialize(self, event_class=StringEvent,keepalive=0): """ Takes an Event based class to define the event's handling """ self._event_class = event_class self._retry = None if keepalive is not 0: self._keepalive = tornado.ioloop.PeriodicCallback(self.push_keepalive, keepalive);
"""Tools""" @tornado.web.asynchronous
[docs] def push_keepalive(self): log.debug("push_keepalive()") self.write(": keepalive %s\r\n\r\n" % (unicode(time.time()))) self.flush()
[docs] def push(self, event): """ For a given event, write event-source outputs on current handler :param event: Event based incoming event """ log.debug("push(%s,%s,%s)" % (,event.action,event.value)) if hasattr(event, "id"): self.write("id: %s\r\n" % (unicode( if self._retry is not None: self.write("retry: %s\r\n" % (unicode(self._retry))) self._retry = None self.write("event: %s\r\n" % (unicode(event.action))) for line in event.value: self.write("data: %s\r\n" % (unicode(line),)) self.write("\r\n") self.flush()
[docs] def buffer_event(self, target, action, value=None): """ creates and store an event for the target :param target: string identifying current target :param action: string matching one of Event.ACTIONS :param value: string containing a value """ self._events[target].append(self._event_class(target, action, value))
[docs] def is_connected(self, target): """ :param target: string identifying a given target @return true if target is connected """ return target in self._connected.values()
[docs] def set_connected(self, target): """ registers target as being connected :param target: string identifying a given target this method will add target to the connected list, and create an empty event buffer """ log.debug("set_connected(%s)" % (target,)) self._connected[self] = target self._events[target] = []
[docs] def set_disconnected(self): """ unregisters current handler as being connected this method will remove target from the connected list, and delete the event buffer """ try: target = self._connected[self] log.debug("set_disconnected(%s)" % (target,)) self._keepalive.stop() del(self._events[target]) del(self._connected[self]) except Exception, err: log.error("set_disconnected(%s,%s): %s", str(self), target, err)
[docs] def write_error(self, status_code, **kwargs): """ overloads the write_error() method of RequestHandler, to support more explicit messages than only the ones from httplib. """ if self.settings.get("debug") and "exc_info" in kwargs: # in debug mode, try to send a traceback self.set_header('Content-Type', 'text/plain') for line in traceback.format_exception(*kwargs["exc_info"]): self.write(line) self.finish() else: if 'mesg' in kwargs: self.finish("<html><title>%(code)d: %(message)s</title>" "<body>%(code)d: %(mesg)s</body></html>\n" % { "code": status_code, "message": httplib.responses[status_code], "mesg": kwargs["mesg"], }) else: self.finish("<html><title>%(code)d: %(message)s</title>" "<body>%(code)d: %(message)s</body></html>\n" % { "code": status_code, "message": httplib.responses[status_code], })
"""Synchronous actions"""
[docs] def post(self,action,target): """ Triggers an event :param action: string defining the type of event :param target: string defining the target handler to send it to this method will look for the request body to get post's data. """ log.debug("post(%s,%s)" % (target,action)) self.set_header("Accept", self._event_class.content_type) if target not in self._connected.values(): self.send_error(404,mesg="Target is not connected") elif action not in self._event_class.ACTIONS: self.send_error(404,mesg="Unknown action requested") else: try: self.buffer_event(target,action,self.request.body) except ValueError, ve: self.send_error(400,mesg="JSON data is not properly formatted: <br />%s" % (ve,))
"""Asynchronous actions""" def _event_generator(self,target): """ parses all events buffered for target and yield them """ for ev in self._events[target]: self._events[target].remove(ev) yield ev def _event_loop(self): """ for target matching current handler, gets and forwards all events until Event.FINISH is reached, and then closes the channel. """ if self.is_connected( for event in self._event_generator( if self._event_class.RETRY in self._event_class.ACTIONS: if event.action == self._event_class.RETRY: try: self._retry = int(event.value[0]) continue except ValueError: log.error("incorrect retry value: %s" % (event.value,)) if event.action == self._event_class.FINISH: self.set_disconnected() self.finish() return self.push(event) tornado.ioloop.IOLoop.instance().add_callback(self._event_loop) @tornado.web.asynchronous
[docs] def get(self,action,target): """ Opens a new event_source connection and wait for events to come Returns error 423 if the target token already exists Redirects to / if action is not matching Event.LISTEN. """ log.debug("get(%s,%s)" % (target, action)) if action == self._event_class.LISTEN: self.set_header("Content-Type", "text/event-stream") self.set_header("Cache-Control", "no-cache") = target if self.is_connected(target): self.send_error(423,mesg="Target is already connected") return self.set_connected(target) tornado.ioloop.IOLoop.instance().add_callback(self._event_loop) self._keepalive.start() else: self.redirect("/",permanent=True)
[docs] def on_connection_close(self): """ overloads RequestHandler's on_connection_close to disconnect currents handler on client's socket disconnection. """ log.debug("on_connection_close()") self.set_disconnected() ###
[docs]def start(): parser = argparse.ArgumentParser(prog=sys.argv[0], description="Event Source Listener") parser.add_argument("-H", "--host", dest="host", default='', help='Host to bind on') # PORT ARGUMENT parser.add_argument("-P", "--port", dest="port", default='8888', help='Port to bind on') parser.add_argument("-d", "--debug", dest="debug", action="store_true", help='enables debug output') parser.add_argument("-j", "--json", dest="json", action="store_true", help='to enable JSON Event') parser.add_argument("-k", "--keepalive", dest="keepalive", default="0", help='Keepalive timeout') parser.add_argument("-i", "--id", dest="id", action="store_true", help='to generate identifiers') args = parser.parse_args(sys.argv[1:]) if args.debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) if args.json: if chosen_event = JSONIdEvent else: chosen_event = JSONEvent else: if chosen_event = StringIdEvent else: chosen_event = StringEvent try: args.keepalive = int(args.keepalive) except ValueError: log.error("keepalive takes a numerical value") sys.exit(1) ### try: application = tornado.web.Application([ (r"/(.*)/(.*)", EventSourceHandler, dict(event_class=chosen_event,keepalive=args.keepalive)), ]) application.listen(int(args.port)) tornado.ioloop.IOLoop.instance().start() except ValueError: log.error("The port '%d' shall be a numerical value." % (args.port,)) sys.exit(1) ###
if __name__ == "__main__": start()