Source code for cslbot.helpers.workers

# -*- coding: utf-8 -*-
# Copyright (C) 2013-2015 Samuel Damashek, Peter Foley, James Forcier, Srijay Kasturi, Reed Koser, Christopher Reffett, and Fox Wilson
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


import multiprocessing
import re
import signal
import threading
from collections import namedtuple
from datetime import datetime, timedelta

import concurrent.futures

from sqlalchemy import or_

from . import babble, backtrace, control, tokens
from .orm import Babble_last, Log

worker_lock = threading.Lock()
executor_lock = threading.Lock()

Event = namedtuple('Event', ['event', 'run_on_cancel'])


[docs]def pool_init(): """We ignore Ctrl-C in the poll workers, so that we can clean things up properly.""" signal.signal(signal.SIGINT, signal.SIG_IGN)
[docs]class Workers(): def __init__(self, handler): with worker_lock: self.pool = multiprocessing.Pool(initializer=pool_init) self.events = {} with executor_lock: self.executor = concurrent.futures.ThreadPoolExecutor(4) self.handler = handler def send(msg, target=handler.config['core']['ctrlchan']): handler.send(target, handler.config['core']['nick'], msg, 'privmsg') self.defer(60, False, self.update_tokens, handler) self.defer(3600, False, self.handle_pending, handler, send) self.defer(3600, False, self.check_babble, handler, send) self.defer(3600, False, self.check_active, handler, send)
[docs] def start_thread(self, func, *args, **kwargs): with executor_lock: self.executor.submit(func, *args, **kwargs)
[docs] def run_pool(self, func, args): with worker_lock: result = self.pool.apply_async(func, args) return result
[docs] def restart_pool(self): with worker_lock: self.pool.terminate() self.pool.join() self.pool = multiprocessing.Pool(initializer=pool_init)
[docs] def run_action(self, func, args): try: thread = threading.current_thread() thread_id = re.match(r'Thread-\d+', thread.name).group(0) thread.name = '%s running %s' % (thread_id, func.__name__) func(*args) except Exception as ex: ctrlchan = self.handler.config['core']['ctrlchan'] backtrace.handle_traceback(ex, self.handler.connection, ctrlchan, self.handler.config)
[docs] def defer(self, t, run_on_cancel, func, *args): event = threading.Timer(t, self.run_action, kwargs={'func': func, 'args': args}) event.name = '%s deferring %s' % (event.name, func.__name__) event.start() with worker_lock: self.events[event.ident] = Event(event, run_on_cancel) return event.ident
[docs] def cancel(self, eventid): with worker_lock: self.events[eventid].event.cancel() if self.events[eventid].run_on_cancel: self.events[eventid].event.function(**self.events[eventid].event.kwargs) del self.events[eventid]
[docs] def stop_workers(self, clean): """ Stop workers and deferred events """ with executor_lock: self.executor.shutdown(clean) del self.executor with worker_lock: if clean: self.pool.close() else: self.pool.terminate() self.pool.join() del self.pool for x in self.events.values(): x.event.cancel() self.events.clear()
[docs] def handle_pending(self, handler, send): # Re-schedule handle_pending self.defer(3600, False, self.handle_pending, handler, send) admins = ": ".join(handler.admins) with handler.db.session_scope() as session: control.show_pending(session, admins, send, True)
[docs] def update_tokens(self, handler): # Re-schedule update_tokens self.defer(600, False, self.update_tokens, handler) tokens.update_all_tokens(handler.config)
[docs] def check_active(self, handler, send): # Re-schedule check_active self.defer(3600, False, self.check_active, handler, send) if not self.handler.config.getboolean('feature', 'voiceactive'): return # Mark inactive after 24 hours. active_time = datetime.now() - timedelta(hours=24) with handler.db.session_scope() as session: with handler.data_lock: for name in handler.channels.keys(): for nick, voiced in handler.voiced[name].items(): if voiced and session.query(Log).filter(Log.source == nick, Log.time >= active_time, or_(Log.type == 'pubmsg', Log.type == 'action')).count() == 0: handler.rate_limited_send('mode', name, '-v %s' % nick)
[docs] def check_babble(self, handler, send): # Re-schedule check_babble self.defer(3600, False, self.check_babble, handler, send) cmdchar = handler.config['core']['cmdchar'] ctrlchan = handler.config['core']['ctrlchan'] with handler.db.session_scope() as session: # If we don't actually update anything, don't bother checking the last row. if not babble.update_markov(session, handler.config): return last = session.query(Babble_last).first() row = session.query(Log).filter(or_(Log.type == 'pubmsg', Log.type == 'privmsg'), ~Log.msg.startswith(cmdchar), Log.target != ctrlchan).order_by(Log.id.desc()).first() if last is None or row is None: return if abs(last.last - row.id) > 1: raise Exception("Last row in babble cache (%d) does not match last row in log (%d)." % (last.last, row.id))