Source code for kohonen.kohonen

# Copyright (c) 2009 Leif Johnson <leif@leifjohnson.net>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

'''Basic self-organizing map implementation.

This module contains the following Kohonen map implementations:

  - Map. A standard rectangular N-dimensional Kohonen map.

    - Gas. A vector quantizer that does not have a fixed topology. Neurons in a
      gas are sorted for updates based on their distance from the cue, with the
      sort order defining a topology for each cue presentation.

      - GrowingGas. A Gas-based quantizer that can add neurons dynamically to
        explain high-error areas of the input space.

  - Filter. A wrapper over an underlying Map instance that maintains an explicit
    estimate of the likelihood of each neuron.

These are tested using the kohonen_test.py file in this source distribution.

Because they have a grid topology, Map objects have some cool visualization
options, including Map.neuron_colormap and Map.distance_heatmap. These require
the Python Image Library.

There is also a collection of distance metrics:

  - cosine_metric. A callable that calculates the cosine distance between a cue
    and each neuron in a Kohonen Map.

  - euclidean_metric. A callable that calculates the Euclidean distance between
    a cue and each neuron in a Kohonen Map.

  - manhattan_metric. A callable that calculates the Manhattan distance between
    a cue and each neuron in a Kohonen Map.

There are also some small utility classes for modeling time series values:

  - Timeseries. A callable that takes no arguments and returns a value that
    might vary over time. Each call to the function will generally return a
    unique value (though this is not necessary).

    - ExponentialTimeseries. A callable that takes no arguments and returns an
      exponentially decreasing (or increasing) series of values, dependent on
      the parameters passed in at construction time.

    - etc.

These distance functions and time series objects are generally used to regulate
the learning parameters in Kohonen Map objects.
'''

import numpy
from numpy import random as rng


def cosine_metric(x, y):
    '''Returns the cosine distance between x and y.'''
    nx = numpy.sqrt(numpy.sum(x * x, axis=-1))
    ny = numpy.sqrt(numpy.sum(y * y, axis=-1))
    # the cosine metric returns 1 when the args are equal, 0 when they are
    # orthogonal, and -1 when they are opposite. we want the opposite effect,
    # and we want to make sure the results are always nonnegative.
    return 1 - numpy.sum(x * y, axis=-1) / nx / ny

def euclidean_metric(x, y):
    '''Returns the euclidean distance (L-2 norm) between x and y.'''
    d = x - y
    return numpy.sqrt(numpy.sum(d * d, axis=-1))

def manhattan_metric(x, y):
    '''Returns the manhattan distance (L-1 norm) between x and y.'''
    d = x - y
    return numpy.sum(numpy.abs(d), axis=-1)

def weighted_euclidean_metric(weights):
    '''Implements a standard euclidean distance with weighted dimensions.'''
    def calculate(x, y):
        d = x - y
        return numpy.sqrt(numpy.sum(d * d * weights, axis=-1))
    return calculate


