from abc import ABCMeta, abstractmethod
import numpy as np
[docs]class Statistics(metaclass = ABCMeta):
"""This abstract base class defines how to calculate statistics from dataset.
The base class also implements a polynomial expansion with cross-product
terms that can be used to get desired polynomial expansion of the calculated statistics.
"""
@abstractmethod
[docs] def __init__(self, degree = 2, cross = True):
"""Constructor that must be overwritten by the sub-class.
The constructor of a sub-class must accept arguments for the polynomial
expansion after extraction of the summary statistics, one has to define
the degree of polynomial expansion and cross, indicating whether cross-prodcut
terms are included.
Parameters
----------
degree: integer, optional
Of polynomial expansion. The default value is 2 meaning second order polynomial expansion.
cross: boolean, optional
Defines whether to include the cross-product terms. The default value is TRUE, meaning the cross product term is included.
"""
raise NotImplemented
@abstractmethod
[docs] def statistics(self, data):
"""To be overwritten by any sub-class: should extract statistics from the
data set data. It is assumed that data is a list of n same type
elements(eg., The data can be a list containing n timeseries, n graphs or n np.ndarray).
Parameters
----------
data: python list
Contains n data sets.
Returns
-------
numpy.ndarray
nxp matrix where for each of the n data points p statistics are calculated.
"""
raise NotImplemented
def _polynomial_expansion(self, summary_statistics):
"""Helper function that does the polynomial expansion and includes cross-product
terms of summary_statistics, already calculated summary statistics.
Parameters
----------
summary_statistics: numpy.ndarray
nxp matrix where n is number of data points in the datasets data set and p number os
summary statistics calculated.
Returns
-------
numpy.ndarray
nx(p+degree*p+cross*nchoosek(p,2)) matrix where for each of the n pointss with
p statistics, degree*p polynomial expansion term and cross*nchoosek(p,2) many
cross-product terms are calculated.
"""
# Check summary_statistics is a np.ndarry
if not isinstance(summary_statistics, (np.ndarray)):
raise TypeError('Summary statisticss is not of allowed types')
# Include the polynomial expansion
result = summary_statistics
for ind in range(2,self.degree+1):
result = np.column_stack((result,pow(summary_statistics,ind)))
# Include the cross-product term
if self.cross == True and summary_statistics.shape[1]>1:
# Convert to a matrix
for ind1 in range(0,summary_statistics.shape[1]):
for ind2 in range(ind1+1,summary_statistics.shape[1]):
result = np.column_stack((result,summary_statistics[:,ind1]*summary_statistics[:,ind2]))
return result
[docs]class Identity(Statistics):
"""
This class implements identity statistics returning a nxp matrix when the data set
contains n numpy.ndarray of length p.
"""
[docs] def __init__(self, degree = 2, cross = True):
self.degree = degree
self.cross = cross
[docs] def statistics(self, data):
if isinstance(data, list):
if np.array(data).shape == (len(data),):
if len(data) == 1:
data = np.array(data).reshape(1,1)
data = np.array(data).reshape(len(data),1)
else:
data = np.concatenate(data).reshape(len(data),-1)
else:
raise TypeError('Input data should be of type list, but found type {}'.format(type(data)))
# Expand the data with polynomial expansion
result = self._polynomial_expansion(data)
return result
[docs]class HakkarainenLorenzStatistics(Statistics):
"""
This class implements the statistics function from the Statistics protocol. This
extracts the statistics following Hakkarainen et. al. [1] from the multivariate timesereis
generated by solving Lorenz 95 odes.
[1] J. Hakkarainen, A. Ilin, A. Solonen, M. Laine, H. Haario, J. Tamminen, E. Oja, and
H. Järvinen. On closure parameter estimation in chaotic systems. Nonlinear Processes
in Geophysics, 19(1):127–143, Feb. 2012.
"""
[docs] def __init__(self, degree = 2, cross = True):
self.degree = degree
self.cross = cross
[docs] def statistics(self, data):
num_element = len(data)
result = np.zeros(shape=(num_element,6))
# Compute statistics
for ind_element in range(0,num_element):
data_ind_element = np.array(data[ind_element])
# Mean
s1 = np.mean(np.mean(data_ind_element,1))
# Variance
s2 = np.mean(np.var(data_ind_element,1))
## Auto Covariance with lag 1
s3 = 0.0
for ind in range(0,data_ind_element.shape[0]):
s3 += self._auto_covariance(data_ind_element[ind,:], lag = 1)
s3 = s3/data_ind_element.shape[0]
## Covariance with a neighboring node
s4 = 0.0
for ind in range(0,data_ind_element.shape[0]-1):
s4 += np.mean(data_ind_element[ind,:]*data_ind_element[ind+1,:])\
- np.mean(data_ind_element[ind,:])*np.mean(data_ind_element[ind+1,:])
s4 = s4/data_ind_element.shape[0]
## Cross-Cov with 2 neighbors with time lag 1
s5 = 0.0
s6 = self._cross_covariance(data_ind_element[1,:],data_ind_element[2,:])
for ind in range(1,data_ind_element.shape[0]-1):
s5 += self._cross_covariance(data_ind_element[ind,:],data_ind_element[ind-1,:])
s6 += self._cross_covariance(data_ind_element[ind,:],data_ind_element[ind+1,:])
s5 += 0.0
s6 += self._cross_covariance(data_ind_element[-2,:],data_ind_element[-1,:])
s5 = s5/data_ind_element.shape[0]
s6 = s6/data_ind_element.shape[0]
result[ind_element,:] = [s1, s2, s3, s4, s5, s6]
# Expand the data with polynomial expansion
result = self._polynomial_expansion(result)
return np.array(result)
def _cross_covariance(self, x, y):
""" Computes cross-covariance between x and y
Parameters
----------
x: numpy.ndarray
Vector of real numbers.
y: numpy.ndarray
Vector of real numbers.
Returns
-------
numpy.ndarray
Cross-covariance calculated between x and y.
"""
return np.mean(np.insert(x,0,1)*np.insert(y,-1,1))-np.mean(np.insert(x,0,1))*np.mean(np.insert(y,-1,1))
def _auto_covariance(self, x, lag = 1):
"""
Calculate the autocovarriance coefficient of x with lag k.
Parameters
----------
x: numpy.ndarray
Vector of real numbers.
k: integer
Time-lag.
Returns
-------
numpy.ndarray
Returns the auto-covariance of x with time-lag k.
"""
N = x.shape[0]
x_mean = np.average(x)
autoCov = 0.0
for ind in range(0, N-lag):
autoCov += (x[ind+lag]-x_mean)*(x[ind]-x_mean)
return (1/(N-1))*autoCov