Source code for botlib.rss

# botlib/rss.py
#
#

""" rss module. """

from .db import Db
from .launcher import Launcher
from .object import Default, Object
from .register import Register
from .clock import Repeater
from .space import kernel, runtime
from .url import get_url

import feedparser
import logging
import urllib
import time

[docs]def get_feed(url): """ fetch a feed. """ result = [] if not url or not "http" in url: logging.debug("%s is not an url." % url) return result try: result = feedparser.parse(get_url(url).data) except (ImportError, ConnectionError, urllib.error.URLError) as ex: logging.warn("# %s %s" % (url, str(ex))) return result if "entries" in result: for entry in result["entries"]: yield Object(entry)
[docs]class Feed(Object): """ Feed typed object. """ pass
[docs]class RSS(Register, Launcher): """ RSS class for fetching rss feeds. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._type = str(type(self))
[docs] def start(self, *args, **kwargs): from .space import runtime repeater = Repeater(600, self.fetcher) runtime.register("RSS", self) return self.launch(repeater.start)
[docs] def stop(self): self.kill("RSS")
[docs] def fetcher(self): from .space import kernel, seen thrs = [] nr = len(seen.urls) for object in kernel.find("rss"): if "rss" not in object: continue if not object.rss: continue thr = kernel.launch(self.fetch, object) thrs.append(thr) return thrs
[docs] def synchronize(self): from .space import kernel, runtime, seen nr = 0 for object in kernel.find("rss"): if not object.get("rss", None): continue for o in get_feed(object.rss): if o.link in seen.urls: continue seen.urls.append(o.link) nr += 1 logging.warn("# %s urls" % nr) seen.sync() return seen
[docs] def fetch(self, object): from .utils import file_time, to_time from .space import fleet, kernel, seen nr = 0 for o in list(get_feed(object.rss))[::-1]: if o.link in seen.urls: continue seen.urls.append(o.link) feed = Feed(o) feed.services = "rss" for f in self.cfg.save_list: if f in feed.link: if "published" in feed: try: date = file_time(to_time(feed.published)) feed.save(stime=date) except bot.error.ENODATE as ex: logging.warn("ENODATE %s" % str(ex)) else: feed.save() kernel.announce(self.display(feed)) nr += 1 seen.sync() return nr
[docs] def display(self, object): result = "" for check in self.cfg.descriptions: if check in object.link: summary = object.get("summary", None) if summary: result += "%s - " % summary for key in self.cfg.display_list: data = object.get(key, None) if data: result += "%s - " % data.rstrip() if result: return result[:-3].rstrip()
[docs]def init(event): rss = RSS() rss.start() return rss
[docs]def shutdown(event): rss = runtime.get("RSS", []) for item in rss: item.stop()