Source code for curve

# -*- coding: utf-8 -*-

#  dcf (discounted cashflow)
#  -------------------------
#  A fast, efficient Python library for generating business cashflows inherited.
#  Typical banking business methods are provided like interpolation, compounding,
#  discounting and fx.
#
#  Author:  pbrisk <pbrisk@icloud.com>
#  Copyright: 2016, 2017 Deutsche Postbank AG
#  Website: https://github.com/pbrisk/dcf
#  License: APACHE Version 2 License (see LICENSE file)


from math import exp, log
import interpolation
import compounding


def _day_count(start, end):
    if hasattr(start, 'diff_in_years'):
        # duck typing businessdate.BusinessDate.diff_in_years
        return start.diff_in_years(end)
    else:
        d = end - start
        if hasattr(d, 'days'):
            # assume datetime.date or finance.BusinessDate (else days as float)
            d = d.days
        return float(d) / 365.25


DAY_COUNT = _day_count
FORWARD_RATE_TENOR = '3M'
FORWARD_CREDIT_TENOR = '1Y'
TIME_SHIFT = '1D'


[docs]class Curve(object): def __init__(self, x_list=None, y_list=None, y_inter=None): r""" Curve object to build function :param list(float) x_list: source values :param list(float) y_list: target values :param list(interpolation.interpolation) y_inter: interpolation function on x_list (optional) or triple of (left, mid, right) interpolation functions with left for x < x_list[0] (as default triple.right is used) right for x > x_list][-1] (as default interpolation.constant is used) mid else (as default interpolation.linear is used) Curve object to build function :math:`f:R \rightarrow R, x \mapsto y` from finite point vectors :math:`x` and :math:`y` using piecewise various interpolation functions. """ if not y_inter: y_inter = interpolation.linear() y_left, y_mid, y_right = interpolation.constant(), interpolation.linear(), interpolation.constant() if isinstance(y_inter, (tuple, list)): if len(y_inter) == 3: y_left, y_mid, y_right = y_inter elif len(y_inter) == 2: y_mid, y_right = y_inter y_left = y_right elif len(y_inter) == 1: y_mid = y_inter[0] else: raise ValueError elif isinstance(y_inter, interpolation.interpolation): y_mid = y_inter else: raise (AttributeError, str(y_inter) + " is not a proper interpolation.") assert len(x_list) == len(y_list) assert len(x_list) == len(set(x_list)) #: Interpolation: self._y_mid = type(y_mid)(x_list, y_list) self._y_right = type(y_right)(x_list, y_list) self._y_left = type(y_left)(x_list, y_list) @property def domain(self): return self._y_mid.x_list def __call__(self, x): if isinstance(x, (tuple, list)): return [self(xx) for xx in x] y = 0.0 if x < self._y_left.x_list[0]: # extrapolate to left y = self._y_left(x) elif x > self._y_right.x_list[-1]: # extrapolate to right y = self._y_right(x) else: # interpolate in the middle y = self._y_mid(x) return y def __add__(self, other): x_list = sorted(set(self.domain + other.domain)) y_list = [self(x) + other(x) for x in x_list] return self.__class__(x_list, y_list, (self._y_left, self._y_mid, self._y_right)) def __sub__(self, other): x_list = sorted(set(self.domain + other.domain)) y_list = [self(x) - other(x) for x in x_list] return self.__class__(x_list, y_list, (self._y_left, self._y_mid, self._y_right)) def __mul__(self, other): x_list = sorted(set(self.domain + other.domain)) y_list = [self(x) * other(x) for x in x_list] return self.__class__(x_list, y_list, (self._y_left, self._y_mid, self._y_right)) def __div__(self, other): x_list = sorted(set(self.domain + other.domain)) y_list = [self(x) / other(x) for x in x_list] return self.__class__(x_list, y_list, (self._y_left, self._y_mid, self._y_right)) def __str__(self): return str([z for z in zip(self.domain, self(self.domain))]) def __repr__(self): return self.__class__.__name__ + '(' + self.__str__() + ')'
[docs] def update(self, x_list=list(), y_list=list()): self._y_left.update(x_list, y_list) self._y_mid.update(x_list, y_list) self._y_right.update(x_list, y_list)
[docs] def shifted(self, delta=0.0): if delta: x_list = [x + delta for x in self.domain] else: x_list = self.domain y_list = self(self.domain) return self.__class__(x_list, y_list, (self._y_left, self._y_mid, self._y_right))
[docs]class DateCurve(Curve): def __init__(self, x_list, y_list, y_inter=None, origin=None, day_count=None): if origin is not None: self.origin = origin else: self.origin = x_list[0] if day_count is not None: self.day_count = day_count else: self.day_count = DAY_COUNT self._relative_mode = not isinstance(self.origin, (int, float)) if self._relative_mode: super(DateCurve, self).__init__([x - self.origin for x in x_list], y_list, y_inter) else: super(DateCurve, self).__init__(x_list, y_list, y_inter) self._domain = x_list @property def domain(self): return self._domain def __call__(self, x): if isinstance(x, (list, tuple)): return [self(xx) for xx in x] if self._relative_mode: return super(DateCurve, self).__call__(x - self.origin) else: return super(DateCurve, self).__call__(x) def __add__(self, other): new = super(DateCurve, self).__add__(other.shifted(self.origin - other.origin)) new.origin = self.origin return new def __sub__(self, other): new = super(DateCurve, self).__sub__(other.shifted(self.origin - other.origin)) new.origin = self.origin return new def __mul__(self, other): new = super(DateCurve, self).__mul__(other.shifted(self.origin - other.origin)) new.origin = self.origin return new def __div__(self, other): new = super(DateCurve, self).__div__(other.shifted(self.origin - other.origin)) new.origin = self.origin return new
[docs] def to_curve(self): x_list = self.domain if self._relative_mode: y_list = self([x - self.origin for x in x_list]) else: y_list = self(x_list) return Curve(x_list, y_list, (self._y_left, self._y_mid, self._y_right))
[docs] def update(self, x_list=list(), y_list=list()): if y_list: for x in x_list: if x not in self._domain: self._domain.append(x) self._domain = sorted(self._domain) if self._relative_mode: super(DateCurve, self).update([x - self.origin for x in x_list], y_list) else: super(DateCurve, self).update(x_list, y_list)
[docs]class RateCurve(DateCurve): @classmethod
[docs] def cast(cls, other): new = cls(other.domain, [other.get_storage_type(x) for x in other.domain], (other._y_left, other._y_mid, other._y_right), other.origin, other.day_count, other.forward_tenor) return new
def __init__(self, x_list, y_list, y_inter=None, origin=None, day_count=None, forward_tenor=None): super(RateCurve, self).__init__(x_list, y_list, y_inter, origin, day_count) if forward_tenor is not None: self.forward_tenor = forward_tenor else: self.forward_tenor = FORWARD_RATE_TENOR def __add__(self, other): casted = self.__class__.cast(other) new = super(RateCurve, self).__add__(casted) new.forward_tenor = self.forward_tenor return new def __sub__(self, other): casted = self.__class__.cast(other) new = super(RateCurve, self).__sub__(casted) new.forward_tenor = self.forward_tenor return new def __mul__(self, other): casted = self.__class__.cast(other) new = super(RateCurve, self).__mul__(casted) new.forward_tenor = self.forward_tenor return new def __div__(self, other): casted = self.__class__.cast(other) new = super(RateCurve, self).__div__(casted) new.forward_tenor = self.forward_tenor return new
[docs] def get_storage_type(self, x): raise NotImplementedError
[docs] def get_discount_factor(self, start, stop): ir = self.get_zero_rate(start, stop) t = self.day_count(start, stop) return compounding.continuous_compounding(ir, t)
[docs] def get_zero_rate(self, start, stop): if start==stop: stop += TIME_SHIFT df = self.get_discount_factor(start, stop) t = self.day_count(start, stop) return compounding.continuous_rate(df, t)
[docs] def get_short_rate(self, start, shift=TIME_SHIFT): up = self.get_zero_rate(self.origin, start + shift) dn = self.get_zero_rate(self.origin, start - shift) t = self.day_count(start - shift, start + shift) return (up - dn) / t
[docs] def get_cash_rate(self, start, stop=None, step=None): if stop is None: if step is None: stop = start + self.forward_tenor else: stop = start + step df = self.get_discount_factor(start, stop) t = self.day_count(start, stop) return compounding.simple_rate(df, t)
[docs] def get_swap_annuity(self, date_list): return sum([self.get_discount_factor(self.origin, t) for t in date_list])
[docs] def get_swap_leg_valuation(self, date_list, flow_list): if isinstance(flow_list, float): return flow_list * self.get_swap_annuity(date_list) else: return sum([self.get_discount_factor(self.origin, t) * r for t, r in zip(date_list, flow_list)])
[docs]class DiscountFactorCurve(RateCurve):
[docs] def get_storage_type(self, x): return self.get_discount_factor(self.origin, x)
[docs] def get_discount_factor(self, start, stop): if stop is None or stop is self.origin: return self(start) else: return self(start) / self(stop)
[docs]class ZeroRateCurve(RateCurve):
[docs] def get_storage_type(self, x): return self.get_zero_rate(self.origin, x)
[docs] def get_zero_rate(self, start, stop): if stop is None or stop is self.origin or start == stop: return self(start) else: df = exp(self(start) * self.day_count(self.origin, start) - self(stop) * self.day_count(self.origin, stop)) t = self.day_count(start, stop) return compounding.continuous_rate(df, t)
[docs]class CashRateCurve(RateCurve):
[docs] def get_storage_type(self, x): return self.get_cash_rate(x)
[docs] def get_discount_factor(self, start, stop): df = 1.0 current = start while current < stop: t = self.day_count(current, current + self.forward_tenor) df *= compounding.simple_compounding(self(current), t) current += self.forward_tenor t = self.day_count(current, stop) df *= compounding.simple_compounding(self(current), t) return df
[docs] def get_cash_rate(self, start, stop=None, step=None): if step is not None and stop is not None: raise TypeError, "one argument (stop or step) must be None." if stop is None: if step is None or step is self.forward_tenor: return self(start) else: return super(CashRateCurve, self).get_cash_rate(start, step=step) else: return super(CashRateCurve, self).get_cash_rate(start, stop)
[docs]class ShortRateCurve(RateCurve):
[docs] def get_storage_type(self, x): return self.get_short_rate(x)
[docs] def get_zero_rate(self, start, stop): # integrate from start to stop ir = 0.0 current = start while current < stop: t = self.day_count(current, current + TIME_SHIFT) ir = self(current) * t current += TIME_SHIFT t = self.day_count(current, stop) ir = self(current) * t return ir
[docs] def get_short_rate(self, start, shift=None): return self(start)
[docs]class CreditCurve(RateCurve): """ generic curve for default probabilities (under construction) """ _inner_curve = RateCurve def __init__(self, x_list, y_list, y_inter=None, origin=None, day_count=None, forward_tenor=None): if forward_tenor is None: forward_tenor = FORWARD_CREDIT_TENOR super(self.__class__, self).__init__(x_list, y_list, y_inter, origin, day_count, forward_tenor)
[docs] def get_survival_prob(self, start, stop): return self.get_discount_factor(start, stop)
[docs] def get_flat_intensity(self, start, stop): return self.get_zero_rate(start, stop)
[docs] def get_forward_survival_rate(self, start, stop=None, step=None): if step is not None and stop is not None: raise TypeError, "one argument (stop or step) must be None." if stop is None: return self.get_cash_rate(start, step=step) else: return self.get_cash_rate(start, stop)
[docs] def get_hazard_rate(self, start, shift=None): return self.get_short_rate(start, shift)
[docs]class SurvivalProbabilityCurve(DiscountFactorCurve, CreditCurve): pass
[docs]class FlatIntensityCurve(CreditCurve): pass
[docs]class ForwardSurvivalRate(CreditCurve): pass
[docs]class HazardRateCurve(CreditCurve): pass