Source code for ineqpy.ineqpy

#!/usr/bin/env python

"""A PYTHON PACKAGE TO QUANTITATIVE ANALYSIS OF INEQUALITY.

Collection of estimators of a stratified sample associated to single
individuals, in this module are calculations as the mean, variance,
quasivariance, population variance of a stratified sample.

Todo
----

Rethinking this module, maybe must be a class.

"""
import pandas as pd
import numpy as np

# TODO implementar L-moments
# def legendre_pol(x):
#     """
#     https://en.wikipedia.org/wiki/Legendre_polynomials
#     https://es.wikipedia.org/wiki/Polinomios_de_Legendre
#     https://en.wikipedia.org/wiki/Binomial_coefficient
#     http://www.itl.nist.gov/div898/software/dataplot/refman2/auxillar/lmoment.htm
#     """
#     return None


def _to_df(*args, **kwargs):
    if args != ():
        res = pd.DataFrame([*args]).T
    elif kwargs is not None:
        res = pd.DataFrame.from_dict(kwargs, orient='columns')
    return res


def _apply_to_df(func, df, x, weights, *args, **kwargs):
    """This function generlize main arguments as Series of a pd.Dataframe.

    Parameters
    ---------
    func : function
        Function to convert his arguments in Series of an Dataframe.
    df : pandas.Dataframe
        DataFrame whats contains the Series `x_name` and `w_name`
    x_name : str
        Name of the column in `df`
    weights_name : str
        Name of the column in `df

    Returns
    -------
    return : func return
        It's depends of func output type

    """
    return func(df[x], df[weights], *args, **kwargs)


