# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# Copyright (C) 2013-2015 Samuel Damashek, Peter Foley, James Forcier, Srijay Kasturi, Reed Koser, Christopher Reffett, and Fox Wilson
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
import functools
import re
import threading
from datetime import datetime, timedelta
from inspect import getdoc
from . import backtrace, modutils
from .orm import Commands, Log
[docs]class CommandData(object):
def __init__(self):
self.known_commands = {}
self.disabled_commands = set()
[docs] def scan_for_commands(self):
"""
Scans for commands
:rtype: list
:return: A list of modules that failed to reload
"""
self.known_commands.clear()
self.disabled_commands = modutils.get_disabled("commands")
errors = modutils.scan_and_reimport("commands")
return errors
[docs] def get_known_commands(self):
return self.known_commands
[docs] def get_enabled_commands(self):
return [x for x in self.known_commands if x not in self.disabled_commands]
[docs] def get_disabled_commands(self):
return [x for x in self.known_commands if x in self.disabled_commands]
[docs] def is_disabled(self, command):
return command in self.disabled_commands
[docs] def is_registered(self, command_name):
return command_name in self.known_commands
[docs] def get_command(self, command_name):
return self.known_commands[command_name]
[docs] def disable_command(self, command):
"""Adds a command to the disabled commands list."""
if command not in self.known_commands:
return "%s is not a loaded command" % command
if command not in self.disabled_commands:
self.disabled_commands.add(command)
return "Disabled command %s" % command
else:
return "That command is already disabled!"
[docs] def enable_command(self, command):
"""Removes a command from the disabled commands list."""
if command == "all":
self.disabled_commands.clear()
return "Enabled all commands."
elif command in self.disabled_commands:
self.disabled_commands.remove(command)
return "Enabled command %s" % command
elif command in self.known_commands:
return "That command isn't disabled!"
else:
return "%s is not a loaded command" % command
[docs] def register(self, name, command):
self.known_commands[name] = command
registry = CommandData()
[docs]def record_command(cursor, nick, command, channel):
record = Commands(nick=nick, command=command, channel=channel)
cursor.add(record)
[docs]def check_command(cursor, nick, msg, target):
# only care about the last 10 seconds.
limit = datetime.now() - timedelta(seconds=10)
# the last one is the command we're currently executing, so get the penultimate one.
last = cursor.query(Log).filter(Log.target == target, Log.type == 'pubmsg', Log.time >= limit).order_by(Log.time.desc()).offset(1).first()
if last:
return last.msg == msg and last.source != nick
else:
return False
[docs]class Command():
def __init__(self, names, args=[], limit=0, admin=False):
self.names = [names] if isinstance(names, str) else names
self.args = args
self.limit = limit
self.admin = admin
for t in self.names:
if registry.is_registered(t):
raise ValueError("There is already a command registered with the name %s" % t)
registry.register(t, self)
def __call__(self, func):
@functools.wraps(func)
def wrapper(send, msg, args):
try:
thread = threading.current_thread()
thread_id = re.match(r'Thread-\d+', thread.name)
thread_id = "Unknown" if thread_id is None else thread_id.group(0)
thread.name = "%s running command.%s" % (thread_id, self.names[0])
with self.handler.db.session_scope() as args['db']:
func(send, msg, args)
except Exception as ex:
backtrace.handle_traceback(ex, self.handler.connection, self.target, self.handler.config, "commands.%s" % self.names[0])
finally:
thread.name = "%s idle, last ran command.%s" % (thread_id, self.names[0])
self.doc = getdoc(func)
if self.doc is None or len(self.doc) < 5:
print("Warning:", self.names[0], "has no or very little documentation")
self.exe = wrapper
return wrapper
[docs] def run(self, send, msg, args, command, nick, target, handler):
if [x for x in self.names if registry.is_disabled(x)]:
send("Sorry, that command is disabled.")
else:
self.target = target
self.handler = handler
with handler.db.session_scope() as session:
record_command(session, nick, command, target)
handler.workers.start_thread(self.exe, send, msg, args)
[docs] def get_doc(self):
return self.doc
[docs] def is_limited(self):
return self.limit != 0
[docs] def requires_admin(self):
return self.admin