Source code for stochasticproducer

# coding=utf-8
"""
module containing stochastic process model producer
"""

from math import sqrt

from scipy.linalg import cholesky

from engine import Producer, State
from producers import MultiProducer

from indexedmatrix import IndexMatrix

# producer

[docs]class GaussEvolutionFunctionProducer(Producer): """ class implementing general Gauss process between grid dates """ def __init__(self, func=None, initial_state=None, length=None): """ :param callable func: evolve function, e.g. `lambda x, s, e, q: x + sqrt(e - s) * q` by default with `x` current state value, `s` current point in time, i.e. start point of next evolution step, `e` next point in time, i.e. end point of evolution step, `q` standard normal random number to do step :param initial_state: initial state (value) of evolution, :param int or None length: length of `q` as a list of Gauss random numbers, if `None` or `0` the evolution function `func` will be invoked with `q` not as a list but a float random number. class implementing general Gauss process between grid dates and provides state to any evolve style function `foo(x, s, e, q)` with `x` last state, `s` last state time, `e` current point in time and `q` current Gauss process state """ if func is None: func = (lambda x, s, e, q: x + sqrt(e - s) * q) self._len = length super(GaussEvolutionFunctionProducer, self).__init__(func, initial_state) def __len__(self): return 0 if self._len is None else self._len
[docs] def evolve(self, new_date): """ evolve to the new process state at the next date :param date new_date: date or point in time of the new state :return State: """ if self.state.date == new_date and not self.initial_state.date == new_date: return self.state if self._len: q = [self.random.gauss(0., 1.) for _ in range(int(self._len))] else: q = self.random.gauss(0., 1.) self.state.value = self.func(self.state.value, self.state.date, new_date, q) self.state.date = new_date return self.state
[docs]class GaussEvolutionProducer(GaussEvolutionFunctionProducer): """ producer to bring diffusion process to life """ def __init__(self, process): """ :param StochasticProcess process: diffusion process to evolve """ self.process = process self.diffusion_driver = process.diffusion_driver length = len(process) if len(process) > 1 else None super(GaussEvolutionProducer, self).__init__(process.evolve, State(process.start), length)
[docs]class CorrelatedGaussEvolutionProducer(MultiProducer): """ class implementing general correlated Gauss process between grid dates """ def __init__(self, producers, correlation=None, diffusion_driver=None): """ :param list(GaussEvolutionProducer) producers: list of producers to evolve :param list(list(float)) or dict((StochasticProcess, StochasticProcess): float) or None correlation: correlation matrix of underlying multivariate Gauss process of diffusion drivers. If `dict` keys must be pairs of diffusion drivers, diagonal and zero entries can be omitted. If not give, all drives are evolve independently. :param list(StochasticProcess) or None diffusion_driver: list of diffusion drivers indexing the correlation matrix. If not given and `correlation` is not an IndexMatrix, e.g. comes already with list of drivers, it is assumed that each process producer has different drivers and the correlation is order in the same way. """ # faking random().gauss() class _FakeGaussRandom(list): def seed(self, *args): pass def gauss(self, *args): return self.pop(0) for p in producers: p.random = _FakeGaussRandom() super(CorrelatedGaussEvolutionProducer, self).__init__(producers) # build correlation from sparse matrix (dict) if isinstance(correlation, dict): assert diffusion_driver is None # collect all drives in correlation drivers = set() for i, j in correlation.keys(): drivers.add(i) drivers.add(j) drivers = tuple(drivers) list_correlation = [[0.0] * len(drivers) for _ in drivers] for i in range(len(list_correlation)): list_correlation[i][i] = 1. # fill sparse correlation matrix for rf_1 in drivers: for rf_2 in drivers: if (rf_1, rf_2) in correlation: if (rf_2, rf_1) in correlation: assert correlation[rf_1, rf_2] == correlation[rf_2, rf_1] i, j = drivers.index(rf_1), drivers.index(rf_2) list_correlation[i][j] = correlation[rf_1, rf_2] list_correlation[j][i] = correlation[rf_1, rf_2] correlation = list_correlation diffusion_driver = drivers self._correlation = correlation # build valid driver list (according correlation and producers) if correlation and not diffusion_driver: # no given diffusion_drivers nor keys in correlation drivers = list() for p in producers: for d in p.diffusion_driver: if d not in drivers: drivers.append(d) diffusion_driver = tuple(drivers) # require enough correlation because here we cannot decide which drives # should be independent or omitted assert len(diffusion_driver) == len(correlation) self._diffusion_driver = () if diffusion_driver is None else diffusion_driver # build index lists for each producers diffusion drivers self._driver_index = dict.fromkeys(producers, []) for p in producers: dd = [d for d in self._diffusion_driver if d in p.diffusion_driver] self._driver_index[p] = [diffusion_driver.index(d) for d in dd] # build cholesky self._cholesky = cholesky(correlation).T if correlation else None def __len__(self): l = 0 for p in self.producers: l += 1 if len(p) == 0 else len(p) return l
[docs] def evolve(self, new_date): """ evolve to the new process state at the next date :param date new_date: date or point in time of the new state :return State: """ if all(p.state.date == new_date for p in self.producers): return [p.state for p in self.producers] if self._cholesky is not None: q = [self.random.gauss(0., 1.) for _ in range(len(self._cholesky))] q = list(self._cholesky.dot(q)) else: q = list() state = list() for p in self.producers: if len(self._driver_index[p]) == len(p.diffusion_driver): qq = [q[i] for i in self._driver_index[p]] elif len(self._driver_index[p]) < len(p.diffusion_driver): qq = list() for d in p.diffusion_driver: qqq = q[self._diffusion_driver.index(d)] if d in self._diffusion_driver else self.random.gauss(0., 1.) qq.append(qqq) else: qq = [self.random.gauss(0., 1.) for _ in p.diffusion_driver] p.random.extend(qq) state.append(p.evolve(new_date)) return state
[docs]class MultiGaussEvolutionProducer(CorrelatedGaussEvolutionProducer): """ class implementing multi variant GaussEvolutionProducer """ def __init__(self, process_list, correlation=None, diffusion_driver=None): producers = [GaussEvolutionProducer(p) for p in process_list] super(MultiGaussEvolutionProducer, self).__init__(producers, correlation, diffusion_driver)