Source code for bytestag.events

'''Event handling'''
# This file is part of Bytestag.
# Copyright © 2012 Christopher Foo <chris.foo@gmail.com>.
# Licensed under GNU GPLv3. See COPYING.txt for details.
from concurrent.futures.thread import ThreadPoolExecutor
from queue import Queue
from threading import Lock
from weakref import WeakValueDictionary
import functools
import heapq
import inspect
import itertools
import logging
import queue
import threading
import time

__docformat__ = 'restructuredtext en'
_logger = logging.getLogger(__name__)


[docs]class EventID(object): '''An event ID. This class' comparison equality depends on the arguments given. ''' def __init__(self, arg, *args): self._args = tuple(itertools.chain((arg,), args)) for arg in self._args: hash(arg) @property
[docs] def args(self): return self._args
def __str__(self): return '<EventID {}>'.format(self._args) def __hash__(self): if len(self._args) == 1: return hash(self._args[0]) return functools.reduce(lambda x, y: hash(x) ^ hash(y), self._args) def __eq__(self, other): return self._args == other def __ne__(self, other): return self._args != other
[docs]class EventReactor(object): '''A reactor that demultiplexs events from other threads'''
[docs] class STOP_ID(object): '''The identifier that stops all event reactors''' pass
def __init__(self, max_queue_size=100): self._queue = Queue(max_queue_size) self._callback_table = {} self._callback_table_lock = Lock() self._max_queue_size = max_queue_size @property
[docs] def queue_size(self): '''The current size of the queue.''' return self._queue.qsize()
@property
[docs] def max_queue_size(self): '''The maximum size of the queue.''' return self._max_queue_size
[docs] def put(self, event_id, *event_data): '''Add an event to be dispatched :Parameters: event_id Any value that can be used as an index event_data Data to be passed to the callback function ''' cur_queue_size = self._queue.qsize() _logger.debug('Event put %s queue_size=%d', event_id, cur_queue_size) if cur_queue_size > self._max_queue_size * 0.90: _logger.warning('Event queue is approaching limits: ' 'current=%d, max=%d', cur_queue_size, self._max_queue_size) try: self._queue.put((event_id, event_data), block=False) except queue.Full as e: _logger.exception('Event queue full') raise e
[docs] def register_handler(self, event_id, handler_callback): '''Add a callback function to handle events :Parameters: event_id Any value that can be used as an index handler_callback A callable object such as a function or an instance with the ``__call__`` member ''' with self._callback_table_lock: if event_id not in self._callback_table: self._callback_table[event_id] = [] self._callback_table[event_id].append(handler_callback) _logger.debug('Registered handler %s=%s', event_id, handler_callback)
[docs] def start(self): '''Start the event reactor''' self._run()
def _run(self): '''Run the main loop''' _logger.debug('Event reactor started') while True: event_id, event_data = self._queue.get() if event_id in self._callback_table: for handler_callback in self._callback_table[event_id]: try: _logger.debug('Call handler=%s event_id=%s', handler_callback, event_id) handler_callback(event_id, *event_data) _logger.debug('Call finished handler=%s event_id=%s', handler_callback, event_id) except Exception as e: try: _logger.exception( 'Handler exception at callback %s', inspect.getsourcelines(handler_callback)) except IOError: pass raise e if event_id == EventReactor.STOP_ID: break _logger.debug('Event reactor finished')
[docs]class EventReactorMixin(object): '''A mix in to provide an ``event_reactor`` property''' def __init__(self, event_reactor): self._event_reactor = event_reactor @property
[docs] def event_reactor(self): '''Return the event reactor :rtype: :class:`EventReactor` ''' return self._event_reactor
@functools.total_ordering
[docs]class EventSchedulerEntry(object): '''An event scheduler entry.''' def __init__(self, abs_time, event_id, *event_data, periodic_interval=None): self.abs_time = abs_time self.event_id = event_id self.event_data = event_data self.periodic_interval = periodic_interval def __str__(self): return '<EventSchedulerEntry t={} event={} perodic={}>'.format( self.abs_time, self.event_id, self.periodic_interval) def __lt__(self, other): return self.abs_time < other.abs_time def __eq__(self, other): return self.abs_time == other.abs_time
[docs]class EventScheduler(threading.Thread, EventReactorMixin): '''Schedules events to be added to event reactors''' def __init__(self, event_reactor): threading.Thread.__init__(self) EventReactorMixin.__init__(self, event_reactor) self.name = 'Scheduler' self.daemon = True self._lock = threading.Lock() self._heap = [] self._event = threading.Event() self._reschedule_id = EventID(self, 'EventReschedule') self.event_reactor.register_handler(self._reschedule_id, self._resched_callback) self.start() def _resched_callback(self, event_id, seconds, target_event_id, event_data): self.add_periodic(seconds, target_event_id, *event_data)
[docs] def add_absolute(self, time, event_id, *event_data): '''Add an event to be scheduled at given time :Parameters: time: ``int``, ``float`` The timestamp in the future event_id The indexable value to be used as an event ID event_data Any extra data to be passed ''' _logger.debug('Add absolute %s %s', time, event_id) with self._lock: entry = EventSchedulerEntry(time, event_id, *event_data) heapq.heappush(self._heap, entry) self._event.set()
[docs] def add_periodic(self, seconds, event_id, *event_data): '''Add an event to be scheduled periodically. :Parameters: seconds: ``int``, ``float`` The interval in seconds event_id The indexable value to be used as an event ID event_data Any extra data to be passed ''' _logger.debug('Add periodic %s %s', seconds, event_id) with self._lock: entry = EventSchedulerEntry(time.time() + seconds, event_id, *event_data, periodic_interval=seconds) heapq.heappush(self._heap, entry) self._event.set()
[docs] def add_one_shot(self, seconds, event_id, *event_data): '''Add an event to be scheduled once. :Parameters: seconds: ``int``, ``float`` The interval in seconds event_id The indexable value to be used as an event ID event_data Any extra data to be passed ''' _logger.debug('Add one shot %s %s', seconds, event_id) with self._lock: entry = EventSchedulerEntry(time.time() + seconds, event_id, *event_data) heapq.heappush(self._heap, entry) self._event.set()
[docs] def run(self): wait_time = None wait_time_skew = 0.1 while True: _logger.debug('Sched wait') self._event.wait(timeout=wait_time) self._event.clear() _logger.debug('Sched run') entry = None with self._lock: if self._heap and self._heap[0].abs_time <= time.time(): entry = heapq.heappop(self._heap) if self._heap: wait_time = self._heap[0].abs_time - time.time() wait_time += wait_time_skew else: wait_time = None if entry: # if self._attempt_put(entry): # wait_time = 1 self.event_reactor.put(entry.event_id, *entry.event_data) if entry.periodic_interval: self.event_reactor.put(self._reschedule_id, entry.periodic_interval, entry.event_id, entry.event_data) _logger.debug('Sched fin')
[docs]class Observer(object): '''A callback manager. Example usage:: >>> def my_function(some_arg): ... print(some_arg) >>> observer = Observer() >>> observer.register(my_function) >>> observer('Observer activated!') 'Observer activated!' ''' def __init__(self, callback_fn=None, one_shot=False): ''' :param callback_fn: An optional callback function. :param one_shot: If `True`, after the first activation, subsequent registers will automatically be called. Otherwise, the observer can activate unlimited times. ''' self._one_shot = one_shot self._fired = False self._one_shot_args = None if callback_fn: self._fns = [callback_fn] else: self._fns = [] def __call__(self, *args, **kwargs): '''Execute all registered callback functions''' self._fired = True if self._one_shot: self._one_shot_args = (args, kwargs) for fn in self._fns: fn(*args, **kwargs)
[docs] def register(self, callback_fn): '''Register a callback function''' self._fns.append(callback_fn) if self._one_shot and self._fired: callback_fn(*self._one_shot_args[0], **self._one_shot_args[1])
[docs]class Task(object): '''An enhanced future. Pass an instance of this task to :class:`concurrent.futures.Executor`. Instead of using the future provided by the executor, use this instance. ''' def __init__(self, *args, **kwargs): self._progress = None self._result = None self._observer = Observer(one_shot=True) self._is_running = False self._is_finished = False self._event = threading.Event() self._result = None self._hooked_tasks = [] self._parent_tasks = [] self._args = args self._kwargs = kwargs @property def progress(self): '''Return the progress made so far''' return self._progress @progress.setter
[docs] def progress(self, o): self._progress = o for parent_task in self._parent_tasks: parent_task.progress = o
@property
[docs] def is_running(self): '''Return whether the task is running :rtype: ``bool`` ''' return self._is_running
@property
[docs] def is_finished(self): return self._is_finished
@property
[docs] def observer(self): '''Return the observer The observer will callback when the task is finished. :rtype: `Observer` ''' return self._observer
@property def result_(self): '''Return the result. Result may be ``None`` if the task is not finished. :see: :func:`result` ''' return self._result @result_.setter
[docs] def result_(self, o): self._result = o
[docs] def hook_task(self, task): '''Hook another task into this task. This function should be called within the task so that `stop` will be propagated to the given task. Once the task finishes, it is automatically unhooked. As well, the task will update the progress. ''' self._hooked_tasks.append(task) task._hooked(self)
def _hooked(self, parent_task): '''A task has hooked us''' self._parent_tasks.append(parent_task) def unhook(*args): self._parent_tasks.remove(parent_task) parent_task._unhook_task(self) self.observer.register(unhook) def _unhook_task(self, task): '''Unhook a task''' self._hooked_tasks.remove(task)
[docs] def stop(self): '''Request the task to stop''' self._is_running = False for task in self._hooked_tasks: task.stop()
def _run_all(self): self._is_running = True try: self._result = self.run(*self._args, **self._kwargs) except Exception as e: _logger.exception('Error during task') raise e finally: self._is_finished = True self._event.set() self._observer(self._result) self._is_running = False return self._result
[docs] def result(self, timeout=None): '''Wait and return the result''' self._event.wait(timeout) return self._result
[docs] def run(self, *args, **kwargs): '''The task's main body. Implementors should override this function. This function should periodically check `is_running` and update `progress`. The function should return a value which is the result. ''' raise NotImplementedError()
def __call__(self): return self._run_all()
[docs]class FnTaskSlot(threading.Thread): '''Limit task execution''' def __init__(self, max_size=3): threading.Thread.__init__(self) self.daemon = True self.name = FnTaskSlot.__name__ self._queue = queue.Queue(max_size) self._current_tasks = set() self._running = False self._max_size = max_size self._observer = Observer() self.start()
[docs] def add(self, fn, *args, **kwargs): '''Executes function with given arguments. This function blocks until the slot is not full. :rtype: :class:`Task` :returns: The Task that given ``fn`` returns. ''' event = threading.Event() self._queue.put((event, fn, args, kwargs)) event.wait() return event.task
[docs] def add_no_block(self, fn, *args, **kwargs): self._queue.put_nowait((fn, args, kwargs))
@property
[docs] def queue(self): return self._queue
@property
[docs] def current_tasks(self): return self._current_tasks
@property
[docs] def observer(self): '''An observer that fires when a task is added or removed. The observer callback arguments are: 1. :obj:`bool` - If `True`, then the task is added. Otherwise, the task was removed. 2. :class:`Task` - The task added or removed. ''' return self._observer
[docs] def run(self): self._running = True while self._running: if len(self._current_tasks) < self._max_size: if self._current_tasks: timeout = 1 else: timeout = None try: event, fn, args, kwargs = self._queue.get(timeout=timeout) except queue.Empty: pass else: _logger.debug('Fn task slot execute') task = fn(*args, **kwargs) event.task = task event.set() self._current_tasks.add(task) self._observer(True, task) for task in frozenset(self._current_tasks): task.result(timeout=1) if task.is_finished: self._current_tasks.remove(task) self._observer(False, task)
[docs] def stop(self): self._running = False for task in self._current_tasks: task.stop()
[docs]class WrappedThreadPoolExecutor(ThreadPoolExecutor, EventReactorMixin): '''Wraps a :class:`.ThreadPoolExecutor` that listens to a stop event''' def __init__(self, max_workers, event_reactor): ThreadPoolExecutor.__init__(self, max_workers) EventReactorMixin.__init__(self, event_reactor) event_reactor.register_handler(EventReactor.STOP_ID, self._stop_cb) self._task_map = WeakValueDictionary() def _stop_cb(self, event_id): _logger.debug('WrappedThreadPoolExecutor stopping everything') for key in self._task_map.keys(): self._task_map[key].stop() self.shutdown(wait=False)
[docs] def submit(self, fn, *args, **kwargs): if isinstance(fn, Task): self._task_map[id(fn)] = fn return ThreadPoolExecutor.submit(self, fn, *args, **kwargs)
[docs]def asynchronous(daemon=True, name=None): '''Wrap a function to run in a separate thread''' def decorator(func): def wrapper(*args, **kwargs): @functools.wraps(func) def logged_func(*args, **kwargs): _logger.debug('Start async call %s', threading.current_thread()) try: func(*args, **kwargs) except Exception as e: _logger.exception('Error in async call') raise e _logger.debug('Stop async call %s', threading.current_thread()) thread = threading.Thread(target=logged_func, args=args, kwargs=kwargs) thread.daemon = daemon thread.name = name thread.start() return thread return wrapper return decorator