Source code for zot.services.rss

# zot/service/rss.py
#
#

""" rss module. """

__copyright__ = "Copyright 2015, B.H.J Thate"

## IMPORTS

from zot.utils import parse_urls, short_date, fetch_url, error, strip_html
from zot.object import Object
from zot.looper import LOOPER
from zot.runtime import kernel

import threading
import logging
import socket
import time

## DEFINES

url1 = "http://feeds.nos.nl/nosnieuwspolitiek"
url2 = "http://www.who.int/feeds/entity/mediacentre/news/en/rss.xml"

key_list = ["title", "link", "published"]

on_start = True

## display function

[docs]def display(*args, **kwargs): entry = args[0] result = "" for key in key_list: result += "%s - " % entry[key] return result[:-3] ## rss_get function
[docs]def rss_get(url): import zot.contrib.feedparser as fp result = [] seen = kernel.last("result", "rss") if not seen: seen = Object() ; seen.result = "rss" ; seen.list = [] try: data = fetch_url("GET", url).read() data = fp.parse(data) except Exception as ex: error() ; return for entry in data["entries"]: if entry["link"] in seen.list: continue seen.list.append(entry["link"]) o = Object(entry) s = o.slice(key_list) if "summary" in o: o["summary"] = strip_html(o["summary"]).rstrip() o.prefix = "rss" o.short = short_date(time.ctime()) o.sync() result.append(s) seen.sync() return result ## look function
[docs]def rss_fetch(*args, **kwargs): global on_start for obj in kernel.objects(): if "rss" not in obj or not obj.rss: continue for o in rss_get(obj.rss): kernel.announce(display(o))
kernel.register("rss.fetch", rss_fetch)
[docs]def rss_vk(*args, **kwargs): nr = 1 for obj in rss_get(url1): kernel.announce("%s: %s" % (nr, display(obj))) ; nr += 1
kernel.register("rss.vk", rss_vk)
[docs]def rss_who(*args, **kwargs): nr = 1 for obj in rss_get(url2): kernel.announce("%s: %s" % (nr, display(obj))) ; nr += 1
kernel.register("rss.who", rss_who) ## RSS
[docs]class RSS(LOOPER): pass ## INIT
[docs]def init(*args, **kwargs): rss = RSS(rss_fetch, 600) kernel.put(rss.start)