# Source code for pyqt_fit.kde

r"""
:Author: Pierre Barbier de Reuille <pierre.barbierdereuille@gmail.com>

Module implementing kernel-based estimation of density of probability.

Given a kernel :math:K, the density function is estimated from a sampling
:math:X = \{X_i \in \mathbb{R}^n\}_{i\in\{1,\ldots,m\}} as:

.. math::

f(\mathbf{z}) \triangleq \frac{1}{hW} \sum_{i=1}^m \frac{w_i}{\lambda_i}
K\left(\frac{X_i-\mathbf{z}}{h\lambda_i}\right)

W = \sum_{i=1}^m w_i

where :math:h is the bandwidth of the kernel, :math:w_i are the weights of
the data points and :math:\lambda_i are the adaptation factor of the kernel
width.

The kernel is a function of :math:\mathbb{R}^n such that:

.. math::

\begin{array}{rclcl}
\idotsint_{\mathbb{R}^n} f(\mathbf{z}) d\mathbf{z}
& = & 1 & \Longleftrightarrow & \text{$f$ is a probability}\\
\idotsint_{\mathbb{R}^n} \mathbf{z}f(\mathbf{z}) d\mathbf{z} &=&
\mathbf{0} & \Longleftrightarrow & \text{$f$ is
centered}\\
\forall \mathbf{u}\in\mathbb{R}^n, \|\mathbf{u}\|
= 1\qquad\int_{\mathbb{R}} t^2f(t \mathbf{u}) dt &\approx&
1 & \Longleftrightarrow & \text{The co-variance matrix of $f$ is close
to be the identity.}
\end{array}

The constraint on the covariance is only required to provide a uniform meaning
for the bandwidth of the kernel.

If the domain of the density estimation is bounded to the interval
:math:[L,U], the density is then estimated with:

.. math::

f(x) \triangleq \frac{1}{hW} \sum_{i=1}^n \frac{w_i}{\lambda_i}
\hat{K}(x;X,\lambda_i h,L,U)

where :math:\hat{K} is a modified kernel that depends on the exact method
used. Currently, only 1D KDE supports bounded domains.
"""

from __future__ import division, absolute_import, print_function
import numpy as np
from .kernels import normal_kernel1d
from . import kde_methods
from .kde_bandwidth import variance_bandwidth, silverman_covariance, scotts_covariance, botev_bandwidth
from .utils import numpy_method_idx

[docs]class KDE1D(object):
r"""
Perform a kernel based density estimation in 1D, possibly on a bounded
domain :math:[L,U].

:param ndarray data: 1D array with the data points
:param dict kwords: setting attributes at construction time.
Any named argument will be equivalent to setting the property
after the fact. For example::

>>> xs = [1,2,3]
>>> k = KDE1D(xs, lower=0)

will be equivalent to::

>>> k = KDE1D(xs)
>>> k.lower = 0

The calculation is separated in three parts:

- The kernel (:py:attr:kernel)
- The bandwidth or covariance estimation (:py:attr:bandwidth,
:py:attr:covariance)
- The estimation method (:py:attr:method)

"""

def __init__(self, xdata, **kwords):
self._xdata = None
self._upper = np.inf
self._lower = -np.inf
self._kernel = normal_kernel1d()

self._bw_fct = None
self._bw = None
self._cov_fct = None
self._covariance = None
self._method = None

self.weights = 1.
self.lambdas = 1.

self._fitted = False

for n in kwords:
setattr(self, n, kwords[n])

self.xdata = xdata

has_bw = (self._bw is not None or self._bw_fct is not None or
self._covariance is not None or self._cov_fct is not None)
if not has_bw:
self.covariance = scotts_covariance

if self._method is None:
self.method = kde_methods.default_method

@property
def fitted(self):
"""
Test if the fitting has been done
"""
return self._fitted

def fit_if_needed(self):
"""
Fit only if needed (testing self.fitted)
"""
if not self._fitted:
self.fit()

def need_fit(self):
"""
Calling this function will mark the object as needing fitting.
"""
self._fitter = False

[docs]    def copy(self):
"""
Shallow copy of the KDE object
"""
res = KDE1D.__new__(KDE1D)
for m in self.__dict__:
if len(m) > 1 and m == '_' and m != '_':
setattr(res, m, getattr(self, m))
return res

def compute_bandwidth(self):
"""
Method computing the bandwidth if needed (i.e. if it was defined by functions)
"""
self._bw, self._covariance = kde_methods.compute_bandwidth(self)

