Source code for uncertainty_wrapper.core

"""
Uncertainty wrapper calculates uncertainties of wrapped functions using
central finite difference approximation of the Jacobian matrix.

.. math::

    \\frac{\\partial f_i}{\\partial x_{j,k}}

Uncertainty of the output is propagated using first order terms of a Taylor
series expansion around :math:`x`.


.. math::

    dF_{ij} = J_{ij} * S_{x_i, x_j} * J_{ij}^{T}

Diagonals of :math:`dF_{ij}` are standard deviations squared.

SunPower Corp. (c) 2016
"""

from functools import wraps
import numpy as np
import logging
from multiprocessing import Pool
from scipy.sparse import csr_matrix

logging.basicConfig()
LOGGER = logging.getLogger(__name__)
DELTA = np.finfo(float).eps ** (1.0 / 3.0) / 2.0


def prop_unc(jc):
    """
    Propagate uncertainty.

    :param jc: the Jacobian and covariance matrix
    :type jc: sequence

    This method is mainly designed to be used as the target for a
    multiprocessing pool.
    """
    j, c = jc
    return np.dot(np.dot(j, c), j.T)


[docs]def partial_derivative(f, x, n, nargs, delta=DELTA): """ Calculate partial derivative using central finite difference approximation. :param f: function :param x: sequence of arguments :param n: index of argument derivateve is with respect to :param nargs: number of arguments :param delta: optional step size, default is :math:`\\epsilon^{1/3}` where :math:`\\epsilon` is machine precision """ dx = np.zeros((nargs, len(x[n]))) # scale delta by (|x| + 1.0) to avoid noise from machine precision dx[n] += np.where(x[n], x[n] * delta, delta) # apply central difference approximation x_dx = zip(*[xi + (dxi, -dxi) for xi, dxi in zip(x, dx)]) return (f(x_dx[0]) - f(x_dx[1])) / dx[n] / 2.0
# TODO: make this a class, add DELTA as class variable and flatten as method
[docs]def jacobian(func, x, nf, nobs, *args, **kwargs): """ Estimate Jacobian matrices :math:`\\frac{\\partial f_i}{\\partial x_{j,k}}` where :math:`k` are independent observations of :math:`x`. The independent variable, :math:`x`, must be a numpy array with exactly 2 dimensions. The first dimension is the number of independent arguments, and the second dimensions is the number of observations. The function must return a Numpy array with exactly 2 dimensions. The first is the number of returns and the second dimension corresponds to the number of observations. If the input argument is 2-D then the output should also be 2-D Constant arguments can be passed as additional positional arguments or keyword arguments. If any constant argument increases the number of observations of the return value, tile the input arguments to match. Use :func:`numpy.atleast_2d` or :func:`numpy.reshape` to get the correct dimensions for scalars. :param func: function :param x: independent variables grouped by observation :param nf: number of return in output (1st dimension) :param nobs: number of observations in output (2nd dimension) :return: Jacobian matrices for each observation """ nargs = len(x) # degrees of freedom f = lambda x_: func(x_, *args, **kwargs) j = np.zeros((nargs, nf, nobs)) # matrix of zeros for n in xrange(nargs): j[n] = partial_derivative(f, x, n, nargs) # better to transpose J once than transpose partial derivative each time # j[:,:,n] = df.T return j.T
[docs]def jflatten(j): """ Flatten 3_D Jacobian into 2-D. """ nobs, nf, nargs = j.shape nrows, ncols = nf * nobs, nargs * nobs jflat = np.zeros((nrows, ncols)) for n in xrange(nobs): r, c = n * nf, n * nargs jflat[r:(r + nf), c:(c + nargs)] = j[n] return jflat
def jtosparse(j): """ Generate sparse matrix coordinates from 3-D Jacobian. """ data = j.flatten().tolist() nobs, nf, nargs = j.shape indices = zip(*[(r, c) for n in xrange(nobs) for r in xrange(n * nf, (n + 1) * nf) for c in xrange(n * nargs, (n + 1) * nargs)]) return csr_matrix((data, indices), shape=(nobs * nf, nobs * nargs)) # TODO: allow user to supply analytical Jacobian, only fall back on Jacob # estimate if jac is None
[docs]def unc_wrapper_args(*covariance_keys): """ Wrap function, calculate its Jacobian and calculate the covariance of the outputs given the covariance of the specified inputs. :param covariance_keys: indices and names of arguments corresponding to covariance :return: wrapped function bound to specified covariance keys This is the outer uncertainty wrapper that allows you to specify the arguments in the original function that correspond to the covariance. The inner wrapper takes the original function to be wrapped. :: def f(a, b, c, d, kw1='foo', *args, **kwargs): pass # arguments a, c, d and kw1 correspond to the covariance matrix f_wrapped = unc_wrapper_args(0, 2, 3, 'kw1')(f) cov = np.array([[0.0001, 0., 0., 0.], [0., 0.0001, 0., 0.], [0., 0., 0.0001, 0.], [0., 0., 0., 0.0001]) y, cov, jac = f_wrapped(a, b, c, d, kw1='bar', __covariance__=cov) The covariance keys can be indices of positional arguments or the names of keywords argument used in calling the function. If no covariance keys are specified then the arguments that correspond to the covariance shoud be grouped into a sequence. If ``None`` is anywhere in ``covariance_keys`` then all of the arguments will be used to calculate the Jacobian. The covariance matrix must be a symmetrical matrix with positive numbers on the diagonal that correspond to the square of the standard deviation, second moment around the mean or root-mean-square(RMS) of the function with respect to the arguments specified as covariance keys. The other elements are the covariances corresponding to the arguments intersecting at that element. Pass the covariance matrix with the keyword ``__covariance__`` and it will be popped from the dictionary of keyword arguments provided to the wrapped function. The wrapped function will return the evaluation of the original function, its Jacobian, which is the sensitivity of the return output to each argument specified as a covariance key and the covariance propagated using the first order terms of a Taylor series expansion around the arguments. An optional keyword argument ``__method__`` can also be passed to the wrapped function (not the wrapper) that specifies the method used to calculate the dot product. The default method is ``'loop'``. The other methods are ``'dense'``, ``'sparse'`` and ``'pool'``. If the arguments specified as covariance keys are arrays, they should all be the same size. These dimensions will be considered as separate observations. Another argument, not in the covariance keys, may also create observations. The resulting Jacobian will have dimensions of number of observations (nobs) by number of return output (nf) by number of covariance keys (nargs). The resulting covariance will be nobs x nf x nf. """ def wrapper(f): @wraps(f) def wrapped_function(*args, **kwargs): cov = kwargs.pop('__covariance__', None) # pop covariance method = kwargs.pop('__method__', 'loop') # pop covariance # covariance keys cannot be defaults, they must be in args or kwargs cov_keys = covariance_keys # convert args to kwargs by index kwargs.update({n: v for n, v in enumerate(args)}) args = () # empty args if None in cov_keys: # use all keys cov_keys = kwargs.keys() # group covariance keys if len(cov_keys) > 0: # uses specified keys x = [np.atleast_1d(kwargs.pop(k)) for k in cov_keys] else: # arguments already grouped x = kwargs.pop(0) # use first argument # remaining args args_dict = {} def args_from_kwargs(kwargs_): """unpack positional arguments from keyword arguments""" # create mapping of positional arguments by index args_ = [(n, v) for n, v in kwargs_.iteritems() if not isinstance(n, basestring)] # sort positional arguments by index idx, args_ = zip(*sorted(args_, key=lambda m: m[0])) # remove args_ and their indices from kwargs_ args_dict_ = {n: kwargs_.pop(n) for n in idx} return args_, args_dict_ if kwargs: args, args_dict = args_from_kwargs(kwargs) def f_(x_, *args_, **kwargs_): """call original function with independent variables grouped""" args_dict_ = args_dict if cov_keys: kwargs_.update(zip(cov_keys, x_), **args_dict_) if kwargs_: args_, _ = args_from_kwargs(kwargs_) return np.array(f(*args_, **kwargs_)) # assumes independent variables already grouped return f(x_, *args_, **kwargs_) # evaluate function and Jacobian avg = f_(x, *args, **kwargs) # number of returns and observations if avg.ndim > 1: nf, nobs = avg.shape else: nf, nobs = avg.size, 1 jac = jacobian(f_, x, nf, nobs, *args, **kwargs) # calculate covariance if cov is not None: # covariance must account for all observations # scale covariances by x squared in each direction if cov.ndim == 3: x = np.array([np.repeat(y, nobs) if len(y)==1 else y for y in x]) LOGGER.debug('x:\n%r', x) cov = np.array([c * y * np.row_stack(y) for c, y in zip(cov, x.T)]) else: # x are all only one dimension x = np.asarray(x) cov = cov * x * x.T assert jac.size / nf / nobs == cov.size / len(x) cov = np.tile(cov, (nobs, 1, 1)) # propagate uncertainty using different methods if method.lower() == 'dense': j, c = jflatten(jac), jflatten(cov) cov = prop_unc((j, c)) # sparse elif method.lower() == 'sparse': j, c = jtosparse(jac), jtosparse(cov) cov = j.dot(c).dot(j.transpose()) cov = cov.todense() # pool elif method.lower() == 'pool': try: p = Pool() cov = np.array(p.map(prop_unc, zip(jac, cov))) finally: p.terminate() # loop is the default else: cov = np.array([prop_unc((jac[o], cov[o])) for o in xrange(nobs)]) # dense and spares are flattened, unravel them into 3-D list of # observations if method.lower() in ['dense', 'sparse']: cov = np.array([ cov[(nf * o):(nf * (o + 1)), (nf * o):(nf * (o + 1))] for o in xrange(nobs) ]) # unpack returns for original function with ungrouped arguments if None in cov_keys or len(cov_keys) > 0: return tuple(avg.tolist() + [cov, jac]) # independent variables were already grouped return avg, cov, jac return wrapped_function return wrapper
# short cut for functions with arguments already grouped unc_wrapper = unc_wrapper_args() unc_wrapper.__doc__ = "equivalent to unc_wrapper_args() with no arguments" unc_wrapper.__name__ = "unc_wrapper"