[docs]class Timeseries(object): '''Represents some sort of value that changes over time.'''
[docs] def __init__(self): '''Set up this timeseries.''' super(Timeseries, self).__init__() self.ticks = 0
def __call__(self): '''Call this timeseries.''' t = self.ticks self.ticks += 1 return t
[docs] def reset(self): '''Reset the time for this series.''' self.ticks = 0
[docs]class ConstantTimeseries(Timeseries): '''This timeseries just returns a constant value.'''
[docs] def __init__(self, k=1): '''Set up this series with a constant value.''' self.k = k
def __call__(self): '''Return the constant.''' return self.k
[docs]class ExponentialTimeseries(Timeseries): '''Represents an exponential decay process.'''
[docs] def __init__(self, rate=-1, initial=1, final=0): '''Create a new exponential timeseries object.''' super(ExponentialTimeseries, self).__init__() self.initial = initial - final self.rate = rate self.final = final
def __call__(self): '''Return an exponentially-decreasing series of values.''' super(ExponentialTimeseries, self).__call__() return self.final + self.initial * numpy.exp(self.rate * self.ticks)
[docs]class Parameters(object): '''We are plain old data holding self-organizing map parameters.'''
[docs] def __init__(self, dimension=None, shape=None, metric=None, learning_rate=None, neighborhood_size=None, noise_variance=None): '''This class holds standard parameters for self-organizing maps. dimension: The length of a neuron vector in a Map or a Gas. shape: The shape of the neuron topology in whatever Map or Gas we are building. metric: The distance metric to use when comparing cues to neurons in the map. Defaults to euclidean_metric. learning_rate: This parameter determines the time course of the learning rate for a Map. This parameter should be a callable that takes no arguments and returns a floating point value for the learning rate. If this parameter is None, a default learning rate series will be used, equivalent to ExponentialTimeseries(-1e-3, 1, 0.2). If this parameter is a numeric value, it will be used as the constant value for the learning rate: ConstantTimeseries(value). neighborhood_size: Like the learning rate, this parameter determines the time course of the neighborhood size parameter. It should be a callable that takes no arguments and returns a neighborhood size for storing each cue. If this is None, a default neighborhood size series will be used. The initial size will be the maximum of the dimensions given in shape, and the decay will be -1e-3: ExponentialTimeseries(-1e-3, max(shape), 1). If this is a floating point value, it will be used as a constant neighborhood size: ConstantTimeseries(value). noise_variance: Like the learning rate and neighborhood size, this should be a factory for creating a callable that creates noise variance values. If this is None, no noise will be included in the created Maps. If this parameter is a number, it will be used as a constant noise variance. ''' assert dimension is not None self.dimension = dimension assert shape is not None self.shape = shape self.metric = metric or euclidean_metric ET = ExponentialTimeseries CT = ConstantTimeseries self.learning_rate = learning_rate if isinstance(learning_rate, (float, int)): self.learning_rate = CT(learning_rate) if learning_rate is None: self.learning_rate = ET(-1e-3, 1, 0.2) self.neighborhood_size = neighborhood_size if isinstance(neighborhood_size, (float, int)): self.neighborhood_size = CT(neighborhood_size) if neighborhood_size is None: self.neighborhood_size = ET(-1e-3, max(shape), 1) self.noise_variance = noise_variance if isinstance(noise_variance, (float, int)): self.noise_variance = CT(noise_variance)
def heatmap(raw, axes=(0, 1), lower=None, upper=None): '''Create a heat map image from the given raw matrix. raw: An array of values to use for the image pixels. axes: The axes in the array that we want to preserve for the final image. All other axes will be summed away. lower: If given, clip values in the matrix to this lower limit. If not given, raw.min() will be used. upper: If given, clip values in the matrix to this upper limit. If not given, raw.max() will be used. Returns an annotated Image object (as returned from _image). ''' assert len(axes) == 2 for ax in xrange(len(raw.shape) - 1, -1, -1): if ax in axes: continue raw = raw.sum(axis=ax) l = lower if l is None: l = raw.min() l *= l < 0 and 1.01 or 0.99 u = upper if u is None: u = raw.max() * 1.01 u *= u > 0 and 1.01 or 0.99 return _image(raw, l, u, format) def colormap(raw, axes=(0, 1, 2), layers=(0, 1, 2)): '''Create an RGB image using the given layers of a 3D raw values matrix. raw: An array of raw values to use for the image. axes: The axes in the array that we want to preserve for the final image. All other axes will be summed away. layers: The indices of the third preserved axis that we should use for the red, green, and blue channels in the output image. Raw values will be scaled along each layer to lie in [lower, upper], where lower (upper) is the global lower (upper) bound of all values in each of the raw layers. Returns an Image object, as in the heatmap() function. ''' assert len(axes) == len(layers) == 3 for ax in xrange(len(raw.shape) - 1, -1, -1): if ax in axes: continue raw = raw.sum(axis=ax) u = -numpy.inf l = numpy.inf for i in layers: v = raw[:, :, i] l = min(l, v.min()) u = max(u, v.max()) l *= l < 0 and 1.01 or 0.99 u *= u > 0 and 1.01 or 0.99 return _image(raw[:, :, layers], l, u, 'RGB') def _image(values, lower, upper, format='L'): '''Create a PIL image using the given 2D array of values. Pixel values in the range [lower, upper] are scaled linearly to [0, 1] before creating the image. Returns an Image object annotated with the lower and upper bounds that were used to scale the values to convert them to pixels. ''' from PIL import Image ratios = (values - lower) / (upper - lower) img = Image.fromarray(numpy.array(256 * ratios, numpy.uint8), format) img.lower_bound = lower img.upper_bound = upper return img def _zeros(shape, dtype='d'): '''Get a blank (all-zero) matrix with a certain shape.''' return numpy.zeros(shape, dtype=dtype) def itershape(shape): '''Given a shape tuple, iterate over all indices in that shape.''' if not shape: yield () return for i in xrange(shape[0]): for z in itershape(shape[1:]): yield (i, ) + z def argsample(pdf, n=1): '''Return n indices drawn proportionally from a discrete mass vector.''' assert (pdf >= 0).all(), 'cannot sample from %r!' % pdf cdf = pdf.cumsum() return numpy.searchsorted(cdf, rng.uniform(0, cdf[-1], n)) def sample(pdf, n=1): '''Return n samples drawn proportionally from a discrete mass vector.''' assert len(pdf.shape) == 1 return pdf[argsample(pdf, n)]
[docs]class Map(object): '''Basic implementation of a rectangular N-dimensional self-organizing map. A Self-Organizing or Kohonen Map (henceforth just Map) is a group of lightweight processing units called neurons, which are here implemented as vectors of real numbers. Neurons in a Map are arranged in a specific topology, so that a given neuron is connected to a small, specific subset of the overall neurons in the Map. In addition, the Map uses a distance metric (e.g., Euclidean distance) for computing similarity between neurons and cue vectors, as described below. The Map accepts cues---vectors of real numbers---as inputs. In standard Map usage, cues represent some data point of interest. Normally applications of Maps use input vectors like the activation patterns for an array of sensors, term frequency vectors for a document, etc. Cues are stored in the Map as follows: First, a "winner" neuron w is chosen from the Map, and, second, the neurons in the Map topologically near w are altered so that they become closer to the cue. Each of these steps is described briefly below. For the first step, the Map computes the distance between the cue and each of the Map neurons using its metric. The neuron closest to the cue under this metric is declared the "winner" w. Alternatively, the winner can be selected probabilistically based on the overall distance landscape. Next, the Map alters the neurons in the neighborhood of w, normally using some function of the difference between the cue and the neuron being modified. The weight of the alteration decreases exponentially as the topological distance from w increases. The learning rule for a neuron n is n += eta * exp(-d**2 / sigma**2) * (c - n) where eta is the learning rate, sigma is called the neighborhood size, d is the topological distance between n and w, and c is the cue vector being stored in the map. Eta and sigma normally decrease in value over time, to take advantage of the empirical machine learning benefits of simulated annealing. The storage mechanism in a Map has the effect of grouping cues with similar characteristics into similar areas of the Map. Because the winner---and its neighborhood---are altered to look more like the cues that they capture, the winner for a given cue will tend to win similar inputs in the future. This tends to cluster similar Map inputs, and can lead to interesting data organization patterns. '''
[docs] def __init__(self, params): '''Initialize this Map.''' self._shape = params.shape self.dimension = params.dimension self.neurons = _zeros(self.shape + (self.dimension, )) self._metric = params.metric self._learning_rate = params.learning_rate self._neighborhood_size = params.neighborhood_size self._noise_variance = params.noise_variance # precompute a neighborhood mask for performing fast storage updates. # this mask is the same dimensionality as self.shape, but twice the size # along each axis. the maximum value in the mask is 1, occurring in the # center. values decrease in a gaussian fashion from the center. S = tuple(2 * size - 1 for size in self.shape) self._neighborhood_mask = _zeros(S) for coords in itershape(S): z = 0 for axis, offset in enumerate(coords): d = offset + 1 - self.shape[axis] z += d * d self._neighborhood_mask[coords] = numpy.exp(-z / 2)
@property def shape(self): return self._shape
[docs] def neuron(self, coords): '''Get the current state of a specific neuron.''' return self.neurons[coords]
[docs] def reset(self, f=None): '''Reset the neurons and timeseries in the Map. f: A callable that takes a neuron coordinate and returns a value for that neuron. Defaults to random values from the standard normal. ''' self._learning_rate.reset() self._neighborhood_size.reset() if f is None: self.neurons = rng.randn(*self.neurons.shape) else: for z in itershape(self.shape): self.neurons[z] = f(z)
[docs] def weights(self, distances): '''Get an array of learning weights to use for storing a cue.''' i = self.smallest(distances) z = [] for axis, size in enumerate(self.flat_to_coords(i)): offset = self.shape[axis] - size - 1 z.append(slice(offset, offset + self.shape[axis])) sigma = self._neighborhood_size() return self._neighborhood_mask[z] ** (1.0 / sigma / sigma)
[docs] def distances(self, cue): '''Get the distance of each neuron in the Map to a particular cue.''' z = numpy.resize(cue, self.neurons.shape) return self._metric(z, self.neurons)
[docs] def flat_to_coords(self, i): '''Given a flattened index, convert it to a coordinate tuple.''' coords = [] for limit in reversed(self.shape[1:]): i, j = divmod(i, limit) coords.append(j) coords.append(i) return tuple(reversed(coords))
[docs] def winner(self, cue): '''Get the coordinates of the most similar neuron to the given cue. Returns a flat index; use flat_to_coords to convert this to a neuron index. ''' return self.smallest(self.distances(cue))
[docs] def sample(self, n): '''Get a sample of n neuron coordinates from the map. The returned values will be flat indices; use flat_to_coords to convert them to neuron indices. ''' return rng.randint(0, self.neurons.size / self.dimension - 1, n)
[docs] def smallest(self, distances): '''Get the index of the smallest element in the given distances array. Returns a flat index; use flat_to_coords to convert this to a neuron index. ''' assert distances.shape == self.shape return distances.argmin()
[docs] def learn(self, cue, weights=None, distances=None): '''Add a new cue vector to the Map, moving neurons as needed.''' if weights is None: if distances is None: distances = self.distances(cue) weights = self.weights(distances) assert weights.shape == self.shape weights.shape += (1, ) delta = numpy.resize(cue, self.neurons.shape) - self.neurons eta = self._learning_rate() self.neurons += eta * weights * delta if self._noise_variance: self.neurons += rng.normal( 0, self._noise_variance(), self.neurons.shape)
[docs] def neuron_heatmap(self, axes=(0, 1), lower=None, upper=None): '''Return an image representation of this Map.''' return heatmap(self.neurons, axes, lower, upper)
[docs] def distance_heatmap(self, cue, axes=(0, 1), lower=None, upper=None): '''Return an image representation of the distance to a cue.''' return heatmap(self.distances(cue), axes, lower, upper)
[docs]class Gas(Map): '''A neural Gas is a topologically unordered collection of neurons. Learning takes place in the Gas by ordering the neurons according to their distance from each cue that is presented. Neurons are updated using this sorted order, with an exponentially decreasing weight for neurons that are further (in sort order) from the cue. '''
[docs] def __init__(self, params): '''Initialize this Gas. A Gas must have a 1D shape.''' super(Gas, self).__init__(params) assert len(params.shape) == 1 self.N = params.shape[0]
def weights(self, distances): # this is slightly different from a traditional gas, which uses a linear # negative exponential for update weights: # # return numpy.exp(-distances.argsort() / sigma) # # quadratic weights more closely match the standard kohonen behavior. z = distances.argsort() / self._neighborhood_size() return numpy.exp(-z * z)
def _array_without(a, i): '''Remove the ith row and column from 2x2 array a.''' if i == 0: return a[1:, 1:].copy() if i == a.shape[0] - 1: return a[:-1, :-1].copy() return numpy.hstack((numpy.vstack((a[:i, :i], a[i+1:, :i])), numpy.vstack((a[:i, i+1:], a[i+1:, i+1:])))) def _vector_without(v, i): '''Remove the ith element from vector v.''' if i == 0: return v[1:].copy() if i == v.shape[0] - 1: return v[:-1].copy() return numpy.concatenate((v[:i], v[i+1:]))
[docs]class GrowingGasParameters(Parameters): '''Parameters for Growing Neural Gases.'''
[docs] def __init__(self, growth_interval=2, max_connection_age=5, error_decay=0.99, neighbor_error_decay=0.99, **kwargs): super(GrowingGasParameters, self).__init__(**kwargs) self.growth_interval = growth_interval self.max_connection_age = max_connection_age self.error_decay = error_decay self.neighbor_error_decay = neighbor_error_decay
[docs]class GrowingGas(Gas): '''A Growing Neural Gas uses a variable number of variable-topology neurons. In essence, a GNG is similar to a standard Gas, but there is additional logic in this class for adding new neurons to better explain areas of the sample space that currently have large error. '''
[docs] def __init__(self, params): '''Initialize a new Growing Gas with parameters.''' self._size = 2 super(GrowingGas, self).__init__(params) self._growth_interval = params.growth_interval self._max_connection_age = params.max_connection_age self._error_decay = params.error_decay self._neighbor_error_decay = params.neighbor_error_decay self._errors = _zeros(self.shape) self._connections = _zeros((self._size, self._size), '=i2') - 1 self._cue_count = 0
@property def shape(self): return (self._size, ) def neighbors(self, i): return self._connections[i] def _connect(self, a, b): self._set_connection(a, b, 0) def _age_connection(self, a, b): self._set_connection(a, b, self._connections[a, b] + 1) def _disconnect(self, a, b): self._set_connection(a, b, -1) def _set_connection(self, a, b, age): self._connections[a, b] = self._connections[b, a] = age
[docs] def learn(self, cue, weights=None, distances=None): '''Store a cue in the gas.''' distances = self.distances(cue) # find the two closest neurons. connect them. add error to the winner. w = distances.argmin() d = distances[w] self._errors[w] += d * d distances[w] = 1 + distances.max() self._connect(w, distances.argmin()) # move the winner and all of its neighbors toward the cue. eta = self._learning_rate() def adjust(i): self.neurons[i] += eta * (cue - self.neurons[i]) adjust(w) for j, age in enumerate(self.neighbors(w)): if 0 <= age < 65535: # prevent 16-bit age counter overflow adjust(j) self._age_connection(w, j) # add noise. if self._noise_variance: self.neurons += rng.normal( 0, self._noise_variance(), self.neurons.shape) # manipulate the gas topology by pruning and growing as needed. self._prune() self._cue_count += 1 if (self._cue_count % self._growth_interval == 0 and self._size < self.N): self._grow() # decrease unit error. self._errors *= self._error_decay
def _prune(self): '''Remove old connections, and prune any disconnected neurons.''' mask = numpy.where(self._connections > self._max_connection_age) if self._size == 2 or len(mask[0]) == 0: return # remove connections older than max_connection_age (set to -1). self._connections[mask] = -1 # remove neurons that were disconnected after removing connections. indices, = numpy.where((self._connections < 0).all(axis=0)) for i in indices[::-1]: self.neurons = _vector_without(self.neurons, i) self._errors = _vector_without(self._errors, i) self._connections = _array_without(self._connections, i) self._size -= 1 def _grow(self): '''Add a single neuron between two high-error neurons.''' # identify the neuron with max error, and its max error neighbor. q = self._errors.argmax() f = (self._errors * (self.neighbors(q) >= 0)).argmax() r = self._size # allocate a new neurons array. neurons = _zeros((r + 1, self.dimension)) neurons[:r] = self.neurons self.neurons = neurons self.neurons[r] = (self.neurons[q] + self.neurons[f]) / 2 # insert new node between old two nodes. self._disconnect(q, f) conn = _zeros((r + 1, r + 1), '=i2') - 1 conn[:r, :r] = self._connections self._connections = conn self._connect(q, r) self._connect(r, q) # update error for the new and old neurons. self._errors = numpy.concatenate((self._errors, [0])) self._errors[f] *= self._neighbor_error_decay self._errors[q] *= self._neighbor_error_decay self._errors[r] = (self._errors[f] + self._errors[q]) / 2 self._size += 1
[docs]class Filter(object): '''A Filter is an estimate of the probability density of the inputs.'''
[docs] def __init__(self, map, history=None): '''Initialize this Filter with an underlying Map implementation. history: A callable that returns values in the open interval (0, 1). These values determine how much new cues influence the activation state of the Filter. A 0 value would mean that no history is preserved (i.e. each new cue stored in the Filter completely determines the activity of the Filter) while a 1 value would mean that new cues have no impact on the activity of the Filter (i.e. the initial activity is the only activity that is ever used). ''' self.map = map self.activity = _zeros(self.map.shape) + 1 self.activity /= self.activity.sum() self._history = history is None and ConstantTimeseries(0.7) or history
@property def shape(self): return self.map.shape def neuron(self, coords): return self.map.neuron(coords) def reset(self, f=None): return self.map.reset(f=f) def distances(self, cue): return self.map.distances(cue) def flat_to_coords(self, i): return self.map.flat_to_coords(i) def winner(self, cue): return self.map.winner(cue) def smallest(self, distances): return self.map.smallest(distances) def weights(self, distances): return self.map.weights(distances) * (1 - self.activity) def sample(self, n): return argsample(self.activity, n) def learn(self, cue, **kwargs): d = self.distances(cue) p = numpy.exp(-self.distances(cue).argsort()) l = self._history() self.activity = l * self.activity + (1 - l) * p / p.sum() self.map.learn(cue, **kwargs)

Related Topics