Source code for NeuroTools.analysis

"""
NeuroTools.analysis
===================

A collection of analysis functions that may be used by NeuroTools.signals or other packages.

.. currentmodule:: NeuroTools.analysis

Classes
-------
.. autosummary::

   TuningCurve

Functions
---------
.. autosummary::
   :nosignatures:

   ccf
   crosscorrelate
   make_kernel
   simple_frequency_spectrum

"""

import numpy as np

from NeuroTools import check_dependency

HAVE_MATPLOTLIB = check_dependency('matplotlib')
if HAVE_MATPLOTLIB:
    import matplotlib
    matplotlib.use('Agg')
else:
    MATPLOTLIB_ERROR = "The matplotlib package was not detected"

HAVE_PYLAB = check_dependency('pylab')
if HAVE_PYLAB:
    import pylab
else:
    PYLAB_ERROR = "The pylab package was not detected"


[docs]def ccf(x, y, axis=None): """Fast cross correlation function based on fft. Computes the cross-correlation function of two series. Note that the computations are performed on anomalies (deviations from average). Returns the values of the cross-correlation at different lags. Parameters ---------- x, y : 1D MaskedArrays The two input arrays. axis : integer, optional Axis along which to compute (0 for rows, 1 for cols). If `None`, the array is flattened first. Examples -------- >>> z = arange(5) >>> ccf(z,z) array([ 3.90798505e-16, -4.00000000e-01, -4.00000000e-01, -1.00000000e-01, 4.00000000e-01, 1.00000000e+00, 4.00000000e-01, -1.00000000e-01, -4.00000000e-01, -4.00000000e-01]) """ assert x.ndim == y.ndim, "Inconsistent shape !" # assert(x.shape == y.shape, "Inconsistent shape !") if axis is None: if x.ndim > 1: x = x.ravel() y = y.ravel() npad = x.size + y.size xanom = (x - x.mean(axis=None)) yanom = (y - y.mean(axis=None)) Fx = np.fft.fft(xanom, npad, ) Fy = np.fft.fft(yanom, npad, ) iFxy = np.fft.ifft(Fx.conj() * Fy).real varxy = np.sqrt(np.inner(xanom, xanom) * np.inner(yanom, yanom)) else: npad = x.shape[axis] + y.shape[axis] if axis == 1: if x.shape[0] != y.shape[0]: raise ValueError("Arrays should have the same length!") xanom = (x - x.mean(axis=1)[:, None]) yanom = (y - y.mean(axis=1)[:, None]) varxy = np.sqrt((xanom * xanom).sum(1) * (yanom * yanom).sum(1))[:, None] else: if x.shape[1] != y.shape[1]: raise ValueError("Arrays should have the same width!") xanom = (x - x.mean(axis=0)) yanom = (y - y.mean(axis=0)) varxy = np.sqrt((xanom * xanom).sum(0) * (yanom * yanom).sum(0)) Fx = np.fft.fft(xanom, npad, axis=axis) Fy = np.fft.fft(yanom, npad, axis=axis) iFxy = np.fft.ifft(Fx.conj() * Fy, n=npad, axis=axis).real # We just turn the lags into correct positions: iFxy = np.concatenate((iFxy[len(iFxy) / 2:len(iFxy)], iFxy[0:len(iFxy) / 2])) return iFxy / varxy
from NeuroTools.plotting import get_display, set_labels HAVE_PYLAB = check_dependency('pylab')
[docs]def crosscorrelate(sua1, sua2, lag=None, n_pred=1, predictor=None, display=False, kwargs={}): """Cross-correlation between two series of discrete events (e.g. spikes). Calculates the cross-correlation between two vectors containing event times. Returns ``(differeces, pred, norm)``. See below for details. Adapted from original script written by Martin P. Nawrot for the FIND MATLAB toolbox [1]_. Parameters ---------- sua1, sua2 : 1D row or column `ndarray` or `SpikeTrain` Event times. If sua2 == sua1, the result is the autocorrelogram. lag : float Lag for which relative event timing is considered with a max difference of +/- lag. A default lag is computed from the inter-event interval of the longer of the two sua arrays. n_pred : int Number of surrogate compilations for the predictor. This influences the total length of the predictor output array predictor : {None, 'shuffle'} Determines the type of bootstrap predictor to be used. 'shuffle' shuffles interevent intervals of the longer input array and calculates relative differences with the shorter input array. `n_pred` determines the number of repeated shufflings, resulting differences are pooled from all repeated shufflings. display : boolean If True the corresponding plots will be displayed. If False, int, int_ and norm will be returned. kwargs : dict Arguments to be passed to np.histogram. Returns ------- differences : np array Accumulated differences of events in `sua1` minus the events in `sua2`. Thus positive values relate to events of `sua2` that lead events of `sua1`. Units are the same as the input arrays. pred : np array Accumulated differences based on the prediction method. The length of `pred` is ``n_pred * length(differences)``. Units are the same as the input arrays. norm : float Normalization factor used to scale the bin heights in `differences` and `pred`. ``differences/norm`` and ``pred/norm`` correspond to the linear correlation coefficient. Examples -------- >> crosscorrelate(np_array1, np_array2) >> crosscorrelate(spike_train1, spike_train2) >> crosscorrelate(spike_train1, spike_train2, lag = 150.0) >> crosscorrelate(spike_train1, spike_train2, display=True, kwargs={'bins':100}) See also -------- ccf .. [1] Meier R, Egert U, Aertsen A, Nawrot MP, "FIND - a unified framework for neural data analysis"; Neural Netw. 2008 Oct; 21(8):1085-93. """ assert predictor is 'shuffle' or predictor is None, "predictor must be \ either None or 'shuffle'. Other predictors are not yet implemented." #Check whether sua1 and sua2 are SpikeTrains or arrays sua = [] for x in (sua1, sua2): #if isinstance(x, SpikeTrain): if hasattr(x, 'spike_times'): sua.append(x.spike_times) elif x.ndim == 1: sua.append(x) elif x.ndim == 2 and (x.shape[0] == 1 or x.shape[1] == 1): sua.append(x.ravel()) else: raise TypeError("sua1 and sua2 must be either instances of the" \ "SpikeTrain class or column/row vectors") sua1 = sua[0] sua2 = sua[1] if sua1.size < sua2.size: if lag is None: lag = np.ceil(10*np.mean(np.diff(sua1))) reverse = False else: if lag is None: lag = np.ceil(20*np.mean(np.diff(sua2))) sua1, sua2 = sua2, sua1 reverse = True #construct predictor if predictor is 'shuffle': isi = np.diff(sua2) sua2_ = np.array([]) for ni in xrange(1,n_pred+1): idx = np.random.permutation(isi.size-1) sua2_ = np.append(sua2_, np.add(np.insert( (np.cumsum(isi[idx])), 0, 0), sua2.min() + ( np.random.exponential(isi.mean())))) #calculate cross differences in spike times differences = np.array([]) pred = np.array([]) for k in xrange(0, sua1.size): differences = np.append(differences, sua1[k] - sua2[np.nonzero( (sua2 > sua1[k] - lag) & (sua2 < sua1[k] + lag))]) if predictor == 'shuffle': for k in xrange(0, sua1.size): pred = np.append(pred, sua1[k] - sua2_[np.nonzero( (sua2_ > sua1[k] - lag) & (sua2_ < sua1[k] + lag))]) if reverse is True: differences = -differences pred = -pred norm = np.sqrt(sua1.size * sua2.size) # Plot the results if display=True if display: subplot = get_display(display) if not subplot or not HAVE_PYLAB: return differences, pred, norm else: # Plot the cross-correlation try: counts, bin_edges = np.histogram(differences, **kwargs) edge_distances = np.diff(bin_edges) bin_centers = bin_edges[1:] - edge_distances/2 counts = counts / norm xlabel = "Time" ylabel = "Cross-correlation coefficient" #NOTE: the x axis corresponds to the upper edge of each bin subplot.plot(bin_centers, counts, label='cross-correlation', color='b') if predictor is None: set_labels(subplot, xlabel, ylabel) pylab.draw() elif predictor is 'shuffle': # Plot the predictor norm_ = norm * n_pred counts_, bin_edges_ = np.histogram(pred, **kwargs) counts_ = counts_ / norm_ subplot.plot(bin_edges_[1:], counts_, label='predictor') subplot.legend() pylab.draw() except ValueError: print "There are no correlated events within the selected lag"\ " window of %s" % lag else: return differences, pred, norm
def _dict_max(D): """For a dict containing numerical values, return the key for the highest value. If there is more than one item with the same highest value, return one of them (arbitrary - depends on the order produced by the iterator). """ max_val = max(D.values()) for k in D: if D[k] == max_val: return k
[docs]def make_kernel(form, sigma, time_stamp_resolution, direction=1): """Creates kernel functions for convolution. Constructs a numeric linear convolution kernel of basic shape to be used for data smoothing (linear low pass filtering) and firing rate estimation from single trial or trial-averaged spike trains. Exponential and alpha kernels may also be used to represent postynaptic currents / potentials in a linear (current-based) model. Adapted from original script written by Martin P. Nawrot for the FIND MATLAB toolbox [1]_ [2]_. Parameters ---------- form : {'BOX', 'TRI', 'GAU', 'EPA', 'EXP', 'ALP'} Kernel form. Currently implemented forms are BOX (boxcar), TRI (triangle), GAU (gaussian), EPA (epanechnikov), EXP (exponential), ALP (alpha function). EXP and ALP are aymmetric kernel forms and assume optional parameter `direction`. sigma : float Standard deviation of the distribution associated with kernel shape. This parameter defines the time resolution (in ms) of the kernel estimate and makes different kernels comparable (cf. [1] for symetric kernels). This is used here as an alternative definition to the cut-off frequency of the associated linear filter. time_stamp_resolution : float Temporal resolution of input and output in ms. direction : {-1, 1} Asymmetric kernels have two possible directions. The values are -1 or 1, default is 1. The definition here is that for direction = 1 the kernel represents the impulse response function of the linear filter. Default value is 1. Returns ------- kernel : array_like Array of kernel. The length of this array is always an odd number to represent symmetric kernels such that the center bin coincides with the median of the numeric array, i.e for a triangle, the maximum will be at the center bin with equal number of bins to the right and to the left. norm : float For rate estimates. The kernel vector is normalized such that the sum of all entries equals unity sum(kernel)=1. When estimating rate functions from discrete spike data (0/1) the additional parameter `norm` allows for the normalization to rate in spikes per second. For example: ``rate = norm * scipy.signal.lfilter(kernel, 1, spike_data)`` m_idx : int Index of the numerically determined median (center of gravity) of the kernel function. Examples -------- To obtain single trial rate function of trial one should use:: r = norm * scipy.signal.fftconvolve(sua, kernel) To obtain trial-averaged spike train one should use:: r_avg = norm * scipy.signal.fftconvolve(sua, np.mean(X,1)) where `X` is an array of shape `(l,n)`, `n` is the number of trials and `l` is the length of each trial. See also -------- SpikeTrain.instantaneous_rate SpikeList.averaged_instantaneous_rate .. [1] Meier R, Egert U, Aertsen A, Nawrot MP, "FIND - a unified framework for neural data analysis"; Neural Netw. 2008 Oct; 21(8):1085-93. .. [2] Nawrot M, Aertsen A, Rotter S, "Single-trial estimation of neuronal firing rates - from single neuron spike trains to population activity"; J. Neurosci Meth 94: 81-92; 1999. """ assert form.upper() in ('BOX','TRI','GAU','EPA','EXP','ALP'), "form must \ be one of either 'BOX','TRI','GAU','EPA','EXP' or 'ALP'!" assert direction in (1,-1), "direction must be either 1 or -1" SI_sigma = sigma / 1000. #convert to SI units (ms -> s) SI_time_stamp_resolution = time_stamp_resolution / 1000. #convert to SI units (ms -> s) norm = 1./SI_time_stamp_resolution if form.upper() == 'BOX': w = 2.0 * SI_sigma * np.sqrt(3) width = 2 * np.floor(w / 2.0 / SI_time_stamp_resolution) + 1 # always odd number of bins height = 1. / width kernel = np.ones((1, width)) * height # area = 1 elif form.upper() == 'TRI': w = 2 * SI_sigma * np.sqrt(6) halfwidth = np.floor(w / 2.0 / SI_time_stamp_resolution) trileft = np.arange(1, halfwidth + 2) triright = np.arange(halfwidth, 0, -1) # odd number of bins triangle = np.append(trileft, triright) kernel = triangle / triangle.sum() # area = 1 elif form.upper() == 'EPA': w = 2.0 * SI_sigma * np.sqrt(5) halfwidth = np.floor(w / 2.0 / SI_time_stamp_resolution) base = np.arange(-halfwidth, halfwidth + 1) parabula = base**2 epanech = parabula.max() - parabula # inverse parabula kernel = epanech / epanech.sum() # area = 1 elif form.upper() == 'GAU': w = 2.0 * SI_sigma * 2.7 # > 99% of distribution weight halfwidth = np.floor(w / 2.0 / SI_time_stamp_resolution) # always odd base = np.arange(-halfwidth, halfwidth + 1) * SI_time_stamp_resolution g = np.exp(-(base**2) / 2.0 / SI_sigma**2) / SI_sigma / np.sqrt(2.0 * np.pi) kernel = g / g.sum() elif form.upper() == 'ALP': w = 5.0 * SI_sigma alpha = np.arange(1, (2.0 * np.floor(w / SI_time_stamp_resolution / 2.0) + 1) + 1) * SI_time_stamp_resolution alpha = (2.0 / SI_sigma**2) * alpha * np.exp(-alpha * np.sqrt(2) / SI_sigma) kernel = alpha / alpha.sum() # normalization if direction == -1: kernel = np.flipud(kernel) elif form.upper() == 'EXP': w = 5.0 * SI_sigma expo = np.arange(1, (2.0 * np.floor(w / SI_time_stamp_resolution / 2.0) + 1) + 1) * SI_time_stamp_resolution expo = np.exp(-expo / SI_sigma) kernel = expo / expo.sum() if direction == -1: kernel = np.flipud(kernel) kernel = kernel.ravel() m_idx = np.nonzero(kernel.cumsum() >= 0.5)[0].min() return kernel, norm, m_idx
[docs]def simple_frequency_spectrum(x): """Simple frequency spectrum. Very simple calculation of frequency spectrum with no detrending, windowing, etc, just the first half (positive frequency components) of abs(fft(x)) Parameters ---------- x : array_like The input array, in the time-domain. Returns ------- spec : array_like The frequency spectrum of `x`. """ spec = np.absolute(np.fft.fft(x)) spec = spec[:len(x) / 2] # take positive frequency components spec /= len(x) # normalize spec *= 2.0 # to get amplitudes of sine components, need to multiply by 2 spec[0] /= 2.0 # except for the dc component return spec
[docs]class TuningCurve(object): """Class to facilitate working with tuning curves.""" def __init__(self, D=None): """ If `D` is a dict, it is used to give initial values to the tuning curve. """ self._tuning_curves = {} self._counts = {} if D is not None: for k,v in D.items(): self._tuning_curves[k] = [v] self._counts[k] = 1 self.n = 1 else: self.n = 0
[docs] def add(self, D): for k,v in D.items(): self._tuning_curves[k].append(v) self._counts[k] += 1 self.n += 1
def __getitem__(self, i): D = {} for k,v in self._tuning_curves[k].items(): D[k] = v[i] return D def __repr__(self): return "TuningCurve: %s" % self._tuning_curves
[docs] def stats(self): """Return the mean tuning curve with stderrs.""" mean = {} stderr = {} n = self.n for k in self._tuning_curves.keys(): arr = np.array(self._tuning_curves[k]) mean[k] = arr.mean() stderr[k] = arr.std()*n/(n-1)/np.sqrt(n) return mean, stderr
[docs] def max(self): """Return the key of the max value and the max value.""" k = _dict_max(self._tuning_curves) return k, self._tuning_curves[k]