Source code for eventsource.client

import sys
import time
import argparse
import logging
log = logging.getLogger('eventsource.client')

from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient, HTTPRequest

[docs]class Event(object): """ Defines a received event """ def __init__(self): = None = None = None def __repr__(self): return "Event<%s,%s,%s>" % (str(, str(, str('\n','\\n')))
[docs]class EventSourceClient(object): def __init__(self,url,action,target,callback=None,retry=0): """ Build the event source client :param url: string, the url to connect to :param action: string of the listening action to connect to :param target: string with the listening token :param callback: function with one parameter (Event) that gets called for each received event :param retry: timeout between two reconnections (0 means no reconnection) """ self._url = "http://%s/%s/%s" % (url,action,target) AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient") self.http_client = AsyncHTTPClient() self.http_request = HTTPRequest(url=self._url, method='GET', headers={"content-type":"text/event-stream"}, request_timeout=0, streaming_callback=self.handle_stream) if callback is None: self.cb = lambda e: "received %s" % (e,) ) else: self.cb = callback self.retry_timeout = int(retry)
[docs] def poll(self): """ Function to call to start listening """ if self.retry_timeout == 0: self.http_client.fetch(self.http_request, self.handle_request) IOLoop.instance().start() while self.retry_timeout!=0: self.http_client.fetch(self.http_request, self.handle_request) IOLoop.instance().start() time.sleep(self.retry_timeout/1000)
[docs] def end(self): """ Function to call to end listening """ self.retry_timeout=0 IOLoop.instance().stop()
[docs] def handle_stream(self,message): """ Acts on message reception :param message: string of an incoming message parse all the fields and builds an Event object that is passed to the callback function """ event = Event() for line in message.strip('\r\n').split('\r\n'): (field, value) = line.split(":",1) if field == 'event': = value elif field == 'data': value = value.lstrip() if is None: = value else: = "%s\n%s" % (, value) elif field == 'id': = value elif field == 'retry': try: self.retry_timeout = int(value) "timeout reset: %s" % (value,) ) except ValueError: pass elif field == '': "received comment: %s" % (value,) ) else: raise Exception("Unknown field !") if is not None: self.cb(event)
[docs] def handle_request(self,response): """ Function that gets called on non long-polling actions, on error or on end of polling. """ if response.error: log.error(response.error) else:"disconnection requested") self.retry_timeout=0 IOLoop.instance().stop()
[docs]def start(): parser = argparse.ArgumentParser(prog=sys.argv[0], description="Event Source Client") parser.add_argument("-H", "--host", dest="host", default='', help='Host to connect to') # PORT ARGUMENT parser.add_argument("-P", "--port", dest="port", default='8888', help='Port to be used connection') parser.add_argument("-d", "--debug", dest="debug", action="store_true", help='enables debug output') parser.add_argument("-r", "--retry", dest="retry", default='-1', help='Reconnection timeout') parser.add_argument(dest="token", help='Token to be used for connection') args = parser.parse_args(sys.argv[1:]) if args.debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) ### def log_events(event): "received %s" % (event,) ) EventSourceClient(url="%(host)s:%(port)s" % args.__dict__, action="poll", target=args.token, retry=args.retry).poll() ###
if __name__ == "__main__": start()