Source code for botlib.launcher
# mad/launch.py
#
#
""" a launch launches threads (or Tasks in this case). """
from botlib.utils import name
from .object import Object
from .task import Task
import threading
import logging
[docs]class Launcher(Object):
cc = "!"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._status = "init"
self._tasks = []
[docs] def waiter(self, thrs, timeout=None):
result = []
for thr in thrs:
if not thr:
continue
logging.debug("# waiting for %s" % str(thr))
try:
res = thr.join(timeout)
except KeyboardInterrupt:
break
result.append(res)
return result
[docs] def launch(self, *args, **kwargs):
self._counter.launched += 1
t = Task()
t._name = kwargs.get("name", name(args[0]))
t.put(*args)
return t
[docs] def kill(self, thrname=""):
thrs = []
for thr in self.running(thrname):
self._counter.killed += len(thrs)
logging.info("! killed %s" % str(thr))
if "exit" in dir(thr):
thr.exit()
if "stop" in dir(thr):
thr.stop()
if "cancel" in dir(thr):
thr.cancel()
thrs.append(thr)
return thrs
[docs] def running(self, name=""):
n = str(name).upper()
for thr in threading.enumerate():
thrname = str(thr).upper()
if thrname.startswith("<_"): continue
if n not in thrname: continue
yield thr
[docs] def start(self, *args, **kwargs):
""" virtual start method. """
pass