[docs]    def fit(self):
"""
Compute the various parameters needed by the kde method
"""
if self._weights.shape:
assert self._weights.shape == self._xdata.shape, \
"There must be as many weights as data points"
self._total_weights = sum(self._weights)
else:
self._total_weights = len(self._xdata)
self.method.fit(self)
self._fitted = True

@property
def xdata(self):
return self._xdata

@xdata.setter
def xdata(self, xs):
self.need_fit()
self._xdata = np.atleast_1d(xs)
assert len(self._xdata.shape) == 1, "The attribute xdata must be a one-dimension array"

@property
def kernel(self):
r"""
Kernel object. This must be an object modeled on
:py:class:pyqt_fit.kernels.Kernel1D. It is recommended to inherit
this class to provide numerical approximation for all methods.

By default, the kernel is an instance of
:py:class:pyqt_fit.kernels.normal_kernel1d
"""
return self._kernel

@kernel.setter
[docs]    def kernel(self, val):
self.need_fit()
self._kernel = val

@property
def lower(self):
r"""
Lower bound of the density domain. If deleted, becomes set to
:math:-\infty
"""
return self._lower

@lower.setter
def lower(self, val):
self.need_fit()
self._lower = float(val)

@lower.deleter
[docs]    def lower(self):
self.need_fit()
self._lower = -np.inf

@property
def upper(self):
r"""
Upper bound of the density domain. If deleted, becomes set to
:math:\infty
"""
return self._upper

@upper.setter
def upper(self, val):
self.need_fit()
self._upper = float(val)

@upper.deleter
[docs]    def upper(self):
self.need_fit()
self._upper = np.inf

@property
def weights(self):
"""
Weigths associated to each data point. It can be either a single value,
or an array with a value per data point. If a single value is provided,
the weights will always be set to 1.
"""
return self._weights

@weights.setter
def weights(self, ws):
self.need_fit()
try:
ws = float(ws)
self._weights = np.asarray(1.)
except TypeError:
ws = np.array(ws, dtype=float)
self._weights = ws
self._total_weights = None

@weights.deleter
[docs]    def weights(self):
self.need_fit()
self._weights = np.asarray(1.)
self._total_weights = None

@property
def total_weights(self):
return self._total_weights

@property
def lambdas(self):
"""
Scaling of the bandwidth, per data point. It can be either a single
value or an array with one value per data point.

When deleted, the lamndas are reset to 1.
"""
return self._lambdas

@lambdas.setter
def lambdas(self, ls):
self.need_fit()
try:
self._lambdas = np.asarray(float(ls))
except TypeError:
ls = np.array(ls, dtype=float)
self._lambdas = ls

@lambdas.deleter
[docs]    def lambdas(self):
self.need_fit()
self._lambdas = np.asarray(1.)

@property
def bandwidth(self):
"""
Bandwidth of the kernel.
Can be set either as a fixed value or using a bandwidth calculator,
that is a function of signature w(xdata) that returns a single
value.

.. note::

A ndarray with a single value will be converted to a floating point
value.
"""
return self._bw

@bandwidth.setter
[docs]    def bandwidth(self, bw):
self.need_fit()
self._bw_fct = None
self._cov_fct = None
if callable(bw):
self._bw_fct = bw
else:
bw = float(bw)
self._bw = bw
self._covariance = bw * bw

@property
def bandwidth_function(self):
return self._bw_fct

@property
def covariance(self):
"""
Covariance of the gaussian kernel.
Can be set either as a fixed value or using a bandwidth calculator,
that is a function of signature w(xdata) that returns a single
value.

.. note::

A ndarray with a single value will be converted to a floating point
value.
"""
return self._covariance

@covariance.setter
[docs]    def covariance(self, cov):
self.need_fit()
self._bw_fct = None
self._cov_fct = None
if callable(cov):
self._cov_fct = cov
else:
cov = float(cov)
self._covariance = cov
self._bw = np.sqrt(cov)

@property
def covariance_function(self):
return self._cov_fct

@numpy_method_idx
def pdf(self, points, out=None):
"""
Compute the PDF of the distribution on the set of points points
"""
self.fit_if_needed()
return self._method.pdf(self, points, out)

[docs]    def evaluate(self, points, out=None):
"""
Compute the PDF of the distribution on the set of points points
"""
return self.pdf(points, out)

