Source code for abcpy.statistics

from abc import ABCMeta, abstractmethod
import numpy as np

[docs]class Statistics(metaclass = ABCMeta): """This abstract base class defines how to calculate statistics from dataset. The base class also implements a polynomial expansion with cross-product terms that can be used to get desired polynomial expansion of the calculated statistics. """ @abstractmethod
[docs] def __init__(self, degree = 2, cross = True): """Constructor that must be overwritten by the sub-class. The constructor of a sub-class must accept arguments for the polynomial expansion after extraction of the summary statistics, one has to define the degree of polynomial expansion and cross, indicating whether cross-prodcut terms are included. Parameters ---------- degree: integer, optional Of polynomial expansion. The default value is 2 meaning second order polynomial expansion. cross: boolean, optional Defines whether to include the cross-product terms. The default value is TRUE, meaning the cross product term is included. """ raise NotImplemented
@abstractmethod
[docs] def statistics(self, data): """To be overwritten by any sub-class: should extract statistics from the data set data. It is assumed that data is a list of n same type elements(eg., The data can be a list containing n timeseries, n graphs or n np.ndarray). Parameters ---------- data: python list Contains n data sets. Returns ------- numpy.ndarray nxp matrix where for each of the n data points p statistics are calculated. """ raise NotImplemented
def _polynomial_expansion(self, summary_statistics): """Helper function that does the polynomial expansion and includes cross-product terms of summary_statistics, already calculated summary statistics. Parameters ---------- summary_statistics: numpy.ndarray nxp matrix where n is number of data points in the datasets data set and p number os summary statistics calculated. Returns ------- numpy.ndarray nx(p+degree*p+cross*nchoosek(p,2)) matrix where for each of the n pointss with p statistics, degree*p polynomial expansion term and cross*nchoosek(p,2) many cross-product terms are calculated. """ # Check summary_statistics is a np.ndarry if not isinstance(summary_statistics, (np.ndarray)): raise TypeError('Summary statisticss is not of allowed types') # Include the polynomial expansion result = summary_statistics for ind in range(2,self.degree+1): result = np.column_stack((result,pow(summary_statistics,ind))) # Include the cross-product term if self.cross == True and summary_statistics.shape[1]>1: # Convert to a matrix for ind1 in range(0,summary_statistics.shape[1]): for ind2 in range(ind1+1,summary_statistics.shape[1]): result = np.column_stack((result,summary_statistics[:,ind1]*summary_statistics[:,ind2])) return result
[docs]class Identity(Statistics): """ This class implements identity statistics returning a nxp matrix when the data set contains n numpy.ndarray of length p. """
[docs] def __init__(self, degree = 2, cross = True): self.degree = degree self.cross = cross
[docs] def statistics(self, data): if isinstance(data, list): if np.array(data).shape == (len(data),): if len(data) == 1: data = np.array(data).reshape(1,1) data = np.array(data).reshape(len(data),1) else: data = np.concatenate(data).reshape(len(data),-1) else: raise TypeError('Input data should be of type list, but found type {}'.format(type(data))) # Expand the data with polynomial expansion result = self._polynomial_expansion(data) return result
[docs]class HakkarainenLorenzStatistics(Statistics): """ This class implements the statistics function from the Statistics protocol. This extracts the statistics following Hakkarainen et. al. [1] from the multivariate timesereis generated by solving Lorenz 95 odes. [1] J. Hakkarainen, A. Ilin, A. Solonen, M. Laine, H. Haario, J. Tamminen, E. Oja, and H. Järvinen. On closure parameter estimation in chaotic systems. Nonlinear Processes in Geophysics, 19(1):127–143, Feb. 2012. """
[docs] def __init__(self, degree = 2, cross = True): self.degree = degree self.cross = cross
[docs] def statistics(self, data): num_element = len(data) result = np.zeros(shape=(num_element,6)) # Compute statistics for ind_element in range(0,num_element): data_ind_element = np.array(data[ind_element]) # Mean s1 = np.mean(np.mean(data_ind_element,1)) # Variance s2 = np.mean(np.var(data_ind_element,1)) ## Auto Covariance with lag 1 s3 = 0.0 for ind in range(0,data_ind_element.shape[0]): s3 += self._auto_covariance(data_ind_element[ind,:], lag = 1) s3 = s3/data_ind_element.shape[0] ## Covariance with a neighboring node s4 = 0.0 for ind in range(0,data_ind_element.shape[0]-1): s4 += np.mean(data_ind_element[ind,:]*data_ind_element[ind+1,:])\ - np.mean(data_ind_element[ind,:])*np.mean(data_ind_element[ind+1,:]) s4 = s4/data_ind_element.shape[0] ## Cross-Cov with 2 neighbors with time lag 1 s5 = 0.0 s6 = self._cross_covariance(data_ind_element[1,:],data_ind_element[2,:]) for ind in range(1,data_ind_element.shape[0]-1): s5 += self._cross_covariance(data_ind_element[ind,:],data_ind_element[ind-1,:]) s6 += self._cross_covariance(data_ind_element[ind,:],data_ind_element[ind+1,:]) s5 += 0.0 s6 += self._cross_covariance(data_ind_element[-2,:],data_ind_element[-1,:]) s5 = s5/data_ind_element.shape[0] s6 = s6/data_ind_element.shape[0] result[ind_element,:] = [s1, s2, s3, s4, s5, s6] # Expand the data with polynomial expansion result = self._polynomial_expansion(result) return np.array(result)
def _cross_covariance(self, x, y): """ Computes cross-covariance between x and y Parameters ---------- x: numpy.ndarray Vector of real numbers. y: numpy.ndarray Vector of real numbers. Returns ------- numpy.ndarray Cross-covariance calculated between x and y. """ return np.mean(np.insert(x,0,1)*np.insert(y,-1,1))-np.mean(np.insert(x,0,1))*np.mean(np.insert(y,-1,1)) def _auto_covariance(self, x, lag = 1): """ Calculate the autocovarriance coefficient of x with lag k. Parameters ---------- x: numpy.ndarray Vector of real numbers. k: integer Time-lag. Returns ------- numpy.ndarray Returns the auto-covariance of x with time-lag k. """ N = x.shape[0] x_mean = np.average(x) autoCov = 0.0 for ind in range(0, N-lag): autoCov += (x[ind+lag]-x_mean)*(x[ind]-x_mean) return (1/(N-1))*autoCov