Source code for audiolazy.lazy_misc

# -*- coding: utf-8 -*-
# This file is part of AudioLazy, the signal processing Python package.
# Copyright (C) 2012-2016 Danilo de Jesus da Silva Bellini
#
# AudioLazy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Common miscellanous tools and constants for general use
"""

from collections import deque, Iterable
from functools import wraps
import itertools as it
import sys
from math import pi
import operator

# Audiolazy internal imports
from . import _internals
from .lazy_compat import (xrange, xzip_longest, STR_TYPES, SOME_GEN_TYPES,
                          iteritems)
from .lazy_core import StrategyDict

__all__ = ["DEFAULT_SAMPLE_RATE", "rint", "blocks", "zero_pad", "elementwise",
           "almost_eq", "sHz", "freq2lag", "lag2freq", "freq_to_lag",
           "lag_to_freq", "cached"]

DEFAULT_SAMPLE_RATE = 44100 # Hz (samples/second)


[docs]def rint(x, step=1): """ Round to integer. Parameters ---------- x : Input number (integer or float) to be rounded. step : Quantization level (defaults to 1). If set to 2, the output will be the "best" even number. Result ------ The step multiple nearest to x. When x is exactly halfway between two possible outputs, it'll result the one farthest to zero. """ div, mod = divmod(x, step) err = min(step / 10., .1) result = div * step if x > 0: result += err elif x < 0: result -= err if (operator.ge if x >= 0 else operator.gt)(2 * mod, step): result += step return int(result)
[docs]def blocks(seq, size=None, hop=None, padval=0.): """ General iterable blockenizer. Generator that gets ``size`` elements from ``seq``, and outputs them in a collections.deque (mutable circular queue) sequence container. Next output starts ``hop`` elements after the first element in last output block. Last block may be appended with ``padval``, if needed to get the desired size. The ``seq`` can have hybrid / hetherogeneous data, it just need to be an iterable. You can use other type content as padval (e.g. None) to help segregate the padding at the end, if desired. Note ---- When hop is less than size, changing the returned contents will keep the new changed value in the next yielded container. """ # Initialization res = deque(maxlen=size) # Circular queue idx = 0 last_idx = size - 1 if hop is None: hop = size reinit_idx = size - hop # Yields each block, keeping last values when needed if hop <= size: for el in seq: res.append(el) if idx == last_idx: yield res idx = reinit_idx else: idx += 1 # Yields each block and skips (loses) data due to hop > size else: for el in seq: if idx < 0: # Skips data idx += 1 else: res.append(el) if idx == last_idx: yield res #res = dtype() idx = size-hop else: idx += 1 # Padding to finish if idx > max(size-hop, 0): for _ in xrange(idx,size): res.append(padval) yield res
[docs]def zero_pad(seq, left=0, right=0, zero=0.): """ Zero padding sample generator (not a Stream!). Parameters ---------- seq : Sequence to be padded. left : Integer with the number of elements to be padded at left (before). Defaults to zero. right : Integer with the number of elements to be padded at right (after). Defaults to zero. zero : Element to be padded. Defaults to a float zero (0.0). Returns ------- A generator that pads the given ``seq`` with samples equals to ``zero``, ``left`` times before and ``right`` times after it. """ for unused in xrange(left): yield zero for item in seq: yield item for unused in xrange(right): yield zero
[docs]def elementwise(name="", pos=None): """ Function auto-map decorator broadcaster. Creates an "elementwise" decorator for one input parameter. To create such, it should know the name (for use as a keyword argument and the position "pos" (input as a positional argument). Without a name, only the positional argument will be used. Without both name and position, the first positional argument will be used. """ if (name == "") and (pos is None): pos = 0 def elementwise_decorator(func): """ Element-wise decorator for functions known to have 1 input and 1 output be applied directly on iterables. When made to work with more than 1 input, all "secondary" parameters will the same in all function calls (i.e., they will not even be a copy). """ @wraps(func) def wrapper(*args, **kwargs): # Find the possibly Iterable argument positional = (pos is not None) and (pos < len(args)) arg = args[pos] if positional else kwargs[name] if isinstance(arg, Iterable) and not isinstance(arg, STR_TYPES): if positional: data = (func(*(args[:pos] + (x,) + args[pos+1:]), **kwargs) for x in arg) else: data = (func(*args, **dict(it.chain(iteritems(kwargs), [(name, x)]))) for x in arg) # Generators should still return generators if isinstance(arg, SOME_GEN_TYPES): return data # Cast to numpy array or matrix, if needed, without actually # importing its package type_arg = type(arg) try: is_numpy = type_arg.__module__ == "numpy" except AttributeError: is_numpy = False if is_numpy: np_type = {"ndarray": sys.modules["numpy"].array, "matrix": sys.modules["numpy"].mat }[type_arg.__name__] return np_type(list(data)) # If it's a Stream, let's use the Stream constructor from .lazy_stream import Stream if issubclass(type_arg, Stream): return Stream(data) # Tuple, list, set, dict, deque, etc.. all falls here return type_arg(data) return func(*args, **kwargs) # wrapper returned value return wrapper # elementwise_decorator returned value return elementwise_decorator
almost_eq = StrategyDict("almost_eq") @almost_eq.strategy("bits") def almost_eq(a, b, bits=32, tol=1, ignore_type=True, pad=0.): """ Almost equal, based on the amount of floating point significand bits. Alternative to "a == b" for float numbers and iterables with float numbers, and tests for sequence contents (i.e., an elementwise a == b, that also works with generators, nested lists, nested generators, etc.). If the type of both the contents and the containers should be tested too, set the ignore_type keyword arg to False. Default version is based on 32 bits IEEE 754 format (23 bits significand). Could use 64 bits (52 bits significand) but needs a native float type with at least that size in bits. If a and b sizes differ, at least one will be padded with the pad input value to keep going with the comparison. Note ---- Be careful with endless generators! """ if not (ignore_type or type(a) == type(b)): return False is_it_a = isinstance(a, Iterable) is_it_b = isinstance(b, Iterable) if is_it_a != is_it_b: return False if is_it_a: return all(almost_eq.bits(ai, bi, bits, tol, ignore_type) for ai, bi in xzip_longest(a, b, fillvalue=pad)) significand = {32: 23, 64: 52, 80: 63, 128: 112 }[bits] # That doesn't include the sign bit power = tol - significand - 1 return abs(a - b) <= 2 ** power * abs(a + b) @almost_eq.strategy("diff") def almost_eq(a, b, max_diff=1e-7, ignore_type=True, pad=0.): """ Almost equal, based on the :math:`|a - b|` value. Alternative to "a == b" for float numbers and iterables with float numbers. See almost_eq for more information. This version based on the non-normalized absolute diff, similar to what unittest does with its assertAlmostEquals. If a and b sizes differ, at least one will be padded with the pad input value to keep going with the comparison. Note ---- Be careful with endless generators! """ if not (ignore_type or type(a) == type(b)): return False is_it_a = isinstance(a, Iterable) is_it_b = isinstance(b, Iterable) if is_it_a != is_it_b: return False if is_it_a: return all(almost_eq.diff(ai, bi, max_diff, ignore_type) for ai, bi in xzip_longest(a, b, fillvalue=pad)) return abs(a - b) <= max_diff
[docs]def sHz(rate): """ Unit conversion constants. Useful for casting to/from the default package units (number of samples for time and rad/second for frequency). You can use expressions like ``440 * Hz`` to get a frequency value, or assign like ``kHz = 1e3 * Hz`` to get other unit, as you wish. Parameters ---------- rate : Sample rate in samples per second Returns ------- A tuple ``(s, Hz)``, where ``s`` is the second unit and ``Hz`` is the hertz unit, as the number of samples and radians per sample, respectively. """ return float(rate), 2 * pi / rate
[docs]def freq2lag(v): """ Converts from frequency (rad/sample) to lag (number of samples). """ return 2 * pi / v
freq_to_lag = _internals.deprecate(freq2lag)
[docs]def lag2freq(v): """ Converts from lag (number of samples) to frequency (rad/sample). """ return 2 * pi / v
lag_to_freq = _internals.deprecate(freq2lag)
[docs]def cached(func): """ Cache decorator for a function without keyword arguments You can access the cache contents using the ``cache`` attribute in the resulting function, which is a dictionary mapping the arguments tuple to the previously returned function result. """ class Cache(dict): def __missing__(self, key): result = self[key] = func(*key) return result cache = Cache() f = wraps(func)(lambda *key: cache[key]) f.cache = cache return f