Source code for botlib.clock

# clock.py
#
#

""" timer, repeater and other clock based classes. """

from .event import Event
from .object import Default, Config, Object
from .utils import day, elapsed, get_day, get_hour, now, to_day, to_time
from .utils import name, sname
from .error import ENODATE
from .space import cfg

import threading
import logging
import time
import os

start = 0

[docs]class Timer(Object): """ call a function as x seconds of sleep. """ def __init__(self, sleep, func, *args, **kwargs): super().__init__(**kwargs) self._state = Default(default="") self._counter = Default(default=0) self._time = Default(default=0) self._time.start = time.time() self._time.latest = time.time() self._state.status = "waiting" self._name = kwargs.get("name", name(func)) self.sleep = sleep self.func = func self.args = args try: self._event = self.args[0] self._state.origin = self._event.origin except: self._event = Event() self.kwargs = kwargs
[docs] def start(self): """ start the timer. """ timer = threading.Timer(self.sleep, self.run) timer.setDaemon(True) timer.setName(self._name) timer.sleep = self.sleep timer._state = self._state timer._name = self._name timer._time = self._time timer._counter = self._counter self._counter.run += 1 self._time.latest = time.time() timer.start() return timer
[docs] def run(self, *args, **kwargs): """ run the registered function. """ self._time.latest = time.time() self.func(*self.args, **self.kwargs)
[docs] def exit(self): """ cancel the timer. """ self._status = "" self.cancel()
[docs]class Repeater(Timer): """ Repeat an funcion every x seconds. >>> def func(event): event.reply("yo!") >>> from bot.clock import Repeater >>> from bot.time import now >>> repeater = Repeater(now(), func) >>> repeater.start()[B """ def __init__(self, sleep, func, *args, **kwargs): super().__init__(sleep, func, *args, **kwargs) logging.warn("# start %s" % kwargs.get(name, sname(func)))
[docs] def run(self, *args, **kwargs): self._time.start = time.time() self._time.run += 1 try: self.func(*self.args, **self.kwargs) except: logging.error(get_exception()) self.start()
[docs]def init(event): from .space import cfg, db, kernel cfg = Config(default=0).load(os.path.join(cfg.workdir, "runtime", "timer")) cfg.template("timer") timers = [] for e in db.sequence("timer", cfg.latest): if e.done: continue if "time" not in e: continue if time.time() < int(e.time): timer = Timer(int(e.time), e.direct, e.txt) t = kernel.launch(timer.start) timers.append(t) else: cfg.last = int(e.time) cfg.save() e.done = True e.sync() return timers