runner

threads management to run jobs.

class fbf.lib.runner.BotEventRunner(name='runner', doready=True)

Bases: fbf.lib.runner.Runner

handle(job)

schedule a bot command.

class fbf.lib.runner.LongRunner(name='runner', doready=True)

Bases: fbf.lib.runner.Runner

handle(job)

schedule a bot command.

class fbf.lib.runner.Runner(name='runner', doready=True)

Bases: fbf.lib.threadloop.RunnerLoop

a runner is a thread with a queue on which jobs can be pushed. jobs scheduled should not take too long since only one job can be executed in a Runner at the same time.

done(event)
handle(job)

schedule a job.

class fbf.lib.runner.Runners(name, max=100, runnertype=<class 'fbf.lib.runner.Runner'>, doready=True)

Bases: builtins.object

runners is a collection of runner objects.

cleanup(dojoin=False)

clean up idle runners.

makenew()

create a new runner.

names()
put(*data, **kwargs)

put a job on a free runner.

runnersizes()

return sizes of runner objects.

running()

return list of running jobs.

size()
start()

overload this if needed.

status(filter=None)
stop(name=None, dojoin=False)

stop runners.

fbf.lib.runner.runnercleanup(bot, event)
fbf.lib.runner.size()

CODE

# fbf/runner.py
#
#

""" threads management to run jobs. """

fbf imports

from fbf.lib.threads import getname, start_new_thread, start_bot_command
from fbf.utils.exception import handle_exception
from fbf.utils.locking import locked, lockdec
from fbf.utils.lockmanager import rlockmanager, lockmanager
from fbf.utils.generic import waitevents
from fbf.utils.trace import callstack, whichmodule
from fbf.utils.statdict import StatDict
from fbf.lib.threadloop import RunnerLoop
from fbf.lib.callbacks import callbacks
from fbf.lib.errors import URLNotEnabled
from .threads import getmod
from fbf.utils.lazydict import LazyDict

basic imports

import queue
import time
import _thread
import random
import logging
import sys

defines

stats = StatDict()

Runner class

class Runner(RunnerLoop):

    """
        a runner is a thread with a queue on which jobs can be pushed.
        jobs scheduled should not take too long since only one job can
        be executed in a Runner at the same time.

    """

    def __init__(self, name="runner", doready=True):
        RunnerLoop.__init__(self, name)
        self.starttime = time.time()
        self.elapsed = self.starttime
        self.finished = time.time()
        self.doready = doready
        self.longrunning = []
        self.shortrunning = []

    def handle(self, job):
        """ schedule a job. """
        kwargs = job.kwargs
        try: speed, descr, func, *args = job.args
        except ValueError:
            try:
                speed, descr, func = job.args
                args = []
                kwargs = {}
            except ValueError:
                speed, func = jobs.args
                descr = "none"
                args = ()
                kwargs = {}
        res = func(*args, **kwargs)
        elapsed = time.time() - self.starttime
        if elapsed > 5:
            logging.debug('ALERT %s %s job taking too long: %s seconds' % (descr, str(func), elapsed))
        stats.upitem(self.nowrunning)
        stats.upitem(self.type)
        return res

    def done(self, event):
        try: int(event.cbtype)
        except ValueError:
            if event.cbtype not in ['TICK', 'TICK10', 'PING', 'NOTICE', 'TICK60']: logging.warn("done %s" % str(event.cbtype))

BotEventRunner class

class BotEventRunner(Runner):

    def handle(self, job):
        """ schedule a bot command. """
        speed, descr, func, bot, ievent = job.args
        self.starttime = time.time()
        self.finished = 0
        self.elapsed = 0
        if not ievent.nolog: logging.debug("event handler is %s" % str(func))
        if self.nowrunning in self.longrunning:
            logging.warn("putting %s on longrunner" % self.nowrunning)
            longrunner.put(job)
            return
        try: res = func(bot, job.event)
        except URLNotEnabled: logging.warn("urls fetching is disabled (%s)" % ievent.usercmnd) ; return
        except Exception as ex: handle_exception(ievent) ; return
        self.finished = time.time()
        self.elapsed = self.finished - self.starttime
        if self.elapsed > 5:
            if self.nowrunning not in self.longrunning: self.longrunning.append(self.nowrunning)
            if not ievent.nolog: logging.debug('ALERT %s %s job taking too long: %s seconds' % (descr, str(func), self.elapsed))
        return res