[docs]    def __call__(self, points, out=None):
"""
This method is an alias for :py:meth:BoundedKDE1D.evaluate
"""
return self.pdf(points, out=out)

@numpy_method_idx
def cdf(self, points, out=None):
r"""
Compute the cumulative distribution function defined as:

.. math::

cdf(x) = P(X \leq x) = \int_l^x p(t) dt

where :math:l is the lower bound of the distribution domain and
:math:p the density of probability.
"""
self.fit_if_needed()
return self.method.cdf(self, points, out)

[docs]    def cdf_grid(self, N=None, cut=None):
"""
Compute the cdf from the lower bound to the points given as argument.
"""
self.fit_if_needed()
return self.method.cdf_grid(self, N, cut)

@numpy_method_idx
def icdf(self, points, out=None):
r"""
Compute the inverse cumulative distribution (quantile) function.
"""
self.fit_if_needed()
return self.method.icdf(self, points, out)

[docs]    def icdf_grid(self, N=None, cut=None):
"""
Compute the inverse cumulative distribution (quantile) function on a grid.
"""
self.fit_if_needed()
return self.method.icdf_grid(self, N, cut)

@numpy_method_idx
def sf(self, points, out=None):
r"""
Compute the survival function.

The survival function is defined as:

.. math::

sf(x) = P(X \geq x) = \int_x^u p(t) dt = 1 - cdf(x)

where :math:u is the upper bound of the distribution domain and
:math:p the density of probability.

"""
self.fit_if_needed()
return self.method.sf(self, points, out)

def sf_grid(self, N=None, cut=None):
r"""
Compute the survival function on a grid
"""
self.fit_if_needed()
return self.method.sf_grid(self, N, cut)

@numpy_method_idx
def isf(self, points, out=None):
r"""
Compute the inverse survival function, defined as:

.. math::

isf(p) = \sup\left\{x\in\mathbb{R} : sf(x) \leq p\right\}
"""
self.fit_if_needed()
return self.method.isf(self, points, out)

def isf_grid(self, N=None, cut=None):
r"""
Compute the inverse survival function on a grid.
"""
self.fit_if_needed()
return self.method.isf_grid(self, N, cut)

@numpy_method_idx
def hazard(self, points, out=None):
r"""
Compute the hazard function evaluated on the points.

The hazard function is defined as:

.. math::

h(x) = \frac{p(x)}{sf(x)}
"""
self.fit_if_needed()
return self.method.hazard(self, points, out)

def hazard_grid(self, N=None, cut=None):
"""
Compute the hazard function evaluated on a grid.
"""
self.fit_if_needed()
return self.method.hazard_grid(self, N, cut)

@numpy_method_idx
def cumhazard(self, points, out=None):
r"""
Compute the cumulative hazard function evaluated on the points.

The cumulative hazard function is defined as:

.. math::

ch(x) = \int_l^x h(t) dt = -\ln sf(x)

where :math:l is the lower bound of the domain, :math:h the hazard
function and :math:sf the survival function.
"""
self.fit_if_needed()
return self.method.cumhazard(self, points, out)

def cumhazard_grid(self, N=None, cut=None):
"""
Compute the cumulative hazard function evaluated on a grid.
"""
self.fit_if_needed()
return self.method.cumhazard_grid(self, N, cut)

@property
def method(self):
"""
Select the method to use. The method should be an object modeled on
:py:class:pyqt_fit.kde_methods.KDE1DMethod, and it is recommended to
inherit the model.

Available methods in the :py:mod:pyqt_fit.kde_methods sub-module.

:Default: :py:data:pyqt_fit.kde_methods.default_method
"""
return self._method

@method.setter
def method(self, m):
self.need_fit()
self._method = m

@method.deleter
[docs]    def method(self):
self.need_fit()
self._method = kde_methods.renormalization

@property
[docs]    def closed(self):
"""
Returns true if the density domain is closed (i.e. lower and upper
are both finite)
"""
return self.lower > -np.inf and self.upper < np.inf

@property
def bounded(self):
"""
Returns true if the density domain is actually bounded
"""
return self.lower > -np.inf or self.upper < np.inf

[docs]    def grid(self, N=None, cut=None):
"""
Evaluate the density on a grid of N points spanning the whole dataset.

:returns: a tuple with the mesh on which the density is evaluated and
the density itself
"""
self.fit_if_needed()
return self._method.grid(self, N, cut)