"""
.. module:: listener
:platform: Unix
:synopsis: This module provides an eventsource listener based on tornado
.. moduleauthor:: Bernard Pratz <guyzmo@hackable-devices.org>
.. resources::
* http://stackoverflow.com/questions/10665569/websocket-event-source-implementation-to-expose-a-two-way-rpc-to-a-python-dj
* http://stackoverflow.com/questions/8812715/using-a-simple-python-generator-as-a-co-routine-in-a-tornado-async-handler
* http://dev.w3.org/html5/eventsource/#event-stream-interpretation
"""
import sys
import time
import argparse
import logging
log = logging.getLogger("eventsource.listener")
import json
import httplib
import tornado.web
import tornado.ioloop
"""Event base"""
[docs]class Event(object):
"""
Class that defines an event, its behaviour and the matching actions
LISTEN is the GET event that will open an event source communication
FINISH is the POST event that will end an event source communication started by LISTEN
RETRY is the POST event that defines reconnection timeouts for the client
ACTIONS contains the list of acceptable POST targets.
target is the token that matches an event source channel
action contains the name of the action (which shall be in ACTIONS)
value contains a list of every lines of the value to be parsed
"""
content_type = "text/plain"
LISTEN = "poll"
FINISH = "close"
RETRY = "retry"
ACTIONS=[FINISH]
"""Property to encapsulate processing on value"""
[docs] def get_value(self):
return self._value
[docs] def set_value(self, v):
self._value = v
value = property(get_value,set_value)
id = None
def __init__(self, target, action, value=None):
"""
Creates a new Event object with
:param target: a string matching an open channel
:param action: a string matching an action in the ACTIONS list
:param value: a value to be embedded
"""
self.target = target
self.action = action
self.set_value(value)
[docs]class EventId(object):
cnt = 0
[docs] def get_id(self):
if self.cnt == EventId.cnt:
self.cnt = EventId.cnt
EventId.cnt+=1
return self.cnt
id = property(get_id)
""" Reusable events """
[docs]class StringEvent(Event):
ACTIONS=["ping",Event.FINISH]
"""Property to enable multiline output of the value"""
[docs] def get_value(self):
return [line for line in self._value.split('\n')]
value = property(get_value,Event.set_value)
[docs]class JSONEvent(Event):
content_type = "application/json"
LISTEN = "poll"
FINISH = "close"
ACTIONS=["ping",FINISH]
"""Property to enable JSON checking of the value"""
[docs] def get_value(self):
return [json.dumps(self._value)]
[docs] def set_value(self, v):
self._value = json.loads(v)
value = property(get_value,set_value)
[docs]class StringIdEvent(StringEvent,EventId):
ACTIONS=["ping",Event.RETRY,Event.FINISH]
id = property(EventId.get_id)
[docs]class JSONIdEvent(JSONEvent,EventId):
content_type = JSONEvent.content_type
ACTIONS=["ping",Event.RETRY,Event.FINISH]
id = property(EventId.get_id)
##
"""EventSource mechanism"""
[docs]class EventSourceHandler(tornado.web.RequestHandler):
_connected = {}
_events = {}
[docs] def initialize(self, event_class=StringEvent,keepalive=0):
"""
Takes an Event based class to define the event's handling
"""
self._event_class = event_class
self._retry = None
if keepalive is not 0:
self._keepalive = tornado.ioloop.PeriodicCallback(self.push_keepalive, keepalive);
"""Tools"""
@tornado.web.asynchronous
[docs] def push_keepalive(self):
log.debug("push_keepalive()")
self.write(": keepalive %s\r\n\r\n" % (unicode(time.time())))
self.flush()
[docs] def push(self, event):
"""
For a given event, write event-source outputs on current handler
:param event: Event based incoming event
"""
log.debug("push(%s,%s,%s)" % (event.id,event.action,event.value))
if hasattr(event, "id"):
self.write("id: %s\r\n" % (unicode(event.id)))
if self._retry is not None:
self.write("retry: %s\r\n" % (unicode(self._retry)))
self._retry = None
self.write("event: %s\r\n" % (unicode(event.action)))
for line in event.value:
self.write("data: %s\r\n" % (unicode(line),))
self.write("\r\n")
self.flush()
[docs] def buffer_event(self, target, action, value=None):
"""
creates and store an event for the target
:param target: string identifying current target
:param action: string matching one of Event.ACTIONS
:param value: string containing a value
"""
self._events[target].append(self._event_class(target, action, value))
[docs] def is_connected(self, target):
"""
:param target: string identifying a given target
@return true if target is connected
"""
return target in self._connected.values()
[docs] def set_connected(self, target):
"""
registers target as being connected
:param target: string identifying a given target
this method will add target to the connected list,
and create an empty event buffer
"""
log.debug("set_connected(%s)" % (target,))
self._connected[self] = target
self._events[target] = []
[docs] def set_disconnected(self):
"""
unregisters current handler as being connected
this method will remove target from the connected list,
and delete the event buffer
"""
try:
target = self._connected[self]
log.debug("set_disconnected(%s)" % (target,))
self._keepalive.stop()
del(self._events[target])
del(self._connected[self])
except Exception, err:
log.error("set_disconnected(%s,%s): %s", str(self), target, err)
[docs] def write_error(self, status_code, **kwargs):
"""
overloads the write_error() method of RequestHandler, to
support more explicit messages than only the ones from httplib.
"""
if self.settings.get("debug") and "exc_info" in kwargs:
# in debug mode, try to send a traceback
self.set_header('Content-Type', 'text/plain')
for line in traceback.format_exception(*kwargs["exc_info"]):
self.write(line)
self.finish()
else:
if 'mesg' in kwargs:
self.finish("<html><title>%(code)d: %(message)s</title>"
"<body>%(code)d: %(mesg)s</body></html>\n" % {
"code": status_code,
"message": httplib.responses[status_code],
"mesg": kwargs["mesg"],
})
else:
self.finish("<html><title>%(code)d: %(message)s</title>"
"<body>%(code)d: %(message)s</body></html>\n" % {
"code": status_code,
"message": httplib.responses[status_code],
})
"""Synchronous actions"""
[docs] def post(self,action,target):
"""
Triggers an event
:param action: string defining the type of event
:param target: string defining the target handler to send it to
this method will look for the request body to get post's data.
"""
log.debug("post(%s,%s)" % (target,action))
self.set_header("Accept", self._event_class.content_type)
if target not in self._connected.values():
self.send_error(404,mesg="Target is not connected")
elif action not in self._event_class.ACTIONS:
self.send_error(404,mesg="Unknown action requested")
else:
try:
self.buffer_event(target,action,self.request.body)
except ValueError, ve:
self.send_error(400,mesg="JSON data is not properly formatted: <br />%s" % (ve,))
"""Asynchronous actions"""
def _event_generator(self,target):
"""
parses all events buffered for target and yield them
"""
for ev in self._events[target]:
self._events[target].remove(ev)
yield ev
def _event_loop(self):
"""
for target matching current handler, gets and forwards all events
until Event.FINISH is reached, and then closes the channel.
"""
if self.is_connected(self.target):
for event in self._event_generator(self.target):
if self._event_class.RETRY in self._event_class.ACTIONS:
if event.action == self._event_class.RETRY:
try:
self._retry = int(event.value[0])
continue
except ValueError:
log.error("incorrect retry value: %s" % (event.value,))
if event.action == self._event_class.FINISH:
self.set_disconnected()
self.finish()
return
self.push(event)
tornado.ioloop.IOLoop.instance().add_callback(self._event_loop)
@tornado.web.asynchronous
[docs] def get(self,action,target):
"""
Opens a new event_source connection and wait for events to come
Returns error 423 if the target token already exists
Redirects to / if action is not matching Event.LISTEN.
"""
log.debug("get(%s,%s)" % (target, action))
if action == self._event_class.LISTEN:
self.set_header("Content-Type", "text/event-stream")
self.set_header("Cache-Control", "no-cache")
self.target = target
if self.is_connected(target):
self.send_error(423,mesg="Target is already connected")
return
self.set_connected(target)
tornado.ioloop.IOLoop.instance().add_callback(self._event_loop)
self._keepalive.start()
else:
self.redirect("/",permanent=True)
[docs] def on_connection_close(self):
"""
overloads RequestHandler's on_connection_close to disconnect
currents handler on client's socket disconnection.
"""
log.debug("on_connection_close()")
self.set_disconnected()
###
[docs]def start():
parser = argparse.ArgumentParser(prog=sys.argv[0],
description="Event Source Listener")
parser.add_argument("-H",
"--host",
dest="host",
default='0.0.0.0',
help='Host to bind on')
# PORT ARGUMENT
parser.add_argument("-P",
"--port",
dest="port",
default='8888',
help='Port to bind on')
parser.add_argument("-d",
"--debug",
dest="debug",
action="store_true",
help='enables debug output')
parser.add_argument("-j",
"--json",
dest="json",
action="store_true",
help='to enable JSON Event')
parser.add_argument("-k",
"--keepalive",
dest="keepalive",
default="0",
help='Keepalive timeout')
parser.add_argument("-i",
"--id",
dest="id",
action="store_true",
help='to generate identifiers')
args = parser.parse_args(sys.argv[1:])
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if args.json:
if args.id:
chosen_event = JSONIdEvent
else:
chosen_event = JSONEvent
else:
if args.id:
chosen_event = StringIdEvent
else:
chosen_event = StringEvent
try:
args.keepalive = int(args.keepalive)
except ValueError:
log.error("keepalive takes a numerical value")
sys.exit(1)
###
try:
application = tornado.web.Application([
(r"/(.*)/(.*)", EventSourceHandler, dict(event_class=chosen_event,keepalive=args.keepalive)),
])
application.listen(int(args.port))
tornado.ioloop.IOLoop.instance().start()
except ValueError:
log.error("The port '%d' shall be a numerical value." % (args.port,))
sys.exit(1)
###
if __name__ == "__main__":
start()