Skip to content

Commit

Permalink
pairwise_cross_correlation scaleopt flag (#277)
Browse files Browse the repository at this point in the history
scaleopt is equivalent with matlab xcorr(). Default is ubiased.
  • Loading branch information
dizcza authored Jan 27, 2020
1 parent 2062dc7 commit ca0ff03
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 92 deletions.
162 changes: 95 additions & 67 deletions elephant/signal_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
'''

from __future__ import division, print_function
import numpy as np
import scipy.signal
import quantities as pq

import warnings

import neo
import numpy as np
import numpy.matlib as npm
import quantities as pq
import scipy.signal


def zscore(signal, inplace=True):
Expand Down Expand Up @@ -139,44 +142,67 @@ def zscore(signal, inplace=True):
return result


def cross_correlation_function(signal, ch_pairs, env=False, nlags=None):

"""
Computes unbiased estimator of the cross-correlation function.
Calculates the unbiased estimator of the cross-correlation function [1]_
def cross_correlation_function(signal, ch_pairs, env=False, nlags=None,
scaleopt='unbiased'):
r"""
Calculates the pairwise cross-correlation estimate of `signal[ch_pairs]`
[1]_.
.. math::
R(\\tau) = \\frac{1}{N-|k|} R'(\\tau) \\ ,
where :math:`R'(\\tau) = \\left<x(t)y(t+\\tau)\\right>` in a pairwise
manner, i.e. `signal[ch_pairs[0,0]]` vs `signal2[ch_pairs[0,1]]`,
`signal[ch_pairs[1,0]]` vs `signal2[ch_pairs[1,1]]`, and so on. The
cross-correlation function is obtained by `scipy.signal.fftconvolve`.
Time series in signal are zscored beforehand. Alternatively returns the
Hilbert envelope of :math:`R(\\tau)`, which is useful to determine the
R_{xy}(\tau) = \frac{1}{normalizer} \left<x(t),y(t+\tau)\right>
:math:`R_{xy}(\tau)` is calculated in a pairwise manner, i.e.
`signal[ch_pairs[0,0]]` vs `signal[ch_pairs[0,1]]`,
`signal[ch_pairs[1,0]]` vs `signal[ch_pairs[1,1]]`, and so on.
The input time series are z-scored beforehand. `scaleopt` controls the
choice of :math:`R_{xy}(\tau)` normalizer. Alternatively, returns the
Hilbert envelope of :math:`R_{xy}(\tau)`, which is useful to determine the
correlation length of oscillatory signals.
Parameters
-----------
signal : neo.AnalogSignal (`nt` x `nch`)
Signal with nt number of samples that contains nch LFP channels
ch_pairs : list (or array with shape `(n,2)`)
list with n channel pairs for which to compute cross-correlation,
----------
signal : neo.AnalogSignal
Shape: `[nt, nch]`
Signal with `nt` number of samples that contains `nch` LFP channels
ch_pairs : np.ndarray or list
Shape: `[n, 2]`
List with `n` channel pairs for which to compute cross-correlation,
each element of list must contain 2 channel indices
env : bool
env : bool, optional
Return Hilbert envelope of cross-correlation function
Default: False
nlags : int
Defines number of lags for cross-correlation function. Float will be
rounded to nearest integer. Number of samples of output is `2*nlags+1`.
If None, number of samples of output is equal to number of samples of
input signal, namely `nt`
nlags : int, optional
Defines the number of lags for the cross-correlation function. The
number of output samples is `2*nlags+1`. If `None`, the number of
output samples is equal to the number of input samples, namely `nt`.
Default: None
scaleopt : {'none', 'biased', 'unbiased', 'normalized', 'coeff'}, optional
Normalization option, equivalent to matlab `xcorr(..., scaleopt)`.
Specified as one of the following.
* 'none': raw, unscaled cross-correlation :math:`R_{xy}(\tau)`.
* 'biased': biased estimate of the cross-correlation:
.. math::
R_{xy,biased}(\tau) = \frac{1}{N} R_{xy}(\tau)
* 'unbiased': unbiased estimate of the cross-correlation:
.. math::
R_{xy,unbiased}(\tau) = \frac{1}{N-\tau) R_{xy}{\tau)
* 'normalized' or 'coeff': normalizes the sequence so that the
autocorrelations at zero lag equal 1:
.. math::
R_{xy,coeff}(\tau) = \frac{1}{\sqrt{R_{xx}(0) R_{yy}(0)}
R_{xy}(\tau)
Default: 'unbiased'
Returns
-------
cross_corr : neo.AnalogSignal (`2*nlag+1` x `n`)
cross_corr : neo.AnalogSignal
Shape: `[2*nlags+1, n]`
Pairwise cross-correlation functions for channel pairs given by
`ch_pairs`. If `env=True`, the output is the Hilbert envelope of the
pairwise cross-correlation function. This is helpful to compute the
Expand All @@ -185,13 +211,15 @@ def cross_correlation_function(signal, ch_pairs, env=False, nlags=None):
Raises
------
ValueError
If the input signal is not a neo.AnalogSignal.
If the input signal is not an object of `neo.AnalogSignal`.
ValueError
If `ch_pairs` is not a list of channel pair indices with shape `(n,2)`.
KeyError
If keyword `env` is not a boolean.
KeyError
If `nlags` is not an integer or float larger than 0.
ValueError
If `env` is not a boolean.
ValueError
If `nlags` is not a positive integer.
ValueError
If `scaleopt` is not one of the predefined above keywords.
Examples
--------
Expand All @@ -215,68 +243,68 @@ def cross_correlation_function(signal, ch_pairs, env=False, nlags=None):
References
----------
.. [1] Hall & River (2009) "Spectral Analysis of Signals, Spectral Element
Method in Structural Dynamics", Eq. 2.2.3
.. [1] Stoica, P., & Moses, R. (2005). Spectral Analysis of Signals.
Prentice Hall. Retrieved from http://user.it.uu.se/~ps/SAS-new.pdf,
Eq. 2.2.3.
"""

# Make ch_pairs a 2D array
pairs = np.array(ch_pairs)
pairs = np.asarray(ch_pairs)
if pairs.ndim == 1:
pairs = pairs[:, np.newaxis]
pairs = np.expand_dims(pairs, axis=0)

# Check input
if not isinstance(signal, neo.AnalogSignal):
raise ValueError('Input signal is not a neo.AnalogSignal!')
if np.shape(pairs)[1] != 2:
pairs = pairs.T
if np.shape(pairs)[1] != 2:
raise ValueError('ch_pairs is not a list of channel pair indices.'\
raise ValueError('Input signal must be of type neo.AnalogSignal')
if pairs.shape[1] != 2:
raise ValueError('`ch_pairs` is not a list of channel pair indices. '
'Cannot define pairs for cross-correlation.')
if not isinstance(env, bool):
raise KeyError('env is not a boolean!')
raise ValueError('`env` must be a boolean value')
if nlags is not None:
if not isinstance(nlags, (int, float)):
raise KeyError('nlags must be an integer or float larger than 0!')
if nlags <= 0:
raise KeyError('nlags must be an integer or float larger than 0!')
if not isinstance(nlags, int) or nlags <= 0:
raise ValueError('nlags must be a non-negative integer')

# z-score analog signal and store channel time series in different arrays
# Cross-correlation will be calculated between xsig and ysig
xsig = np.array([zscore(signal).magnitude[:, pair[0]] \
for pair in pairs]).T
ysig = np.array([zscore(signal).magnitude[:, pair[1]] \
for pair in pairs]).T
z_transformed = zscore(signal, inplace=False).magnitude
# transpose (nch, xy, nt) -> (xy, nt, nch)
xsig, ysig = np.transpose(z_transformed.T[pairs], (1, 2, 0))

# Define vector of lags tau
nt, nch = np.shape(xsig)
tau = (np.arange(nt) - nt//2)
nt, nch = xsig.shape
tau = np.arange(nt) - nt // 2

# Calculate cross-correlation by taking Fourier transform of signal,
# multiply in Fourier space, and transform back. Correct for bias due
# to zero-padding
xcorr = np.zeros((nt, nch))
for i in range(nch):
xcorr[:, i] = scipy.signal.fftconvolve(xsig[:, i], ysig[::-1, i],
mode='same')
xcorr = xcorr / npm.repmat((nt-abs(tau)), nch, 1).T
xcorr = scipy.signal.fftconvolve(xsig, ysig[::-1], mode='same', axes=0)
if scaleopt == 'biased':
xcorr /= nt
elif scaleopt == 'unbiased':
normalizer = np.expand_dims(nt - np.abs(tau), axis=1)
xcorr /= normalizer
elif scaleopt in ('normalized', 'coeff'):
normalizer = np.sqrt((xsig ** 2).sum(axis=0) * (ysig ** 2).sum(axis=0))
xcorr /= normalizer
elif scaleopt != 'none':
raise ValueError("Invalid scaleopt mode: '{}'".format(scaleopt))

# Calculate envelope of cross-correlation function with Hilbert transform.
# This is useful for transient oscillatory signals.
if env:
for i in range(nch):
xcorr[:, i] = np.abs(scipy.signal.hilbert(xcorr[:, i]))
xcorr = np.abs(scipy.signal.hilbert(xcorr, axis=0))

# Cut off lags outside desired range
# Cut off lags outside the desired range
if nlags is not None:
nlags = int(np.round(nlags))
tau0 = int(np.argwhere(tau == 0))
xcorr = xcorr[tau0-nlags:tau0+nlags+1, :]
tau0 = np.argwhere(tau == 0).item()
xcorr = xcorr[tau0 - nlags: tau0 + nlags + 1, :]

# Return neo.AnalogSignal
cross_corr = neo.AnalogSignal(xcorr,
units='',
t_start=np.min(tau)*signal.sampling_period,
t_stop=np.max(tau)*signal.sampling_period,
t_start=tau[0]*signal.sampling_period,
t_stop=tau[-1]*signal.sampling_period,
sampling_rate=signal.sampling_rate,
dtype=float)
return cross_corr
Expand Down
95 changes: 70 additions & 25 deletions elephant/test/test_signal_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@

import neo
import numpy as np
import quantities as pq
import scipy.signal as spsig
import scipy.stats
from numpy.testing.utils import assert_array_almost_equal
import quantities as pq
import elephant.signal_processing
from numpy.ma.testutils import assert_array_equal, assert_allclose
from numpy.testing.utils import assert_array_almost_equal

import elephant.signal_processing

class XCorrelationTestCase(unittest.TestCase):

class PairwiseCrossCorrelationTest(unittest.TestCase):
# Set parameters
sampling_period = 0.02*pq.s
sampling_rate = 1./sampling_period
sampling_period = 0.02 * pq.s
sampling_rate = 1. / sampling_period
n_samples = 2018
time = np.arange(n_samples)*sampling_period
freq = 1.*pq.Hz
times = np.arange(n_samples) * sampling_period
freq = 1. * pq.Hz

def test_cross_correlation_freqs(self):
'''
Expand All @@ -35,28 +35,32 @@ def test_cross_correlation_freqs(self):
E.g., f=0.1 and N=2018 only has an accuracy on the order decimal=1
'''
freq_arr = np.linspace(0.5, 15, 8) * pq.Hz
signal = np.zeros((self.n_samples, 2))
signal = np.zeros((self.n_samples, 3))
for freq in freq_arr:
signal[:, 0] = np.sin(2.*np.pi*freq*self.time)
signal[:, 1] = np.cos(2.*np.pi*freq*self.time)
signal[:, 0] = np.sin(2. * np.pi * freq * self.times)
signal[:, 1] = np.cos(2. * np.pi * freq * self.times)
signal[:, 2] = np.cos(2. * np.pi * freq * self.times + 0.2)
# Convert signal to neo.AnalogSignal
signal_neo = neo.AnalogSignal(signal, units='mV', t_start=0.*pq.ms,
sampling_rate=self.sampling_rate,
dtype=float)
signal_neo = neo.AnalogSignal(signal, units='mV',
t_start=0. * pq.ms,
sampling_rate=self.sampling_rate,
dtype=float)
rho = elephant.signal_processing.cross_correlation_function(
signal_neo, [0, 1])
signal_neo, [[0, 1], [0, 2]])
# Cross-correlation of sine and cosine should be sine
assert_array_almost_equal(
rho.magnitude[:, 0], np.sin(2.*np.pi*freq*rho.times), decimal=2)
rho.magnitude[:, 0], np.sin(2. * np.pi * freq * rho.times),
decimal=2)
self.assertEqual(rho.shape, (signal.shape[0], 2)) # 2 pairs

def test_cross_correlation_nlags(self):
'''
Sine vs cosine for specific nlags
'''
nlags = 30
signal = np.zeros((self.n_samples, 2))
signal[:, 0] = 0.2 * np.sin(2.*np.pi*self.freq*self.time)
signal[:, 1] = 5.3 * np.cos(2.*np.pi*self.freq*self.time)
signal[:, 0] = 0.2 * np.sin(2. * np.pi * self.freq * self.times)
signal[:, 1] = 5.3 * np.cos(2. * np.pi * self.freq * self.times)
# Convert signal to neo.AnalogSignal
signal = neo.AnalogSignal(signal, units='mV', t_start=0.*pq.ms,
sampling_rate=self.sampling_rate,
Expand All @@ -72,8 +76,8 @@ def test_cross_correlation_phi(self):
'''
phi = np.pi/6.
signal = np.zeros((self.n_samples, 2))
signal[:, 0] = 0.2 * np.sin(2.*np.pi*self.freq*self.time+phi)
signal[:, 1] = 5.3 * np.cos(2.*np.pi*self.freq*self.time)
signal[:, 0] = 0.2 * np.sin(2. * np.pi * self.freq * self.times + phi)
signal[:, 1] = 5.3 * np.cos(2. * np.pi * self.freq * self.times)
# Convert signal to neo.AnalogSignal
signal = neo.AnalogSignal(signal, units='mV', t_start=0.*pq.ms,
sampling_rate=self.sampling_rate,
Expand All @@ -85,23 +89,64 @@ def test_cross_correlation_phi(self):
rho.magnitude[:, 0], np.sin(2.*np.pi*self.freq*rho.times+phi),
decimal=2)

def test_cross_correlation_env(self):
def test_cross_correlation_envelope(self):
'''
Envelope of sine vs cosine
'''
# Sine with phase shift phi vs cosine for different frequencies
nlags = 800 # nlags need to be smaller than N/2 b/c border effects
signal = np.zeros((self.n_samples, 2))
signal[:, 0] = 0.2 * np.sin(2.*np.pi*self.freq*self.time)
signal[:, 1] = 5.3 * np.cos(2.*np.pi*self.freq*self.time)
signal[:, 0] = 0.2 * np.sin(2. * np.pi * self.freq * self.times)
signal[:, 1] = 5.3 * np.cos(2. * np.pi * self.freq * self.times)
# Convert signal to neo.AnalogSignal
signal = neo.AnalogSignal(signal, units='mV', t_start=0.*pq.ms,
sampling_rate=self.sampling_rate,
dtype=float)
env = elephant.signal_processing.cross_correlation_function(
envelope = elephant.signal_processing.cross_correlation_function(
signal, [0, 1], nlags=nlags, env=True)
# Envelope should be one for sinusoidal function
assert_array_almost_equal(env, np.ones_like(env), decimal=2)
assert_array_almost_equal(envelope, np.ones_like(envelope), decimal=2)

def test_cross_correlation_biased(self):
signal = np.c_[np.sin(2. * np.pi * self.freq * self.times),
np.cos(2. * np.pi * self.freq * self.times)] * pq.mV
signal = neo.AnalogSignal(signal, t_start=0. * pq.ms,
sampling_rate=self.sampling_rate)
raw = elephant.signal_processing.cross_correlation_function(
signal, [0, 1], scaleopt='none'
)
biased = elephant.signal_processing.cross_correlation_function(
signal, [0, 1], scaleopt='biased'
)
assert_array_almost_equal(biased, raw / biased.shape[0])

def test_cross_correlation_coeff(self):
signal = np.c_[np.sin(2. * np.pi * self.freq * self.times),
np.cos(2. * np.pi * self.freq * self.times)] * pq.mV
signal = neo.AnalogSignal(signal, t_start=0. * pq.ms,
sampling_rate=self.sampling_rate)
normalized = elephant.signal_processing.cross_correlation_function(
signal, [0, 1], scaleopt='coeff'
)
sig1, sig2 = signal.magnitude.T
target_numpy = np.correlate(sig1, sig2, mode="same")
target_numpy /= np.sqrt((sig1 ** 2).sum() * (sig2 ** 2).sum())
target_numpy = np.expand_dims(target_numpy, axis=1)
assert_array_almost_equal(normalized.magnitude,
target_numpy,
decimal=3)

def test_cross_correlation_coeff_autocorr(self):
# Numpy/Matlab equivalent
signal = np.sin(2. * np.pi * self.freq * self.times)
signal = signal[:, np.newaxis] * pq.mV
signal = neo.AnalogSignal(signal, t_start=0. * pq.ms,
sampling_rate=self.sampling_rate)
normalized = elephant.signal_processing.cross_correlation_function(
signal, [0, 0], scaleopt='coeff'
)
# auto-correlation at zero lag should equal 1
self.assertAlmostEqual(normalized[normalized.shape[0] // 2], 1)


class ZscoreTestCase(unittest.TestCase):
Expand Down

0 comments on commit ca0ff03

Please sign in to comment.