Source code for pyqt_fit.kernel_smoothing

"""
:Author: Pierre Barbier de Reuille <pierre.barbierdereuille@gmail.com>

Module implementing non-parametric regressions using kernel smoothing methods.
"""

from __future__ import division, absolute_import, print_function
from scipy import stats
from scipy.linalg import sqrtm, solve
import scipy
import numpy as np
from .compat import irange

from .cyth import HAS_CYTHON

local_linear = None


def useCython():
    """
    Switch to using Cython methods if available
    """
    global local_linear
    if HAS_CYTHON:
        from . import cy_local_linear
        local_linear = cy_local_linear


def usePython():
    """
    Switch to using the python implementation of the methods
    """
    global local_linear
    from . import py_local_linear
    local_linear = py_local_linear

if HAS_CYTHON:
    useCython()
else:
    usePython()

from .kde import scotts_covariance
from .kernels import normal_kernel, normal_kernel1d


[docs]class SpatialAverage(object): r""" Perform a Nadaraya-Watson regression on the data (i.e. also called local-constant regression) using a gaussian kernel. The Nadaraya-Watson estimate is given by: .. math:: f_n(x) \triangleq \frac{\sum_i K\left(\frac{x-X_i}{h}\right) Y_i} {\sum_i K\left(\frac{x-X_i}{h}\right)} Where :math:`K(x)` is the kernel and must be such that :math:`E(K(x)) = 0` and :math:`h` is the bandwidth of the method. :param ndarray xdata: Explaining variables (at most 2D array) :param ndarray ydata: Explained variables (should be 1D array) :type cov: ndarray or callable :param cov: If an ndarray, it should be a 2D array giving the matrix of covariance of the gaussian kernel. Otherwise, it should be a function ``cov(xdata, ydata)`` returning the covariance matrix. """ def __init__(self, xdata, ydata, cov=scotts_covariance): self.xdata = np.atleast_2d(xdata) self.ydata = np.atleast_1d(ydata) self._bw = None self._covariance = None self._inv_cov = None self.covariance = cov self.d, self.n = self.xdata.shape self.correction = 1. @property
[docs] def bandwidth(self): """ Bandwidth of the kernel. It cannot be set directly, but rather should be set via the covariance attribute. """ if self._bw is None and self._covariance is not None: self._bw = np.real(sqrtm(self._covariance)) return self._bw
@property def covariance(self): """ Covariance of the gaussian kernel. Can be set either as a fixed value or using a bandwith calculator, that is a function of signature ``w(xdata, ydata)`` that returns a 2D matrix for the covariance of the kernel. """ return self._covariance @covariance.setter # noqa
[docs] def covariance(self, cov): if callable(cov): _cov = np.atleast_2d(cov(self.xdata, self.ydata)) else: _cov = np.atleast_2d(cov) self._bw = None self._covariance = _cov self._inv_cov = scipy.linalg.inv(_cov)
[docs] def evaluate(self, points, result=None): """ Evaluate the spatial averaging on a set of points :param ndarray points: Points to evaluate the averaging on :param ndarray result: If provided, the result will be put in this array """ points = np.atleast_2d(points).astype(self.xdata.dtype) #norm = self.kde(points) d, m = points.shape if result is None: result = np.zeros((m,), points.dtype) norm = np.zeros((m,), points.dtype) # iterate on the internal points for i, ci in np.broadcast(irange(self.n), irange(self._correction.shape[0])): diff = np.dot(self._correction[ci], self.xdata[:, i, np.newaxis] - points) tdiff = np.dot(self._inv_cov, diff) energy = np.exp(-np.sum(diff * tdiff, axis=0) / 2.0) result += self.ydata[i] * energy norm += energy result[norm > 0] /= norm[norm > 0] return result
[docs] def __call__(self, *args, **kwords): """ This method is an alias for :py:meth:`SpatialAverage.evaluate` """ return self.evaluate(*args, **kwords)
@property def correction(self): """ The correction coefficient allows to change the width of the kernel depending on the point considered. It can be either a constant (to correct globaly the kernel width), or a 1D array of same size as the input. """ return self._correction @correction.setter # noqa
[docs] def correction(self, value): self._correction = np.atleast_1d(value)
[docs] def set_density_correction(self): """ Add a correction coefficient depending on the density of the input """ kde = stats.gaussian_kde(self.xdata) dens = kde(self.xdata) dm = dens.max() dens[dens < 1e-50] = dm self._correction = dm / dens
[docs]class LocalLinearKernel1D(object): r""" Perform a local-linear regression using a gaussian kernel. The local constant regression is the function that minimises, for each position: .. math:: f_n(x) \triangleq \argmin_{a_0\in\mathbb{R}} \sum_i K\left(\frac{x-X_i}{h}\right) \left(Y_i - a_0 - a_1(x-X_i)\right)^2 Where :math:`K(x)` is the kernel and must be such that :math:`E(K(x)) = 0` and :math:`h` is the bandwidth of the method. :param ndarray xdata: Explaining variables (at most 2D array) :param ndarray ydata: Explained variables (should be 1D array) :type cov: float or callable :param cov: If an float, it should be a variance of the gaussian kernel. Otherwise, it should be a function ``cov(xdata, ydata)`` returning the variance. """ def __init__(self, xdata, ydata, cov=scotts_covariance): self.xdata = np.atleast_1d(xdata) self.ydata = np.atleast_1d(ydata) self.n = self.xdata.shape[0] self._bw = None self._covariance = None self.covariance = cov @property
[docs] def bandwidth(self): """ Bandwidth of the kernel. """ return self._bw
@property def covariance(self): """ Covariance of the gaussian kernel. Can be set either as a fixed value or using a bandwith calculator, that is a function of signature ``w(xdata, ydata)`` that returns a single value. .. note:: A ndarray with a single value will be converted to a floating point value. """ return self._covariance @covariance.setter # noqa
[docs] def covariance(self, cov): if callable(cov): _cov = float(cov(self.xdata, self.ydata)) else: _cov = float(cov) self._covariance = _cov self._bw = np.sqrt(_cov)
[docs] def evaluate(self, points, out=None): """ Evaluate the spatial averaging on a set of points :param ndarray points: Points to evaluate the averaging on :param ndarray result: If provided, the result will be put in this array """ li2, out = local_linear.local_linear_1d(self._bw, self.xdata, self.ydata, points, out) self.li2 = li2 return out
[docs] def __call__(self, *args, **kwords): """ This method is an alias for :py:meth:`LocalLinearKernel1D.evaluate` """ return self.evaluate(*args, **kwords)
class PolynomialDesignMatrix1D(object): def __init__(self, dim): self.dim = dim powers = np.arange(0, dim + 1).reshape((1, dim + 1)) self.powers = powers def __call__(self, dX, out=None): return np.power(dX, self.powers, out) # / self.frac
[docs]class LocalPolynomialKernel1D(object): r""" Perform a local-polynomial regression using a user-provided kernel (Gaussian by default). The local constant regression is the function that minimises, for each position: .. math:: f_n(x) \triangleq \argmin_{a_0\in\mathbb{R}} \sum_i K\left(\frac{x-X_i}{h}\right) \left(Y_i - a_0 - a_1(x-X_i) - \ldots - a_q \frac{(x-X_i)^q}{q!}\right)^2 Where :math:`K(x)` is the kernel such that :math:`E(K(x)) = 0`, :math:`q` is the order of the fitted polynomial and :math:`h` is the bandwidth of the method. It is also recommended to have :math:`\int_\mathbb{R} x^2K(x)dx = 1`, (i.e. variance of the kernel is 1) or the effective bandwidth will be scaled by the square-root of this integral (i.e. the standard deviation of the kernel). :param ndarray xdata: Explaining variables (at most 2D array) :param ndarray ydata: Explained variables (should be 1D array) :param int q: Order of the polynomial to fit. **Default:** 3 :type cov: float or callable :param cov: If an float, it should be a variance of the gaussian kernel. Otherwise, it should be a function ``cov(xdata, ydata)`` returning the variance. **Default:** ``scotts_covariance`` """ def __init__(self, xdata, ydata, q=3, **kwords): self.xdata = np.atleast_1d(xdata) self.ydata = np.atleast_1d(ydata) self.n = self.xdata.shape[0] self.q = q self._kernel = None self._bw = None self._covariance = None self.designMatrix = None for n in kwords: setattr(self, n, kwords[n]) if self.kernel is None: self.kernel = normal_kernel1d() if self.covariance is None: self.covariance = scotts_covariance if self.designMatrix is None: self.designMatrix = PolynomialDesignMatrix1D @property def bandwidth(self): """ Bandwidth of the kernel. """ return self._bw @bandwidth.setter # noqa
[docs] def bandwidth(self, bw): if callable(bw): _bw = float(bw(self.xdata, self.ydata)) else: _bw = float(bw) self._bw = _bw self._covariance = _bw * _bw
@property def covariance(self): """ Covariance of the gaussian kernel. Can be set either as a fixed value or using a bandwith calculator, that is a function of signature ``w(xdata, ydata)`` that returns a single value. .. note:: A ndarray with a single value will be converted to a floating point value. """ return self._covariance @covariance.setter # noqa
[docs] def covariance(self, cov): if callable(cov): _cov = float(cov(self.xdata, self.ydata)) else: _cov = float(cov) self._covariance = _cov self._bw = np.sqrt(_cov)
@property def cov(self): """ Covariance of the gaussian kernel. Can be set either as a fixed value or using a bandwith calculator, that is a function of signature ``w(xdata, ydata)`` that returns a single value. .. note:: A ndarray with a single value will be converted to a floating point value. """ return self.covariance @cov.setter # noqa def cov(self, val): self.covariance = val @property def kernel(self): r""" Kernel object. Should provide the following methods: ``kernel.pdf(xs)`` Density of the kernel, denoted :math:`K(x)` By default, the kernel is an instance of :py:class:`kernels.normal_kernel1d` """ return self._kernel @kernel.setter # noqa def kernel(self, val): self._kernel = val
[docs] def evaluate(self, points, out=None): """ Evaluate the spatial averaging on a set of points :param ndarray points: Points to evaluate the averaging on :param ndarray result: If provided, the result will be put in this array """ xdata = self.xdata[:, np.newaxis] # make it a column vector ydata = self.ydata[:, np.newaxis] # make it a column vector q = self.q bw = self.bandwidth kernel = self.kernel designMatrix = self.designMatrix(q) if out is None: out = np.empty(points.shape, dtype=float) for i, p in enumerate(points): dX = (xdata - p) Wx = kernel(dX / bw) Xx = designMatrix(dX) WxXx = Wx * Xx XWX = np.dot(Xx.T, WxXx) Lx = solve(XWX, WxXx.T)[0] out[i] = np.dot(Lx, ydata) return out
[docs] def __call__(self, *args, **kwords): """ This method is an alias for :py:meth:`LocalLinearKernel1D.evaluate` """ return self.evaluate(*args, **kwords)
[docs]class PolynomialDesignMatrix(object): """ Class used to create a design matrix for polynomial regression """ def __init__(self, dim, deg): self.dim = dim self.deg = deg self._designMatrixSize() def _designMatrixSize(self): """ Compute the size of the design matrix for a n-D problem of order d. Can also compute the Taylors factors (i.e. the factors that would be applied for the taylor decomposition) :param int dim: Dimension of the problem :param int deg: Degree of the fitting polynomial :param bool factors: If true, the out includes the Taylor factors :returns: The number of columns in the design matrix and, if required, a ndarray with the taylor coefficients for each column of the design matrix. """ dim = self.dim deg = self.deg init = 1 dims = [0] * (dim + 1) cur = init prev = 0 #if factors: # fcts = [1] fact = 1 for i in irange(deg): diff = cur - prev prev = cur old_dims = list(dims) fact *= (i + 1) for j in irange(dim): dp = diff - old_dims[j] cur += dp dims[j + 1] = dims[j] + dp # if factors: # fcts += [fact]*(cur-prev) self.size = cur #self.factors = np.array(fcts)
[docs] def __call__(self, x, out=None): """ Creates the design matrix for polynomial fitting using the points x. :param ndarray x: Points to create the design matrix. Shape must be (D,N) or (N,), where D is the dimension of the problem, 1 if not there. :param int deg: Degree of the fitting polynomial :param ndarray factors: Scaling factor for the columns of the design matrix. The shape should be (M,) or (M,1), where M is the number of columns of the out. This value can be obtained using the :py:func:`designMatrixSize` function. :returns: The design matrix as a (M,N) matrix. """ dim, deg = self.dim, self.deg #factors = self.factors x = np.atleast_2d(x) dim = x.shape[0] if out is None: s = self._designMatrixSize(dim, deg) out = np.empty((s, x.shape[1]), dtype=x.dtype) dims = [0] * (dim + 1) out[0, :] = 1 cur = 1 for i in irange(deg): old_dims = list(dims) prev = cur for j in irange(x.shape[0]): dims[j] = cur for k in irange(old_dims[j], prev): np.multiply(out[k], x[j], out[cur]) cur += 1 #if factors is not None: # factors = np.asarray(factors) # if len(factors.shape) == 1: # factors = factors[:,np.newaxis] # out /= factors return out
[docs]class LocalPolynomialKernel(object): r""" Perform a local-polynomial regression in N-D using a user-provided kernel (Gaussian by default). The local constant regression is the function that minimises, for each position: .. math:: f_n(x) \triangleq \argmin_{a_0\in\mathbb{R}} \sum_i K\left(\frac{x-X_i}{h}\right) \left(Y_i - a_0 - \mathcal{P}_q(X_i-x)\right)^2 Where :math:`K(x)` is the kernel such that :math:`E(K(x)) = 0`, :math:`q` is the order of the fitted polynomial, :math:`\mathcal{P}_q(x)` is a polynomial of order :math:`d` in :math:`x` and :math:`h` is the bandwidth of the method. The polynomial :math:`\mathcal{P}_q(x)` is of the form: .. math:: \mathcal{F}_d(k) = \left\{ \n \in \mathbb{N}^d \middle| \sum_{i=1}^d n_i = k \right\} \mathcal{P}_q(x_1,\ldots,x_d) = \sum_{k=1}^q \sum_{\n\in\mathcal{F}_d(k)} a_{k,\n} \prod_{i=1}^d x_i^{n_i} For example we have: .. math:: \mathcal{P}_2(x,y) = a_{110} x + a_{101} y + a_{220} x^2 + a_{211} xy + a_{202} y^2 :param ndarray xdata: Explaining variables (at most 2D array). The shape should be (N,D) with D the dimension of the problem and N the number of points. For 1D array, the shape can be (N,), in which case it will be converted to (N,1) array. :param ndarray ydata: Explained variables (should be 1D array). The shape must be (N,). :param int q: Order of the polynomial to fit. **Default:** 3 :param callable kernel: Kernel to use for the weights. Call is ``kernel(points)`` and should return an array of values the same size as ``points``. If ``None``, the kernel will be ``normal_kernel(D)``. :type cov: float or callable :param cov: If an float, it should be a variance of the gaussian kernel. Otherwise, it should be a function ``cov(xdata, ydata)`` returning the variance. **Default:** ``scotts_covariance`` """ def __init__(self, xdata, ydata, q=3, cov=scotts_covariance, kernel=None): self.xdata = np.atleast_2d(xdata) self.ydata = np.atleast_1d(ydata) self.d, self.n = self.xdata.shape self.q = q if kernel is None: kernel = normal_kernel(self.d) self.kernel = kernel self._bw = None self._covariance = None self.covariance = cov @property
[docs] def bandwidth(self): """ Bandwidth of the kernel. """ return self._bw
@property def covariance(self): """ Covariance of the gaussian kernel. Can be set either as a fixed value or using a bandwith calculator, that is a function of signature ``w(xdata, ydata)`` that returns a DxD matrix. .. note:: A ndarray with a single value will be converted to a floating point value. """ return self._covariance @covariance.setter # noqa
[docs] def covariance(self, cov): if callable(cov): _cov = cov(self.xdata, self.ydata) else: _cov = np.atleast_2d(cov) self._covariance = _cov self._bw = np.real(sqrtm(_cov))
[docs] def evaluate(self, points, out=None): """ Evaluate the spatial averaging on a set of points :param ndarray points: Points to evaluate the averaging on :param ndarray out: Pre-allocated array for the result """ xdata = self.xdata ydata = self.ydata[:, np.newaxis] # make it a column vector points = np.atleast_2d(points) n = self.n q = self.q d = self.d designMatrix = PolynomialDesignMatrix(d, q) dm_size = designMatrix.size Xx = np.empty((dm_size, n), dtype=xdata.dtype) WxXx = np.empty(Xx.shape, dtype=xdata.dtype) XWX = np.empty((dm_size, dm_size), dtype=xdata.dtype) inv_bw = scipy.linalg.inv(self.bandwidth) kernel = self.kernel if out is None: out = np.empty((points.shape[1],), dtype=float) for i in irange(points.shape[1]): dX = (xdata - points[:, i:i + 1]) Wx = kernel(np.dot(inv_bw, dX)) designMatrix(dX, out=Xx) np.multiply(Wx, Xx, WxXx) np.dot(Xx, WxXx.T, XWX) Lx = solve(XWX, WxXx)[0] out[i] = np.dot(Lx, ydata) return out
[docs] def __call__(self, *args, **kwords): """ This method is an alias for :py:meth:`LocalLinearKernel1D.evaluate` """ return self.evaluate(*args, **kwords)