Source code for botlib.task
# mad/task.py
#
#
""" adapted thread to add extra functionality to threads. """
from .utils import name, sname
from .event import Event
from .object import Default, Object
from .trace import get_exception
import threading
import logging
import queue
import time
[docs]class Task(threading.Thread):
""" Task are adapted Threads. """
_counter = Default(default=0)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setDaemon(kwargs.get("daemon", False))
self._error = Default(default="")
self._counter = Default(default=0)
self._queue = queue.Queue()
self._ready = threading.Event()
self._result = Object()
self._running = False
self._state = Default()
self._time = Default(default=0)
self.args = args
self.kwargs = kwargs
self.once = False
if self.args:
self._event = args[0]
self._name = self._event.txt
else:
self._event = None
self._name = kwargs.get("name", self._name)
if self._name:
self.setName(self._name)
def __iter__(self):
""" return self as an iterator. """
return self
def __next__(self):
""" yield next value. """
for k in dir(self):
yield k
[docs] def put(self, func, *args):
""" put a new function to execute. """
if not self._running:
self._running = True
self.start()
self._queue.put_nowait((func, args))
return self
[docs] def run(self):
""" run the threads botloop. """
self._time.start = time.time()
_func, args = self._queue.get()
self._result= Object()
self._state.status = "running"
if self.args:
self._event = args[0]
self._state.origin = self._event.origin
logging.warn(self._event)
self._time.latest = time.time()
try:
self._result = _func(*args)
except:
logging.error(get_exception())
try:
args[0].ready()
except:
pass
self._state.status = "stopped"
return self._result
[docs] def stop(self):
""" stop the task. """
self._state.status = ""
self.put(None, None)
[docs] def join(self, *args, **kwargs):
""" join the thread and signal ready. """
super().join()
#self.ready()
return self._result
[docs] def isSet(self):
""" see if the object ready flag is set. """
return self._ready.isSet()
[docs] def ready(self):
""" signal the event as being ready. """
self._ready.set()
[docs] def clear(self):
""" clear the ready flag. """
self._ready.clear()
[docs] def wait(self, sec=180.0):
""" wait for the task to be ready. """
self._ready.wait()