# -*- coding: utf-8 -*-
# This file is part of AudioLazy, the signal processing Python package.
# Copyright (C) 2012-2016 Danilo de Jesus da Silva Bellini
#
# AudioLazy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Audio analysis and block processing module
"""
from __future__ import division
from math import sin, cos, pi
from collections import deque, Sequence, Iterable
from functools import wraps, reduce
from itertools import chain
import operator
# Audiolazy internal imports
from .lazy_core import StrategyDict
from .lazy_stream import tostream, thub, Stream
from .lazy_math import cexp, ceil
from .lazy_filters import lowpass, z
from .lazy_compat import xrange, xmap, xzip, iteritems
from .lazy_text import format_docstring
__all__ = ["window", "wsymm", "acorr", "lag_matrix", "dft", "zcross",
"envelope", "maverage", "clip", "unwrap", "amdf", "overlap_add",
"stft"]
window = StrategyDict("window")
wsymm = StrategyDict("wsymm")
window.symm = wsymm.symm = wsymm
window.periodic = wsymm.periodic = window
window._doc_kwargs = (lambda sname, symm=False, distinct=True, formula=None,
name=None, names=None, params=None, math=None,
math_symm=None, bib=None, out=None,
params_def=None, seealso=None: dict(
sname = sname,
symm = symm,
name = name or sname.capitalize(),
params = params or "",
seealso = seealso or "",
sp_detail = (" (symmetric)" if symm else " (periodic)") if distinct else "",
out = out or "",
template_ = """
{name} windowing/apodization function{sp_detail}.
{expl}{bib}
Parameters
----------
size :
Window size in samples.{params}
Returns
-------
List with the window samples. {out}
{sp_note}
Hint
----
All ``window`` and ``wsymm`` strategies have both a ``periodic`` and
``symm`` attribute with the respective strategy. The StrategyDict instances
themselves also have these attributes (with the respective StrategyDict
instance).{hint_extra}
See Also
--------{see_other}{seealso}{see_stft_ola}
""",
see_other = "" if not distinct else """
{other_sdict} :
StrategyDict instance with {other_sp} windowing/apodization functions.
{other_sdict}.{sname} :
{name} windowing/apodization function ({other_sp}).""".format(
sname = sname,
name = name or sname.capitalize(),
other_sp = "periodic" if symm else "symmetric",
other_sdict = "window" if symm else "wsymm",
),
see_stft_ola = "" if symm or not distinct else """
stft :
Short Time Fourier Transform block processor / phase vocoder wrapper.
overlap_add :
Overlap-add algorithm for an interables of blocks.""",
expl = "" if not math else """
For this model, the resulting :math:`n`-th sample
(where :math:`n = 0, 1, \\cdots, size - 1`) is:
.. math:: {math}
""".format(math = (math_symm or math.replace("size", "size - 1"))
if symm else math),
bib = bib or """
This window model was taken from:
``Harris, F. J. "On the Use of Windows for Harmonic Analysis with the
Discrete Fourier Transform". Proceedings of the IEEE, vol. 66, no. 1,
January 1978.``
""",
sp_note = ("""
Warning
-------
Don't use this strategy for FFT/DFT/STFT windowing! You should use the
periodic approach for that. See the F. J. Harris paper for more information.
""" if symm else """
Note
----
Be careful as this isn't a "symmetric" window implementation by default, you
should append the first sample at the end to get a ``size + 1`` symmetric
window. The "periodic" window implementation returned by this function
is designed for using directly with DFT/STFT. See the F. J. Harris paper
for more information on these.
By default, Numpy, Scipy signal subpackage, GNU Octave and MatLab uses the
symmetric approach for the window functions, with [1.0] as the result when
the size is 1 (which means the window is actually empty). Here the
implementation differ expecting that these functions will be mainly used in
a DFT/STFT process.
""") if distinct else """
Note
----
As this strategy is both "symmetric" and "periodic", ``window.{sname}``
and ``wsymm.{sname}`` are the very same function/strategy.
""".format(sname=sname),
hint_extra = (""" However, in this case, they're the same, i.e.,
``window.{sname}`` is ``wsymm.{sname}``.""" if not distinct else
""" You can get the {other_sp} strategy ``{other_sdict}.{sname}`` with:
* ``{sdict}.{sname}.{other_meth}``;
* ``{sdict}.{other_meth}.{sname}`` ({sdict}.{other_meth} is {other_sdict});
* ``{other_sdict}.{sname}.{other_meth}`` (unneeded ``.{other_meth}``);
* ``{other_sdict}.{other_meth}.{sname}`` (pleonastically, as
{other_sdict}.{other_meth} is {other_sdict}).""")
.format(
sname = sname,
sdict = "wsymm" if symm else "window",
other_meth = "periodic" if symm else "symm",
other_sp = "periodic" if symm else "symmetric",
other_sdict = "window" if symm else "wsymm",
),
))
window._content_generation_table = [
dict(
names = ("hann", "hanning",),
formula = ".5 * (1 - cos(2 * pi * n / size))",
math = r"\frac{1}{2} \left[ 1 - \cos \left( \frac{2 \pi n}{size} \right) "
r"\right]",
),
dict(
names = ("hamming",),
formula = ".54 - .46 * cos(2 * pi * n / size)",
math = r"0.54 - 0.46 \cos \left( \frac{2 \pi n}{size} \right)",
),
dict(
names = ("rect", "dirichlet", "rectangular",),
formula = "1.0",
name = "Dirichlet/rectangular",
out = "All values are ones (1.0).",
distinct = False,
seealso = """
ones :
Lazy ``1.0`` stream generator.""",
),
dict(
names = ("bartlett",),
formula = "1 - 2.0 / size * abs(n - size / 2.0)",
math = r"1 - \frac{2}{size} \left| \frac{n - size}{2} \right|",
name = "Bartlett (triangular starting with zero)",
bib = " ",
seealso = """
window.triangular :
Triangular with no zero end-point (periodic).
wsymm.triangular :
Triangular with no zero end-point (symmetric).""",
),
dict(
names = ("triangular", "triangle",),
formula = "1 - 2.0 / (size + 2) * abs(n - size / 2.0)",
math = r"1 - \frac{2}{size + 2} \left| \frac{n - size}{2} \right|",
math_symm = r"1 - \frac{2}{size + 1} \left| \frac{n - size - 1}{2} "
r"\right|",
name = "Triangular (with no zero end-point)",
bib = " ",
seealso = """
window.bartlett :
Triangular starting with zero (periodic).
wsymm.bartlett :
Triangular starting with zero (symmetric).""",
),
dict(
names = ("blackman",),
formula = "(1 - alpha) / 2 + alpha / 2 * cos(4 * pi * n / size)"
" - .5 * cos(2 * pi * n / size)",
math = r"\frac{1 - \alpha}{2} "
r" - \frac{1}{2} \cos \left( \frac{2 \pi n}{size} \right)"
r" + \frac{\alpha}{2} \cos \left( \frac{4 \pi n}{size} \right)",
params = """
alpha :
Blackman window alpha value. Defaults to 0.16. Use ``2.0 * 1430 / 18608``
for the 'exact Blackman' window.""",
params_def = ", alpha=.16"
),
dict(
names = ("cos",),
formula = "sin(pi * n / size) ** alpha",
math = r"\left[ \sin \left( \frac{\pi n}{size} \right) \right]^{\alpha}",
name = "Cosine to the power of alpha",
params = """
alpha :
Power value. Defaults to 1.""",
params_def = ", alpha=1"
),
]
window._code_template = """
def {sname}(size{params_def}):
return [{formula} for n in xrange(size)]
"""
wsymm._code_template = """
def {sname}(size{params_def}):
if size == 1:
return [1.0]
size, indexes = size - 1, xrange(size)
return [{formula} for n in indexes]
"""
def _generate_window_strategies():
""" Create all window and wsymm strategies """
for wnd_dict in window._content_generation_table:
names = wnd_dict["names"]
sname = wnd_dict["sname"] = names[0]
wnd_dict.setdefault("params_def", "")
for sdict in [window, wsymm]:
docs_dict = window._doc_kwargs(symm = sdict is wsymm, **wnd_dict)
decorators = [format_docstring(**docs_dict), sdict.strategy(*names)]
ns = dict(pi=pi, sin=sin, cos=cos, xrange=xrange, __name__=__name__)
exec(sdict._code_template.format(**wnd_dict), ns, ns)
reduce(lambda func, dec: dec(func), decorators, ns[sname])
if not wnd_dict.get("distinct", True):
wsymm[sname] = window[sname]
break
wsymm[sname].periodic = window[sname].periodic = window[sname]
wsymm[sname].symm = window[sname].symm = wsymm[sname]
_generate_window_strategies()
[docs]def acorr(blk, max_lag=None):
"""
Calculate the autocorrelation of a given 1-D block sequence.
Parameters
----------
blk :
An iterable with well-defined length. Don't use this function with Stream
objects!
max_lag :
The size of the result, the lags you'd need. Defaults to ``len(blk) - 1``,
since any lag beyond would result in zero.
Returns
-------
A list with lags from 0 up to max_lag, where its ``i``-th element has the
autocorrelation for a lag equals to ``i``. Be careful with negative lags!
You should use abs(lag) indexes when working with them.
Examples
--------
>>> seq = [1, 2, 3, 4, 3, 4, 2]
>>> acorr(seq) # Default max_lag is len(seq) - 1
[59, 52, 42, 30, 17, 8, 2]
>>> acorr(seq, 9) # Zeros at the end
[59, 52, 42, 30, 17, 8, 2, 0, 0, 0]
>>> len(acorr(seq, 3)) # Resulting length is max_lag + 1
4
>>> acorr(seq, 3)
[59, 52, 42, 30]
"""
if max_lag is None:
max_lag = len(blk) - 1
return [sum(blk[n] * blk[n + tau] for n in xrange(len(blk) - tau))
for tau in xrange(max_lag + 1)]
[docs]def lag_matrix(blk, max_lag=None):
"""
Finds the lag matrix for a given 1-D block sequence.
Parameters
----------
blk :
An iterable with well-defined length. Don't use this function with Stream
objects!
max_lag :
The size of the result, the lags you'd need. Defaults to ``len(blk) - 1``,
the maximum lag that doesn't create fully zeroed matrices.
Returns
-------
The covariance matrix as a list of lists. Each cell (i, j) contains the sum
of ``blk[n - i] * blk[n - j]`` elements for all n that allows such without
padding the given block.
"""
if max_lag is None:
max_lag = len(blk) - 1
elif max_lag >= len(blk):
raise ValueError("Block length should be higher than order")
return [[sum(blk[n - i] * blk[n - j] for n in xrange(max_lag, len(blk))
) for i in xrange(max_lag + 1)
] for j in xrange(max_lag + 1)]
[docs]def dft(blk, freqs, normalize=True):
"""
Complex non-optimized Discrete Fourier Transform
Finds the DFT for values in a given frequency list, in order, over the data
block seen as periodic.
Parameters
----------
blk :
An iterable with well-defined length. Don't use this function with Stream
objects!
freqs :
List of frequencies to find the DFT, in rad/sample. FFT implementations
like numpy.fft.ftt finds the coefficients for N frequencies equally
spaced as ``line(N, 0, 2 * pi, finish=False)`` for N frequencies.
normalize :
If True (default), the coefficient sums are divided by ``len(blk)``,
and the coefficient for the DC level (frequency equals to zero) is the
mean of the block. If False, that coefficient would be the sum of the
data in the block.
Returns
-------
A list of DFT values for each frequency, in the same order that they appear
in the freqs input.
Note
----
This isn't a FFT implementation, and performs :math:`O(M . N)` float
pointing operations, with :math:`M` and :math:`N` equals to the length of
the inputs. This function can find the DFT for any specific frequency, with
no need for zero padding or finding all frequencies in a linearly spaced
band grid with N frequency bins at once.
"""
dft_data = (sum(xn * cexp(-1j * n * f) for n, xn in enumerate(blk))
for f in freqs)
if normalize:
lblk = len(blk)
return [v / lblk for v in dft_data]
return list(dft_data)
@tostream
[docs]def zcross(seq, hysteresis=0, first_sign=0):
"""
Zero-crossing stream.
Parameters
----------
seq :
Any iterable to be used as input for the zero crossing analysis
hysteresis :
Crossing exactly zero might happen many times too fast due to high
frequency oscilations near zero. To avoid this, you can make two
threshold limits for the zero crossing detection: ``hysteresis`` and
``-hysteresis``. Defaults to zero (0), which means no hysteresis and only
one threshold.
first_sign :
Optional argument with the sign memory from past. Gets the sig from any
signed number. Defaults to zero (0), which means "any", and the first sign
will be the first one found in data.
Returns
-------
A Stream instance that outputs 1 for each crossing detected, 0 otherwise.
"""
neg_hyst = -hysteresis
seq_iter = iter(seq)
# Gets the first sign
if first_sign == 0:
last_sign = 0
for el in seq_iter:
yield 0
if (el > hysteresis) or (el < neg_hyst): # Ignores hysteresis region
last_sign = -1 if el < 0 else 1 # Define the first sign
break
else:
last_sign = -1 if first_sign < 0 else 1
# Finds the full zero-crossing sequence
for el in seq_iter: # Keep the same iterator (needed for non-generators)
if el * last_sign < neg_hyst:
last_sign = -1 if el < 0 else 1
yield 1
else:
yield 0
envelope = StrategyDict("envelope")
@envelope.strategy("rms")
def envelope(sig, cutoff=pi/512):
"""
Envelope non-linear filter.
This strategy finds a RMS by passing the squared data through a low pass
filter and taking its square root afterwards.
Parameters
----------
sig :
The signal to be filtered.
cutoff :
Lowpass filter cutoff frequency, in rad/sample. Defaults to ``pi/512``.
Returns
-------
A Stream instance with the envelope, without any decimation.
See Also
--------
maverage :
Moving average linear filter.
"""
return lowpass(cutoff)(thub(sig, 1) ** 2) ** .5
@envelope.strategy("abs")
def envelope(sig, cutoff=pi/512):
"""
Envelope non-linear filter.
This strategy make an ideal half wave rectification (get the absolute value
of each signal) and pass the resulting data through a low pass filter.
Parameters
----------
sig :
The signal to be filtered.
cutoff :
Lowpass filter cutoff frequency, in rad/sample. Defaults to ``pi/512``.
Returns
-------
A Stream instance with the envelope, without any decimation.
See Also
--------
maverage :
Moving average linear filter.
"""
return lowpass(cutoff)(abs(thub(sig, 1)))
@envelope.strategy("squared")
def envelope(sig, cutoff=pi/512):
"""
Squared envelope non-linear filter.
This strategy squares the input, and apply a low pass filter afterwards.
Parameters
----------
sig :
The signal to be filtered.
cutoff :
Lowpass filter cutoff frequency, in rad/sample. Defaults to ``pi/512``.
Returns
-------
A Stream instance with the envelope, without any decimation.
See Also
--------
maverage :
Moving average linear filter.
"""
return lowpass(cutoff)(thub(sig, 1) ** 2)
maverage = StrategyDict("maverage")
@maverage.strategy("deque")
def maverage(size):
"""
Moving average
This is the only strategy that uses a ``collections.deque`` object
instead of a ZFilter instance. Fast, but without extra capabilites such
as a frequency response plotting method.
Parameters
----------
size :
Data block window size. Should be an integer.
Returns
-------
A callable that accepts two parameters: a signal ``sig`` and the starting
memory element ``zero`` that behaves like the ``LinearFilter.__call__``
arguments. The output from that callable is a Stream instance, and has
no decimation applied.
See Also
--------
envelope :
Signal envelope (time domain) strategies.
"""
size_inv = 1. / size
@tostream
def maverage_filter(sig, zero=0.):
data = deque((zero * size_inv for _ in xrange(size)), maxlen=size)
mean_value = zero
for el in sig:
mean_value -= data.popleft()
new_value = el * size_inv
data.append(new_value)
mean_value += new_value
yield mean_value
return maverage_filter
@maverage.strategy("recursive", "feedback")
def maverage(size):
"""
Moving average
Linear filter implementation as a recursive / feedback ZFilter.
Parameters
----------
size :
Data block window size. Should be an integer.
Returns
-------
A ZFilter instance with the feedback filter.
See Also
--------
envelope :
Signal envelope (time domain) strategies.
"""
return (1. / size) * (1 - z ** -size) / (1 - z ** -1)
@maverage.strategy("fir")
def maverage(size):
"""
Moving average
Linear filter implementation as a FIR ZFilter.
Parameters
----------
size :
Data block window size. Should be an integer.
Returns
-------
A ZFilter instance with the FIR filter.
See Also
--------
envelope :
Signal envelope (time domain) strategies.
"""
return sum((1. / size) * z ** -i for i in xrange(size))
[docs]def clip(sig, low=-1., high=1.):
"""
Clips the signal up to both a lower and a higher limit.
Parameters
----------
sig :
The signal to be clipped, be it a Stream instance, a list or any iterable.
low, high :
Lower and higher clipping limit, "saturating" the input to them. Defaults
to -1.0 and 1.0, respectively. These can be None when needed one-sided
clipping. When both limits are set to None, the output will be a Stream
that yields exactly the ``sig`` input data.
Returns
-------
Clipped signal as a Stream instance.
"""
if low is None:
if high is None:
return Stream(sig)
return Stream(el if el < high else high for el in sig)
if high is None:
return Stream(el if el > low else low for el in sig)
if high < low:
raise ValueError("Higher clipping limit is smaller than lower one")
return Stream(high if el > high else
(low if el < low else el) for el in sig)
@tostream
[docs]def unwrap(sig, max_delta=pi, step=2*pi):
"""
Parametrized signal unwrapping.
Parameters
----------
sig :
An iterable seen as an input signal.
max_delta :
Maximum value of :math:`\Delta = sig_i - sig_{i-1}` to keep output
without another minimizing step change. Defaults to :math:`\pi`.
step :
The change in order to minimize the delta is an integer multiple of this
value. Defaults to :math:`2 . \pi`.
Returns
-------
The signal unwrapped as a Stream, minimizing the step difference when any
adjacency step in the input signal is higher than ``max_delta`` by
summing/subtracting ``step``.
"""
idata = iter(sig)
d0 = next(idata)
yield d0
delta = d0 - d0 # Get the zero (e.g., integer, float) from data
for d1 in idata:
d_diff = d1 - d0
if abs(d_diff) > max_delta:
delta += - d_diff + min((d_diff) % step,
(d_diff) % -step, key=lambda x: abs(x))
yield d1 + delta
d0 = d1
[docs]def amdf(lag, size):
"""
Average Magnitude Difference Function non-linear filter for a given
size and a fixed lag.
Parameters
----------
lag :
Time lag, in samples. See ``freq2lag`` if needs conversion from
frequency values.
size :
Moving average size.
Returns
-------
A callable that accepts two parameters: a signal ``sig`` and the starting
memory element ``zero`` that behaves like the ``LinearFilter.__call__``
arguments. The output from that callable is a Stream instance, and has
no decimation applied.
See Also
--------
freq2lag :
Frequency (in rad/sample) to lag (in samples) converter.
"""
filt = (1 - z ** -lag).linearize()
@tostream
def amdf_filter(sig, zero=0.):
return maverage(size)(abs(filt(sig, zero=zero)), zero=zero)
return amdf_filter
overlap_add = StrategyDict("overlap_add")
@overlap_add.strategy("numpy")
@tostream
def overlap_add(blk_sig, size=None, hop=None, wnd=None, normalize=True):
"""
Overlap-add algorithm using Numpy arrays.
Parameters
----------
blk_sig :
An iterable of blocks (sequences), such as the ``Stream.blocks`` result.
size :
Block size for each ``blk_sig`` element, in samples.
hop :
Number of samples for two adjacent blocks (defaults to the size).
wnd :
Windowing function to be applied to each block or any iterable with
exactly ``size`` elements. If ``None`` (default), applies a rectangular
window.
normalize :
Flag whether the window should be normalized so that the process could
happen in the [-1; 1] range, dividing the window by its hop gain.
Default is ``True``.
Returns
-------
A Stream instance with the blocks overlapped and added.
See Also
--------
Stream.blocks :
Splits the Stream instance into blocks with given size and hop.
blocks :
Same to Stream.blocks but for without using the Stream class.
chain :
Lazily joins all iterables given as parameters.
chain.from_iterable :
Same to ``chain(*data)``, but the ``data`` evaluation is lazy.
window :
Window/apodization/tapering functions for a given size as a StrategyDict.
Note
----
Each block has the window function applied to it and the result is the
sum of the blocks without any edge-case special treatment for the first
and last few blocks.
"""
import numpy as np
# Finds the size from data, if needed
if size is None:
blk_sig = Stream(blk_sig)
size = len(blk_sig.peek())
if hop is None:
hop = size
# Find the right windowing function to be applied
if wnd is None:
wnd = np.ones(size)
elif callable(wnd) and not isinstance(wnd, Stream):
wnd = wnd(size)
if isinstance(wnd, Sequence):
wnd = np.array(wnd)
elif isinstance(wnd, Iterable):
wnd = np.hstack(wnd)
else:
raise TypeError("Window should be an iterable or a callable")
# Normalization to the [-1; 1] range
if normalize:
steps = Stream(wnd).blocks(hop).map(np.array)
gain = np.sum(np.abs(np.vstack(steps)), 0).max()
if gain: # If gain is zero, normalization couldn't have any effect
wnd = wnd / gain # Can't use "/=" nor "*=" as Numpy would keep datatype
# Overlap-add algorithm
old = np.zeros(size)
for blk in (wnd * blk for blk in blk_sig):
blk[:-hop] += old[hop:]
for el in blk[:hop]:
yield el
old = blk
for el in old[hop:]: # No more blocks, finish yielding the last one
yield el
@overlap_add.strategy("list")
@tostream
def overlap_add(blk_sig, size=None, hop=None, wnd=None, normalize=True):
"""
Overlap-add algorithm using lists instead of Numpy arrays. The behavior
is the same to the ``overlap_add.numpy`` strategy, besides the data types.
"""
# Finds the size from data, if needed
if size is None:
blk_sig = Stream(blk_sig)
size = len(blk_sig.peek())
if hop is None:
hop = size
# Find the window to be applied, resulting on a list or None
if wnd is not None:
if callable(wnd) and not isinstance(wnd, Stream):
wnd = wnd(size)
if isinstance(wnd, Iterable):
wnd = list(wnd)
else:
raise TypeError("Window should be an iterable or a callable")
# Normalization to the [-1; 1] range
if normalize:
if wnd:
steps = Stream(wnd).map(abs).blocks(hop).map(tuple)
gain = max(xmap(sum, xzip(*steps)))
if gain: # If gain is zero, normalization couldn't have any effect
wnd[:] = (w / gain for w in wnd)
else:
wnd = [1 / ceil(size / hop)] * size
# Window application
if wnd:
mul = operator.mul
if len(wnd) != size:
raise ValueError("Incompatible window size")
wnd = wnd + [0.] # Allows detecting when block size is wrong
blk_sig = (xmap(mul, wnd, blk) for blk in blk_sig)
# Overlap-add algorithm
add = operator.add
mem = [0.] * size
s_h = size - hop
for blk in xmap(iter, blk_sig):
mem[:s_h] = xmap(add, mem[hop:], blk)
mem[s_h:] = blk # Remaining elements
if len(mem) != size:
raise ValueError("Wrong block size or declared")
for el in mem[:hop]:
yield el
for el in mem[hop:]: # No more blocks, finish yielding the last one
yield el
stft = StrategyDict("stft")
@stft.strategy("rfft", "base", "real")
def stft(func=None, **kwparams):
"""
Short Time Fourier Transform block processor / phase vocoder wrapper.
This function can be used in many ways:
* Directly as a signal processor builder, wrapping a spectrum block/grain
processor function;
* Directly as a decorator to a block processor;
* Called without the ``func`` parameter for a partial evalution style
changing the defaults.
See the examples below for more information about these use cases.
The resulting function performs a full block-by-block analysis/synthesis
phase vocoder keeping this sequence of actions:
1. Blockenize the signal with the given ``size`` and ``hop``;
2. Lazily apply the given ``wnd`` window to each block;
3. Perform the 5 actions calling their functions in order:
a. ``before``: Pre-processing;
b. ``transform``: A transform like the FFT;
c. ``func``: the positional parameter with the single block processor;
d. ``inverse_transform``: inverse FFT;
e. ``after``: Post-processing.
4. Overlap-add with the ``ola`` overlap-add strategy. The given ``ola``
would deal with its own window application and normalization.
Any parameter from steps 3 and 4 can be set to ``None`` to skip it from
the full process, without changing the other [sub]steps. The parameters
defaults are based on the Numpy FFT subpackage.
Parameters
----------
func :
The block/grain processor function that receives a transformed block in
the frequency domain (the ``transform`` output) and should return the
processed data (it will be the first ``inverse_transform`` input). This
parameter shouldn't appear when this function is used as a decorator.
size :
Block size for the STFT process, in samples.
hop :
Duration in samples between two blocks. Defaults to the ``size`` value.
transform :
Function that receives the windowed block (in time domain) and the
``size`` as two positional inputs and should return the block (in
frequency domain). Defaults to ``numpy.fft.rfft``, which outputs a
Numpy 1D array with length equals to ``size // 2 + 1``.
inverse_transform :
Function that receives the processed block (in frequency domain) and the
``size`` as two positional inputs and should return the block (in
time domain). Defaults to ``numpy.fft.irfft``.
wnd :
Window function to be called as ``wnd(size)`` or window iterable with
length equals to ``size``. The windowing/apodization values are used
before taking the FFT of each block. Defaults to None, which means no
window should be applied (same behavior of a rectangular window).
before :
Function to be applied just before taking the transform, after the
windowing. Defaults to the ``numpy.fft.ifftshift``, which, together with
the ``after`` default, puts the time reference at the ``size // 2``
index of the block, centralizing it for the FFT (e.g. blocks
``[0, 1, 0]`` and ``[0, 0, 1, 0]`` would have zero phase). To disable
this realignment, just change both ``before=None`` and ``after=None``
keywords.
after :
Function to be applied just after the inverse transform, before calling
the overlap-add (as well as before its windowing, if any). Defaults to
the ``numpy.fft.fftshift`` function, which undo the changes done by the
default ``before`` pre-processing for block phase alignment. To avoid
the default time-domain realignment, set both ``before=None`` and
``after=None`` keywords.
ola :
Overlap-add strategy. Uses the ``overlap_add`` default strategy when
not given. The strategy should allow at least size and hop keyword
arguments, besides a first positional argument for the iterable with
blocks. If ``ola=None``, the result from using the STFT processor will be
the ``Stream`` of blocks that would be the overlap-add input.
ola_* :
Extra keyword parameters for the overlap-add strategy, if any. The extra
``ola_`` prefix is removed when calling it. See the overlap-add strategy
docs for more information about the valid parameters.
Returns
-------
A function with the same parameters above, besides ``func``, which is
replaced by the signal input (if func was given). The parameters used when
building the function should be seen as defaults that can be changed when
calling the resulting function with the respective keyword arguments.
Examples
--------
Let's process something:
>>> my_signal = Stream(.1, .3, -.1, -.3, .5, .4, .3)
Wrapping directly the processor function:
>>> processor_w = stft(abs, size=64)
>>> sig = my_signal.copy() # Any iterable
>>> processor_w(sig)
<audiolazy.lazy_stream.Stream object at 0x...>
>>> peek200_w = _.peek(200) # Needs Numpy
>>> type(peek200_w[0]).__name__ # Result is a signal (numpy.float64 data)
'float64'
Keyword parameters in a partial evaluation style (can be reassigned):
>>> stft64 = stft(size=64) # Same to ``stft`` but with other defaults
>>> processor_p = stft64(abs)
>>> sig = my_signal.copy() # Any iterable
>>> processor_p(sig)
<audiolazy.lazy_stream.Stream object at 0x...>
>>> _.peek(200) == peek200_w # This should do the same thing
True
As a decorator, this time with other windowing configuration:
>>> stft64hann = stft64(wnd=window.hann, ola_wnd=window.hann)
>>> @stft64hann # stft(...) can also be used as an anonymous decorator
... def processor_d(blk):
... return abs(blk)
>>> processor_d(sig) # This leads to a different result
<audiolazy.lazy_stream.Stream object at 0x...>
>>> _.peek(200) == peek200_w
False
You can also use other iterables as input, and keep the parameters to be
passed afterwards, as well as change transform calculation:
>>> stft_no_zero_phase = stft(before=None, after=None)
>>> stft_no_wnd = stft_no_zero_phase(ola=overlap_add.list, ola_wnd=None,
... ola_normalize=False)
>>> on_blocks = stft_no_wnd(transform=None, inverse_transform=None)
>>> processor_a = on_blocks(reversed, hop=4) # Reverse
>>> processor_a([1, 2, 3, 4, 5], size=4, hop=2)
<audiolazy.lazy_stream.Stream object at 0x...>
>>> list(_) # From blocks [1, 2, 3, 4] and [3, 4, 5, 0.0]
[4.0, 3.0, 2.0, 6, 4, 3]
>>> processor_a([1, 2, 3, 4, 5], size=4) # Default hop instead
<audiolazy.lazy_stream.Stream object at 0x...>
>>> list(_) # No overlap, blocks [1, 2, 3, 4] and [5, 0.0, 0.0, 0.0]
[4, 3, 2, 1, 0.0, 0.0, 0.0, 5]
>>> processor_a([1, 2, 3, 4, 5]) # Size was never given
Traceback (most recent call last):
...
TypeError: Missing 'size' argument
For analysis only, one can set ``ola=None``:
>>> from numpy.fft import ifftshift # [1, 2, 3, 4, 5] -> [3, 4, 5, 1, 2]
>>> analyzer = stft(ifftshift, ola=None, size=8, hop=2)
>>> sig = Stream(1, 0, -1, 0) # A pi/2 rad/sample cosine signal
>>> result = analyzer(sig)
>>> result
<audiolazy.lazy_stream.Stream object at 0x...>
Let's see the result contents. That processing "rotates" the frequencies,
converting the original ``[0, 0, 4, 0, 0]`` real FFT block to a
``[4, 0, 0, 0, 0]`` block, which means the block cosine was moved to
a DC-only signal keeping original energy/integral:
>>> result.take()
array([ 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5])
>>> result.take() # From [0, 0, -4, 0, 0] to [-4, 0, 0, 0, 0]
array([-0.5, -0.5, -0.5, -0.5, -0.5, -0.5, -0.5, -0.5])
Note
----
Parameters should be passed as keyword arguments. The only exception
is ``func`` for this function and ``sig`` for the returned function,
which are always the first positional argument, ald also the one that
shouldn't appear when using this function as a decorator.
Hint
----
1. When using Numpy FFT, one can keep data in place and return the
changed input block to save time;
2. Actually, there's nothing in this function that imposes FFT or Numpy
besides the default values. One can still use this even for other
transforms that have nothing to do with the Fourier Transform.
See Also
--------
overlap_add :
Overlap-add algorithm for an iterable (e.g. a Stream instance) of blocks
(sequences such as lists or Numpy arrays). It's also a StrategyDict.
window :
Window/apodization/tapering functions for a given size as a StrategyDict.
"""
# Using as a decorator or to "replicate" this function with other defaults
if func is None:
cfi = chain.from_iterable
mix_dict = lambda *dicts: dict(cfi(iteritems(d) for d in dicts))
result = lambda f=None, **new_kws: stft(f, **mix_dict(kwparams, new_kws))
return result
# Using directly
@tostream
@wraps(func)
def wrapper(sig, **kwargs):
kws = kwparams.copy()
kws.update(kwargs)
if "size" not in kws:
raise TypeError("Missing 'size' argument")
if "hop" in kws and kws["hop"] > kws["size"]:
raise ValueError("Hop value can't be higher than size")
blk_params = {"size": kws.pop("size")}
blk_params["hop"] = kws.pop("hop", None)
ola_params = blk_params.copy() # Size and hop
blk_params["wnd"] = kws.pop("wnd", None)
ola = kws.pop("ola", overlap_add)
class NotSpecified(object):
pass
for name in ["transform", "inverse_transform", "before", "after"]:
blk_params[name] = kws.pop(name, NotSpecified)
for k, v in kws.items():
if k.startswith("ola_"):
if ola is not None:
ola_params[k[len("ola_"):]] = v
else:
raise TypeError("Extra '{}' argument with no overlap-add "
"strategy".format(k))
else:
raise TypeError("Unknown '{}' extra argument".format(k))
def blk_gen(size, hop, wnd, transform, inverse_transform, before, after):
if transform is NotSpecified:
from numpy.fft import rfft as transform
if inverse_transform is NotSpecified:
from numpy.fft import irfft as inverse_transform
if before is NotSpecified:
from numpy.fft import ifftshift as before
if after is NotSpecified:
from numpy.fft import fftshift as after
# Find the right windowing function to be applied
if callable(wnd) and not isinstance(wnd, Stream):
wnd = wnd(size)
if isinstance(wnd, Iterable):
wnd = list(wnd)
if len(wnd) != size:
raise ValueError("Incompatible window size")
elif wnd is not None:
raise TypeError("Window should be an iterable or a callable")
# Pad size lambdas
trans = transform and (lambda blk: transform(blk, size))
itrans = inverse_transform and (lambda blk:
inverse_transform(blk, size))
# Continuation style calling
funcs = [f for f in [before, trans, func, itrans, after]
if f is not None]
process = lambda blk: reduce(lambda data, f: f(data), funcs, blk)
if wnd is None:
for blk in Stream(sig).blocks(size=size, hop=hop):
yield process(blk)
else:
blk_with_wnd = wnd[:]
mul = operator.mul
for blk in Stream(sig).blocks(size=size, hop=hop):
blk_with_wnd[:] = xmap(mul, blk, wnd)
yield process(blk_with_wnd)
if ola is None:
return blk_gen(**blk_params)
else:
return ola(blk_gen(**blk_params), **ola_params)
return wrapper
@stft.strategy("cfft", "complex")
def stft(func=None, **kwparams):
"""
Short Time Fourier Transform for complex data.
Same to the default STFT strategy, but with new defaults. This is the same
to:
.. code-block:: python
stft.base(transform=numpy.fft.fft, inverse_transform=numpy.fft.ifft)
See ``stft.base`` docs for more.
"""
from numpy.fft import fft, ifft
return stft.base(transform=fft, inverse_transform=ifft)(func, **kwparams)
@stft.strategy("cfftr", "complex_real")
def stft(func=None, **kwparams):
"""
Short Time Fourier Transform for real data keeping the full FFT block.
Same to the default STFT strategy, but with new defaults. This is the same
to:
.. code-block:: python
stft.base(transform=numpy.fft.fft,
inverse_transform=lambda *args: numpy.fft.ifft(*args).real)
See ``stft.base`` docs for more.
"""
from numpy.fft import fft, ifft
ifft_r = lambda *args: ifft(*args).real
return stft.base(transform=fft, inverse_transform=ifft_r)(func, **kwparams)