Source code for infpy.dbn.harmonium

#
# Copyright John Reid 2007
#

"""
Code for exponential family harmoniums

Follows Welling, Rosen-Zvi, Hinton 2005
"""

from numpy import asmatrix, array, empty, matrix, zeros_like, zeros

[docs]class Harmonium( object ): """ An exponential family harmonium with hidden and observed variables Follows Welling, Rosen-Zvi, Hinton 2005 """ def __init__( self, observed, hidden ): """ Initialise with the observed and hidden variables """ from numpy.random import gamma self.observed = array( observed ) self.hidden = array( hidden ) self.W = empty( (len(observed), len(hidden)), dtype = numpy.object_ ) # randomise W for i, o in enumerate(observed): for j, h in enumerate(hidden): self.W[i,j] = asmatrix( -gamma( 1.0, size = (o.family.size(), h.family.size()) ) )
[docs] def theta_hat( self, i, hidden_values ): """ The natural parameters of an observed variable taking quadratic term for interactions into account """ assert len(hidden_values) == len(self.hidden) theta = matrix(self.observed[i].phi) for j, h in enumerate(hidden_values): assert self.hidden[j].family._check_u(h) update = self.W[i,j] * asmatrix(h).T theta += update.T return theta.A.reshape( (self.observed[i].family.size()) )
[docs] def lambda_hat( self, j, observed_values ): """ The natural parameters of a hidden variable taking quadratic term for interactions into account """ assert len(observed_values) == len(self.observed) lambda_ = matrix(self.hidden[j].phi) for i, o in enumerate(observed_values): assert self.observed[i].family._check_u(o) lambda_ += asmatrix(o) * self.W[i,j] return lambda_.A.reshape( (self.hidden[j].family.size(),) )
[docs]def contrastive_divergence_iteration( harmonium, observed_data, eta ): """ One iteration of contrastive divergence """ theta_update = [ zeros_like( o.phi ) for o in harmonium.observed ] lambda_update = [ zeros_like( h.phi ) for h in harmonium.hidden ] W_update = empty( (len(theta_update), len(lambda_update)), dtype = numpy.object_ ) for i in xrange( len(theta_update) ): for j in xrange( len(lambda_update) ): W_update[i,j] = zeros( (len(theta_update[i]), len(lambda_update[j])) ) for d in observed_data: assert len(d) == len(harmonium.observed) # get parameters for hidden variables lambda_hat = [ harmonium.lambda_hat(j,d) for j, h in enumerate(harmonium.hidden) ] # sample the hidden variables hidden_samples = [ h.family.sample( phi ) for h, phi in zip( harmonium.hidden, lambda_hat ) ] # get parameters for observed variables theta_hat = [ harmonium.theta_hat(i,hidden_samples) for i, o in enumerate(harmonium.observed) ] # sample the observed variables observed_samples = [ o.family.sample( phi ) for o, phi in zip( harmonium.observed, theta_hat ) ] theta_update += d
if '__main__' == __name__: from infpy.exp import * discrete = DiscreteExpFamily(3) gaussian = GaussianExpFamily() h = Harmonium( observed = [ Variable( discrete, discrete.phi([ .5, .3, .2 ]) ), ], hidden = [ Variable( gaussian, gaussian.phi([ .0, 1. ]) ), ] ) observed_data = [[numpy.array([1,0,0])]] hidden_data = [[numpy.array([0.0, 0.0])]] #h.lambda_hat(0, observed_data[0]) #h.theta_hat(0, hidden_data[0]) contrastive_divergence_iteration( h, observed_data, 1e-3 )