sMAP 2.0 documentation

smap.util

Contents

Source code for smap.util

"""
Copyright (c) 2011, 2012, Regents of the University of California
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions 
are met:

 - Redistributions of source code must retain the above copyright
   notice, this list of conditions and the following disclaimer.
 - Redistributions in binary form must reproduce the above copyright
   notice, this list of conditions and the following disclaimer in the
   documentation and/or other materials provided with the
   distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED 
OF THE POSSIBILITY OF SUCH DAMAGE.
"""
"""
@author Stephen Dawson-Haggerty <stevedh@eecs.berkeley.edu>
"""

import os
import time
import re
import uuid
import errno
import cPickle as pickle
import ConfigParser
import traceback as trace
import collections

from twisted.internet import task, reactor, threads, defer
from twisted.internet.protocol import Protocol
from twisted.python.lockfile import FilesystemLock
from twisted.python import log, failure

is_string = lambda x: isinstance(x, str) or isinstance(x, unicode)
is_integer = lambda x: isinstance(x, int) or isinstance(x, long)
def to_bool(s):
    if isinstance(s, bool): return s
    elif isinstance(s, int): return not s == 0
    else: return s.lower().strip() in ['true', '1']

def now():
    return int(time.time())

def split_path(path):
    path = re.split('/+', path)
    return filter(lambda x: len(x), path)

def join_path(path):
    return '/' + '/'.join(path)

norm_path = lambda x: join_path(split_path(x))

def str_path(s):
    """Make a string appropriate to be a path compnent"""
    return s.lower().replace(' ', '_').replace('/', '_')

def find(f, lst):
    for o in lst:
        if f(o): return o
    return None

def buildkv(fullname, obj, separator='/'):
    if isinstance(obj, dict):
        rv = []
        for newk, newv in obj.iteritems():
            if len(fullname):
                rv += buildkv(fullname + separator + newk, newv, separator)
            else:
                rv += buildkv(newk, newv, separator)
        return rv
    else:
        return [(fullname, obj)]

# make a nested object from a config file line
def build_recursive(d, suppress=['type', 'key', 'uuid']):
    rv = {}
    for k, v in d.iteritems():
        if k in suppress: continue
        pieces = k.split('/')
        cur = rv
        for cmp in pieces[:-1]:
            if not cur.has_key(cmp):
                cur[cmp] = {}
            cur = cur[cmp]
        cur[pieces[-1]] = v
    return rv

def dict_merge(o1, o2):
    """Recursively merge dict o1 into dict o2.  
    """
    if not isinstance(o1, dict) or not isinstance(o2, dict): 
        return o2
    o2 = dict(o2)
    for k, v in o1.iteritems():
        if k in o2:
            o2[k] = dict_merge(v, o2[k])
        else:
            o2[k] = v
    return o2

def dict_all(dlist):
    keys = set.intersection(*map(lambda x: set(x.iterkeys()), dlist))
    keys = dict(((k, None) for k in keys))
    for t in dlist[1:]:
        for k in keys.keys():
            if t[k] != dlist[0][k]:
                del keys[k]
    return dict(((k, dlist[0][k]) for k in keys.iterkeys()))
    
def flatten(lst):
    rv = []
    for l in lst:
        rv.extend(l)
    return rv

"""Push all metadata down to the leaves and remove the collections
"""
def push_metadata(rpt):
    for k, v in rpt.iteritems():
        sp = split_path(k)
        if 'Readings' in v:
            for i in xrange(0, len(sp)):
                if join_path(sp[:i]) in rpt:
                    upobj = rpt[join_path(sp[:i])]
                    if 'Contents' in upobj:
                        del upobj['Contents']
                    v.update(dict_merge(upobj, v))
    for k, v in rpt.items():
        if not 'Readings' in v:
            del rpt[k]

class FixedSizeList(list):
    """
    A class for keeping a circular buffer with a maximum size.
    Used for storing a fixed history of "profile" data.
    """
    def __init__(self, size=None, init=None, sort_profile=False, seqno=0):
        self.size = size
        self.seqno = seqno
        self.sort_profile = sort_profile
        if not init:
            init = []
        list.__init__(self, init)

    def __repr__(self):
        return "FixedSizeList(size=" + str(self.size) + \
            ", seqno=" + str(self.seqno) + ", init=" + \
            list.__repr__(self) + ")"

    def append(self, val):
        if self.sort_profile == True:
            # Find insert point in sorted list
            idx = bisect.bisect_left([r.time for r in self], val.time)
            # Ignore duplicate times
            if idx >= len(self) or self[idx].time != val.time:
                self.insert(idx, val)
            else:
                return False
        else:
            list.append(self, val)
            self.seqno += 1

        if self.size and len(self) > self.size:
            self.pop(0)

        return True

    def extend(self, val):
        list.extend(self, val)
        self.seqno += len(val)
        if self.size and len(self) > self.size:
            self.reverse()
            del self[self.size:]
            self.reverse()

    def truncate(self, seq):
        """Remove first n values from the list"""
        rmpt = seq - (self.seqno - len(self) )
        if rmpt >= 0:
            del self[:rmpt]

    def set_size(self, size):
        if len(self) > size:
            self.__delslice__(0, self.size  - size)
        self.size = size

    def idxtoseq(self, idx):
        return self.seqno - len(self) + idx

def pickle_load(filename):
    """Load an object from a gzipped pickle file while holding a
    filesystem lock
    """
