Source code for wheezy.core.benchmark

""" ``bechmark`` module.
"""

from wheezy.core.comp import PY2
from wheezy.core.comp import PY_MINOR


if PY2 and PY_MINOR < 6:  # pragma: nocover
    import gc
    from itertools import repeat
    from time import time as default_timer

    def timeit(f, number=1000000):
        it = repeat(None, number)
        e = gc.isenabled()
        gc.disable()
        try:
            t0 = default_timer()
            for i in it:
                f()
            t1 = default_timer()
            return t1 - t0
        finally:
            if e:
                gc.enable()

else:  # pragma: nocover
    from timeit import timeit
    from timeit import default_timer  # noqa


class Benchmark(object):
[docs] """ Measure execution time of your code. """ def __init__(self, targets, number, warmup_number=None, timer=None): """ ``targets`` - a list of targets (callables) to be tested. ``number`` - how many times each target is executed. ``warmup_number`` - how many times each target is warmed up before the bechmark is measured. """ self.targets = targets self.number = number self.warmup_number = warmup_number or max(int(number / 100), 10) if timer is not None: self.timer = timer self.bench = self.bench_timer def bench(self, number): for target in self.targets: yield (target.__name__, timeit(target, number=number)) def bench_timer(self, number): for target in self.targets: self.timer.start() timeit(target, number=number) self.timer.stop() yield (target.__name__, self.timer.timing) def run(self): # warm up list(self.bench(self.warmup_number)) # run return self.bench(self.number) def report(self, name=None, baselines=None): baselines = baselines or {} print("%s: %s x %s" % (name or 'noname', len(self.targets), self.number)) print("%s %s %s %s" % ("baseline", "throughput", "change", "target")) base = None for (name, result) in self.run(): if not result: print(' - % - rps - % ' + name) continue if base is None: base = result base_relative = round(base / result, 3) rps = round(self.number / result, 1) previous_relative = baselines.get(name, base_relative) delta = base_relative / previous_relative - 1.0 print("%7.1f%% %7drps %+5.1f%% %s" % ( base_relative * 100, rps, delta * 100, name)) class Timer(object):
[docs] """ Intercept a call to given method in order to compute timing. """ def __init__(self, target, name): assert hasattr(target, name) assert callable(getattr(target, name)) self.target = target self.name = name def start(self): self.timing = 0.0 self.saved = saved = getattr(self.target, self.name) def timing_wrapper(*args, **kwargs): t0 = default_timer() result = saved(*args, **kwargs) t1 = default_timer() self.timing += t1 - t0 return result setattr(self.target, self.name, timing_wrapper) def stop(self): setattr(self.target, self.name, self.saved)