# Source code for pyqt_fit.bootstrap

"""
:Author: Pierre Barbier de Reuille <pierre.barbierdereuille@gmail.com>

This modules provides function for bootstrapping a regression method.
"""

from __future__ import division, print_function, absolute_import
import numpy as np
from numpy.random import randint
from scipy import optimize
from collections import namedtuple
from . import nonparam_regression
from . import sharedmem
import multiprocessing as mp
from . import bootstrap_workers
from .compat import irange, izip

def percentile(array, p, axis=0):
"""
Compute the percentiles of the values in array
"""
a = np.asarray(array).sort(axis=axis)
n = (len(a) - 1) * p / 100
n0 = np.floor(n)
n1 = n0 + 1
#print("%g percentile on %d = [%d-%d]" % (p*100, len(array), n0, n1))
d = n - n0
v0 = array[n0]
v1 = array[n1]
return v0 + d * (v1 - v0)

[docs]def bootstrap_residuals(fct, xdata, ydata, repeats=3000, residuals=None,
"""
This implements the residual bootstrapping method for non-linear
regression.

:type  fct: callable
:param fct: Function evaluating the function on xdata at least with
fct(xdata)

:type  xdata: ndarray of shape (N,) or (k,N) for function with k predictors
:param xdata: The independent variable where the data is measured

:type  ydata: ndarray
:param ydata: The dependant data

:type  residuals: ndarray or callable or None
:param residuals: Residuals for the estimation on each xdata. If callable,
the call will be residuals(ydata, yopt).

:type  repeats: int
:param repeats: Number of repeats for the bootstrapping

:param add_residual: Function that add a residual to a value. The call
add_residual(yopt, residual) should return the new ydata, with
the residuals 'applied'. If None, it is considered the residuals should

:type  correct_bias: boolean
:param correct_bias: If true, the additive bias of the residuals is
computed and restored

:type  kwrds: dict
:param kwrds: Dictionnary present to absorbed unknown named parameters

:rtype: (ndarray, ndarray)
:returns:

1. xdata, with a new axis at position -2. This correspond to the
'shuffled' xdata (as they are *not* shuffled here)

2.Second item is the shuffled ydata. There is a line per repeat, each
line is shuffled independently.

.. todo::

explain the method here, as well as how to create add_residual
"""
if residuals is None:
residuals = np.subtract

yopt = fct(xdata)

if not isinstance(residuals, np.ndarray):
res = residuals(ydata, yopt)
else:
res = np.array(residuals)

res -= np.mean(res)

shuffle = randint(0, len(ydata), size=(repeats, len(ydata)))

shuffled_res = res[shuffle]

if correct_bias:
kde = nonparam_regression.NonParamRegression(xdata, res)
kde.fit()
bias = kde(xdata)
shuffled_res += bias

return xdata[..., np.newaxis, :], modified_ydata

[docs]def bootstrap_regression(fct, xdata, ydata, repeats=3000, **kwrds):
"""
This implements the shuffling of standard bootstrapping method for
non-linear regression.

:type  fct: callable
:param fct: This is the function to optimize

:type  xdata: ndarray of shape (N,) or (k,N) for function with k predictors
:param xdata: The independent variable where the data is measured

:type  ydata: ndarray
:param ydata: The dependant data

:type  repeats: int
:param repeats: Number of repeats for the bootstrapping

:type  kwrds: dict
:param kwrds: Dictionnary to absorbed unknown named parameters

:rtype: (ndarray, ndarray)
:returns:
1. The shuffled x data. The axis -2 has one element per repeat, the
other axis are shuffled independently.

2. The shuffled ydata. There is a line per repeat, each line is
shuffled independently.

.. todo::

explain the method here
"""
shuffle = randint(0, len(ydata), size=(repeats, len(ydata)))
shuffled_x = xdata[..., shuffle]
shuffled_y = ydata[shuffle]
return shuffled_x, shuffled_y

def getCIs(CI, *arrays):
#sorted_arrays = [ np.sort(a, axis=0) for a in arrays ]

if not np.iterable(CI):
CI = (CI,)

def make_CI(a):
return np.zeros((len(CI), 2) + a.shape[1:], dtype=float)
CIs = tuple(make_CI(a) for a in arrays)
for i, ci in enumerate(CI):
ci = (100. - ci) / 2
for cis, arr in izip(CIs, arrays):
low = np.percentile(arr, ci, axis=0)
high = np.percentile(arr, 100 - ci, axis=0)
cis[i] = [low, high]

return CIs

BootstrapResult = namedtuple('BootstrapResult', '''y_fit y_est eval_points y_eval CIs_val CIs
shuffled_xs shuffled_ys full_results''')

[docs]def bootstrap(fit, xdata, ydata, CI, shuffle_method=bootstrap_residuals,
shuffle_args=(), shuffle_kwrds={}, repeats=3000,
eval_points=None, full_results=False, nb_workers=None,
extra_attrs=(), fit_args=(), fit_kwrds={}):
"""
This function implement the bootstrap algorithm for a regression algorithm.
and the :py:mod:multiprocess module.

:type  fit: callable
:param fit:
Method used to compute regression. The call is::

f = fit(xdata, ydata, *fit_args, **fit_kwrds)

Fit should return an object that would evaluate the regression on a
set of points. The next call will be::

f(eval_points)

:type  xdata: ndarray of shape (N,) or (k,N) for function with k predictors
:param xdata: The independent variable where the data is measured

:type  ydata: ndarray
:param ydata: The dependant data

:type  CI: tuple of float
:param CI: List of percentiles to extract

:type  shuffle_method: callable
:param shuffle_method:
Create shuffled dataset. The call is::

shuffle_method(xdata, ydata, y_est, repeat=repeats, *shuffle_args,
**shuffle_kwrds)

where y_est is the estimated dependant variable on the xdata.

:type  shuffle_args: tuple
:param shuffle_args: List of arguments for the shuffle method

:type  shuffle_kwrds: dict
:param shuffle_kwrds: Dictionnary of arguments for the shuffle method

:type  repeats: int
:param repeats: Number of repeats for the bootstraping

:type  eval_points: ndarray or None
:param eval_points: List of points to evaluate. If None, eval_point
is xdata.

:type  full_results: bool
:param full_results: if True, output also the whole set of evaluations

:type  nb_workers: int or None
:param nb_worders: Number of worker threads. If None, the number of
detected CPUs will be used. And if 1 or less, a single thread
will be used.

:type  extra_attrs: tuple of str
:param extra_attrs: List of attributes of the fitting method to extract on
top of the y values for confidence intervals

:type  fit_args: tuple
:param fit_args: List of extra arguments for the fit callable

:type  fit_kwrds: dict
:param fit_kwrds: Dictionnary of extra named arguments for the fit callable

:rtype: :py:class:BootstrapResult
:return: Estimated y on the data, on the evaluation points, the requested
confidence intervals and, if requested, the shuffled X, Y and the full
estimated distributions.
"""
xdata = np.asarray(xdata)
ydata = np.asarray(ydata)
y_fit = fit(xdata, ydata, *fit_args, **fit_kwrds)
y_fit.fit()

shuffled_x, shuffled_y = shuffle_method(y_fit, xdata, ydata,
repeats=repeats,
*shuffle_args, **shuffle_kwrds)
nx = shuffled_x.shape[-2]
ny = shuffled_y.shape[0]
extra_values = []
for attr in extra_attrs:
extra_values.append(getattr(y_fit, attr))

if eval_points is None:
eval_points = xdata
if nb_workers is None:
nb_workers = mp.cpu_count()

multiprocess = nb_workers > 1

# Copy everything in shared mem
if multiprocess:
ra = sharedmem.zeros((repeats + 1, len(eval_points)), dtype=float)
result_array = ra.np
sx = sharedmem.array(shuffled_x)
sy = sharedmem.array(shuffled_y)
ep = sharedmem.array(eval_points)

def make_ea(ev):
return sharedmem.zeros((repeats + 1, len(ev)), dtype=float)
eas = [make_ea(ev) for ev in extra_values]
extra_arrays = [ea.np for ea in eas]
pool = mp.Pool(mp.cpu_count(), bootstrap_workers.initialize_shared,
(nx, ny, ra, eas, sx, sy, ep, extra_attrs,
fit, fit_args, fit_kwrds))
else:
result_array = np.empty((repeats + 1, len(eval_points)), dtype=float)

def make_ea(ev):
return np.empty((repeats + 1, len(ev)), dtype=float)
extra_arrays = [make_ea(ev) for ev in extra_values]
bootstrap_workers.initialize(nx, ny, result_array, extra_arrays,
shuffled_x, shuffled_y, eval_points,
extra_attrs, fit, fit_args, fit_kwrds)

result_array[0] = y_fit(eval_points)

for ea, ev in izip(extra_arrays, extra_values):
ea[0] = ev

base_repeat = repeats // nb_workers
if base_repeat * nb_workers < repeats:
base_repeat += 1

for i in irange(nb_workers):
end_repeats = (i + 1) * base_repeat
if end_repeats > repeats:
end_repeats = repeats
if multiprocess:
pool.apply_async(bootstrap_workers.bootstrap_result,
(i, i * base_repeat, end_repeats))
else:
bootstrap_workers.bootstrap_result(i, i * base_repeat, end_repeats)

if multiprocess:
pool.close()
pool.join()
CIs = getCIs(CI, result_array, *extra_arrays)

# copy the array to not return a view on a larger array
y_eval = np.array(result_array[0])

if not full_results:
shuffled_y = shuffled_x = result_array = None
extra_arrays = ()
elif multiprocess:
result_array = result_array.copy()  # copy in local memory
extra_arrays = [ea.copy for ea in extra_arrays]

return BootstrapResult(y_fit, y_fit(xdata), eval_points, y_eval, tuple(CI), CIs,
shuffled_x, shuffled_y, result_array)

def test():
from numpy.random import rand, randn
from pylab import plot, clf, legend, arange, figure, title, show
from curve_fitting import curve_fit

p0, p1, p2 = params
return p0 + p1 * x + p2 * x ** 2

init = (10, 1, 1)
target = np.array([10, 4, 1.2])
print("Target parameters: {}".format(target))
x = 6 * rand(200) - 3
y = test(x, target) * (1 + 0.3 * randn(x.shape[0]))
xr = arange(-3, 3, 0.01)
yr = test(xr, target)

print("Estimage best parameters, fixing the first one")
popt, pcov, _, _ = curve_fit(test, x, y, init, fix_params=(0,))
print("Best parameters: {}".format(popt))

print("Estimate best parameters from data")
popt, pcov, _, _ = curve_fit(test, x, y, init)
print("Best parameters: {}".format(popt))

figure(1)
clf()
plot(x, y, '+', label='data')
plot(xr, yr, 'r', label='function')
legend(loc='upper left')

print("Residual bootstrap calculation")
result_r = bootstrap(test, x, y, init, (95, 99),
shuffle_method=bootstrap_residuals, eval_points=xr,
fit=curve_fit)
popt_r, pcov_r, res_r, CI_r, CIp_r, extra_r = result_r
yopt_r = test(xr, popt_r)

figure(2)
clf()
plot(xr, yopt_r, 'g', label='estimate')
plot(xr, yr, 'r', label='target')
plot(xr, CI_r[0][0], 'b--', label='95% CI')
plot(xr, CI_r[0][1], 'b--')
plot(xr, CI_r[1][0], 'k--', label='99% CI')
plot(xr, CI_r[1][1], 'k--')
legend(loc='upper left')
title('Residual Bootstrapping')

print("Regression bootstrap calculation")
(popt_c, pcov_c, res_c, CI_c, CIp_r,
extra_c) = bootstrap(test, x, y, init, CI=(95, 99),
shuffle_method=bootstrap_regression, eval_points=xr,
fit=curve_fit)
yopt_c = test(xr, popt_c)

figure(3)
clf()
plot(xr, yopt_c, 'g', label='estimate')
plot(xr, yr, 'r', label='target')
plot(xr, CI_c[0][0], 'b--', label='95% CI')
plot(xr, CI_c[0][1], 'b--')
plot(xr, CI_c[1][0], 'k--', label='99% CI')
plot(xr, CI_c[1][1], 'k--')
legend(loc='upper left')
title('Regression Bootstrapping (also called Case Resampling)')

print("Done")

show()

return locals()

def profile(filename='bootstrap_profile'):
import cProfile
import pstats
cProfile.run('res = bootstrap.test()', 'bootstrap_profile')
p = pstats.Stats('bootstrap_profile')
return p

if __name__ == "__main__":
test()