generic functions.
checks if the user is fbf.
see if ddir has umode permission and if not set them.
check if resultlist is to be sent to the queues. if so do it!
copy a file with optional sed.
decode a string char by char. strip chars that can’t be decoded.
apply a sedstring to the file.
return filesize of a file.
convert from encoding.
get filename with the highest extension (number).
get nick from bots userhost cache
return a random nick.
get userhost from bots userhost cache
checks if we are on debian.
Bases: builtins.str
strip control characters for jabber transmission.
convert s to a jsonstring.
split output into seperate chunks.
check is string is in list of strings.
apply a sedstring to a string.
return a stripped userhost (everything before the ‘/’).
strip control characters from txt.
convert to ascii.
convert to encoding.
convert to latin1.
touch a file.
return unique elements in a list (as list).
wait for results to arrive in a queue. return list of results.
# fbf/utils/generic.py # # """ generic functions. """
from .exception import handle_exception from .trace import calledfrom, whichmodule from .lazydict import LazyDict from fbf.imports import getjson json = getjson()
from stat import ST_UID, ST_MODE, S_IMODE import time import sys import re import getopt import types import os import os.path import random import queue import logging import io
class istr(str): pass
def fix_format(s): counters = { chr(2): 0, chr(3): 0 } for letter in s: if letter in counters: counters[letter] += 1 for char in counters: if counters[char] % 2: s += char return s
def isdebian(): """ checks if we are on debian. """ return os.path.isfile("/etc/debian_version")
def botuser(): """ checks if the user is fbf. """ try: import getpass return getpass.getuser() except ImportError: return ""
def checkpermissions(ddir, umode): """ see if ddir has umode permission and if not set them. """ try: uid = os.getuid() gid = os.getgid() except AttributeError: return try: stat = os.stat(ddir) except OSError: return if stat[ST_UID] != uid: try: os.chown(ddir, uid, gid) except: pass if S_IMODE(stat[ST_MODE]) != umode: try: os.chmod(ddir, umode) except: handle_exception()
def printline(lineno, txt): try: print(txt.split("\n")[lineno-1]) except: handle_exception()
def jsonstring(s): """ convert s to a jsonstring. """ if type(s) == tuple: s = list(s) return json.dumps(s)
def getwho(bot, who, channel=None): """ get userhost from bots userhost cache """ who = who.lower() try: if bot.type in ["xmpp", "sxmpp", "sleek"]: return stripped(bot.userhosts[who]) else: return bot.userhosts[who] except KeyError: pass def getnick(bot, userhost): """ get nick from bots userhost cache """ userhost = userhost.lower() try: return bot.nicks[userhost] except KeyError: pass
def splittxt(what, l=375): """ split output into seperate chunks. """ txtlist = [] start = 0 end = l length = len(what) for i in range(round(length/end+1)): starttag = what.find("</", end) if starttag != -1: endword = what.find('>', end) + 1 else: endword = what.find(' ', end) if endword == -1: endword = length res = what[start:endword] if res: txtlist.append(res) start = endword end = start + l return txtlist
def getrandomnick(): """ return a random nick. """ return "fbf-" + str(random.randint(0, 100))
def decodeperchar(txt, encoding='utf-8', what=""): """ decode a string char by char. strip chars that can't be decoded. """ res = [] nogo = [] for i in txt: try: res.append(i.decode(encoding)) except UnicodeDecodeError: if i not in nogo: nogo.append(i) if nogo: if what: logging.debug("%s: can't decode %s characters to %s" % (what, nogo, encoding)) else: logging.debug("%s - can't decode %s characters to %s" % (whichmodule(), nogo, encoding)) return "".join(res)
def toenc(what, encoding='utf-8'): """ convert to encoding. """ if not what: what= "" try: return str(what, encoding) except UnicodeDecodeError: logging.debug("%s - can't encode %s to %s" % (whichmodule(2), what, encoding)) raise
def fromenc(txt, encoding='utf-8', what=""): """ convert from encoding. """ if not txt: txt = "" if type(txt) == str: return txt try: return txt.decode(encoding) except UnicodeDecodeError: logging.debug("%s - can't decode %s - decoding per char" % (whichmodule(), encoding)) return decodeperchar(txt, encoding, what)
def toascii(what): """ convert to ascii. """ return what.encode('ascii', 'replace')
def tolatin1(what): """ convert to latin1. """ return what.encode('latin-1', 'replace')
def strippedtxt(what, allowed=[]): """ strip control characters from txt. """ txt = [] for i in what: if ord(i) > 31 or (allowed and i in allowed): txt.append(i) return ''.join(txt)
REcolor = re.compile("\003\d\d(.*?)\003") def matchcolor(match): return match.group(1) def stripcolor(txt): find = REcolor.findall(txt) for c in find: if c: txt = re.sub(REcolor, c, txt, 1) ; logging.info(txt) return txt
def uniqlist(l): """ return unique elements in a list (as list). """ result = [] for i in l: if i not in result: result.append(i) return result
def jabberstrip(text, allowed=[]): """ strip control characters for jabber transmission. """ txt = [] allowed = allowed + ['\n', '\t'] for i in text: if ord(i) > 31 or (allowed and i in allowed): txt.append(i) return ''.join(txt)
def filesize(path): """ return filesize of a file. """ return os.stat(path)[6]
def touch(fname): """ touch a file. """ fd = os.open(fname, os.O_WRONLY | os.O_CREAT) os.close(fd)
def stringinlist(s, l): """ check is string is in list of strings. """ for i in l: if s in i: return True return False
def stripped(userhost): """ return a stripped userhost (everything before the '/'). """ return userhost.split('/')[0]
def gethighest(ddir, ffile): """ get filename with the highest extension (number). """ highest = 0 for i in os.listdir(ddir): if not os.path.isdir(ddir + os.sep + i) and ffile in i: try: seqnr = i.split('.')[-1] except IndexError: continue try: if int(seqnr) > highest: highest = int(seqnr) except ValueError: continue ffile += '.' + str(highest + 1) return ffile
def waitevents(eventlist, millisec=5000): result = [] for e in eventlist: if not e: continue #logging.warn("waitevents - waiting for %s" % e.txt) #e.finished.wait(millisec) res = waitforqueue(e.outqueue, 5000) result.append(res) e.finished.clear() return result
def waitforqueue(queue, timeout=10000, maxitems=None, bot=None): """ wait for results to arrive in a queue. return list of results. """ #if len(queue) > 1: return list(queue) result = [] counter = 0 if not maxitems: maxitems = 100 logging.warn("waiting for queue: %s - %s" % (timeout, maxitems)) try: while not len(queue): if len(queue) > maxitems: break if counter > timeout: break time.sleep(0.001) ; counter += 10 logging.warn("waitforqueue - result is %s items (%s) - %s" % (len(queue), counter, str(queue))) return queue except (AttributeError, TypeError): q = [] while 1: if counter > timeout: break try: q.append(queue.get_nowait()) except queue.Empty: break time.sleep(0.001) ; counter += 10 logging.warn("waitforqueue - result is %s items (%s) - %s" % (len(q), counter, str(q))) return q #time.sleep(0.2)
def checkqueues(self, queues, resultlist): """ check if resultlist is to be sent to the queues. if so do it! """ for queue in queues: for item in resultlist: queue.put_nowait(item) return True return False
def sedstring(input, sedstring): seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') return input.replace(fr,to)
def sedfile(filename, sedstring): result = io.StringIO() f = open(filename, 'r') seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') try: for line in f: l = line.replace(fr,to) result.write(l) finally: f.close() return result
def dosed(filename, sedstring): """ apply a sedstring to the file. """ try: f = open(filename, 'r') except IOError: return tmp = filename + '.tmp' fout = open(tmp, 'w') seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') try: for line in f: if 'googlecode' in line or 'github' in line or 'google.com' in line or '' in line: l = line else: l = line.replace(fr,to) fout.write(l) finally: fout.flush() fout.close() try: os.rename(tmp, filename) except WindowsError: os.remove(filename) os.rename(tmp, filename) def stringsed(instring, sedstring): """ apply a sedstring to a string. """ seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') mekker = instring.replace(fr,to) return mekker def copyfile(filename, filename2, sedstring=None): """ copy a file with optional sed. """ if os.path.isdir(filename): return try: f = open(filename, 'r') except IOError: return ddir = "" for x in filename2.split(os.sep)[:-1]: ddir += os.sep + x if not os.path.isdir(ddir): try: os.mkdir(ddir) except: pass try: fout = open(filename2, 'w') except: return if sedstring: seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') try: for line in f: if sedstring: l = line.replace(fr,to) else: l = line fout.write(l) finally: fout.flush() fout.close()