.. _fbf.utils.generic: generic ~~~~~~~ .. automodule:: fbf.utils.generic :show-inheritance: :members: :undoc-members: CODE ---- :: # fbf/utils/generic.py # # """ generic functions. """ .. _fbf.utils.generic_lib_imports: lib imports -------------- :: from .exception import handle_exception from .trace import calledfrom, whichmodule from .lazydict import LazyDict from fbf.imports import getjson json = getjson() .. _fbf.utils.generic_generic_imports: generic imports ------------------ :: from stat import ST_UID, ST_MODE, S_IMODE import time import sys import re import getopt import types import os import os.path import random import queue import logging import io .. _fbf.utils.generic_istr_class: istr class ------------- :: class istr(str): pass .. _fbf.utils.generic_fix_format_function: fix_format function ---------------------- :: def fix_format(s): counters = { chr(2): 0, chr(3): 0 } for letter in s: if letter in counters: counters[letter] += 1 for char in counters: if counters[char] % 2: s += char return s .. _fbf.utils.generic_isdebian_function: isdebian function -------------------- :: def isdebian(): """ checks if we are on debian. """ return os.path.isfile("/etc/debian_version") .. _fbf.utils.generic_isfbfuser_function: isfbfuser function --------------------- :: def botuser(): """ checks if the user is fbf. """ try: import getpass return getpass.getuser() except ImportError: return "" .. _fbf.utils.generic_checkpermission_function: checkpermission function --------------------------- :: def checkpermissions(ddir, umode): """ see if ddir has umode permission and if not set them. """ try: uid = os.getuid() gid = os.getgid() except AttributeError: return try: stat = os.stat(ddir) except OSError: return if stat[ST_UID] != uid: try: os.chown(ddir, uid, gid) except: pass if S_IMODE(stat[ST_MODE]) != umode: try: os.chmod(ddir, umode) except: handle_exception() .. _fbf.utils.generic_printline_function: printline function --------------------- :: def printline(lineno, txt): try: print(txt.split("\n")[lineno-1]) except: handle_exception() .. _fbf.utils.generic_jsonstring_function: jsonstring function ---------------------- :: def jsonstring(s): """ convert s to a jsonstring. """ if type(s) == tuple: s = list(s) return json.dumps(s) .. _fbf.utils.generic_getwho_function: getwho function ------------------ :: def getwho(bot, who, channel=None): """ get userhost from bots userhost cache """ who = who.lower() try: if bot.type in ["xmpp", "sxmpp", "sleek"]: return stripped(bot.userhosts[who]) else: return bot.userhosts[who] except KeyError: pass def getnick(bot, userhost): """ get nick from bots userhost cache """ userhost = userhost.lower() try: return bot.nicks[userhost] except KeyError: pass .. _fbf.utils.generic_splitxt_function: splitxt function ------------------- :: def splittxt(what, l=375): """ split output into seperate chunks. """ txtlist = [] start = 0 end = l length = len(what) for i in range(round(length/end+1)): starttag = what.find("', end) + 1 else: endword = what.find(' ', end) if endword == -1: endword = length res = what[start:endword] if res: txtlist.append(res) start = endword end = start + l return txtlist .. _fbf.utils.generic_getrandomnick_function: getrandomnick function ------------------------- :: def getrandomnick(): """ return a random nick. """ return "fbf-" + str(random.randint(0, 100)) .. _fbf.utils.generic_decodeperchar_function: decodeperchar function ------------------------- :: def decodeperchar(txt, encoding='utf-8', what=""): """ decode a string char by char. strip chars that can't be decoded. """ res = [] nogo = [] for i in txt: try: res.append(i.decode(encoding)) except UnicodeDecodeError: if i not in nogo: nogo.append(i) if nogo: if what: logging.debug("%s: can't decode %s characters to %s" % (what, nogo, encoding)) else: logging.debug("%s - can't decode %s characters to %s" % (whichmodule(), nogo, encoding)) return "".join(res) .. _fbf.utils.generic_toenc_function: toenc function ----------------- :: def toenc(what, encoding='utf-8'): """ convert to encoding. """ if not what: what= "" try: return str(what, encoding) except UnicodeDecodeError: logging.debug("%s - can't encode %s to %s" % (whichmodule(2), what, encoding)) raise .. _fbf.utils.generic_fromenc_function: fromenc function ------------------- :: def fromenc(txt, encoding='utf-8', what=""): """ convert from encoding. """ if not txt: txt = "" if type(txt) == str: return txt try: return txt.decode(encoding) except UnicodeDecodeError: logging.debug("%s - can't decode %s - decoding per char" % (whichmodule(), encoding)) return decodeperchar(txt, encoding, what) .. _fbf.utils.generic_toascii_function: toascii function ------------------- :: def toascii(what): """ convert to ascii. """ return what.encode('ascii', 'replace') .. _fbf.utils.generic_tolatin1_function: tolatin1 function -------------------- :: def tolatin1(what): """ convert to latin1. """ return what.encode('latin-1', 'replace') .. _fbf.utils.generic_strippedtxt_function: strippedtxt function ----------------------- :: def strippedtxt(what, allowed=[]): """ strip control characters from txt. """ txt = [] for i in what: if ord(i) > 31 or (allowed and i in allowed): txt.append(i) return ''.join(txt) .. _fbf.utils.generic_stripcolor_function: stripcolor function ---------------------- :: REcolor = re.compile("\003\d\d(.*?)\003") def matchcolor(match): return match.group(1) def stripcolor(txt): find = REcolor.findall(txt) for c in find: if c: txt = re.sub(REcolor, c, txt, 1) ; logging.info(txt) return txt .. _fbf.utils.generic_uniqlist_function: uniqlist function -------------------- :: def uniqlist(l): """ return unique elements in a list (as list). """ result = [] for i in l: if i not in result: result.append(i) return result .. _fbf.utils.generic_jabberstrip_function: jabberstrip function ----------------------- :: def jabberstrip(text, allowed=[]): """ strip control characters for jabber transmission. """ txt = [] allowed = allowed + ['\n', '\t'] for i in text: if ord(i) > 31 or (allowed and i in allowed): txt.append(i) return ''.join(txt) .. _fbf.utils.generic_filesize_function: filesize function -------------------- :: def filesize(path): """ return filesize of a file. """ return os.stat(path)[6] .. _fbf.utils.generic_touch_function: touch function ----------------- :: def touch(fname): """ touch a file. """ fd = os.open(fname, os.O_WRONLY | os.O_CREAT) os.close(fd) .. _fbf.utils.generic_stringinlist_function: stringinlist function ------------------------ :: def stringinlist(s, l): """ check is string is in list of strings. """ for i in l: if s in i: return True return False .. _fbf.utils.generic_stripped_function: stripped function -------------------- :: def stripped(userhost): """ return a stripped userhost (everything before the '/'). """ return userhost.split('/')[0] .. _fbf.utils.generic_gethighest_function: gethighest function ---------------------- :: def gethighest(ddir, ffile): """ get filename with the highest extension (number). """ highest = 0 for i in os.listdir(ddir): if not os.path.isdir(ddir + os.sep + i) and ffile in i: try: seqnr = i.split('.')[-1] except IndexError: continue try: if int(seqnr) > highest: highest = int(seqnr) except ValueError: continue ffile += '.' + str(highest + 1) return ffile .. _fbf.utils.generic_waitevents_function: waitevents function ---------------------- :: def waitevents(eventlist, millisec=5000): result = [] for e in eventlist: if not e: continue #logging.warn("waitevents - waiting for %s" % e.txt) #e.finished.wait(millisec) res = waitforqueue(e.outqueue, 5000) result.append(res) e.finished.clear() return result .. _fbf.utils.generic_waitforqueue_function: waitforqueue function ------------------------ :: def waitforqueue(queue, timeout=10000, maxitems=None, bot=None): """ wait for results to arrive in a queue. return list of results. """ #if len(queue) > 1: return list(queue) result = [] counter = 0 if not maxitems: maxitems = 100 logging.warn("waiting for queue: %s - %s" % (timeout, maxitems)) try: while not len(queue): if len(queue) > maxitems: break if counter > timeout: break time.sleep(0.001) ; counter += 10 logging.warn("waitforqueue - result is %s items (%s) - %s" % (len(queue), counter, str(queue))) return queue except (AttributeError, TypeError): q = [] while 1: if counter > timeout: break try: q.append(queue.get_nowait()) except queue.Empty: break time.sleep(0.001) ; counter += 10 logging.warn("waitforqueue - result is %s items (%s) - %s" % (len(q), counter, str(q))) return q #time.sleep(0.2) .. _fbf.utils.generic_checkqueues_function: checkqueues function ----------------------- :: def checkqueues(self, queues, resultlist): """ check if resultlist is to be sent to the queues. if so do it! """ for queue in queues: for item in resultlist: queue.put_nowait(item) return True return False .. _fbf.utils.generic_sedstring_function: sedstring function --------------------- :: def sedstring(input, sedstring): seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') return input.replace(fr,to) .. _fbf.utils.generic_sedfile_function: sedfile function ------------------- :: def sedfile(filename, sedstring): result = io.StringIO() f = open(filename, 'r') seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') try: for line in f: l = line.replace(fr,to) result.write(l) finally: f.close() return result .. _fbf.utils.generic_dosed_function: dosed function ----------------- :: def dosed(filename, sedstring): """ apply a sedstring to the file. """ try: f = open(filename, 'r') except IOError: return tmp = filename + '.tmp' fout = open(tmp, 'w') seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') try: for line in f: if 'googlecode' in line or 'github' in line or 'google.com' in line or '' in line: l = line else: l = line.replace(fr,to) fout.write(l) finally: fout.flush() fout.close() try: os.rename(tmp, filename) except WindowsError: os.remove(filename) os.rename(tmp, filename) def stringsed(instring, sedstring): """ apply a sedstring to a string. """ seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') mekker = instring.replace(fr,to) return mekker def copyfile(filename, filename2, sedstring=None): """ copy a file with optional sed. """ if os.path.isdir(filename): return try: f = open(filename, 'r') except IOError: return ddir = "" for x in filename2.split(os.sep)[:-1]: ddir += os.sep + x if not os.path.isdir(ddir): try: os.mkdir(ddir) except: pass try: fout = open(filename2, 'w') except: return if sedstring: seds = sedstring.split('/') fr = seds[1].replace('\\', '') to = seds[2].replace('\\', '') try: for line in f: if sedstring: l = line.replace(fr,to) else: l = line fout.write(l) finally: fout.flush() fout.close()