Source code for hullwhite_model

# -*- coding: utf-8 -*-

#  shortrate
#  -----------
#  risk factor model library python style.
#
#  Author:  pbrisk <pbrisk@icloud.com>
#  Website: https://github.com/pbrisk/shortrate
#  License: MIT (see LICENSE file)


from math import sqrt, exp

from businessdate import BusinessDate
from dcf import ZeroRateCurve, TIME_SHIFT, compounding
from scipy import integrate

from risk_factor_model import RiskFactorModel


[docs]class HullWhiteCurve(ZeroRateCurve, RiskFactorModel): """ calculation of discount factors in the Hull White model """ @classmethod
[docs] def cast(cls, other, mean_reversion=0.0, volatility=0.0, terminal_date=None): """ :param ZeroRateCurve other: :param mean_reversion: mean reversion speed of short rate process :type mean_reversion: float or function :param volatility: short rate volatility :type volatility: float or function :param BusinessDate terminal_date: date of terminal measure :return: HullWhiteCurve build HullWhiteCurve i.e. Hull White model in terminal measure from ZeroRateCurve, mean reversion speed, volatility and terminal measure date. """ new = cls(mean_reversion=mean_reversion, volatility=volatility, terminal_date=terminal_date, inner_factor=other) return new
def __init__(self, x_list=None, y_list=None, y_inter=None, origin=None, day_count=None, forward_tenor=None, mean_reversion=0.0, volatility=0.0, terminal_date=None, inner_factor=None): """ :param list(float) x_list: :param list(float) y_list: :param list(interpolation) y_inter: :param BusinessDate origin: :param DayCount day_count: :param BusinessPeriod forward_tenor: standard forward :param mean_reversion: mean reversion speed of short rate process :type mean_reversion: float or function :param volatility: short rate volatility :type volatility: float or function :param BusinessDate terminal_date: date of terminal measure :param RateCurve inner_factor: initializes Hull White curve """ if inner_factor is None: inner_factor = ZeroRateCurve(x_list, y_list, y_inter, origin, day_count, forward_tenor) else: if any([x_list, y_list, y_inter, origin, day_count, forward_tenor]): raise (TypeError, 'If `inner_factor` is given all other `RateCurve` properties must be `None`.') RiskFactorModel.__init__(self, inner_factor, 0.0) super(HullWhiteCurve, self).__init__(inner_factor.domain, [inner_factor.get_storage_type(x) for x in inner_factor.domain], (inner_factor._y_left, inner_factor._y_mid, inner_factor._y_right), inner_factor.origin, inner_factor.day_count, inner_factor.forward_tenor) # init mean reversion # time dependent mean reversion speed -> not implemented yet # if isinstance(mean_reversion, float): # self.mean_reversion = (lambda x: mean_reversion) # elif hasattr(mean_reversion, 'origin'): # self.mean_reversion = mean_reversion.to_curve() # else: # self.mean_reversion = mean_reversion # init mean reversion if isinstance(mean_reversion, float): self.mean_reversion = mean_reversion else: raise NotImplementedError # init volatility if isinstance(volatility, float): self.volatility = (lambda x: volatility) elif hasattr(volatility, 'to_curve'): self.volatility = volatility.to_curve() else: self.volatility = volatility # terminal_date if terminal_date is None: terminal_date = self.domain[-1] self.terminal_date = terminal_date self._pre_calc_diffusion = dict() self._pre_calc_drift = dict() self._integral_vol_squared_I1 = dict() # factor state variables self._factor_date = self.origin self._factor_yf = 0.0 self._factor_value = 0.0 self._integral = 0.0 if isinstance(self.inner_factor, RiskFactorModel): self._diffusion_driver = self.inner_factor else: self._diffusion_driver = self # integration helpers for Hull White model
[docs] def calc_integral_I1(self, t1, t2): r""" :param float t1: start time as year fraction / float :param float t2: end time as year fraction / float :return: float returns the value of the helper function I1 .. math:: I_1(t_1, t_2) = \exp \left( -\int_{t_1}^{t_2} a(\tau) \,\mathrm{d}\tau \right) = \mathrm{e}^{-a(t_2 - t_1)} """ return exp(-self.mean_reversion * (t2 - t1))
[docs] def calc_integral_I1_squared(self, t1, t2): r""" :param float t1: start time as year fraction / float :param float t2: end time as year fraction / float :return float: returns the value of the helper function I1^2 .. math:: I_1(t_1, t_2)^2 = \exp \left( -2\int_{t_1}^{t_2} a(\tau) \,\mathrm{d}\tau \right) = \mathrm{e}^{-2a(t_2 - t_1)} """ return exp(-2.0 * self.mean_reversion * (t2 - t1))
[docs] def calc_integral_B(self, t1, t2): r""" returns the value of the helper function B .. math:: B(t_1, t_2) = \int_{t_1}^{t_2} I_1(t_1, \tau) \, \mathrm{d}\tau = \frac{1}{a}\Big(1 - \mathrm{e}^{-a(t_2 - t_1)}\Big) :param float t1: start time as year fraction / float :param float t2: end time as year fraction / float :return float: """ return (1 - exp(- self.mean_reversion * (t2 - t1))) / self.mean_reversion
[docs] def calc_integral_volatility_squared_with_I1(self, t1, t2): """ :param t1: :param t2: :return float: Calculates integral of integrand :math:`f` with :math:`I_1` between two time points :math:`t_1` and :math:`t_2` with :math:`t_1 \le t_2` is as: .. math:: \textrm{Var}_r(t_1,t_2) = \int_{t_1}^{t_2} vol(u)^2 I_1(u,t_2) \,\mathrm{d} u """ func = (lambda x: self.volatility(x) ** 2 * self.calc_integral_I1(x, t2)) result, err = integrate.quad(func, t1, t2) return result
[docs] def calc_integral_volatility_squared_with_I1_squared(self, t1, t2): """ :param t1: :param t2: :return float: calculates drift integral :math:`I_2` """ func = (lambda x: self.volatility(x) ** 2 * self.calc_integral_I1_squared(x, t2)) result, err = integrate.quad(func, t1, t2) return result
[docs] def calc_integral_I2(self, s, t): """ :param float s: start time as year fraction / float :param float t: end time as year fraction / float :return float: returns the value of the helper function Integrals One of the deterministic terms of a step in the MC simulation is calculated here with last observation date for T-Bond numeraire T .. math:: \int_s^t \sigma^2(u) I_1(u,t) (B(u,t)-B(u,T)) \,\mathrm{d} u + B(s,t)I_1(s,t)\int_0^s \sigma^2(u) I_1^2(u,s)\,\mathrm{d}u """ terminal_date_yf = BusinessDate.diff_in_years(self.origin, self.terminal_date) func1 = (lambda u: self.volatility(u) ** 2 * self.calc_integral_I1(u, t) * (self.calc_integral_B(u, t) - self.calc_integral_B(u, terminal_date_yf))) part1and3, err1 = integrate.quad(func1, s, t) part2 = self.calc_integral_B(s, t) * \ self.calc_integral_I1(s, t) * \ self.calc_integral_volatility_squared_with_I1_squared(0., s) return part1and3 + part2
# integrate drift integrals of curve part def _calc_integrals(self, e): """ :param BusinessDate e: end date :return float: method to pre calculate :math:` \int_0^t \sigma(u)^2 I_1(u, t) du` along a grid """ end = BusinessDate.diff_in_years(self.origin, e) return self.calc_integral_volatility_squared_with_I1(0.0, end) # integrate drift and diffusion integrals of stochastic process part def _calc_drift_integrals(self, s, e): """ :param s: :param e: :return: method to pre calculate drift integrals along `s` to `e` on a grid """ start = BusinessDate.diff_in_years(self.origin, s) end = BusinessDate.diff_in_years(self.origin, e) i1 = self.calc_integral_I1(start, end) i2 = self.calc_integral_I2(start, end) return i1, i2 def _calc_diffusion_integrals(self, s, e): """ :param s: :param e: :return: method to pre calculate diffusion integrals along `s` to `e` on a grid """ start = BusinessDate.diff_in_years(self.origin, s) end = BusinessDate.diff_in_years(self.origin, e) var = self.calc_integral_volatility_squared_with_I1_squared(start, end) return sqrt(var) # RiskFactorModel methods
[docs] def pre_calculate(self, s, e): """ :param BusinessDate s: start date :param BusinessDate e: end date pre calculate values based only on grid points """ self._integral_vol_squared_I1[e] = self._calc_integrals(e) self._pre_calc_drift[s, e] = self._calc_drift_integrals(s, e) self._pre_calc_diffusion[s, e] = self._calc_diffusion_integrals(s, e)
[docs] def evolve(self, x, s, e, q): """ :param x: :param s: :param e: :param q: :return: evolve Hull White process of shortrate diviation math:: y = r - y """ i1, i2 = self._calc_drift_integrals(s, e) if (s, e) not in self._pre_calc_drift else self._pre_calc_drift[s, e] v = self._calc_diffusion_integrals(s, e) \ if (s, e) not in self._pre_calc_diffusion else self._pre_calc_diffusion[s, e] return x * i1 + i2 + v * q
[docs] def set_risk_factor(self, factor_date, factor_value=0.0): """ :param BusinessDate factor_date: date of t :param float factor_value: value of risk factor y set :math:`y=r(t)-F(0,t)` risk factor and prepare discount factor integral .. :math: ` \int_0^t \sigma(u)^2 I_1(u, t) du` """ self._factor_date = factor_date self._factor_yf = BusinessDate.diff_in_years(self.origin, factor_date) self._factor_value = factor_value if factor_date in self._integral_vol_squared_I1: self._integral = self._integral_vol_squared_I1[factor_date] else: self._integral = self._calc_integrals(factor_date) return self
# ZeroRateCurve methods
[docs] def get_discount_factor(self, start_date, end_date): r""" :param BusinessDate start_date: start date :param BusinessDate end_date: end date :return float: calculate the discount rate for the given start date and end date .. math:: P_{u,y}: \textrm{BusinessDate} {\times} \textrm{BusinessDate} \to \mathbb{R} and .. math:: (s,t) \mapsto P_{\text{init}}(s,t) \exp \left(-\frac{1}{2}(B^2(u,t)-B^2(u,s)) \int_0^u \sigma^2(\tau)I_1(\tau,t), \mathrm{d}\tau\right)\mathrm{e}^{-(B(t,T)-B(t,S))y} with :math:`P_{\text{init}}(s,t) = \verb|Curve.get_discount_curve(s,t)|` Here the variables with subscript :math:`\textrm{pld}` are dates (BusinessDate instances) and the variables without subscripts are year fractions between the correspondent :math:`\textrm{pld}` variables and :math:`\verb|validity_date|` in the default DCC (Act/365.25). """ # assert self._factor_date <= S <= T df = self.inner_factor.get_discount_factor(start_date, end_date) loc_diff_in_years = BusinessDate.diff_in_years # for speed opt use pointer factor_yf = self._factor_yf start_yf = loc_diff_in_years(self.origin, start_date) end_yf = loc_diff_in_years(self.origin, end_date) loc_calc_b = self.calc_integral_B # for speed opt use pointer start_arg = loc_calc_b(factor_yf, start_yf) end_arg = loc_calc_b(factor_yf, end_yf) arg = (start_arg - end_arg) * (0.5 * (start_arg + end_arg) * self._integral + self._factor_value) return df * exp(arg)
[docs] def get_zero_rate(self, S, T): """ :param S: :param T: :return: calculate zero rate between `S` and `T`. """ if S == T: T += TIME_SHIFT pass df = self.get_discount_factor(S, T) yf = self.day_count(S, T) return compounding.continuous_rate(df, yf)