# -*- coding: utf-8 -*-
# Copyright (C) 2013-2015 Samuel Damashek, Peter Foley, James Forcier, Srijay Kasturi, Reed Koser, Christopher Reffett, and Fox Wilson
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
import logging
import os
import re
import subprocess
from datetime import datetime, timedelta
from os.path import exists, join
from random import choice, random
import pkg_resources
from . import orm
[docs]def get_users(args):
with args['handler'].data_lock:
users = list(args['handler'].channels[args['target']].users()) if args['target'] != 'private' else ['you']
return users
[docs]def parse_time(time):
time, unit = time[:-1], time[-1].lower()
if time.isdigit():
time = int(time)
return None
conv = {'s': 1, 'm': 60, 'h': timedelta(hours=1).total_seconds(), 'd': timedelta(days=1).total_seconds(),
'w': timedelta(weeks=1).total_seconds(), 'y': timedelta(weeks=52).total_seconds()}
if unit in conv.keys():
return time * conv[unit]
return None if unit else time
[docs]def do_pull(srcdir=None, repo=None):
if repo is None:
return subprocess.check_output(['git', 'pull'], cwd=srcdir, stderr=subprocess.STDOUT).decode().splitlines()[-1]
return subprocess.check_output(['pip', 'install', '--no-deps', '-U', 'git+git://github.com/%s' % repo],
stderr=subprocess.STDOUT, env=os.environ.copy()).decode().splitlines()[-1]
except subprocess.CalledProcessError as e:
for line in e.output.decode().splitlines():
raise e
[docs]def do_nuke(c, nick, target, channel):
c.privmsg(channel, "Please Stand By, Nuking " + target)
c.privmsg_many([nick, target], " ____________________ ")
c.privmsg_many([nick, target], " :-' , '; ., ) '-: ")
c.privmsg_many([nick, target], " / ( / / \\ ")
c.privmsg_many([nick, target], " / ;' \\ , . / ) \\ ")
c.privmsg_many([nick, target], " ( ( . ., ; ; ' ; ) ")
c.privmsg_many([nick, target], " \\ ,---:----------:---, / ")
c.privmsg_many([nick, target], " '--' \\ \\ / / '--' ")
c.privmsg_many([nick, target], " \\ \\ / / ")
c.privmsg_many([nick, target], " \\ / ")
c.privmsg_many([nick, target], " | . | ")
c.privmsg_many([nick, target], " |, '; | ")
c.privmsg_many([nick, target], " | ,. | ")
c.privmsg_many([nick, target], " | ., ;| ")
c.privmsg_many([nick, target], " |:; ; | ")
c.privmsg_many([nick, target], " ________/;';,.',\\ ________ ")
c.privmsg_many([nick, target], " ( ;' . ;';,.;', ; '; ; ) ")
[docs]def ping(ping_map, c, e, pongtime):
if e.arguments[1] == 'No such nick/channel':
nick = e.arguments[0]
if nick not in ping_map:
target = ping_map.pop(nick)
c.privmsg(target, "%s: %s" % (e.arguments[1], e.arguments[0]))
nick = e.source.split('!')[0]
response = e.arguments[1].replace(' ', '.')
pingtime = float(response)
delta = pongtime - datetime.fromtimestamp(pingtime)
elapsed = "%s.%s seconds" % (delta.seconds, delta.microseconds)
except (ValueError, OverflowError):
elapsed = response
target = ping_map.pop(nick) if nick in ping_map else nick
c.privmsg(target, "CTCP reply from %s: %s" % (nick, elapsed))
[docs]def get_channels(chanlist, nick):
channels = []
for name, channel in chanlist.items():
if nick in channel.users():
return channels
[docs]def get_cmdchar(config, connection, msg, msgtype):
cmdchar = config['core']['cmdchar']
botnick = '%s: ' % connection.real_nickname
if msg.startswith(botnick):
msg = msg.replace(botnick, cmdchar, 1)
altchars = [x.strip() for x in config['core']['altcmdchars'].split(',')]
if altchars and altchars[0] != '':
for i in altchars:
if msg.startswith(i):
msg = msg.replace(i, cmdchar, 1)
# Don't require cmdchar in PMs.
if msgtype == 'privmsg' and not msg.startswith(cmdchar):
msg = cmdchar + msg
return msg
[docs]def list_fortunes(offensive=False):
cmd = ['fortune', '-f']
if offensive:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode()
output = re.sub(r'[0-9]{1,2}\.[0-9]{2}%', '', output)
fortunes = [x.strip() for x in output.splitlines()[1:]]
if offensive:
fortunes = ['off/%s' % x for x in fortunes]
return sorted(fortunes)
[docs]def get_fortune(msg, name='fortune'):
fortunes = list_fortunes() + list_fortunes(True)
cmd = ['fortune', '-s']
match = re.match('(-[ao])( .+|$)', msg)
if match:
msg = match.group(2).strip()
if 'bofh' in name or 'excuse' in name:
if random() < 0.05:
return "BOFH Excuse #1337:\nYou don't exist, go away!"
elif msg in fortunes:
elif msg:
return "%s is not a valid fortune module" % msg
return subprocess.check_output(cmd).decode()
[docs]def ignore(session, nick):
row = session.query(orm.Ignore).filter(orm.Ignore.nick == nick).first()
if row is None:
# FIXME: support expiration times for ignores
session.add(orm.Ignore(nick=nick, expire=datetime.min))
return "Now ignoring %s" % nick
return "%s is already ignored." % nick
[docs]def get_version(srcdir):
gitdir = join(srcdir, ".git")
if not exists(gitdir):
return None, pkg_resources.get_distribution('CslBot').version
commit = subprocess.check_output(['git', '--git-dir=%s' % gitdir, 'rev-parse', 'HEAD']).decode().splitlines()[0]
version = subprocess.check_output(['git', '--git-dir=%s' % gitdir, 'describe', '--tags']).decode().splitlines()[0]
return commit, version
except subprocess.CalledProcessError:
return None, None
[docs]def split_msg(msgs, max_len):
"""Splits as close to the end as possible."""
msg = ""
while len(msg.encode()) < max_len:
if len(msg.encode()) + len(msgs[0]) > max_len:
return msg, msgs
char = msgs.pop(0).decode()
# If we have a space within 15 chars of the length limit, split there to avoid words being broken up.
if char == ' ' and len(msg.encode()) > max_len - 15:
return msg, msgs
msg += char
return msg, msgs
[docs]def truncate_msg(msg, max_len):
if len(msg.encode()) > max_len:
msg = [x.encode() for x in msg]
msg, _ = split_msg(msg, max_len - 3)
return msg + "..."
return msg