Source code for pyqt_fit.kde

r"""
:Author: Pierre Barbier de Reuille <pierre.barbierdereuille@gmail.com>

Module implementing kernel-based estimation of density of probability.

Given a kernel :math:`K`, the density function is estimated from a sampling 
:math:`X = \{X_i \in \mathbb{R}^n\}_{i\in\{1,\ldots,m\}}` as:

.. math::

    f(\mathbf{z}) \triangleq \frac{1}{hW} \sum_{i=1}^m \frac{w_i}{\lambda_i}
    K\left(\frac{X_i-\mathbf{z}}{h\lambda_i}\right)

    W = \sum_{i=1}^m w_i

where :math:`h` is the bandwidth of the kernel, :math:`w_i` are the weights of 
the data points and :math:`\lambda_i` are the adaptation factor of the kernel 
width.

The kernel is a function of :math:`\mathbb{R}^n` such that:

.. math::

    \begin{array}{rclcl}
       \idotsint_{\mathbb{R}^n} f(\mathbf{z}) d\mathbf{z} 
       & = & 1 & \Longleftrightarrow & \text{$f$ is a probability}\\
       \idotsint_{\mathbb{R}^n} \mathbf{z}f(\mathbf{z}) d\mathbf{z} &=& 
       \mathbf{0} & \Longleftrightarrow & \text{$f$ is 
       centered}\\
       \forall \mathbf{u}\in\mathbb{R}^n, \|\mathbf{u}\| 
       = 1\qquad\int_{\mathbb{R}} t^2f(t \mathbf{u}) dt &\approx& 
       1 & \Longleftrightarrow & \text{The co-variance matrix of $f$ is close 
       to be the identity.}
    \end{array}

The constraint on the covariance is only required to provide a uniform meaning 
for the bandwidth of the kernel.

If the domain of the density estimation is bounded to the interval 
:math:`[L,U]`, the density is then estimated with:

.. math::

    f(x) \triangleq \frac{1}{hW} \sum_{i=1}^n \frac{w_i}{\lambda_i}
    \hat{K}(x;X,\lambda_i h,L,U)

where :math:`\hat{K}` is a modified kernel that depends on the exact method 
used. Currently, only 1D KDE supports bounded domains.
"""

from __future__ import division, absolute_import, print_function
import numpy as np
from .kernels import normal_kernel1d
from . import kde_methods
from .kde_bandwidth import variance_bandwidth, silverman_covariance, scotts_covariance, botev_bandwidth
from .utils import numpy_method_idx