[docs]def cmoment(x, weights=None, order=2, param=None, ddof=0): """Calculate the central moment of `x` with respect to `param` of order `n`, given the weights `w`. Parameters ---------- x : 1d-array Variable weights : 1d-array Weights order : int, optional Moment order, 2 by default (variance) param : int or array, optional Parameter for which the moment is calculated, the default is None, implies use the mean. ddof : int, optional Degree of freedom, zero by default. Returns ------- central_moment : float Notes ----- - The cmoment of order 1 is 0 - The cmoment of order 2 is the variance. Source : https://en.wikipedia.org/wiki/Moment_(mathematics) TODO ---- Implement: https://en.wikipedia.org/wiki/L-moment#cite_note-wang:96-6 """ # return np.sum((x-c)^n*counts) / np.sum(counts) if param is None: param = xbar(x, weights) elif not isinstance(param, (np.ndarray, int, float)): raise NotImplementedError if weights is None: weights = np.repeat([1], len(x)) return np.sum((x - param) ** order * weights) / (np.sum(weights) - ddof)
[docs]def stdmoment(x, weights=None, param=None, order=3, ddof=0): """Calculate the standardized moment of order `c` for the variable` x` with respect to `c`. Parameters --------- x : 1d-array Random Variable weights : 1d-array, optional Weights or probability order : int, optional Order of Moment, three by default param : int or float or array, optional Central trend, default is the mean. ddof : int, optional Degree of freedom. Returns ------- stdmoment : float Returns the standardized `n` order moment. Notes ----- Source: - https://en.wikipedia.org/wiki/Moment_(mathematics)#Significance_of_the_moments - https://en.wikipedia.org/wiki/Standardized_moment TODO ---- It is the general case of the raw and central moments. Review implementation. """ if weights is None: weights = np.repeat([1], len(x)) if param is None: param = xbar(x, weights) # m = np.subtract(x, c) # m = np.power(m, n) * w / np.sum(w) # m = np.sum(m) # m = np.divide(m, np.power(var(x, w, ddof=ddof), n / 2)) # return m res = cmoment(x, weights, order, param=param, ddof=ddof) res /= var(x, weights, ddof=ddof) ** (order / 2) return res
[docs]def xbar(x, weights=None): """Calculate the mean of `x` given weights `w`. Parameters ---------- x : 1d-array or pd.Series or pd.DataFrame Variable on which the mean is estimated w : 1d-array or pd.Series or pd.DataFrame, optional Weights of the `x` variable of a dimension Returns ------- xbar : 1d-array or pd.Series or float """ return np.average(x, weights=weights, axis=0)
[docs]def var(x, weights=None, ddof=0): """Calculate the population variance of `x` given weights `w`, for a homogeneous population. Parameters ---------- x : 1d-array or pd.Series or pd.DataFrame Variable on which the quasivariation is estimated w : 1d-array or pd.Series or pd.DataFrame Weights of the `x` variable of a dimension Returns ------- Shat2 : 1d-array or pd.Series or float Estimation of quasivariance of `x` Notes ----- If stratificated sample must pass with groupby each strata. """ if weights is None: weights = np.repeat([1], len(x)) return cmoment(x, weights=weights, order=2, ddof=ddof)
[docs]def kurt(x, weights): """Calculate the asymmetry coefficient Parameters --------- x : 1d-array w : 1d-array Returns ------- kurt : float Kurtosis coefficient. Notes ----- It is an alias of the standardized fourth-order moment. """ return stdmoment(x=x, weights=weights, order=4)
[docs]def skew(x, weights): """Returns the asymmetry coefficient of a sample. Parameters --------- x : 1d-array w : 1d-array Returns ------- skew : float Notes ----- It is an alias of the standardized third-order moment. """ return stdmoment(x=x, weights=weights, order=3)
[docs]def shat2_h(x, weights, group, data=None): """Sample variance of `x_name`, calculated as the second-order central moment. Parameters --------- x : array or str variable `x` apply the statistic. If `data` is None then must pass this argument as array, else as string name in `data` weights : array or str weights can be interpreted as frequency, probability, density function of `x`, each element in `x`. If `data` is None then must pass this argument as array, else as string name in `data` group : array or str group is a categorical variable to calculate the statistical by each group. If `data` is None then must pass this argument as array, else as string name in `data` data : pd.DataFrame, optional pd.DataFrame has all variables needed. order Returns ------- shat2_h : array or pd.Series Notes ----- This function is useful to calculate the variance of the mean. TODO ---- Review function """ if data is None: data = _to_df(x=x, weights=weights) x = 'x' weights = 'weights' def sd(df): x = df.loc[:, x].copy().values weights = np.repeat([1], len(df)) return cmoment(x, weights, 2, param=xbar(x)) return data.groupby(group).apply(sd)
[docs]def vhat_h(x='x', weights='w', group='h', data=None): """Data a DataFrame calculates the sample variance for each stratum. The objective of this function is to make it easy to calculate the moments of the distribution that follows an estimator, eg. Can be used to calculate the variance that follows the mean. Parameters --------- data : pandas.DataFrame Dataframe containing the series needed for the calculation x : str weights : str Name of the weights `w` in the DataFrame group : str Name of the stratum variable `h` in the DataFrame Returns ------- vhat_h : pandas.Series A series with the values of the variance of each `h` stratum. Notes ----- TODO ---- Review improvements. Examples -------- >>> # Computes the variance of the mean >>> data = pd.DataFrame(data=[renta, peso, estrato], columns=["renta", "peso", "estrato"]) >>> v = vhat_h(data,x_name='income') >>> v stratum 1 700.917.728,64 2 9.431.897.980,96 3 317.865.839.789,10 4 741.304.873.092,88 5 535.275.436.859,10 6 225.573.783.240,68 7 142.048.272.010,63 8 40.136.989.131,06 9 18.501.808.022,56 dtype: float64 >>> # the value of de variance of the mean: >>> v_total = v.sum() / peso.sum() ** 2 24662655225.947945 """ if data is None: data = _to_df(x=x, weights=weights, group=group) x = 'x' weights = 'weights' group = 'group' def v(df): """Calculate the variance of each stratum `h` Parameters --------- df : pandas.DataFrame Dataframe containing the data Returns ------- vhat : float Value of the population variance for the stratum `h` Notes ----- Source: .. math:: r`N_h ^2 \cdot fpc \cdot \frac{ \hatS ^2 _h }{n_h}` """ xi = df[x].copy().values Nh = df[weights].sum() fpc = 1 - (len(df) / Nh) ddof = 1 if len(df) > 1 else 0 shat2h = cmoment(x=xi, order=2, ddof=ddof) return (Nh ** 2) * fpc * shat2h / len(df) return data.groupby(group).apply(v)
[docs]def moment_h(x='x', weights='w', group='h', data=None, order=2): """Calculates the asymmetry of each `h` stratum. Parameters ---------- x : array or str weights : array or str group : array or str data : pd.DataFrame, optional order : int, optional Returns ------- moment_of_order : float TODO ---- Review calculations, it does not appear to be correct. Attempt to make a generalization of vhat_h, for any estimator. .. warning:: Actually Does Not Work! """ if data is None: data = _to_df(x=x, weights=weights, group=group) x = 'x' weights = 'weights' group = 'group' def mh(df): x = df.loc[:, x].copy().values weights = np.repeat([1], len(df)) Nh = df.loc[:, weights].sum() fpc = 1 - (len(df) / Nh) ddof = 1 if len(df) > 1 else 0 stdm = stdmoment(x=x, weights=weights, order=order, ddof=ddof) return (Nh ** order) * fpc * stdm / len(df) return data.groupby(group).apply(mh)
'''Inequality functions'''
[docs]def lorenz(income, weights, data=None): """This function compute the lorenz curve and returns a DF with two columns of axis x and y. Parameters ---------- data : pandas.DataFrame A pandas.DataFrame thats contains data. income : str or 1d-array, optional Population or wights, if a DataFrame is passed then `x` shuold be a name of the column of DataFrame, else can pass a pandas.Series or array. weights : str or 1d-array Income, monetary variable, if a DataFrame is passed then `y`is a name of the series on this DataFrame, however, you can pass a pd.Series or np.array. Returns ------- lorenz : pandas.Dataframe Lorenz distribution in a Dataframe with two columns, labeled x and y, that corresponds to plots axis. """ if data is None: data = _to_df(income=income, weights=weights) income='income' weights='weights' data[income] = data.income * data.weights res = data.sort_values(by=weights).cumsum() / data.sum() else: data[income] = data[income] * data[weights] res = data.sort_values(by=weights).cumsum() / data.sum() return res
[docs]def gini(income='x', weights='w', data=None, sorted=False): """Calcula el indice de Gini, Parameters --------- data : pandas.DataFrame DataFrame that contains the data. income : str or np.array, optional Name of the monetary variable `x` in` df` weights : str or np.array, optional Name of the series containing the weights `x` in` df` sorted : bool, optional If the DataFrame is previously ordered by the variable `x`, it's must pass True, but False by default. Returns ------- gini : float Gini Index Value. Notes ----- The calculation is done following (discrete probability distribution): G = 1 - [∑_i^n f(y_i)·(S_{i-1} + S_i)] where: - y_i = Income - S_i = ∑_{j=1}^i y_i · f(y_i) Source: - https://en.wikipedia.org/wiki/Gini_coefficient - CALCULATING INCOME DISTRIBUTION INDICES FROM MICRO-DATA - STEPHEN JENKINS TODO ---- - Implement statistical deviation calculation, VAR (GINI) - Clear comments - Rename output Examples -------- """ if data is None: data = _to_df(income=income, weights=weights) income = 'income' weights = 'weights' # if any(df[income] <= 0): # stage = df.loc[df[income] <= 0].copy().sum() # stage[income] = 0 # df = pd.concat([stage.to_frame().T, df.loc[df[income] > 0]], axis=0) if not sorted: data = data[[income, weights]].sort_values(income, ascending=True).copy() # another aproach # x = df[income] # f_x = df[weights] # f_x /= f_x.sum() # si = x * f_x # si = si.cumsum() # si_1 = si.shift(1) # sn = si.iloc[-1] # g = (1 - np.divide(np.sum(f_x * (si_1 + si)), sn)) # return G, G2, G3, G4 x = data[income] f_x = data[weights] / data[weights].sum() F_x = f_x.cumsum() mu = np.sum(x * f_x) cov = np.cov(x, F_x, rowvar=False, aweights=f_x)[0,1] g = 2 * cov / mu return g
[docs]def atk(income, weights=None, e=0.5, data=None): """Calculate the coefficient of atkinson Parameters --------- income : array or str If `data` is none `income` must be an 1D-array, when `data` is a pd.DataFrame, you must pass the name of income variable as string. weights : array or str, optional If `data` is none `weights` must be an 1D-array, when `data` is a pd.DataFrame, you must pass the name of weights variable as string. e : int, optional Epsilon parameter interpreted by atkinson index as inequality adversion, must be between 0 and 1. data : pd.DataFrame, optional data is a pd.DataFrame that contains the variables. Returns ------- atkinson : float Notes ----- Source: https://en.wikipedia.org/wiki/Atkinson_index TODO ---- - Implement: CALCULATING INCOME DISTRIBUTION INDICES FROM MICRO-DATA http://www.jstor.org/stable/41788716 .. warning:: The results has difference with stata, maybe have a bug. """ if (income is None) and (data is None): raise ValueError('Must pass at least one of both `income` or `df`') # non-null condition if np.any(income <= 0): mask = income > 0 income = income[mask] if weights is not None: weights = weights[mask] # more than one value if len(income) == 0: return 0 N = len(income) # observations if weights is None: weights = np.repeat(1, N) mu = xbar(income, weights) f_i = weights / sum(weights) # density function # another aproach # e value condition # if e == 1: # Ee = np.power(np.e, np.sum(f_i * np.log(income))) # elif (0 <= e) or (e < 1): # Ee = np.power(np.sum(f_i * np.power(income, 1 - e)), 1 / (1 - e)) # else: # assert (e < 0) or (e > 1), "Not valid e value, 0 ≤ e ≤ 1" # Ee = None # return None # atkinson = (mu - Ee) / mu if e == 1: atkinson = 1 - np.power(np.e, np.sum(f_i * np.log(income) - np.log(mu))) elif (0 <= e) or (e < 1): atkinson = 1 - np.power(np.sum(f_i * np.power(income / mu, 1 - e)), 1 / (1 - e)) else: assert (e < 0) or (e > 1), "Not valid e value, 0 ≤ e ≤ 1" atkinson = None return atkinson
[docs]def atk_h(income, weights, group, data=None, e=0.5): """ Parameters --------- income : str or np.array Income variable, you can pass name of variable in `df` or array-like weights : str or np.array probability or weights, you can pass name of variable in `df` or array-like groups : str or np.array stratum, name of stratum in `df` or array-like e : int, optional Value of epsilon parameter data : pd.DataFrame, optional DataFrame that's contains the previous data. Returns ------- atkinson_by_group : float Notes ----- Source: https://en.wikipedia.org/wiki/Atkinson_index TODO ---- Review function, has different results with stata. Examples -------- """ # df = df.loc[df[x] > 0].copy() # df.loc[:, weights] /= df[weights].sum() if data is None: data = _to_df(income=income, weights=weights, group=group) income = 'income' weights = 'weights' group = 'group' N = len(data) def a_h(df): ''' Funtion alias to calculate atk from a DataFrame ''' if df is None: raise ValueError res = atk(income=df[income].values, weights=df[weights].values, e=e) * len(df) / N return res if data is not None: atk_by_group = data.groupby(group).apply(a_h) mu_by_group = data.groupby(group).apply(lambda dw: xbar(dw[income], dw[weights])) return atk_by_group.sum() + atk(income=mu_by_group.values) else: raise NotImplementedError