Source code for stochasticprocess

# coding=utf-8
"""
module containing brownian motion model related classes
"""

from scipy.linalg import cholesky

from math import sqrt, exp, log


[docs]class StochasticProcess(object): def __init__(self, start): self.start = start self._diffusion_driver = self def __len__(self): return len(self.diffusion_driver) @property def diffusion_driver(self): """ diffusion driver are the underlying `dW` of each process `X` in a SDE like `dX = m dt + s dW` :return list(StochasticProcess): """ if self._diffusion_driver is None: return self, if isinstance(self._diffusion_driver, list): return tuple(self._diffusion_driver) if isinstance(self._diffusion_driver, tuple): return self._diffusion_driver return self._diffusion_driver, # return as a tuple
[docs] def evolve(self, x, s, e, q): """ :param float x: current state value, i.e. value before evolution step :param float s: current point in time, i.e. start point of next evolution step :param float e: next point in time, i.e. end point of evolution step :param float q: standard normal random number to do step :return float: next state value, i.e. value after evolution step evolves process state `x` from `s` to `e` in time depending of standard normal random variable `q` """ return 0.0
[docs]class MultivariateStochasticProcess(StochasticProcess): pass
[docs]class WienerProcess(StochasticProcess): """ class implementing general Gauss process between grid dates """ def __init__(self, mu=0., sigma=1., start=0.): super(WienerProcess, self).__init__(start) self._mu = mu self._sigma = sigma def _drift(self, x, s, e): return self._mu * (e - s) def _diffusion(self, x, s, e): return self._sigma * sqrt(e - s)
[docs] def evolve(self, x, s, e, q): return x + self._drift(x, s, e) + self._diffusion(x, s, e) * q
[docs] def mean(self, t): return self.start + self._mu * t
[docs] def variance(self, t): return self._sigma ** 2 * t
[docs]class OrnsteinUhlenbeckProcess(StochasticProcess): """ class implementing Ornstein Uhlenbeck process """ def __init__(self, theta=0.1, mu=0.1, sigma=0.1, start=0.0): r""" :param flaot theta: mean reversion speed :param float mu: drift :param float sigma: diffusion :param float start: initial value .. math:: dx_t = \theta ( \mu - x_t) dt + \sigma dW_t, x_0 = a """ super(OrnsteinUhlenbeckProcess, self).__init__(start) self._theta = theta self._mu = mu self._sigma = sigma def _drift(self, x, s, e): if self._theta: return x * exp(-self._theta * float(e - s)) + self._mu * (1. - exp(-self._theta * float(e - s))) else: return x def _diffusion(self, x, s, e): # return self._sigma * (1. - exp(-self._theta * float(e - s))) return sqrt(self.variance(float(e-s)))
[docs] def evolve(self, x, s, e, q): return self._drift(x, s, e) + self._diffusion(x, s, e) * q
[docs] def mean(self, t): return self._drift(self.start, 0., t)
[docs] def variance(self, t): if self._theta: return self._sigma * self._sigma * (1. - exp(-2. * self._theta * t)) * .5 / self._theta else: return t
[docs]class GeometricBrownianMotion(WienerProcess): """ class implementing general Gauss process between grid dates """ def __init__(self, mu=0., sigma=1., start=1.): super(GeometricBrownianMotion, self).__init__(mu, sigma, start) self._diffusion_driver = super(GeometricBrownianMotion, self).diffusion_driver
[docs] def evolve(self, x, s, e, q): return x * exp(super(GeometricBrownianMotion, self).evolve(0., s, e, q))
[docs] def mean(self, t): return self.start * exp(self._mu * t)
[docs] def variance(self, t): return self.start ** 2 * exp(2 * self._mu * t) * (exp(self._sigma ** 2 * t) - 1)
[docs]class SABR(MultivariateStochasticProcess): """ class implementing the Hagan et al SABR model """ def __init__(self, alpha=.1, beta=.2, nu=.3, rho=-.2, start=.05): super(SABR, self).__init__((start, alpha)) self._blend_process = GeometricBrownianMotion(0.0) self._vol_process = GeometricBrownianMotion(0.0, nu, alpha) self._beta = beta self._rho = rho self._diffusion_driver = self._blend_process.diffusion_driver + self._vol_process.diffusion_driver
[docs] def evolve(self, x, s, e, q): f, a = x if e-s > 0: q1, q2 = q q2 = self._rho * q1 + sqrt(1.-self._rho ** 2) * q2 a = self._vol_process.evolve(a, s, e, q2) sgn = -1. if f < 0. else 1. f += sgn * abs(f) ** self._beta * sqrt(e-s) * a * q1 return f, a
[docs] def mean(self, t): return self.start[0]
[docs] def variance(self, t): return self._vol_process.variance(t) * t # todo give better sabr variance proxy
[docs]class MultiGauss(MultivariateStochasticProcess): """ class implementing multi dimensional brownian motion """ def __init__(self, mu=list([0.]), covar=list([[1.]]), start=list([0.])): super(MultiGauss, self).__init__(start) self._mu = mu self._dim = len(start) self._cholesky = None if covar is None else cholesky(covar).T self._variance = [1.] * self._dim if covar is None else [covar[i][i] for i in range(self._dim)] self._diffusion_driver = [WienerProcess(m, sqrt(s)) for m, s in zip(self._mu, self._variance)] def _drift(self, x, s, e): return [m * (e - s) for m in self._mu]
[docs] def evolve(self, x, s, e, q): dt = sqrt(e - s) q = [qq * dt for qq in q] q = list(self._cholesky.dot(q)) d = self._drift(x, s, e) return [xx + dd + qq for xx, dd, qq in zip(x, d, q)]
[docs] def mean(self, t): return [s + m * t for s, m in zip(self.start, self._mu)]
[docs] def variance(self, t): return [v * t for v in self._variance]