Source code for pyqt_fit.npr_methods

"""
:Author: Pierre Barbier de Reuille <pierre.barbierdereuille@gmail.com>

Module implementing non-parametric regressions using kernel methods.
"""

from __future__ import division, absolute_import, print_function
from scipy import linalg
import scipy
import numpy as np
from .compat import irange
from .cyth import HAS_CYTHON
from . import kde
from . import py_local_linear
from . import kernels

local_linear = None

def useCython():
    """
    Switch to using Cython methods if available
    """
    global local_linear
    if HAS_CYTHON:
        from . import cy_local_linear
        local_linear = cy_local_linear


def usePython():
    """
    Switch to using the python implementation of the methods
    """
    global local_linear
    local_linear = py_local_linear

if HAS_CYTHON:
    useCython()
else:
    usePython()

[docs]def compute_bandwidth(reg): """ Compute the bandwidth and covariance for the model, based of its xdata attribute """ if reg.bandwidth_function: bw = np.atleast_2d(reg.bandwidth_function(reg.xdata, model=reg)) cov = np.dot(bw, bw).real elif reg.covariance_function: cov = np.atleast_2d(reg.covariance_function(reg.xdata, model=reg)) bw = linalg.sqrtm(cov) else: return reg.bandwidth, reg.covariance return bw, cov
[docs]class RegressionKernelMethod(object): r""" Base class for regression kernel methods """
[docs] def fit(self, reg): """ Fit the method and returns the fitted object that will be used for actual evaluation. The object needs to call the :py:meth:`pyqt_fit.nonparam_regression.NonParamRegression.set_actual_bandwidth` method with the computed bandwidth and covariance. :Default: Compute the bandwidth based on the real data and set it in the regression object """ reg.set_actual_bandwidth(*compute_bandwidth(reg)) return self
[docs] def evaluate(self, points, out): """ Evaluate the regression of the provided points. :param ndarray points: 2d-array of points to compute the regression on. Each column is a point. :param ndarray out: 1d-array in which to store the result :rtype: ndarray :return: The method must return the ``out`` array, updated with the regression values """ raise NotImplementedError()
[docs]class SpatialAverage(RegressionKernelMethod): r""" Perform a Nadaraya-Watson regression on the data (i.e. also called local-constant regression) using a gaussian kernel. The Nadaraya-Watson estimate is given by: .. math:: f_n(x) \triangleq \frac{\sum_i K\left(\frac{x-X_i}{h}\right) Y_i} {\sum_i K\left(\frac{x-X_i}{h}\right)} Where :math:`K(x)` is the kernel and must be such that :math:`E(K(x)) = 0` and :math:`h` is the bandwidth of the method. :param ndarray xdata: Explaining variables (at most 2D array) :param ndarray ydata: Explained variables (should be 1D array) :type cov: ndarray or callable :param cov: If an ndarray, it should be a 2D array giving the matrix of covariance of the gaussian kernel. Otherwise, it should be a function ``cov(xdata, ydata)`` returning the covariance matrix. """ def __init__(self): self.correction = 1. def fit(self, reg): self = super(SpatialAverage, self).fit(reg) self.inv_bw = linalg.inv(reg.bandwidth) return self def evaluate(self, reg, points, out): d, m = points.shape norm = np.zeros((m,), points.dtype) xdata = reg.xdata[..., np.newaxis] ydata = reg.fitted_ydata correction = self.correction N = reg.N inv_bw = scipy.linalg.inv(reg.bandwidth) kernel = reg.kernel out.fill(0) # iterate on the internal points for i, ci in np.broadcast(irange(N), irange(correction.shape[0])): diff = correction[ci] * (xdata[:, i, :] - points) #tdiff = np.dot(inv_cov, diff) #energy = np.exp(-np.sum(diff * tdiff, axis=0) / 2.0) energy = kernel(np.dot(inv_bw, diff)).squeeze() out += ydata[i] * energy norm += energy out[norm > 0] /= norm[norm > 0] return out @property def correction(self): """ The correction coefficient allows to change the width of the kernel depending on the point considered. It can be either a constant (to correct globaly the kernel width), or a 1D array of same size as the input. """ return self._correction @correction.setter
[docs] def correction(self, value): value = np.atleast_1d(value) assert len(value.shape) == 1, "Error, the correction must be a single value or a 1D array" self._correction = value
[docs] def set_density_correction(self): """ Add a correction coefficient depending on the density of the input """ est = kde.KDE1D(self.xdata) dens = est(self.xdata) dm = dens.max() dens[dens < 1e-50] = dm self._correction = dm / dens
@property
[docs] def q(self): """ Degree of the fitted polynom """ return 0
[docs]class LocalLinearKernel1D(RegressionKernelMethod): r""" Perform a local-linear regression using a gaussian kernel. The local constant regression is the function that minimises, for each position: .. math:: f_n(x) \triangleq \argmin_{a_0\in\mathbb{R}} \sum_i K\left(\frac{x-X_i}{h}\right) \left(Y_i - a_0 - a_1(x-X_i)\right)^2 Where :math:`K(x)` is the kernel and must be such that :math:`E(K(x)) = 0` and :math:`h` is the bandwidth of the method. """ def fit(self, reg): return super(LocalLinearKernel1D, self).fit(reg) def evaluate(self, reg, points, out): """ Evaluate the spatial averaging on a set of points :param ndarray points: Points to evaluate the averaging on :param ndarray result: If provided, the result will be put in this array """ points = points[0] xdata = reg.xdata[0] ll = local_linear.local_linear_1d if not isinstance(reg.kernel, kernels.normal_kernel1d): ll = py_local_linear.local_linear_1d li2, out = ll(reg.bandwidth, xdata, reg.fitted_ydata, points, reg.kernel, out) self.li2 = li2 return out @property
[docs] def q(self): """ Degree of the fitted polynom """ return 1
[docs]class PolynomialDesignMatrix1D(object): def __init__(self, degree): self.degree = degree powers = np.arange(0, degree + 1).reshape((1, degree + 1)) self.powers = powers def __call__(self, dX, out=None): return np.power(dX, self.powers, out)
[docs]class LocalPolynomialKernel1D(RegressionKernelMethod): r""" Perform a local-polynomial regression using a user-provided kernel (Gaussian by default). The local constant regression is the function that minimises, for each position: .. math:: f_n(x) \triangleq \argmin_{a_0\in\mathbb{R}} \sum_i K\left(\frac{x-X_i}{h}\right) \left(Y_i - a_0 - a_1(x-X_i) - \ldots - a_q \frac{(x-X_i)^q}{q!}\right)^2 Where :math:`K(x)` is the kernel such that :math:`E(K(x)) = 0`, :math:`q` is the order of the fitted polynomial and :math:`h` is the bandwidth of the method. It is also recommended to have :math:`\int_\mathbb{R} x^2K(x)dx = 1`, (i.e. variance of the kernel is 1) or the effective bandwidth will be scaled by the square-root of this integral (i.e. the standard deviation of the kernel). :param ndarray xdata: Explaining variables (at most 2D array) :param ndarray ydata: Explained variables (should be 1D array) :param int q: Order of the polynomial to fit. **Default:** 3 :type cov: float or callable :param cov: If an float, it should be a variance of the gaussian kernel. Otherwise, it should be a function ``cov(xdata, ydata)`` returning the variance. **Default:** ``scotts_covariance`` """ def __init__(self, q=3): self._q = q @property def q(self): ''' Degree of the fitted polynomials ''' return self._q @q.setter
[docs] def q(self, val): self._q = int(val)
def fit(self, reg): assert reg.dim == 1, "This method can only be used with 1D data" if self.q == 0: obj = SpatialAverage() return obj.fit(reg) elif self.q == 1: obj = LocalLinearKernel1D() return obj.fit(reg) self = super(LocalPolynomialKernel1D, self).fit(reg) self.designMatrix = PolynomialDesignMatrix1D(self.q) return self def evaluate(self, reg, points, out): """ Evaluate the spatial averaging on a set of points :param ndarray points: Points to evaluate the averaging on :param ndarray result: If provided, the result will be put in this array """ xdata = reg.xdata[0, :, np.newaxis] # make it a column vector ydata = reg.fitted_ydata[:, np.newaxis] # make it a column vector points = points[0] # make it a line vector bw = reg.bandwidth kernel = reg.kernel designMatrix = self.designMatrix for i, p in enumerate(points): dX = (xdata - p) Wx = kernel(dX / bw) Xx = designMatrix(dX) WxXx = Wx * Xx XWX = np.dot(Xx.T, WxXx) Lx = linalg.solve(XWX, WxXx.T)[0] out[i] = np.dot(Lx, ydata) return out
[docs]class PolynomialDesignMatrix(object): """ Class used to create a design matrix for polynomial regression """ def __init__(self, dim, deg): self.dim = dim self.deg = deg self._designMatrixSize() def _designMatrixSize(self): """ Compute the size of the design matrix for a n-D problem of order d. Can also compute the Taylors factors (i.e. the factors that would be applied for the taylor decomposition) :param int dim: Dimension of the problem :param int deg: Degree of the fitting polynomial :param bool factors: If true, the out includes the Taylor factors :returns: The number of columns in the design matrix and, if required, a ndarray with the taylor coefficients for each column of the design matrix. """ dim = self.dim deg = self.deg init = 1 dims = [0] * (dim + 1) cur = init prev = 0 #if factors: # fcts = [1] fact = 1 for i in irange(deg): diff = cur - prev prev = cur old_dims = list(dims) fact *= (i + 1) for j in irange(dim): dp = diff - old_dims[j] cur += dp dims[j + 1] = dims[j] + dp # if factors: # fcts += [fact]*(cur-prev) self.size = cur #self.factors = np.array(fcts)
[docs] def __call__(self, x, out=None): """ Creates the design matrix for polynomial fitting using the points x. :param ndarray x: Points to create the design matrix. Shape must be (D,N) or (N,), where D is the dimension of the problem, 1 if not there. :param int deg: Degree of the fitting polynomial :param ndarray factors: Scaling factor for the columns of the design matrix. The shape should be (M,) or (M,1), where M is the number of columns of the out. This value can be obtained using the :py:func:`designMatrixSize` function. :returns: The design matrix as a (M,N) matrix. """ dim, deg = self.dim, self.deg #factors = self.factors x = np.atleast_2d(x) dim = x.shape[0] if out is None: s = self._designMatrixSize(dim, deg) out = np.empty((s, x.shape[1]), dtype=x.dtype) dims = [0] * (dim + 1) out[0, :] = 1 cur = 1 for i in irange(deg): old_dims = list(dims) prev = cur for j in irange(x.shape[0]): dims[j] = cur for k in irange(old_dims[j], prev): np.multiply(out[k], x[j], out[cur]) cur += 1 #if factors is not None: # factors = np.asarray(factors) # if len(factors.shape) == 1: # factors = factors[:,np.newaxis] # out /= factors return out
[docs]class LocalPolynomialKernel(RegressionKernelMethod): r""" Perform a local-polynomial regression in N-D using a user-provided kernel (Gaussian by default). The local constant regression is the function that minimises, for each position: .. math:: f_n(x) \triangleq \argmin_{a_0\in\mathbb{R}} \sum_i K\left(\frac{x-X_i}{h}\right) \left(Y_i - a_0 - \mathcal{P}_q(X_i-x)\right)^2 Where :math:`K(x)` is the kernel such that :math:`E(K(x)) = 0`, :math:`q` is the order of the fitted polynomial, :math:`\mathcal{P}_q(x)` is a polynomial of order :math:`d` in :math:`x` and :math:`h` is the bandwidth of the method. The polynomial :math:`\mathcal{P}_q(x)` is of the form: .. math:: \mathcal{F}_d(k) = \left\{ \n \in \mathbb{N}^d \middle| \sum_{i=1}^d n_i = k \right\} \mathcal{P}_q(x_1,\ldots,x_d) = \sum_{k=1}^q \sum_{\n\in\mathcal{F}_d(k)} a_{k,\n} \prod_{i=1}^d x_i^{n_i} For example we have: .. math:: \mathcal{P}_2(x,y) = a_{110} x + a_{101} y + a_{220} x^2 + a_{211} xy + a_{202} y^2 :param ndarray xdata: Explaining variables (at most 2D array). The shape should be (N,D) with D the dimension of the problem and N the number of points. For 1D array, the shape can be (N,), in which case it will be converted to (N,1) array. :param ndarray ydata: Explained variables (should be 1D array). The shape must be (N,). :param int q: Order of the polynomial to fit. **Default:** 3 :param callable kernel: Kernel to use for the weights. Call is ``kernel(points)`` and should return an array of values the same size as ``points``. If ``None``, the kernel will be ``normal_kernel(D)``. :type cov: float or callable :param cov: If an float, it should be a variance of the gaussian kernel. Otherwise, it should be a function ``cov(xdata, ydata)`` returning the variance. **Default:** ``scotts_covariance`` """ def __init__(self, q=3): self._q = q @property def q(self): ''' Degree of the fitted polynomials ''' return self._q @q.setter
[docs] def q(self, val): self._q = int(val)
def fit(self, reg): if self.q == 0: obj = SpatialAverage() return obj.fit(reg) elif reg.dim == 1: obj = LocalPolynomialKernel1D(self.q) return obj.fit(reg) self = super(LocalPolynomialKernel, self).fit(reg) self.designMatrix = PolynomialDesignMatrix(reg.dim, self.q) return self def evaluate(self, reg, points, out): """ Evaluate the spatial averaging on a set of points :param ndarray points: Points to evaluate the averaging on :param ndarray out: Pre-allocated array for the result """ xdata = reg.xdata ydata = reg.fitted_ydata[:, np.newaxis] # make it a column vector d, n = xdata.shape designMatrix = self.designMatrix dm_size = designMatrix.size Xx = np.empty((dm_size, n), dtype=xdata.dtype) WxXx = np.empty(Xx.shape, dtype=xdata.dtype) XWX = np.empty((dm_size, dm_size), dtype=xdata.dtype) inv_bw = scipy.linalg.inv(reg.bandwidth) kernel = reg.kernel for i in irange(points.shape[1]): dX = (xdata - points[:, i:i + 1]) Wx = kernel(np.dot(inv_bw, dX)) designMatrix(dX, out=Xx) np.multiply(Wx, Xx, WxXx) np.dot(Xx, WxXx.T, XWX) Lx = linalg.solve(XWX, WxXx)[0] out[i] = np.dot(Lx, ydata) return out
default_method = LocalPolynomialKernel(q=1)