[docs]class KDE1D(object): r""" Perform a kernel based density estimation in 1D, possibly on a bounded domain :math:`[L,U]`. :param ndarray data: 1D array with the data points :param dict kwords: setting attributes at construction time. Any named argument will be equivalent to setting the property after the fact. For example:: >>> xs = [1,2,3] >>> k = KDE1D(xs, lower=0) will be equivalent to:: >>> k = KDE1D(xs) >>> k.lower = 0 The calculation is separated in three parts: - The kernel (:py:attr:`kernel`) - The bandwidth or covariance estimation (:py:attr:`bandwidth`, :py:attr:`covariance`) - The estimation method (:py:attr:`method`) """ def __init__(self, xdata, **kwords): self._xdata = None self._upper = np.inf self._lower = -np.inf self._kernel = normal_kernel1d() self._bw_fct = None self._bw = None self._cov_fct = None self._covariance = None self._method = None self.weights = 1. self.lambdas = 1. self._fitted = False for n in kwords: setattr(self, n, kwords[n]) self.xdata = xdata has_bw = (self._bw is not None or self._bw_fct is not None or self._covariance is not None or self._cov_fct is not None) if not has_bw: self.covariance = scotts_covariance if self._method is None: self.method = kde_methods.default_method @property def fitted(self): """ Test if the fitting has been done """ return self._fitted def fit_if_needed(self): """ Fit only if needed (testing self.fitted) """ if not self._fitted: self.fit() def need_fit(self): """ Calling this function will mark the object as needing fitting. """ self._fitter = False
[docs] def copy(self): """ Shallow copy of the KDE object """ res = KDE1D.__new__(KDE1D) # Copy private members: start with a single '_' for m in self.__dict__: if len(m) > 1 and m[0] == '_' and m[1] != '_': setattr(res, m, getattr(self, m)) return res
def compute_bandwidth(self): """ Method computing the bandwidth if needed (i.e. if it was defined by functions) """ self._bw, self._covariance = kde_methods.compute_bandwidth(self)
[docs] def fit(self): """ Compute the various parameters needed by the kde method """ if self._weights.shape: assert self._weights.shape == self._xdata.shape, \ "There must be as many weights as data points" self._total_weights = sum(self._weights) else: self._total_weights = len(self._xdata) self.method.fit(self) self._fitted = True
@property def xdata(self): return self._xdata @xdata.setter def xdata(self, xs): self.need_fit() self._xdata = np.atleast_1d(xs) assert len(self._xdata.shape) == 1, "The attribute xdata must be a one-dimension array" @property def kernel(self): r""" Kernel object. This must be an object modeled on :py:class:`pyqt_fit.kernels.Kernel1D`. It is recommended to inherit this class to provide numerical approximation for all methods. By default, the kernel is an instance of :py:class:`pyqt_fit.kernels.normal_kernel1d` """ return self._kernel @kernel.setter
[docs] def kernel(self, val): self.need_fit() self._kernel = val
@property def lower(self): r""" Lower bound of the density domain. If deleted, becomes set to :math:`-\infty` """ return self._lower @lower.setter def lower(self, val): self.need_fit() self._lower = float(val) @lower.deleter
[docs] def lower(self): self.need_fit() self._lower = -np.inf
@property def upper(self): r""" Upper bound of the density domain. If deleted, becomes set to :math:`\infty` """ return self._upper @upper.setter def upper(self, val): self.need_fit() self._upper = float(val) @upper.deleter
[docs] def upper(self): self.need_fit() self._upper = np.inf
@property def weights(self): """ Weigths associated to each data point. It can be either a single value, or an array with a value per data point. If a single value is provided, the weights will always be set to 1. """ return self._weights @weights.setter def weights(self, ws): self.need_fit() try: ws = float(ws) self._weights = np.asarray(1.) except TypeError: ws = np.array(ws, dtype=float) self._weights = ws self._total_weights = None @weights.deleter
[docs] def weights(self): self.need_fit() self._weights = np.asarray(1.) self._total_weights = None
@property def total_weights(self): return self._total_weights @property def lambdas(self): """ Scaling of the bandwidth, per data point. It can be either a single value or an array with one value per data point. When deleted, the lamndas are reset to 1. """ return self._lambdas @lambdas.setter def lambdas(self, ls): self.need_fit() try: self._lambdas = np.asarray(float(ls)) except TypeError: ls = np.array(ls, dtype=float) self._lambdas = ls @lambdas.deleter
[docs] def lambdas(self): self.need_fit() self._lambdas = np.asarray(1.)
@property def bandwidth(self): """ Bandwidth of the kernel. Can be set either as a fixed value or using a bandwidth calculator, that is a function of signature ``w(xdata)`` that returns a single value. .. note:: A ndarray with a single value will be converted to a floating point value. """ return self._bw @bandwidth.setter
[docs] def bandwidth(self, bw): self.need_fit() self._bw_fct = None self._cov_fct = None if callable(bw): self._bw_fct = bw else: bw = float(bw) self._bw = bw self._covariance = bw * bw
@property def bandwidth_function(self): return self._bw_fct @property def covariance(self): """ Covariance of the gaussian kernel. Can be set either as a fixed value or using a bandwidth calculator, that is a function of signature ``w(xdata)`` that returns a single value. .. note:: A ndarray with a single value will be converted to a floating point value. """ return self._covariance @covariance.setter
[docs] def covariance(self, cov): self.need_fit() self._bw_fct = None self._cov_fct = None if callable(cov): self._cov_fct = cov else: cov = float(cov) self._covariance = cov self._bw = np.sqrt(cov)
@property def covariance_function(self): return self._cov_fct @numpy_method_idx def pdf(self, points, out=None): """ Compute the PDF of the distribution on the set of points ``points`` """ self.fit_if_needed() return self._method.pdf(self, points, out)
[docs] def evaluate(self, points, out=None): """ Compute the PDF of the distribution on the set of points ``points`` """ return self.pdf(points, out)
[docs] def __call__(self, points, out=None): """ This method is an alias for :py:meth:`BoundedKDE1D.evaluate` """ return self.pdf(points, out=out)
@numpy_method_idx def cdf(self, points, out=None): r""" Compute the cumulative distribution function defined as: .. math:: cdf(x) = P(X \leq x) = \int_l^x p(t) dt where :math:`l` is the lower bound of the distribution domain and :math:`p` the density of probability. """ self.fit_if_needed() return self.method.cdf(self, points, out)
[docs] def cdf_grid(self, N=None, cut=None): """ Compute the cdf from the lower bound to the points given as argument. """ self.fit_if_needed() return self.method.cdf_grid(self, N, cut)
@numpy_method_idx def icdf(self, points, out=None): r""" Compute the inverse cumulative distribution (quantile) function. """ self.fit_if_needed() return self.method.icdf(self, points, out)
[docs] def icdf_grid(self, N=None, cut=None): """ Compute the inverse cumulative distribution (quantile) function on a grid. """ self.fit_if_needed() return self.method.icdf_grid(self, N, cut)
@numpy_method_idx def sf(self, points, out=None): r""" Compute the survival function. The survival function is defined as: .. math:: sf(x) = P(X \geq x) = \int_x^u p(t) dt = 1 - cdf(x) where :math:`u` is the upper bound of the distribution domain and :math:`p` the density of probability. """ self.fit_if_needed() return self.method.sf(self, points, out) def sf_grid(self, N=None, cut=None): r""" Compute the survival function on a grid """ self.fit_if_needed() return self.method.sf_grid(self, N, cut) @numpy_method_idx def isf(self, points, out=None): r""" Compute the inverse survival function, defined as: .. math:: isf(p) = \sup\left\{x\in\mathbb{R} : sf(x) \leq p\right\} """ self.fit_if_needed() return self.method.isf(self, points, out) def isf_grid(self, N=None, cut=None): r""" Compute the inverse survival function on a grid. """ self.fit_if_needed() return self.method.isf_grid(self, N, cut) @numpy_method_idx def hazard(self, points, out=None): r""" Compute the hazard function evaluated on the points. The hazard function is defined as: .. math:: h(x) = \frac{p(x)}{sf(x)} """ self.fit_if_needed() return self.method.hazard(self, points, out) def hazard_grid(self, N=None, cut=None): """ Compute the hazard function evaluated on a grid. """ self.fit_if_needed() return self.method.hazard_grid(self, N, cut) @numpy_method_idx def cumhazard(self, points, out=None): r""" Compute the cumulative hazard function evaluated on the points. The cumulative hazard function is defined as: .. math:: ch(x) = \int_l^x h(t) dt = -\ln sf(x) where :math:`l` is the lower bound of the domain, :math:`h` the hazard function and :math:`sf` the survival function. """ self.fit_if_needed() return self.method.cumhazard(self, points, out) def cumhazard_grid(self, N=None, cut=None): """ Compute the cumulative hazard function evaluated on a grid. """ self.fit_if_needed() return self.method.cumhazard_grid(self, N, cut) @property def method(self): """ Select the method to use. The method should be an object modeled on :py:class:`pyqt_fit.kde_methods.KDE1DMethod`, and it is recommended to inherit the model. Available methods in the :py:mod:`pyqt_fit.kde_methods` sub-module. :Default: :py:data:`pyqt_fit.kde_methods.default_method` """ return self._method @method.setter def method(self, m): self.need_fit() self._method = m @method.deleter
[docs] def method(self): self.need_fit() self._method = kde_methods.renormalization
@property
[docs] def closed(self): """ Returns true if the density domain is closed (i.e. lower and upper are both finite) """ return self.lower > -np.inf and self.upper < np.inf
@property def bounded(self): """ Returns true if the density domain is actually bounded """ return self.lower > -np.inf or self.upper < np.inf
[docs] def grid(self, N=None, cut=None): """ Evaluate the density on a grid of N points spanning the whole dataset. :returns: a tuple with the mesh on which the density is evaluated and the density itself """ self.fit_if_needed() return self._method.grid(self, N, cut)