#
# Copyright John Reid 2006
#
"""See Blei, Jordan - Variational Methods for Dirichlet Processes"""
import numpy, scipy.special, math, infpy
[docs]class ExpFamilyDP( object ):
"""
Implements a variational algorithm for inference in a dirichlet process
mixture model of exponential family distributions.
"""
def __init__(
self,
X,
K,
alpha,
family,
lambda_
):
"""Initialise the algorithm
@param X: The data (in canonical form, i.e. not sufficient statistics)
@param K: Variational truncation parameter
@param alpha: Parameter for dirichlet process
@param family: The exponential family
@param lambda_: Conjugate prior for exp family parameters
"""
self.gamma = None
"The variational parameters for the V's (array of shape (K,2))."
self.tau = None
"The variational parameters for the eta's (array of length K)."
self.tau_0 = None
"The pseudo count variational parameters for the eta's (array of length K)."
self.phi = None
"The variational parameters for the Z's (array of shape (N,2))."
self._do_ll_checks = True
"Check the log likelihood increases."
self.family = family
"Exponential family."
self.K = K
"Variational # mixtures truncation parameter."
self.N = len( X )
"# of data."
self.alpha = alpha
"Dirichlet process parameter."
self.data = [
self.family.T( x )
for x
in X
]
"The data as sufficient statistics."
self.lambda_ = lambda_
"Conjugate prior parameter."
self.initialise_variational_parameters()
[docs] def initialise_variational_parameters(self):
"""Set the variational parameters to some starting point."""
# gamma will be updated first so set it to anything...
self.gamma = numpy.ones( ( self.K - 1, 2 ), dtype = numpy.float64 )
self.phi = numpy.random.uniform( size = (self.N, self.K) )
self._normalise_phi()
self.tau = [
(
self.lambda_[0],
self.lambda_[1]
)
for i in xrange( self.K )
]
# assign each data point to the mixtures according to random phi
for n in range( self.N ):
if len( self.data[n] ) != len( self.lambda_[1] ):
raise RuntimeError(
'Sufficient statistic length (%d) and prior length (%d) differ' % (
( len( self.data[n] ), len( self.lambda_[1] ) )
)
)
for i in xrange( self.K ):
self.tau[i] = (
self.tau[i][0] + self.phi[n,i],
self.tau[i][1] + self.phi[n,i] * self.data[n]
)
[docs] def update(self):
"""Perform one update step."""
self.update_gamma()
self.update_tau()
self.update_phi()
def _get_LL_check(self):
"""
If log likelihood checking is switched
on this returns the LL otherwise None
"""
if hasattr(self, '_do_ll_checks' ) and self._do_ll_checks:
return self.log_likelihood()
else:
return None
def _check_LL(self, last_LL):
"""
If last_LL != None then checks that it is greater than current LL
"""
if None != last_LL:
LL = self.log_likelihood()
if last_LL > LL and not infpy.check_is_close( last_LL, LL ):
raise RuntimeError(
'LL has decreased by %g. %g -> %g'
% ( last_LL - LL, last_LL, LL )
)
[docs] def update_gamma(self):
"""Update gamma"""
#LL = self._get_LL_check()
for i in xrange( self.K - 1 ):
self.gamma[i,0] = 1.0 + sum(
[
self.phi[n,i]
for n in xrange( self.N )
]
)
self.gamma[i,1] = self.alpha + sum(
[
sum(
[
self.phi[n,j]
for j in xrange( i + 1, self.K )
]
)
for n in xrange( self.N )
]
)
# print 'Gamma:\n', self.gamma
# raise
#self._check_LL( LL )
[docs] def update_tau(self):
"""Update tau"""
#LL = self._get_LL_check()
for i in xrange( self.K ):
# each tau is a tuple of the pseudo count and a vector
self.tau[i] = (
self.lambda_[0] + sum(
[
self.phi[n,i]
for n
in xrange( self.N )
]
),
self.lambda_[1] + sum(
[
self.phi[n,i] * self.data[n]
for n
in xrange( self.N )
]
)
)
# print 'Tau:\n', self.tau
#self._check_LL( LL )
[docs] def update_phi(self):
"""Update phi."""
#LL = self._get_LL_check()
digamma = scipy.special.digamma
for i in xrange( self.K ):
# E(log V_i)
if i == self.K - 1:
E_log_V_i = 0.0 # last mixture's v is always 1
else:
E_log_V_i = (
digamma( self.gamma[i,0] )
- digamma( self.gamma[i,0] + self.gamma[i,1] )
)
# E(a(eta_i))
E_a = self.family.expected_a( self.tau[i] )
# Sum of E(log(1-V_j))
E_sum_log_1_minus_V_j = sum(
[
digamma( self.gamma[j,1] )
- digamma( self.gamma[j,0] + self.gamma[j,1] )
for j
in xrange( i - 1 )
]
)
E_eta = self.family.expected_eta( self.tau[i] ).T
for n in xrange( self.N ):
# E(eta_i).X_n
E_eta_dot_X = ( E_eta * self.data[n] )[0,0]
# we have to watch out for underflow - so just store E in phi
# for the moment
self.phi[n,i] = (
E_log_V_i
+ E_eta_dot_X
+ E_a
+ E_sum_log_1_minus_V_j
)
# to cater for underflow - add constant to each row to make largest 0...
# phi is proportional to exponent of E so is invariant to this
for n in xrange( self.N ):
largest = max( self.phi[n,:] )
self.phi[n,:] -= largest
# only now take exponent
self.phi = numpy.exp( self.phi )
# now normalise
self._normalise_phi()
#self._check_LL( LL )
def _normalise_phi(self):
#normalise phi
for n in xrange( self.N ):
s = sum( self.phi[n,:] )
if 0.0 == s:
# print self.phi
raise RuntimeError( 'sum(phi[%d,:]) is 0' % n )
self.phi[n,:] /= s
[docs] def log_likelihood(self):
"""The log likelihood of the data."""
return sum(
[
math.log(
numpy.dot(
self._p_T_in_mixture( self.data[n] ),
self.phi[n,:]
)
)
for n in xrange( self.N )
]
)
# return sum(
# [
# math.log( self.p_T( self.data[n] ) )
# for n in xrange( self.N )
# ]
# )
def _gamma_factor(self, i):
"""Get the factor for the i'th gamma."""
return self.gamma[i,0] / (self.gamma[i,0] + self.gamma[i,1])
[docs] def expected_theta(self):
"""
Expected value of theta under the variational distribution.
"""
result = numpy.ones( self.K, dtype = numpy.float64 )
running_product = 1.0
for i in xrange( self.K - 1 ):
factor = self._gamma_factor( i )
result[i] = factor * running_product
running_product *= ( 1.0 - factor )
result[-1] = running_product
return result
def _p_T_in_mixture(self, T):
"""Array of pdf's of T given tau, one for each mixture.
@param T: sufficient statistic
"""
import infpy.exp_family
return numpy.array(
[
math.exp(
infpy.exp_family.log_p_T_given_tau( self.family, T, self.tau[i] )
)
for i in xrange( self.K )
],
dtype = numpy.float64
)
[docs] def p_T(self, T):
"""The pdf of the variational density at T.
@param T: sufficient statistic
"""
return numpy.dot(
self.expected_theta(),
self._p_T_in_mixture( T )
)
[docs] def p_z_given_T(self, T):
"""The pdf of variational density at z given T as an array over all z's
@param T: sufficient statistic
"""
expected_theta = self.expected_theta()
p_T_in_mixture = self._p_T_in_mixture( T )
total = numpy.dot( expected_theta, p_x_in_mixture )
return numpy.array(
[
expected_theta[i] * p_T_in_mixture[i] / total
for i in xrange( self.K )
],
dtype = numpy.float64
)