# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# Copyright (C) 2013-2015 Samuel Damashek, Peter Foley, James Forcier, Srijay Kasturi, Reed Koser, Christopher Reffett, and Fox Wilson
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
import functools
import re
import threading
from . import backtrace, modutils
[docs]class HookData(object):
def __init__(self):
self.known_hooks = {}
self.disabled_hooks = set()
[docs] def scan_for_hooks(self):
"""
Scans for hooks
:rtype: list
:return: A list of modules that failed to reload
"""
self.known_hooks.clear()
self.disabled_hooks = modutils.get_disabled("hooks")
errors = modutils.scan_and_reimport("hooks")
return errors
[docs] def get_known_hooks(self):
return self.known_hooks
[docs] def get_hook_objects(self):
return [self.known_hooks[x] for x in self.known_hooks if x not in self.disabled_hooks]
[docs] def get_enabled_hooks(self):
return [x for x in self.known_hooks if x not in self.disabled_hooks]
[docs] def get_disabled_hooks(self):
return [x for x in self.known_hooks if x in self.disabled_hooks]
[docs] def is_disabled(self, hook):
return hook in self.disabled_hooks
[docs] def disable_hook(self, hook):
"""Adds a hook to the disabled hooks list."""
if hook not in self.known_hooks:
return "%s is not a loaded hook" % hook
if hook not in self.disabled_hooks:
self.disabled_hooks.add(hook)
return "Disabled hook %s" % hook
else:
return "That hook is already disabled!"
[docs] def enable_hook(self, hook):
"""Removes a command from the disabled hooks list."""
if hook == "all":
self.disabled_hooks.clear()
return "Enabled all hooks."
elif hook in self.disabled_hooks:
self.disabled_hooks.remove(hook)
return "Enabled hook %s" % hook
elif hook in self.known_hooks:
return "That hook isn't disabled!"
else:
return "%s is not a loaded hook" % hook
[docs] def register(self, hook):
self.known_hooks[hook.name] = hook
registry = HookData()
[docs]class Hook():
def __init__(self, name, types, args=[]):
self.name = name
self.types = [types] if isinstance(types, str) else types
self.args = args
registry.register(self)
def __call__(self, func):
@functools.wraps(func)
def wrapper(send, msg, msgtype, args):
if msgtype in self.types:
try:
thread = threading.current_thread()
thread_id = re.match(r'Thread-\d+', thread.name)
thread_id = "Unknown" if thread_id is None else thread_id.group(0)
thread.name = "%s running %s" % (thread_id, func.__module__)
with self.handler.db.session_scope() as args['db']:
func(send, msg, args)
except Exception as ex:
backtrace.handle_traceback(ex, self.handler.connection, self.target, self.handler.config, func.__module__)
finally:
thread.name = "%s idle, last ran %s" % (thread_id, func.__module__)
self.exe = wrapper
return wrapper
def __str__(self):
return self.name
def __repr__(self):
return self.name
[docs] def run(self, send, msg, msgtype, handler, target, args):
if registry.is_disabled(self.name):
return
self.handler = handler
self.target = target
handler.workers.start_thread(self.exe, send, msg, msgtype, args)