Source code for invenio_sse.ext
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Invenio module for HTML5 server-sent events support."""
from __future__ import absolute_import, print_function
import json
from redis import StrictRedis
from werkzeug.utils import cached_property
from . import config
from .utils import format_sse_event
[docs]class _SSEState(object):
"""SSE state accessible via ``proxies.current_sse``."""
def __init__(self, app):
"""Initialize state."""
self.app = app
self._redis = StrictRedis.from_url(app.config['SSE_REDIS_URL'])
@cached_property
def _pubsub(self):
"""A redis pubsub instance."""
return self._redis.pubsub()
[docs] def publish(self, data, type_=None, id_=None, retry=None, channel='sse'):
"""Publish data as a server-sent event.
:param data: Event data, any object serialize to JSON.
:param type_: Optional type of the event.
:param id_: Optional event ID to set the EventSource object's last
event ID value.
:param retry: Optional reconnection time to use when attempting to send
the event.
:param channel: Optional channel to direct events to different clients,
by defaul ``sse``.
"""
msg = {"data": data, "event": type_, "id": id_, "retry": retry}
self._redis.publish(channel, json.dumps(msg))
[docs] def messages(self, channel='sse'):
"""Message generator from the given channel."""
self._pubsub.subscribe(channel)
for message in self._pubsub.listen():
if message['type'] == 'message':
yield format_sse_event(
json.loads(message['data'].decode('utf-8')))
[docs]class InvenioSSE(object):
"""Invenio-SSE extension."""
def __init__(self, app=None):
"""Extension initialization."""
if app:
self.init_app(app)
[docs] def init_app(self, app):
"""Flask application initialization."""
self.init_config(app)
app.extensions['invenio-sse'] = _SSEState(app)
[docs] def init_config(self, app):
"""Initialize configuration."""
for k in dir(config):
if k.startswith('SSE_'):
app.config.setdefault(k, getattr(config, k))