Source code for invenio_sse.ext

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Invenio module for HTML5 server-sent events support."""

from __future__ import absolute_import, print_function

import json

from redis import StrictRedis
from werkzeug.utils import cached_property

from . import config
from .utils import format_sse_event


[docs]class _SSEState(object): """SSE state accessible via ``proxies.current_sse``.""" def __init__(self, app): """Initialize state.""" self.app = app self._redis = StrictRedis.from_url(app.config['SSE_REDIS_URL']) @cached_property def _pubsub(self): """A redis pubsub instance.""" return self._redis.pubsub()
[docs] def publish(self, data, type_=None, id_=None, retry=None, channel='sse'): """Publish data as a server-sent event. :param data: Event data, any object serialize to JSON. :param type_: Optional type of the event. :param id_: Optional event ID to set the EventSource object's last event ID value. :param retry: Optional reconnection time to use when attempting to send the event. :param channel: Optional channel to direct events to different clients, by defaul ``sse``. """ msg = {"data": data, "event": type_, "id": id_, "retry": retry} self._redis.publish(channel, json.dumps(msg))
[docs] def messages(self, channel='sse'): """Message generator from the given channel.""" self._pubsub.subscribe(channel) for message in self._pubsub.listen(): if message['type'] == 'message': yield format_sse_event( json.loads(message['data'].decode('utf-8')))
[docs]class InvenioSSE(object): """Invenio-SSE extension.""" def __init__(self, app=None): """Extension initialization.""" if app: self.init_app(app)
[docs] def init_app(self, app): """Flask application initialization.""" self.init_config(app) app.extensions['invenio-sse'] = _SSEState(app)
[docs] def init_config(self, app): """Initialize configuration.""" for k in dir(config): if k.startswith('SSE_'): app.config.setdefault(k, getattr(config, k))