#     lock = FilesystemLock(filename + ".lock")
#     if not lock.lock():
#         raise SmapException("Could not acquire report file lock")

    try:
        fp = open(filename, 'rb')
    except IOError:
        return None

    try:
        return pickle.load(fp)
    except (IOError, EOFError, pickle.PickleError), e:
        return None
    finally:
        fp.close()

def pickle_dump(filename, obj):
    """Pickle an object to a gzipped file while holding a filesystem
    lock.
    """
    if not filename:
        return

    try:
        fp = open(filename + '.tmp', 'wb')
    except IOError, e:
        return

    try:
        pickle.dump(obj, fp, protocol=2)
    except pickle.PickleError, TypeError:
        log.err()
    finally:
        os.fsync(fp)
        fp.close()

    try:
        # move it atomically if we were able to pickle the object
        os.rename(filename + '.tmp', filename)
    except OSError, e:
        # Windows versions prior to Vista don't support atomic renames
        if e.errno != errno.EEXIST:
            raise
        os.remove(filename)
        os.rename(filename + '.tmp', filename)
    except IOError, e:
        pass

[docs]def periodicCallInThread(fn, *args): """Periodically enqueue a task to be run in the ``twisted`` threadpool (not in the main loop). You'll need to call the ``start(interval)`` method on the return. Multiple copies may run concurrently, depending on the thread pool size if you do not finish fast enough. :param fn: the function to be called :param args: arguments to be passed to `fn` :rtype: the :py:class:`twisted.internet.task.LoopingCall` result """ return task.LoopingCall(lambda: reactor.callInThread(fn, *args))
class PeriodicCaller: """The problem with doing a LoopingCall and then deferring to a thread is that you might have multiple copies of your function running simultaneously. There are a variety of reasons this might be undesirable, so you can use this class instead, which will wait for a previous invocation to complete before running the next one. """ def __init__(self, fn, args, where='thread'): self.fn, self.args = fn, args self.where = where self.stopping = False if not self.where in ['thread', 'reactor']: raise ValueError("Invalid run mode: must be 'thread' or 'reactor'") def _go(self): # bad things seem to happen when we throw an exception in the # thread pool... let's catch that and just log it try: if self.where == 'thread': self.fn(*self.args) else: return self.fn(*self.args) except: log.err() return defer.succeed(False) def _run(self): if self.stopping: return self.last = time.time() if self.where == 'thread': d = threads.deferToThread(self._go) else: d = task.deferLater(reactor, 0, self._go) d.addBoth(self._post_run) def _post_run(self, result): if isinstance(result, failure.Failure): log.err("PeriodicCaller -- error: " + str(result.value)) now = time.time() sleep_time = self.interval - (now - self.last) if sleep_time < 0: self._run() else: reactor.callLater(sleep_time, self._run) def start(self, interval, now=True): self.interval = interval self.last = time.time() if now: self._run() else: reactor.callLater(self.interval, self._post_run, None) def stop(self): self.stopping = True
[docs]def periodicSequentialCall(fn, *args): """Periodically run `fn(*args)` in a threadpool. unlike :py:func:`~smap.util.periodicCallInThread`, will not run your task concurrently with itself -- if the last invocation didn't finish in time for your next execution, it will wait rather than running it in a different thread. You also need to call `start(interval)` on the result. """ return PeriodicCaller(fn, args)
def syncMaybeDeferred(fn, *args): """Version of maybeDeferred which calls fn(*args) immediately, rather than from the event loop as the library version does. """ rv = fn(*args) if issubclass(rv.__class__, defer.Deferred): return rv else: return defer.succeed(rv) def import_module(modname): """Import a module named by a classic dotted-name""" cmps = modname.split('.') mod = __import__('.'.join(cmps[:-1]), globals(), locals(), [cmps[-1]]) return getattr(mod, cmps[-1]) class SetDict(dict): def __init__(self, *args, **kwargs): dict.__init__(self) if len(args): for k, v in args[0]: self[k] = v for k, v in kwargs.iteritems(): self[k] = v def __setitem__(self, i, y): if i in self: self[i].add(y) else: dict.__setitem__(self, i, set([y])) def __getitem__(self, i): if i in self: return dict.__getitem__(self, i) else: return set([]) def __iter__(self): for k, s in self.iteritems(): for v in s: yield (k, v) class BufferProtocol(Protocol): def __init__(self, finished): self.finished = finished self.buffer = [] def dataReceived(self, data): self.buffer.append(data) def connectionLost(self, reason): self.finished.callback(''.join(self.buffer)) class RateLimiter: """Class for rate-limiting function call @ratelimit minimum time between method calls. If None, method_if_allowed will never be called. If callable, will allow the call if it returns True. @method_if_allowed method to be called if allowed by the rate limit @method_if_disallowed @return (boolean, result) the boolean indicates if the call was allowed by the rate limit, and the result is the result of calling the appropriate method Since this class overwrites __call__, you can get the result of calling the appropriate method just by "calling" this class.it """ def __init__(self, ratelimit, method_if_allowed=None, method_if_disallowed=None): self.ratelimit = ratelimit self.method_if_allowed = method_if_allowed self.method_if_disallowed = method_if_disallowed self.last_value = None self.last_call = 0 def __call__(self, *args, **kwargs): allowed = False rv = None now = time.time() if (callable(self.ratelimit) and self.ratelimit()) or \ (is_integer(self.ratelimit) and \ now - self.last_call > self.ratelimit): allowed = True if self.method_if_allowed: rv = self.method_if_allowed(*args, **kwargs) self.last_call = now elif self.method_if_disallowed: rv = self.method_if_disallowed(*args, **kwargs) return allowed, rv

Contents