class LongRunner(Runner):

    def handle(self, job):
        """ schedule a bot command. """
        speed, descr, func, bot, ievent = job.args
        self.starttime = time.time()
        if not ievent.nolog: logging.debug("long event handler is %s" % str(func))
        res = func(bot, ievent)
        self.elapsed = time.time() - self.starttime
        if self.elapsed < 1 and self.nowrunning not in self.shortrunning: self.shortrunning.append(self.nowrunning)
        return res

Runners class

class Runners(object):

    """ runners is a collection of runner objects. """

    def __init__(self, name, max=100, runnertype=Runner, doready=True):
        self.name = name
        self.max = max
        self.runners = []
        self.runnertype = runnertype
        self.doready = doready

    def status(self, filter=None):
        res = LazyDict()
        for runner in self.runners: res[runner.name] = runner.status(filter)
        return res.tojson()

    def names(self):
        return [getname(runner.name) for runner in self.runners]

    def size(self):
        qsize = [runner.queue.qsize() for runner in self.runners]
        return "%s/%s" % (qsize, len(self.runners))

    def runnersizes(self):
        """ return sizes of runner objects. """
        result = []
        for runner in self.runners: result.append("%s/%s" % (runner.name, runner.queue.qsize()))
        return result

    def stop(self, name=None, dojoin=False):
        """ stop runners. """
        for runner in self.runners:
            if name and not name in runner.name: continue
            runner.stop()
            if dojoin: runner.join(3.0)

    def start(self):
        """ overload this if needed. """
        pass

    def put(self, *data, **kwargs):
        """ put a job on a free runner. """
        for runner in self.runners:
            if runner.queue.empty() and not runner.working:
                return runner.put(*data, **kwargs)
        runner = self.makenew()
        return runner.put(*data, **kwargs)


    def running(self):
        """ return list of running jobs. """
        result = []
        for runner in self.runners:
            if runner.running: result.append(runner.nowrunning)
        return result

    def makenew(self):
        """ create a new runner. """
        runner = None
        if len(self.runners) < self.max:
            runner = self.runnertype(self.name + "-" + str(len(self.runners)))
            runner.start()
            self.runners.append(runner)
        else: runner = random.choice(self.runners)
        return runner

    def cleanup(self, dojoin=False):
        """ clean up idle runners. """
        r = []
        for runner in self.runners:
            if not runner.running or runner.queue.empty() or not runner.working: r.append(runner)
        if not r: return
        for runner in r:
            if dojoin: runners.stop(runner)
            else: runner.stop()
        for runner in r:
            try: self.runners.remove(runner)
            except ValueError: pass
        logging.debug("%s - cleaned %s" %  (self.name, [item.name for item in r]))
        logging.debug("%s - now running: %s" % (self.name, self.size()))
        return self.size()

global runners

cmndrunner = defaultrunner = Runners("cmnd", 50, BotEventRunner)
longrunner = Runners("long", 50, LongRunner)
callbackrunner = Runners("callback", 30, BotEventRunner)
waitrunner = Runners("wait", 20, BotEventRunner)
apirunner = Runners("api", 10, BotEventRunner)
threadrunner = Runners("thread", 100, Runner)
urlrunner = Runners("url", 50, Runner)

allrunners = [cmndrunner, longrunner, callbackrunner, waitrunner, apirunner, threadrunner, urlrunner]

cleanup

def runnercleanup(bot, event):
    cmndrunner.cleanup()
    longrunner.cleanup()
    callbackrunner.cleanup()
    waitrunner.cleanup()
    apirunner.cleanup()
    threadrunner.cleanup()
    urlrunner.cleanup()

callbacks.add("TICK10", runnercleanup)

def size():
    return "cmnd: %s - callback: %s - wait: %s - long: %s - api: %s - thread: %s" % (cmndrunner.size(), callbackrunner.size(), waitrunner.size(), longrunner.size(), apirunner.size(), threadrunner.size())

Table Of Contents

Previous topic

reboot

Next topic

tasks

This Page