Source code for point.plugs.rss

# p/plugs/rss.py
#
#

""" rss module. """

## IMPORTS

from point.utils import parse_urls, to_time, do_url, error
from point import Object, kernel

## basic imports

import threading
import logging
import time

## RSS class

[docs]class RSS(Object): def __init__(p, time_sleep, *args, **kwargs): Object.__init__(p, *args, **kwargs) p.time_sleep = int(time_sleep) p.time_in = time.time()
[docs] def start(p, *args, **kwargs): p.do_one()
[docs] def do_one(p, *args, **kwargs): global kernel p.time_in = time.time() p.timer = threading.Timer(p.time_sleep, p.poll) p.timer.start() kernel.run.RSS = p
[docs] def poll(p, *args, **kwargs): import point.feedparser as fp p.time_in = time.time() for obj in kernel.objects(): if "rss" not in obj: continue logging.warn("poll rss %s" % obj.rss) data = do_url("GET", obj.rss).read() try: data = fp.parse(data) except Exception as ex: error() ; continue for entry in data["entries"]: if kernel.has_obj("link", entry["link"]): logging.info("skip %s" % entry["link"]) ; continue result = "%s -=- %s -=- %s" % (to_time(time.ctime(time.time())), entry["title"], entry["link"]) feed = Object() feed.feed = "rss" feed.link = entry.link feed.save() for p in kernel.fleet: p.announce(result) p.do_one()
[docs]def init(): global kernel kernel.run.RSS = RSS(60.0) ; kernel.run.RSS.start()
[docs]def do_poll(event): kernel.run.RSS.poll()
kernel.cmnds.register("poll", do_poll)