Source code for botlib.launcher

# mad/launch.py
#
#

""" a launch launches threads (or Tasks in this case). """

from botlib.utils import name

from .object import Object
from .task import Task

import threading
import logging

[docs]class Launcher(Object): cc = "!" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._status = "init" self._tasks = []
[docs] def waiter(self, thrs, timeout=None): result = [] for thr in thrs: if not thr: continue logging.debug("# waiting for %s" % str(thr)) try: res = thr.join(timeout) except KeyboardInterrupt: break result.append(res) return result
[docs] def launch(self, *args, **kwargs): self._counter.launched += 1 t = Task() t._name = kwargs.get("name", name(args[0])) t.put(*args) return t
[docs] def kill(self, thrname=""): thrs = [] for thr in self.running(thrname): self._counter.killed += len(thrs) logging.info("! killed %s" % str(thr)) if "exit" in dir(thr): thr.exit() if "stop" in dir(thr): thr.stop() if "cancel" in dir(thr): thr.cancel() thrs.append(thr) return thrs
[docs] def running(self, name=""): n = str(name).upper() for thr in threading.enumerate(): thrname = str(thr).upper() if thrname.startswith("<_"): continue if n not in thrname: continue yield thr
[docs] def start(self, *args, **kwargs): """ virtual start method. """ pass