diff --git a/doc/conf.py b/doc/conf.py index cf163a150..aaab15245 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -123,6 +123,9 @@ # the autosummary fields of each module. autosummary_generate = True +# don't overwrite our custom toctree/*.rst +autosummary_generate_overwrite = False + # -- Options for HTML output --------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for diff --git a/doc/reference/toctree/kernels/elephant.kernels.AlphaKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.AlphaKernel.rst index c605460d0..212853ca0 100644 --- a/doc/reference/toctree/kernels/elephant.kernels.AlphaKernel.rst +++ b/doc/reference/toctree/kernels/elephant.kernels.AlphaKernel.rst @@ -4,4 +4,29 @@ elephant.kernels.AlphaKernel .. currentmodule:: elephant.kernels .. autoclass:: AlphaKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction, min_cutoff + :members: + :inherited-members: + + + .. rubric:: Methods + + .. autosummary:: + + ~AlphaKernel.__call__ + ~AlphaKernel.boundary_enclosing_area_fraction + ~AlphaKernel.cdf + ~AlphaKernel.icdf + ~AlphaKernel.is_symmetric + ~AlphaKernel.median_index + + + + + + .. rubric:: Attributes + + .. autosummary:: + + ~AlphaKernel.min_cutoff + + \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.EpanechnikovLikeKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.EpanechnikovLikeKernel.rst index b8de7b4f5..01737f363 100644 --- a/doc/reference/toctree/kernels/elephant.kernels.EpanechnikovLikeKernel.rst +++ b/doc/reference/toctree/kernels/elephant.kernels.EpanechnikovLikeKernel.rst @@ -4,4 +4,29 @@ elephant.kernels.EpanechnikovLikeKernel .. currentmodule:: elephant.kernels .. autoclass:: EpanechnikovLikeKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction, min_cutoff + :members: + :inherited-members: + + + .. rubric:: Methods + + .. autosummary:: + + ~EpanechnikovLikeKernel.__call__ + ~EpanechnikovLikeKernel.boundary_enclosing_area_fraction + ~EpanechnikovLikeKernel.cdf + ~EpanechnikovLikeKernel.icdf + ~EpanechnikovLikeKernel.is_symmetric + ~EpanechnikovLikeKernel.median_index + + + + + + .. rubric:: Attributes + + .. autosummary:: + + ~EpanechnikovLikeKernel.min_cutoff + + \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.ExponentialKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.ExponentialKernel.rst index 32de24208..35ce10c35 100644 --- a/doc/reference/toctree/kernels/elephant.kernels.ExponentialKernel.rst +++ b/doc/reference/toctree/kernels/elephant.kernels.ExponentialKernel.rst @@ -4,4 +4,29 @@ elephant.kernels.ExponentialKernel .. currentmodule:: elephant.kernels .. autoclass:: ExponentialKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction, min_cutoff + :members: + :inherited-members: + + + .. rubric:: Methods + + .. autosummary:: + + ~ExponentialKernel.__call__ + ~ExponentialKernel.boundary_enclosing_area_fraction + ~ExponentialKernel.cdf + ~ExponentialKernel.icdf + ~ExponentialKernel.is_symmetric + ~ExponentialKernel.median_index + + + + + + .. rubric:: Attributes + + .. autosummary:: + + ~ExponentialKernel.min_cutoff + + \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.GaussianKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.GaussianKernel.rst index 35f8b11ba..aafad0ed7 100644 --- a/doc/reference/toctree/kernels/elephant.kernels.GaussianKernel.rst +++ b/doc/reference/toctree/kernels/elephant.kernels.GaussianKernel.rst @@ -4,5 +4,29 @@ elephant.kernels.GaussianKernel .. currentmodule:: elephant.kernels .. autoclass:: GaussianKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction, min_cutoff + :members: + :inherited-members: + + + .. rubric:: Methods + + .. autosummary:: + + ~GaussianKernel.__call__ + ~GaussianKernel.boundary_enclosing_area_fraction + ~GaussianKernel.cdf + ~GaussianKernel.icdf + ~GaussianKernel.is_symmetric + ~GaussianKernel.median_index + + + + + + .. rubric:: Attributes + + .. autosummary:: + + ~GaussianKernel.min_cutoff + \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.Kernel.rst b/doc/reference/toctree/kernels/elephant.kernels.Kernel.rst deleted file mode 100644 index 870b1d0f2..000000000 --- a/doc/reference/toctree/kernels/elephant.kernels.Kernel.rst +++ /dev/null @@ -1,8 +0,0 @@ -elephant.kernels.Kernel -======================= - -.. currentmodule:: elephant.kernels - -.. autoclass:: Kernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction - \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.LaplacianKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.LaplacianKernel.rst index 0c8b764f5..fceddec45 100644 --- a/doc/reference/toctree/kernels/elephant.kernels.LaplacianKernel.rst +++ b/doc/reference/toctree/kernels/elephant.kernels.LaplacianKernel.rst @@ -4,4 +4,29 @@ elephant.kernels.LaplacianKernel .. currentmodule:: elephant.kernels .. autoclass:: LaplacianKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction, min_cutoff + :members: + :inherited-members: + + + .. rubric:: Methods + + .. autosummary:: + + ~LaplacianKernel.__call__ + ~LaplacianKernel.boundary_enclosing_area_fraction + ~LaplacianKernel.cdf + ~LaplacianKernel.icdf + ~LaplacianKernel.is_symmetric + ~LaplacianKernel.median_index + + + + + + .. rubric:: Attributes + + .. autosummary:: + + ~LaplacianKernel.min_cutoff + + \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.RectangularKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.RectangularKernel.rst index e658c2418..bf0323bf4 100644 --- a/doc/reference/toctree/kernels/elephant.kernels.RectangularKernel.rst +++ b/doc/reference/toctree/kernels/elephant.kernels.RectangularKernel.rst @@ -4,5 +4,29 @@ elephant.kernels.RectangularKernel .. currentmodule:: elephant.kernels .. autoclass:: RectangularKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction, min_cutoff + :members: + :inherited-members: + + + .. rubric:: Methods + + .. autosummary:: + + ~RectangularKernel.__call__ + ~RectangularKernel.boundary_enclosing_area_fraction + ~RectangularKernel.cdf + ~RectangularKernel.icdf + ~RectangularKernel.is_symmetric + ~RectangularKernel.median_index + + + + + + .. rubric:: Attributes + + .. autosummary:: + + ~RectangularKernel.min_cutoff + \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.SymmetricKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.SymmetricKernel.rst deleted file mode 100644 index b44ad7c45..000000000 --- a/doc/reference/toctree/kernels/elephant.kernels.SymmetricKernel.rst +++ /dev/null @@ -1,8 +0,0 @@ -elephant.kernels.SymmetricKernel -================================ - -.. currentmodule:: elephant.kernels - -.. autoclass:: SymmetricKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction - \ No newline at end of file diff --git a/doc/reference/toctree/kernels/elephant.kernels.TriangularKernel.rst b/doc/reference/toctree/kernels/elephant.kernels.TriangularKernel.rst index 7573937d9..3b5f83460 100644 --- a/doc/reference/toctree/kernels/elephant.kernels.TriangularKernel.rst +++ b/doc/reference/toctree/kernels/elephant.kernels.TriangularKernel.rst @@ -4,5 +4,29 @@ elephant.kernels.TriangularKernel .. currentmodule:: elephant.kernels .. autoclass:: TriangularKernel - :members: __call__, is_symmetric, boundary_enclosing_area_fraction, min_cutoff + :members: + :inherited-members: + + + .. rubric:: Methods + + .. autosummary:: + + ~TriangularKernel.__call__ + ~TriangularKernel.boundary_enclosing_area_fraction + ~TriangularKernel.cdf + ~TriangularKernel.icdf + ~TriangularKernel.is_symmetric + ~TriangularKernel.median_index + + + + + + .. rubric:: Attributes + + .. autosummary:: + + ~TriangularKernel.min_cutoff + \ No newline at end of file diff --git a/elephant/kernels.py b/elephant/kernels.py index 58b93dc83..fb7ac8ade 100644 --- a/elephant/kernels.py +++ b/elephant/kernels.py @@ -5,15 +5,6 @@ firing rate estimation. -Base kernel classes -~~~~~~~~~~~~~~~~~~~ - -.. autosummary:: - :toctree: toctree/kernels/ - - Kernel - SymmetricKernel - Symmetric kernels ~~~~~~~~~~~~~~~~~ @@ -40,7 +31,7 @@ -------- >>> import quantities as pq >>> kernel1 = GaussianKernel(sigma=100*pq.ms) ->>> kernel2 = ExponentialKernel(sigma=8*pq.mm, invert=True) +>>> kernel2 = ExponentialKernel(sigma=8*pq.ms, invert=True) :copyright: Copyright 2016 by the Elephant team, see `doc/authors.rst`. :license: Modified BSD, see LICENSE.txt for details. @@ -48,16 +39,25 @@ from __future__ import division, print_function, unicode_literals +import math + import numpy as np import quantities as pq +import scipy.optimize import scipy.special +import scipy.stats + +__all__ = [ + 'RectangularKernel', 'TriangularKernel', 'EpanechnikovLikeKernel', + 'GaussianKernel', 'LaplacianKernel', 'ExponentialKernel', 'AlphaKernel' +] class Kernel(object): r""" This is the base class for commonly used kernels. - **General definition of kernel:** + **General definition of a kernel:** A function :math:`K(x, y)` is called a kernel function if :math:`\int{K(x, y) g(x) g(y) \textrm{d}x \textrm{d}y} \ \geq 0 \quad @@ -80,13 +80,13 @@ class Kernel(object): to one. Popular choices are the rectangular or Gaussian kernels. Exponential and alpha kernels may also be used to represent the - postynaptic current/potentials in a linear (current-based) model. + postsynaptic current/potentials in a linear (current-based) model. Parameters ---------- sigma : pq.Quantity Standard deviation of the kernel. - invert: bool, optional + invert : bool, optional If True, asymmetric kernels (e.g., exponential or alpha kernels) are inverted along the time axis. Default: False. @@ -103,19 +103,22 @@ class Kernel(object): """ def __init__(self, sigma, invert=False): - - if not (isinstance(sigma, pq.Quantity)): - raise TypeError("sigma must be a quantity!") + if not isinstance(sigma, pq.Quantity): + raise TypeError("'sigma' must be a quantity") if sigma.magnitude < 0: - raise ValueError("sigma cannot be negative!") + raise ValueError("'sigma' cannot be negative") if not isinstance(invert, bool): - raise ValueError("invert must be bool!") + raise ValueError("'invert' must be bool") self.sigma = sigma self.invert = invert + def __repr__(self): + return "{cls}(sigma={sigma}, invert={invert})".format( + cls=self.__class__.__name__, sigma=self.sigma, invert=self.invert) + def __call__(self, t): """ Evaluates the kernel at all points in the array `t`. @@ -139,26 +142,12 @@ def __call__(self, t): If the dimensionality of `t` and :attr:`sigma` are different. """ - if not (isinstance(t, pq.Quantity)): - raise TypeError("The argument of the kernel callable must be " - "of type quantity!") - - if t.dimensionality.simplified != self.sigma.dimensionality.simplified: - raise TypeError("The dimensionality of sigma and the input array " - "to the callable kernel object must be the same. " - "Otherwise a normalization to 1 of the kernel " - "cannot be performed.") - - self._sigma_scaled = self.sigma.rescale(t.units) - # A hidden variable _sigma_scaled is introduced here in order to avoid - # accumulation of floating point errors of sigma upon multiple - # usages of the __call__ - function for the same Kernel instance. - + self._check_time_input(t) return self._evaluate(t) def _evaluate(self, t): """ - Evaluates the kernel. + Evaluates the kernel Probability Density Function, PDF. Parameters ---------- @@ -205,24 +194,9 @@ def boundary_enclosing_area_fraction(self, fraction): a boundary was not possible. """ - self._check_fraction(fraction) - sigma_division = 500 # arbitrary choice - interval = self.sigma / sigma_division - self._sigma_scaled = self.sigma - area = 0 - counter = 0 - while area < fraction: - area += (self._evaluate((counter + 1) * interval) + - self._evaluate(counter * interval)) * interval / 2 - area += (self._evaluate(-1 * (counter + 1) * interval) + - self._evaluate(-1 * counter * interval)) * interval / 2 - counter += 1 - if(counter > 250000): - raise ValueError("fraction was chosen too close to one such " - "that in combination with integral " - "approximation errors the calculation of a " - "boundary was not possible.") - return counter * interval + raise NotImplementedError( + "The Kernel class should not be used directly, " + "instead the subclasses for the single kernels.") def _check_fraction(self, fraction): """ @@ -244,14 +218,78 @@ def _check_fraction(self, fraction): """ if not isinstance(fraction, (float, int)): - raise TypeError("`fraction` must be float or integer!") - if not 0 <= fraction < 1: - raise ValueError("`fraction` must be in the interval [0, 1)!") + raise TypeError("`fraction` must be float or integer") + if isinstance(self, (TriangularKernel, RectangularKernel)): + valid = 0 <= fraction <= 1 + bracket = ']' + else: + valid = 0 <= fraction < 1 + bracket = ')' + if not valid: + raise ValueError("`fraction` must be in the interval " + "[0, 1{}".format(bracket)) + + def _check_time_input(self, t): + if not isinstance(t, pq.Quantity): + raise TypeError("The argument 't' of the kernel callable must be " + "of type Quantity") + + if t.dimensionality.simplified != self.sigma.dimensionality.simplified: + raise TypeError("The dimensionality of sigma and the input array " + "to the callable kernel object must be the same. " + "Otherwise a normalization to 1 of the kernel " + "cannot be performed.") + + def cdf(self, t): + r""" + Cumulative Distribution Function, CDF. + + Parameters + ---------- + t : pq.Quantity + The input time scalar. + + Returns + ------- + float + CDF at `t`. - def median_index(self, t): """ + raise NotImplementedError + + def icdf(self, fraction): + r""" + Inverse Cumulative Distribution Function, ICDF, also known as a + quantile. + + Parameters + ---------- + fraction : float + The fraction of CDF to compute the quantile from. + + Returns + ------- + pq.Quantity + The time scalar `t` such that `CDF(t) = fraction`. + + """ + raise NotImplementedError + + def median_index(self, t): + r""" Estimates the index of the Median of the kernel. - This parameter is not mandatory for symmetrical kernels but it is + + We define the Median index :math:`i` of a kernel as: + + .. math:: + t_i = \text{ICDF}\left( \frac{\text{CDF}(t_0) + + \text{CDF}(t_{N-1})}{2} \right) + + where :math:`t_0` and :math:`t_{N-1}` are the first and last entries of + the input array, CDF and ICDF stand for Cumulative Distribution + Function and its Inverse, respectively. + + This function is not mandatory for symmetrical kernels but it is required when asymmetrical kernels have to be aligned at their median. Parameters @@ -264,33 +302,56 @@ def median_index(self, t): int Index of the estimated value of the kernel median. - Notes - ----- - The formula in this method using retrieval of the sampling interval - from `t` only works for `t` with equidistant intervals! - The formula calculates the Median slightly wrong by the potentially - ignored probability in the distribution corresponding to lower values - than the minimum in the array `t`. + Raises + ------ + TypeError + If the input array is not a time pq.Quantity array. + + ValueError + If the input array is empty. + If the input array is not sorted. + + See Also + -------- + Kernel.cdf : cumulative distribution function + Kernel.icdf : inverse cumulative distribution function """ - return np.nonzero(self(t).cumsum() * - (t[len(t) - 1] - t[0]) / (len(t) - 1) >= 0.5)[0].min() + self._check_time_input(t) + if len(t) == 0: + raise ValueError("The input time array is empty.") + if len(t) <= 2: + # either left or right; choose left + return 0 + is_sorted = (np.diff(t.magnitude) >= 0).all() + if not is_sorted: + raise ValueError("The input time array must be sorted (in " + "ascending order).") + cdf_mean = 0.5 * (self.cdf(t[0]) + self.cdf(t[-1])) + if cdf_mean == 0.: + # any index of the kernel non-support is valid; choose median + return len(t) // 2 + icdf = self.icdf(fraction=cdf_mean) + icdf = icdf.rescale(t.units).magnitude + # icdf is guaranteed to be in (t_start, t_end) interval + median_index = np.nonzero(t.magnitude >= icdf)[0][0] + return median_index def is_symmetric(self): - """ - In the case of symmetric kernels, this method is overwritten in the - class `SymmetricKernel`, where it returns True, hence leaving the - here returned value False for the asymmetric kernels. + r""" + True for symmetric kernels and False otherwise (asymmetric kernels). + + A kernel is symmetric if its PDF is symmetric w.r.t. time: + + .. math:: + \text{pdf}(-t) = \text{pdf}(t) Returns ------- bool - True in classes `SymmetricKernel`, `RectangularKernel`, - `TriangularKernel`, `EpanechnikovLikeKernel`, `GaussianKernel`, - and `LaplacianKernel`. - False in classes `Kernel`, `ExponentialKernel`, and `AlphaKernel`. + Whether the kernels is symmetric or not. """ - return False + return isinstance(self, SymmetricKernel) @property def min_cutoff(self): @@ -310,38 +371,67 @@ class SymmetricKernel(Kernel): Base class for symmetric kernels. """ - def is_symmetric(self): - return True - class RectangularKernel(SymmetricKernel): r""" Class for rectangular kernels. .. math:: - K(t) = \left\{\begin{array}{ll} \frac{1}{2 \tau}, & |t| < \tau \\\ + K(t) = \left\{\begin{array}{ll} \frac{1}{2 \tau}, & |t| < \tau \\ 0, & |t| \geq \tau \end{array} \right. with :math:`\tau = \sqrt{3} \sigma` corresponding to the half width of the kernel. - Besides the standard deviation `sigma`, for consistency of interfaces the - parameter `invert` needed for asymmetric kernels also exists without - having any effect in the case of symmetric kernels. + The parameter `invert` has no effect on symmetric kernels. - Attributes - ---------- - min_cutoff : float + Examples + -------- + + .. plot:: + :include-source: + + from elephant import kernels + import quantities as pq + import numpy as np + import matplotlib.pyplot as plt + + time_array = np.linspace(-3, 3, num=100) * pq.s + kernel = kernels.RectangularKernel(sigma=1*pq.s) + kernel_time = kernel(time_array) + plt.plot(time_array, kernel_time) + plt.title("RectangularKernel with sigma=1s") + plt.xlabel("time, s") + plt.ylabel("kernel, 1/s") + plt.show() """ + @property def min_cutoff(self): min_cutoff = np.sqrt(3.0) return min_cutoff def _evaluate(self, t): - return (0.5 / (np.sqrt(3.0) * self._sigma_scaled)) * \ - (np.absolute(t) < np.sqrt(3.0) * self._sigma_scaled) + t_units = t.units + t_abs = np.abs(t.magnitude) + tau = math.sqrt(3) * self.sigma.rescale(t_units).magnitude + kernel = (t_abs < tau) * 1 / (2 * tau) + kernel = pq.Quantity(kernel, units=1 / t_units) + return kernel + + def cdf(self, t): + self._check_time_input(t) + tau = math.sqrt(3) * self.sigma.rescale(t.units).magnitude + t = np.clip(t.magnitude, a_min=-tau, a_max=tau) + cdf = (t + tau) / (2 * tau) + return cdf + + def icdf(self, fraction): + self._check_fraction(fraction) + tau = math.sqrt(3) * self.sigma + icdf = tau * (2 * fraction - 1) + return icdf def boundary_enclosing_area_fraction(self, fraction): self._check_fraction(fraction) @@ -349,36 +439,65 @@ def boundary_enclosing_area_fraction(self, fraction): class TriangularKernel(SymmetricKernel): - """ + r""" Class for triangular kernels. .. math:: - K(t) = \\left\\{ \\begin{array}{ll} \\frac{1}{\\tau} (1 - - \\frac{|t|}{\\tau}), & |t| < \\tau \\\\ - 0, & |t| \\geq \\tau \\end{array} \\right. + K(t) = \left\{ \begin{array}{ll} \frac{1}{\tau} (1 + - \frac{|t|}{\tau}), & |t| < \tau \\ + 0, & |t| \geq \tau \end{array} \right. - with :math:`\\tau = \\sqrt{6} \\sigma` corresponding to the half width of + with :math:`\tau = \sqrt{6} \sigma` corresponding to the half width of the kernel. - Besides the standard deviation `sigma`, for consistency of interfaces the - parameter `invert` needed for asymmetric kernels also exists without - having any effect in the case of symmetric kernels. + The parameter `invert` has no effect on symmetric kernels. - Attributes - ---------- - min_cutoff : float + Examples + -------- + + .. plot:: + :include-source: + + from elephant import kernels + import quantities as pq + import numpy as np + import matplotlib.pyplot as plt + + time_array = np.linspace(-3, 3, num=1000) * pq.s + kernel = kernels.TriangularKernel(sigma=1*pq.s) + kernel_time = kernel(time_array) + plt.plot(time_array, kernel_time) + plt.title("TriangularKernel with sigma=1s") + plt.xlabel("time, s") + plt.ylabel("kernel, 1/s") + plt.show() """ + @property def min_cutoff(self): min_cutoff = np.sqrt(6.0) return min_cutoff def _evaluate(self, t): - return (1.0 / (np.sqrt(6.0) * self._sigma_scaled)) * np.maximum( - 0.0, - (1.0 - (np.absolute(t) / - (np.sqrt(6.0) * self._sigma_scaled)).magnitude)) + tau = math.sqrt(6) * self.sigma.rescale(t.units).magnitude + kernel = scipy.stats.triang.pdf(t.magnitude, c=0.5, loc=-tau, + scale=2 * tau) + kernel = pq.Quantity(kernel, units=1 / t.units) + return kernel + + def cdf(self, t): + self._check_time_input(t) + tau = math.sqrt(6) * self.sigma.rescale(t.units).magnitude + cdf = scipy.stats.triang.cdf(t.magnitude, c=0.5, loc=-tau, + scale=2 * tau) + return cdf + + def icdf(self, fraction): + self._check_fraction(fraction) + tau = math.sqrt(6) * self.sigma.magnitude + icdf = scipy.stats.triang.ppf(fraction, c=0.5, loc=-tau, scale=2 * tau) + return icdf * self.sigma.units def boundary_enclosing_area_fraction(self, fraction): self._check_fraction(fraction) @@ -386,47 +505,80 @@ def boundary_enclosing_area_fraction(self, fraction): class EpanechnikovLikeKernel(SymmetricKernel): - """ - Class for epanechnikov-like kernels. + r""" + Class for Epanechnikov-like kernels. .. math:: - K(t) = \\left\\{\\begin{array}{ll} (3 /(4 d)) (1 - (t / d)^2), - & |t| < d \\\\ - 0, & |t| \\geq d \\end{array} \\right. + K(t) = \left\{\begin{array}{ll} (3 /(4 d)) (1 - (t / d)^2), + & |t| < d \\ + 0, & |t| \geq d \end{array} \right. - with :math:`d = \\sqrt{5} \\sigma` being the half width of the kernel. + with :math:`d = \sqrt{5} \sigma` being the half width of the kernel. The Epanechnikov kernel under full consideration of its axioms has a half - width of :math:`\\sqrt{5}`. Ignoring one axiom also the respective kernel + width of :math:`\sqrt{5}`. Ignoring one axiom also the respective kernel with half width = 1 can be called Epanechnikov kernel [1]_. However, arbitrary width of this type of kernel is here preferred to be called 'Epanechnikov-like' kernel. - Besides the standard deviation `sigma`, for consistency of interfaces the - parameter `invert` needed for asymmetric kernels also exists without - having any effect in the case of symmetric kernels. - - Attributes - ---------- - min_cutoff : float + The parameter `invert` has no effect on symmetric kernels. References ---------- .. [1] https://de.wikipedia.org/wiki/Epanechnikov-Kern + Examples + -------- + + .. plot:: + :include-source: + + from elephant import kernels + import quantities as pq + import numpy as np + import matplotlib.pyplot as plt + + time_array = np.linspace(-3, 3, num=100) * pq.s + kernel = kernels.EpanechnikovLikeKernel(sigma=1*pq.s) + kernel_time = kernel(time_array) + plt.plot(time_array, kernel_time) + plt.title("EpanechnikovLikeKernel with sigma=1s") + plt.xlabel("time, s") + plt.ylabel("kernel, 1/s") + plt.show() + """ + @property def min_cutoff(self): min_cutoff = np.sqrt(5.0) return min_cutoff def _evaluate(self, t): - return (3.0 / (4.0 * np.sqrt(5.0) * self._sigma_scaled)) * np.maximum( - 0.0, - 1 - (t / (np.sqrt(5.0) * self._sigma_scaled)).magnitude ** 2) + tau = math.sqrt(5) * self.sigma.rescale(t.units).magnitude + t_div_tau = np.clip(t.magnitude / tau, a_min=-1, a_max=1) + kernel = 3. / (4. * tau) * np.maximum(0., 1 - t_div_tau ** 2) + kernel = pq.Quantity(kernel, units=1 / t.units) + return kernel + + def cdf(self, t): + self._check_time_input(t) + tau = math.sqrt(5) * self.sigma.rescale(t.units).magnitude + t_div_tau = np.clip(t.magnitude / tau, a_min=-1, a_max=1) + cdf = 3. / 4 * (t_div_tau - t_div_tau ** 3 / 3.) + 0.5 + return cdf + + def icdf(self, fraction): + self._check_fraction(fraction) + # CDF(t) = -1/4 t^3 + 3/4 t + 1/2 + coefs = [-1. / 4, 0, 3. / 4, 0.5 - fraction] + roots = np.roots(coefs) + icdf = next(root for root in roots if -1 <= root <= 1) + tau = math.sqrt(5) * self.sigma + return icdf * tau def boundary_enclosing_area_fraction(self, fraction): - """ + r""" Refer to :func:`Kernel.boundary_enclosing_area_fraction` for the documentation. @@ -439,9 +591,9 @@ def boundary_enclosing_area_fraction(self, fraction): given in [1]_, where the following 3 solutions are given: * :math:`u_1 = 1`, solution on negative side; - * :math:`u_2 = \\frac{-1 + i\\sqrt{3}}{2}`, solution for larger + * :math:`u_2 = \frac{-1 + i\sqrt{3}}{2}`, solution for larger values than zero crossing of the density; - * :math:`u_3 = \\frac{-1 - i\\sqrt{3}}{2}`, solution for smaller + * :math:`u_3 = \frac{-1 - i\sqrt{3}}{2}`, solution for smaller values than zero crossing of the density. The solution :math:`u_3` is the relevant one for the problem at hand, @@ -455,45 +607,74 @@ def boundary_enclosing_area_fraction(self, fraction): self._check_fraction(fraction) # Python's complex-operator cannot handle quantities, hence the # following construction on quantities is necessary: - Delta_0 = complex(1.0 / (5.0 * self.sigma.magnitude**2), 0) / \ - self.sigma.units**2 + Delta_0 = complex(1.0 / (5.0 * self.sigma.magnitude ** 2), 0) / \ + self.sigma.units ** 2 Delta_1 = complex(2.0 * np.sqrt(5.0) * fraction / - (25.0 * self.sigma.magnitude**3), 0) / \ - self.sigma.units**3 - C = ((Delta_1 + (Delta_1**2.0 - 4.0 * Delta_0**3.0)**(1.0 / 2.0)) / - 2.0)**(1.0 / 3.0) + (25.0 * self.sigma.magnitude ** 3), 0) / \ + self.sigma.units ** 3 + C = ((Delta_1 + (Delta_1 ** 2.0 - 4.0 * Delta_0 ** 3.0) ** ( + 1.0 / 2.0)) / + 2.0) ** (1.0 / 3.0) u_3 = complex(-1.0 / 2.0, -np.sqrt(3.0) / 2.0) - b = -5.0 * self.sigma**2 * (u_3 * C + Delta_0 / (u_3 * C)) + b = -5.0 * self.sigma ** 2 * (u_3 * C + Delta_0 / (u_3 * C)) return b.real class GaussianKernel(SymmetricKernel): - """ + r""" Class for gaussian kernels. .. math:: - K(t) = (\\frac{1}{\\sigma \\sqrt{2 \\pi}}) - \\exp(-\\frac{t^2}{2 \\sigma^2}) + K(t) = (\frac{1}{\sigma \sqrt{2 \pi}}) \exp(-\frac{t^2}{2 \sigma^2}) - with :math:`\\sigma` being the standard deviation. + with :math:`\sigma` being the standard deviation. - Besides the standard deviation `sigma`, for consistency of interfaces the - parameter `invert` needed for asymmetric kernels also exists without - having any effect in the case of symmetric kernels. + The parameter `invert` has no effect on symmetric kernels. - Attributes - ---------- - min_cutoff : float + Examples + -------- + + .. plot:: + :include-source: + + from elephant import kernels + import quantities as pq + import numpy as np + import matplotlib.pyplot as plt + + time_array = np.linspace(-3, 3, num=100) * pq.s + kernel = kernels.GaussianKernel(sigma=1*pq.s) + kernel_time = kernel(time_array) + plt.plot(time_array, kernel_time) + plt.title("GaussianKernel with sigma=1s") + plt.xlabel("time, s") + plt.ylabel("kernel, 1/s") + plt.show() """ + @property def min_cutoff(self): min_cutoff = 3.0 return min_cutoff def _evaluate(self, t): - return (1.0 / (np.sqrt(2.0 * np.pi) * self._sigma_scaled)) * np.exp( - -0.5 * (t / self._sigma_scaled).magnitude ** 2) + sigma = self.sigma.rescale(t.units).magnitude + kernel = scipy.stats.norm.pdf(t.magnitude, loc=0, scale=sigma) + kernel = pq.Quantity(kernel, units=1 / t.units) + return kernel + + def cdf(self, t): + self._check_time_input(t) + sigma = self.sigma.rescale(t.units).magnitude + cdf = scipy.stats.norm.cdf(t, loc=0, scale=sigma) + return cdf + + def icdf(self, fraction): + self._check_fraction(fraction) + icdf = scipy.stats.norm.ppf(fraction, loc=0, + scale=self.sigma.magnitude) + return icdf * self.sigma.units def boundary_enclosing_area_fraction(self, fraction): self._check_fraction(fraction) @@ -501,31 +682,60 @@ def boundary_enclosing_area_fraction(self, fraction): class LaplacianKernel(SymmetricKernel): - """ + r""" Class for laplacian kernels. .. math:: - K(t) = \\frac{1}{2 \\tau} \\exp(-|\\frac{t}{\\tau}|) + K(t) = \frac{1}{2 \tau} \exp\left(-\left|\frac{t}{\tau}\right|\right) - with :math:`\\tau = \\sigma / \\sqrt{2}`. + with :math:`\tau = \sigma / \sqrt{2}`. - Besides the standard deviation `sigma`, for consistency of interfaces the - parameter `invert` needed for asymmetric kernels also exists without - having any effect in the case of symmetric kernels. + The parameter `invert` has no effect on symmetric kernels. - Attributes - ---------- - min_cutoff : float + Examples + -------- + + .. plot:: + :include-source: + + from elephant import kernels + import quantities as pq + import numpy as np + import matplotlib.pyplot as plt + + time_array = np.linspace(-3, 3, num=1000) * pq.s + kernel = kernels.LaplacianKernel(sigma=1*pq.s) + kernel_time = kernel(time_array) + plt.plot(time_array, kernel_time) + plt.title("LaplacianKernel with sigma=1s") + plt.xlabel("time, s") + plt.ylabel("kernel, 1/s") + plt.show() """ + @property def min_cutoff(self): min_cutoff = 3.0 return min_cutoff def _evaluate(self, t): - return (1 / (np.sqrt(2.0) * self._sigma_scaled)) * np.exp( - -(np.absolute(t) * np.sqrt(2.0) / self._sigma_scaled).magnitude) + tau = self.sigma.rescale(t.units).magnitude / math.sqrt(2) + kernel = scipy.stats.laplace.pdf(t.magnitude, loc=0, scale=tau) + kernel = pq.Quantity(kernel, units=1 / t.units) + return kernel + + def cdf(self, t): + self._check_time_input(t) + tau = self.sigma.rescale(t.units).magnitude / math.sqrt(2) + cdf = scipy.stats.laplace.cdf(t.magnitude, loc=0, scale=tau) + return cdf + + def icdf(self, fraction): + self._check_fraction(fraction) + tau = self.sigma.magnitude / math.sqrt(2) + icdf = scipy.stats.laplace.ppf(fraction, loc=0, scale=tau) + return icdf * self.sigma.units def boundary_enclosing_area_fraction(self, fraction): self._check_fraction(fraction) @@ -537,74 +747,156 @@ def boundary_enclosing_area_fraction(self, fraction): class ExponentialKernel(Kernel): - """ + r""" Class for exponential kernels. .. math:: - K(t) = \\left\\{\\begin{array}{ll} (1 / \\tau) \\exp{(-t / \\tau)}, - & t > 0 \\\\ - 0, & t \\leq 0 \\end{array} \\right. + K(t) = \left\{\begin{array}{ll} (1 / \tau) \exp{(-t / \tau)}, + & t > 0 \\ + 0, & t \leq 0 \end{array} \right. - with :math:`\\tau = \\sigma`. + with :math:`\tau = \sigma`. + + Examples + -------- + + .. plot:: + :include-source: + + from elephant import kernels + import quantities as pq + import numpy as np + import matplotlib.pyplot as plt + + time_array = np.linspace(-1, 4, num=100) * pq.s + kernel = kernels.ExponentialKernel(sigma=1*pq.s) + kernel_time = kernel(time_array) + plt.plot(time_array, kernel_time) + plt.title("ExponentialKernel with sigma=1s") + plt.xlabel("time, s") + plt.ylabel("kernel, 1/s") + plt.show() - Attributes - ---------- - min_cutoff : float """ + @property def min_cutoff(self): min_cutoff = 3.0 return min_cutoff def _evaluate(self, t): - if not self.invert: - kernel = (t >= 0) * (1. / self._sigma_scaled.magnitude) *\ - np.exp((-t / self._sigma_scaled).magnitude) / t.units - elif self.invert: - kernel = (t <= 0) * (1. / self._sigma_scaled.magnitude) *\ - np.exp((t / self._sigma_scaled).magnitude) / t.units + tau = self.sigma.rescale(t.units).magnitude + if self.invert: + t = -t + kernel = scipy.stats.expon.pdf(t.magnitude, loc=0, scale=tau) + kernel = pq.Quantity(kernel, units=1 / t.units) return kernel - def boundary_enclosing_area_fraction(self, fraction): + def cdf(self, t): + self._check_time_input(t) + tau = self.sigma.rescale(t.units).magnitude + t = t.magnitude + if self.invert: + t = np.minimum(t, 0) + return np.exp(t / tau) + t = np.maximum(t, 0) + return 1. - np.exp(-t / tau) + + def icdf(self, fraction): self._check_fraction(fraction) + if self.invert: + return self.sigma * np.log(fraction) return -self.sigma * np.log(1.0 - fraction) + def boundary_enclosing_area_fraction(self, fraction): + # the boundary b, which encloses a 'fraction' of CDF in [-b, b] range, + # does not depend on the invert, if the kernel is cut at zero. + # It's easier to compute 'b' for a kernel that has not been inverted. + kernel = self.__class__(sigma=self.sigma, invert=False) + return kernel.icdf(fraction) + class AlphaKernel(Kernel): - """ + r""" Class for alpha kernels. .. math:: - K(t) = \\left\\{\\begin{array}{ll} (1 / \\tau^2) - \\ t\\ \\exp{(-t / \\tau)}, & t > 0 \\\\ - 0, & t \\leq 0 \\end{array} \\right. + K(t) = \left\{\begin{array}{ll} (1 / \tau^2) + \ t\ \exp{(-t / \tau)}, & t > 0 \\ + 0, & t \leq 0 \end{array} \right. - with :math:`\\tau = \\sigma / \\sqrt{2}`. + with :math:`\tau = \sigma / \sqrt{2}`. - For the alpha kernel an analytical expression for the boundary of the - integral as a function of the area under the alpha kernel function - cannot be given. Hence in this case the value of the boundary is - determined by kernel-approximating numerical integration, inherited - from the `Kernel` class. + Examples + -------- - Attributes - ---------- - min_cutoff : float + .. plot:: + :include-source: + + from elephant import kernels + import quantities as pq + import numpy as np + import matplotlib.pyplot as plt + + time_array = np.linspace(-1, 4, num=100) * pq.s + kernel = kernels.AlphaKernel(sigma=1*pq.s) + kernel_time = kernel(time_array) + plt.plot(time_array, kernel_time) + plt.title("AlphaKernel with sigma=1s") + plt.xlabel("time, s") + plt.ylabel("kernel, 1/s") + plt.show() """ + @property def min_cutoff(self): min_cutoff = 3.0 return min_cutoff def _evaluate(self, t): - if not self.invert: - kernel = (t >= 0) * 2. * (t / self._sigma_scaled**2).magnitude *\ - np.exp(( - -t * np.sqrt(2.) / self._sigma_scaled).magnitude) / t.units - elif self.invert: - kernel = (t <= 0) * -2. * (t / self._sigma_scaled**2).magnitude *\ - np.exp(( - t * np.sqrt(2.) / self._sigma_scaled).magnitude) / t.units + t_units = t.units + tau = self.sigma.rescale(t_units).magnitude / math.sqrt(2) + t = t.magnitude + if self.invert: + t = -t + kernel = (t >= 0) * 1 / tau ** 2 * t * np.exp(-t / tau) + kernel = pq.Quantity(kernel, units=1 / t_units) return kernel + + def cdf(self, t): + self._check_time_input(t) + tau = self.sigma.rescale(t.units).magnitude / math.sqrt(2) + cdf = self._cdf_stripped(t.magnitude, tau) + return cdf + + def _cdf_stripped(self, t, tau): + # CDF without time units + if self.invert: + t = np.minimum(t, 0) + return np.exp(t / tau) * (tau - t) / tau + t = np.maximum(t, 0) + return 1 - np.exp(-t / tau) * (t + tau) / tau + + def icdf(self, fraction): + self._check_fraction(fraction) + tau = self.sigma.magnitude / math.sqrt(2) + + def cdf(x): + # CDF fof the AlphaKernel, subtracted 'fraction' + # evaluates the error of the root of cdf(x) = fraction + return self._cdf_stripped(x, tau) - fraction + + # fraction is a good starting point for CDF approximation + x0 = fraction if not self.invert else fraction - 1 + x_quantile = scipy.optimize.fsolve(cdf, x0=x0, xtol=1e-7)[0] + x_quantile = pq.Quantity(x_quantile, units=self.sigma.units) + return x_quantile + + def boundary_enclosing_area_fraction(self, fraction): + # the boundary b, which encloses a 'fraction' of CDF in [-b, b] range, + # does not depend on the invert, if the kernel is cut at zero. + # It's easier to compute 'b' for a kernel that has not been inverted. + kernel = self.__class__(sigma=self.sigma, invert=False) + return kernel.icdf(fraction) diff --git a/elephant/statistics.py b/elephant/statistics.py index e569cd0e6..5a76a5850 100644 --- a/elephant/statistics.py +++ b/elephant/statistics.py @@ -62,6 +62,7 @@ # (quantities rescale does not work with unicodes) import numpy as np +import math import quantities as pq import scipy.stats import scipy.signal @@ -71,6 +72,8 @@ import elephant.kernels as kernels import warnings +from elephant.utils import is_time_quantity + cv = scipy.stats.variation @@ -127,18 +130,20 @@ def mean_firing_rate(spiketrain, t_start=None, t_stop=None, axis=None): t_start : float or pq.Quantity, optional The start time to use for the interval. If None, retrieved from the `t_start` attribute of `spiketrain`. If - that is not present, default to 0. Any value from `spiketrain` below - this value is ignored. + that is not present, default to 0. All spiketrain's spike times below + this value are ignored. Default: None. t_stop : float or pq.Quantity, optional The stop time to use for the time points. If not specified, retrieved from the `t_stop` attribute of `spiketrain`. If that is not present, default to the maximum value of - `spiketrain`. Any value from `spiketrain` above this value is ignored. + `spiketrain`. All spiketrain's spike times above this value are + ignored. Default: None. axis : int, optional - The axis over which to do the calculation. - If None, do the calculation over the flattened array. + The axis over which to do the calculation; has no effect when the + input is a neo.SpikeTrain, because a neo.SpikeTrain is always a 1-d + vector. If None, do the calculation over the flattened array. Default: None. Returns @@ -149,57 +154,58 @@ def mean_firing_rate(spiketrain, t_start=None, t_stop=None, axis=None): Raises ------ TypeError - If `spiketrain` is a `np.ndarray` and `t_start` or `t_stop` is + If the input spiketrain is a `np.ndarray` but `t_start` or `t_stop` is `pq.Quantity`. - Notes - ----- - If `spiketrain` is a `pq.Quantity` or `neo.SpikeTrain`, and `t_start` or - `t_stop` are not `pq.Quantity`, `t_start` and `t_stop` are assumed to have - the same units as `spiketrain`. + If the input spiketrain is a `neo.SpikeTrain` or `pq.Quantity` but + `t_start` or `t_stop` is not `pq.Quantity`. + ValueError + If the input spiketrain is empty. """ - if t_start is None: - t_start = getattr(spiketrain, 't_start', 0) - - found_t_start = False - if t_stop is None: - if hasattr(spiketrain, 't_stop'): - t_stop = spiketrain.t_stop - else: - t_stop = np.max(spiketrain, axis=axis) - found_t_start = True + if isinstance(spiketrain, pq.Quantity): + # Quantity or neo.SpikeTrain + if not is_time_quantity(t_start, allow_none=True): + raise TypeError("'t_start' must be a Quantity or None") + if not is_time_quantity(t_stop, allow_none=True): + raise TypeError("'t_stop' must be a Quantity or None") - # figure out what units, if any, we are dealing with - if hasattr(spiketrain, 'units'): units = spiketrain.units + if t_start is None: + t_start = getattr(spiketrain, 't_start', 0 * units) + t_start = t_start.rescale(units).magnitude + if t_stop is None: + t_stop = getattr(spiketrain, 't_stop', + np.max(spiketrain, axis=axis)) + t_stop = t_stop.rescale(units).magnitude + + # calculate as a numpy array + rates = mean_firing_rate(spiketrain.magnitude, t_start=t_start, + t_stop=t_stop, axis=axis) + + rates = pq.Quantity(rates, units=1. / units) + return rates + elif isinstance(spiketrain, (np.ndarray, list, tuple)): + if isinstance(t_start, pq.Quantity) or isinstance(t_stop, pq.Quantity): + raise TypeError("'t_start' and 't_stop' cannot be quantities if " + "'spiketrain' is not a Quantity.") + spiketrain = np.asarray(spiketrain) + if len(spiketrain) == 0: + raise ValueError("Empty input spiketrain.") + if t_start is None: + t_start = 0 + if t_stop is None: + t_stop = np.max(spiketrain, axis=axis) + time_interval = t_stop - t_start + if axis and isinstance(t_stop, np.ndarray): + t_stop = np.expand_dims(t_stop, axis) + rates = np.sum((spiketrain >= t_start) & (spiketrain <= t_stop), + axis=axis) / time_interval + return rates else: - units = None - - # convert everything to the same units - if hasattr(t_start, 'units'): - if units is None: - raise TypeError('t_start cannot be a Quantity if ' - 'spiketrain is not a quantity') - t_start = t_start.rescale(units) - elif units is not None: - t_start = pq.Quantity(t_start, units=units) - if hasattr(t_stop, 'units'): - if units is None: - raise TypeError('t_stop cannot be a Quantity if ' - 'spiketrain is not a quantity') - t_stop = t_stop.rescale(units) - elif units is not None: - t_stop = pq.Quantity(t_stop, units=units) - - if not axis or not found_t_start: - return np.sum((spiketrain >= t_start) & (spiketrain <= t_stop), - axis=axis) / (t_stop - t_start) - else: - # this is needed to handle broadcasting between spiketrain and t_stop - t_stop_test = np.expand_dims(t_stop, axis) - return np.sum((spiketrain >= t_start) & (spiketrain <= t_stop_test), - axis=axis) / (t_stop - t_start) + raise TypeError("Invalid input spiketrain type: '{}'. Allowed: " + "neo.SpikeTrain, Quantity, ndarray". + format(type(spiketrain))) def fanofactor(spiketrains): @@ -236,13 +242,34 @@ def fanofactor(spiketrains): spike_counts = np.array([len(t) for t in spiketrains]) # Compute FF - if all([count == 0 for count in spike_counts]): + if all(count == 0 for count in spike_counts): fano = np.nan else: fano = spike_counts.var() / spike_counts.mean() return fano +def __variation_check(v, with_nan): + # ensure the input ia a vector + if v.ndim != 1: + raise ValueError("The input must be a vector, not a {}-dim matrix.". + format(v.ndim)) + + # ensure we have enough entries + if v.size < 2: + if with_nan: + warnings.warn("The input size is too small. Please provide" + "an input with more than 1 entry. Returning `NaN`" + "since the argument `with_nan` is `True`") + return np.NaN + else: + raise ValueError("Input size is too small. Please provide " + "an input with more than 1 entry. Set 'with_nan' " + "to True to replace the error by a warning.") + + return None + + def lv(v, with_nan=False): r""" Calculate the measure of local variation LV for a sequence of time @@ -300,26 +327,11 @@ def lv(v, with_nan=False): """ # convert to array, cast to float v = np.asarray(v) - # ensure the input is a vector - if len(v.shape) > 1: - raise ValueError("Input shape is larger than 1. Please provide " - "a vector as an input.") - - # ensure we have enough entries - if v.size < 2: - if with_nan: - warnings.warn("Input size is too small. Please provide " - "an input with more than 1 entry. lv returns 'NaN'" - "since the argument `with_nan` is True") - return np.NaN - - else: - raise ValueError("Input size is too small. Please provide " - "an input with more than 1 entry. lv returned any" - "value since the argument `with_nan` is False") + np_nan = __variation_check(v, with_nan) + if np_nan is not None: + return np_nan # calculate LV and return result - # raise error if input is multi-dimensional return 3. * np.mean(np.power(np.diff(v) / (v[:-1] + v[1:]), 2)) @@ -382,42 +394,29 @@ def cv2(v, with_nan=False): """ # convert to array, cast to float v = np.asarray(v) - - # ensure the input ia a vector - if len(v.shape) > 1: - raise ValueError("Input shape is larger than 1. Please provide " - "a vector as an input.") - - # ensure we have enough entries - if v.size < 2: - if with_nan: - warnings.warn("Input size is too small. Please provide" - "an input with more than 1 entry. cv2 returns `NaN`" - "since the argument `with_nan` is `True`") - return np.NaN - else: - raise ValueError("Input size is too small. Please provide " - "an input with more than 1 entry. cv2 returns any" - "value since the argument `with_nan` is `False`") + np_nan = __variation_check(v, with_nan) + if np_nan is not None: + return np_nan # calculate CV2 and return result return 2. * np.mean(np.absolute(np.diff(v)) / (v[:-1] + v[1:])) def instantaneous_rate(spiketrain, sampling_period, kernel='auto', - cutoff=5.0, t_start=None, t_stop=None, trim=False): + cutoff=5.0, t_start=None, t_stop=None, trim=False, + center_kernel=True): """ Estimates instantaneous firing rate by kernel convolution. Parameters - ----------- + ---------- spiketrain : neo.SpikeTrain or list of neo.SpikeTrain Neo object(s) that contains spike times, the unit of the time stamps, and `t_start` and `t_stop` of the spike train. sampling_period : pq.Quantity Time stamp resolution of the spike times. The same resolution will be assumed for the kernel. - kernel : str or `kernels.Kernel`, optional + kernel : 'auto' or Kernel, optional The string 'auto' or callable object of class `kernels.Kernel`. The kernel is used for convolution with the spike train and its standard deviation determines the time resolution of the instantaneous @@ -445,16 +444,25 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', `spiketrain`. Default: None. trim : bool, optional + Accounts for the asymmetry of a kernel. If False, the output of the Fast Fourier Transformation being a longer vector than the input vector by the size of the kernel is reduced back to the original size of the considered time interval of the - `spiketrain` using the median of the kernel. + `spiketrain` using the median of the kernel. False (no trimming) is + equivalent to 'same' convolution mode for symmetrical kernels. If True, only the region of the convolved signal is returned, where there is complete overlap between kernel and spike train. This is achieved by reducing the length of the output of the Fast Fourier Transformation by a total of two times the size of the kernel, and - `t_start` and `t_stop` are adjusted. + `t_start` and `t_stop` are adjusted. True (trimming) is equivalent to + 'valid' convolution mode for symmetrical kernels. Default: False. + center_kernel : bool, optional + If set to True, the kernel will be translated such that its median is + centered on the spike, thus putting equal weight before and after the + spike. If False, no adjustment is performed such that the spike sits at + the origin of the kernel. + Default: True Returns ------- @@ -467,7 +475,7 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', Raises ------ - TypeError: + TypeError If `spiketrain` is not an instance of `neo.SpikeTrain`. If `sampling_period` is not a `pq.Quantity`. @@ -481,8 +489,7 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', If `t_start` and `t_stop` are neither None nor a `pq.Quantity`. If `trim` is not `bool`. - - ValueError: + ValueError If `sampling_period` is smaller than zero. If `kernel` is 'auto' and the function was unable to calculate optimal @@ -495,7 +502,7 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', of the kernel is adjusted to a minimally allowed width. If the instantaneous firing rate approximation contains negative values - with respect to a tolerance (less than -1e-8), possibly due to machine + with respect to a tolerance (less than -1e-5), possibly due to machine precision errors. References @@ -515,7 +522,7 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', """ # Merge spike trains if list of spike trains given: if isinstance(spiketrain, list): - _check_consistency_of_spiketrainlist( + _check_consistency_of_spiketrains( spiketrain, t_start=t_start, t_stop=t_stop) if t_start is None: t_start = spiketrain[0].t_start @@ -534,23 +541,23 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', # Checks of input variables: if not isinstance(spiketrain, SpikeTrain): raise TypeError( - "spiketrain must be instance of :class:`SpikeTrain` of Neo!\n" - " Found: %s, value %s" % (type(spiketrain), str(spiketrain))) + "'spiketrain' must be an instance of neo.SpikeTrain. \n" + "Found: '{}'".format(type(spiketrain))) - if not (isinstance(sampling_period, pq.Quantity) and - sampling_period.dimensionality.simplified == - pq.Quantity(1, "s").dimensionality): + if not is_time_quantity(sampling_period): raise TypeError( - "The sampling period must be a time quantity!\n" - " Found: %s, value %s" % ( - type(sampling_period), str(sampling_period))) + "The 'sampling_period' must be a time Quantity. \n" + "Found: {}".format(type(sampling_period))) if sampling_period.magnitude < 0: - raise ValueError("The sampling period must be larger than zero.") + raise ValueError("The 'sampling_period' ({}) must be non-negative.". + format(sampling_period)) if kernel == 'auto': - kernel_width_sigma = sskernel( - spiketrain.magnitude, tin=None, bootstrap=False)['optw'] + kernel_width_sigma = None + if len(spiketrain) > 0: + kernel_width_sigma = sskernel( + spiketrain.magnitude, tin=None, bootstrap=False)['optw'] if kernel_width_sigma is None: raise ValueError( "Unable to calculate optimal kernel width for " @@ -558,25 +565,21 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', kernel = kernels.GaussianKernel(kernel_width_sigma * spiketrain.units) elif not isinstance(kernel, kernels.Kernel): raise TypeError( - "kernel must be either instance of :class:`Kernel` " - "or the string 'auto'!\n" - " Found: %s, value %s" % (type(kernel), str(kernel))) + "'kernel' must be either instance of class elephant.kernels.Kernel" + " or the string 'auto'. Found: %s, value %s" % (type(kernel), + str(kernel))) - if not (isinstance(cutoff, float) or isinstance(cutoff, int)): - raise TypeError("cutoff must be float or integer!") + if not isinstance(cutoff, (float, int)): + raise TypeError("'cutoff' must be float or integer") - if not (t_start is None or (isinstance(t_start, pq.Quantity) and - t_start.dimensionality.simplified == - pq.Quantity(1, "s").dimensionality)): - raise TypeError("t_start must be a time quantity!") + if not is_time_quantity(t_start, allow_none=True): + raise TypeError("'t_start' must be a time Quantity") - if not (t_stop is None or (isinstance(t_stop, pq.Quantity) and - t_stop.dimensionality.simplified == - pq.Quantity(1, "s").dimensionality)): - raise TypeError("t_stop must be a time quantity!") + if not is_time_quantity(t_stop, allow_none=True): + raise TypeError("'t_stop' must be a time Quantity") - if not (isinstance(trim, bool)): - raise TypeError("trim must be bool!") + if not isinstance(trim, bool): + raise TypeError("'trim' must be bool") # main function: units = pq.CompoundUnit( @@ -592,14 +595,12 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', else: t_stop = t_stop.rescale(spiketrain.units) - time_vector = np.zeros(int((t_stop - t_start)) + 1) - - spikes_slice = spiketrain.time_slice(t_start, t_stop) \ - if len(spiketrain) else np.array([]) - - for spike in spikes_slice: - index = int((spike - t_start)) - time_vector[index] += 1 + # float32 makes fftconvolve less precise which may result in nan + time_vector = np.zeros(int(t_stop - t_start) + 1, dtype=np.float64) + spikes_slice = spiketrain.time_slice(t_start, t_stop) + bins_active = (spikes_slice.times - t_start).magnitude.astype(np.int32) + bins_unique, bin_counts = np.unique(bins_active, return_counts=True) + time_vector[bins_unique] = bin_counts if cutoff < kernel.min_cutoff: cutoff = kernel.min_cutoff @@ -611,25 +612,45 @@ def instantaneous_rate(spiketrain, sampling_period, kernel='auto', sampling_period.rescale(units).magnitude, sampling_period.rescale(units).magnitude) * units - r = scipy.signal.fftconvolve(time_vector, - kernel(t_arr).rescale(pq.Hz).magnitude, - 'full') - if np.any(r < -1e-8): # abs tolerance in np.isclose + if center_kernel: + # keep the full convolve range and do the trimming afterwards; + # trimming is performed according to the kernel median index + fft_mode = 'full' + elif trim: + # no median index trimming is involved + fft_mode = 'valid' + else: + # no median index trimming is involved + fft_mode = 'same' + rate = scipy.signal.fftconvolve(time_vector, + kernel(t_arr).rescale(pq.Hz).magnitude, + mode=fft_mode) + + if np.any(rate < -1e-8): # abs tolerance in np.isclose warnings.warn("Instantaneous firing rate approximation contains " "negative values, possibly caused due to machine " "precision errors.") - if not trim: - r = r[kernel.median_index(t_arr):-(kernel(t_arr).size - - kernel.median_index(t_arr))] - elif trim: - r = r[2 * kernel.median_index(t_arr):-2 * (kernel(t_arr).size - - kernel.median_index(t_arr))] - t_start += kernel.median_index(t_arr) * spiketrain.units - t_stop -= (kernel(t_arr).size - - kernel.median_index(t_arr)) * spiketrain.units + median_id = kernel.median_index(t_arr) + # the size of kernel() output matches the input size + kernel_array_size = len(t_arr) + if center_kernel: + # account for the kernel asymmetry + if not trim: + rate = rate[median_id: -kernel_array_size + median_id] + else: + rate = rate[2 * median_id: -2 * (kernel_array_size - median_id)] + t_start = t_start + median_id * spiketrain.units + t_stop = t_stop - (kernel_array_size - median_id + ) * spiketrain.units + else: + # (to be consistent with center_kernel=True) + # n points have n-1 intervals; + # instantaneous rate is a list of intervals; + # hence, the last element is excluded + rate = rate[:-1] - rate = neo.AnalogSignal(signal=r.reshape(r.size, 1), + rate = neo.AnalogSignal(signal=np.expand_dims(rate, axis=1), sampling_period=sampling_period, units=pq.Hz, t_start=t_start, t_stop=t_stop) @@ -715,19 +736,13 @@ def time_histogram(spiketrains, binsize, t_start=None, t_stop=None, if t_stop is None: # Find the internal range for t_stop - if min_tstop: - t_stop = min_tstop - if not all([min_tstop == t.t_stop for t in spiketrains]): - warnings.warn( - "Spiketrains have different t_stop values -- " - "using minimum t_stop as t_stop.") - else: + if not min_tstop: min_tstop = conv._get_start_stop_from_input(spiketrains)[1] - t_stop = min_tstop - if not all([min_tstop == t.t_stop for t in spiketrains]): - warnings.warn( - "Spiketrains have different t_stop values -- " - "using minimum t_stop as t_stop.") + t_stop = min_tstop + if not all([min_tstop == t.t_stop for t in spiketrains]): + warnings.warn( + "Spiketrains have different t_stop values -- " + "using minimum t_stop as t_stop.") sts_cut = [st.time_slice(t_start=t_start, t_stop=t_stop) for st in spiketrains] @@ -755,7 +770,7 @@ def time_histogram(spiketrains, binsize, t_start=None, t_stop=None, else: raise ValueError('Parameter output is not valid.') - return neo.AnalogSignal(signal=bin_hist.reshape(bin_hist.size, 1), + return neo.AnalogSignal(signal=np.expand_dims(bin_hist, axis=1), sampling_period=binsize, units=bin_hist.units, t_start=t_start) @@ -811,7 +826,7 @@ def complexity_pdf(spiketrains, binsize): complexity_hist = complexity_hist / complexity_hist.sum() # Convert the Complexity pdf to an neo.AnalogSignal complexity_distribution = neo.AnalogSignal( - np.array(complexity_hist).reshape(len(complexity_hist), 1) * + np.expand_dims(complexity_hist, axis=1) * pq.dimensionless, t_start=0 * pq.dimensionless, sampling_period=1 * pq.dimensionless) @@ -836,9 +851,9 @@ def nextpow2(x): """ Return the smallest integral power of 2 that is equal or larger than `x`. """ - n = 2 - while n < x: - n = 2 * n + # PYTHON2: math.log2 does not exist + log2_n = int(math.ceil(math.log(x, 2))) + n = 2 ** log2_n return n @@ -1073,22 +1088,17 @@ def sskernel(spiketimes, tin=None, w=None, bootstrap=False): 'yb': yb} -def _check_consistency_of_spiketrainlist(spiketrainlist, t_start=None, - t_stop=None): - for spiketrain in spiketrainlist: - if not isinstance(spiketrain, SpikeTrain): - raise TypeError( - "spike train must be instance of :class:`SpikeTrain` of Neo!\n" - " Found: %s, value %s" % ( - type(spiketrain), str(spiketrain))) - if t_start is None and not spiketrain.t_start == spiketrainlist[ - 0].t_start: - raise ValueError( - "the spike trains must have the same t_start!") - if t_stop is None and not spiketrain.t_stop == spiketrainlist[ - 0].t_stop: - raise ValueError( - "the spike trains must have the same t_stop!") - if not spiketrain.units == spiketrainlist[0].units: - raise ValueError( - "the spike trains must have the same units!") +def _check_consistency_of_spiketrains(spiketrains, t_start=None, + t_stop=None): + for st in spiketrains: + if not isinstance(st, SpikeTrain): + raise TypeError("The spike trains must be instances of " + "neo.SpikeTrain. Found: '{}'". + format(type(st))) + + if t_start is None and not st.t_start == spiketrains[0].t_start: + raise ValueError("The spike trains must have the same t_start.") + if t_stop is None and not st.t_stop == spiketrains[0].t_stop: + raise ValueError("The spike trains must have the same t_stop.") + if not st.units == spiketrains[0].units: + raise ValueError("The spike trains must have the same units.") diff --git a/elephant/test/test_kernels.py b/elephant/test/test_kernels.py index dcc1a3f66..009ca7df5 100644 --- a/elephant/test/test_kernels.py +++ b/elephant/test/test_kernels.py @@ -6,23 +6,26 @@ :license: Modified BSD, see LICENSE.txt for details. """ +import math import unittest +import warnings import numpy as np import quantities as pq import scipy.integrate as spint +from numpy.testing import assert_array_almost_equal + import elephant.kernels as kernels class kernel_TestCase(unittest.TestCase): def setUp(self): - self.kernel_types = [obj for obj in kernels.__dict__.values() - if isinstance(obj, type) and - issubclass(obj, kernels.Kernel) and - hasattr(obj, "_evaluate") and - obj is not kernels.Kernel and - obj is not kernels.SymmetricKernel] - self.fraction = 0.9999 + self.kernel_types = tuple( + kern_cls for kern_cls in kernels.__dict__.values() + if isinstance(kern_cls, type) and + issubclass(kern_cls, kernels.Kernel) and + kern_cls is not kernels.Kernel and + kern_cls is not kernels.SymmetricKernel) def test_error_kernels(self): """ @@ -31,18 +34,18 @@ def test_error_kernels(self): self.assertRaises( TypeError, kernels.RectangularKernel, sigma=2.0) self.assertRaises( - ValueError, kernels.RectangularKernel, sigma=-0.03*pq.s) + ValueError, kernels.RectangularKernel, sigma=-0.03 * pq.s) self.assertRaises( - ValueError, kernels.RectangularKernel, sigma=2.0*pq.ms, + ValueError, kernels.AlphaKernel, sigma=2.0 * pq.ms, invert=2) - rec_kernel = kernels.RectangularKernel(sigma=0.3*pq.ms) + rec_kernel = kernels.RectangularKernel(sigma=0.3 * pq.ms) self.assertRaises( TypeError, rec_kernel, [1, 2, 3]) self.assertRaises( - TypeError, rec_kernel, [1, 2, 3]*pq.V) - kernel = kernels.Kernel(sigma=0.3*pq.ms) + TypeError, rec_kernel, [1, 2, 3] * pq.V) + kernel = kernels.Kernel(sigma=0.3 * pq.ms) self.assertRaises( - NotImplementedError, kernel._evaluate, [1, 2, 3]*pq.V) + NotImplementedError, kernel._evaluate, [1, 2, 3] * pq.V) self.assertRaises( NotImplementedError, kernel.boundary_enclosing_area_fraction, fraction=0.9) @@ -53,22 +56,22 @@ def test_error_kernels(self): self.assertEqual(kernel.is_symmetric(), False) self.assertEqual(rec_kernel.is_symmetric(), True) - @unittest.skip('very time-consuming test') - def test_error_alpha_kernel(self): - alp_kernel = kernels.AlphaKernel(sigma=0.3*pq.ms) - self.assertRaises(ValueError, - alp_kernel.boundary_enclosing_area_fraction, 0.9999999) + def test_alpha_kernel_extreme(self): + alp_kernel = kernels.AlphaKernel(sigma=0.3 * pq.ms) + quantile = alp_kernel.boundary_enclosing_area_fraction(0.9999999) + self.assertAlmostEqual(quantile.magnitude, 4.055922083048838) def test_kernels_normalization(self): """ Test that each kernel normalizes to area one. """ sigma = 0.1 * pq.mV + fraction = 0.9999 kernel_resolution = sigma / 100.0 kernel_list = [kernel_type(sigma, invert=False) for kernel_type in self.kernel_types] for kernel in kernel_list: - b = kernel.boundary_enclosing_area_fraction(self.fraction).magnitude + b = kernel.boundary_enclosing_area_fraction(fraction).magnitude n_points = int(2 * b / kernel_resolution.magnitude) restric_defdomain = np.linspace( -b, b, num=n_points) * sigma.units @@ -83,26 +86,28 @@ def test_kernels_stddev(self): equals the parameter sigma with which the kernel was constructed. """ sigma = 0.5 * pq.s + fraction = 0.9999 kernel_resolution = sigma / 50.0 for invert in (False, True): kernel_list = [kernel_type(sigma, invert) for kernel_type in self.kernel_types] for kernel in kernel_list: - b = kernel.boundary_enclosing_area_fraction(self.fraction).magnitude + b = kernel.boundary_enclosing_area_fraction( + fraction).magnitude n_points = int(2 * b / kernel_resolution.magnitude) restric_defdomain = np.linspace( -b, b, num=n_points) * sigma.units kern = kernel(restric_defdomain) av_integr = kern * restric_defdomain - average = spint.cumtrapz(y=av_integr.magnitude, - x=restric_defdomain.magnitude)[-1] * \ - sigma.units - var_integr = (restric_defdomain-average)**2 * kern - variance = spint.cumtrapz(y=var_integr.magnitude, - x=restric_defdomain.magnitude)[-1] * \ - sigma.units**2 + average = spint.cumtrapz( + y=av_integr.magnitude, + x=restric_defdomain.magnitude)[-1] * sigma.units + var_integr = (restric_defdomain - average) ** 2 * kern + variance = spint.cumtrapz( + y=var_integr.magnitude, + x=restric_defdomain.magnitude)[-1] * sigma.units ** 2 stddev = np.sqrt(variance) - self.assertAlmostEqual(stddev, sigma, delta=0.01*sigma) + self.assertAlmostEqual(stddev, sigma, delta=0.01 * sigma) def test_kernel_boundary_enclosing(self): """ @@ -126,6 +131,253 @@ def test_kernel_boundary_enclosing(self): x=restric_defdomain.magnitude)[-1] self.assertAlmostEqual(frac, fraction, delta=0.002) + def test_kernel_output_same_size(self): + time_array = np.linspace(0, 10, num=20) * pq.s + for kernel_type in self.kernel_types: + kernel = kernel_type(sigma=1 * pq.s) + kernel_points = kernel(time_array) + self.assertEqual(len(kernel_points), len(time_array)) + + def test_element_wise_only(self): + # Test that kernel operation is applied element-wise without any + # recurrent magic (e.g., convolution) + np.random.seed(19) + t_array = np.linspace(-10, 10, num=100) * pq.s + t_shuffled = t_array.copy() + np.random.shuffle(t_shuffled) + for kern_cls in self.kernel_types: + for invert in (False, True): + kernel = kern_cls(sigma=1 * pq.s, invert=invert) + kernel_shuffled = kernel(t_shuffled) + kernel_shuffled.sort() + kernel_expected = kernel(t_array) + kernel_expected.sort() + assert_array_almost_equal(kernel_shuffled, kernel_expected) + + def test_kernel_pdf_range(self): + t_array = np.linspace(-10, 10, num=1000) * pq.s + for kern_cls in self.kernel_types: + for invert in (False, True): + kernel = kern_cls(sigma=1 * pq.s, invert=invert) + kernel_array = kernel(t_array) + in_range = (kernel_array <= 1) & (kernel_array >= 0) + self.assertTrue(in_range.all()) + + def test_boundary_enclosing_area_fraction(self): + # test that test_boundary_enclosing_area_fraction does not depend + # on the invert + sigma = 1 * pq.s + fractions_test = np.linspace(0, 1, num=10, endpoint=False) + for kern_cls in self.kernel_types: + kernel = kern_cls(sigma=sigma, invert=False) + kernel_inverted = kern_cls(sigma=sigma, invert=True) + for fraction in fractions_test: + self.assertAlmostEqual( + kernel.boundary_enclosing_area_fraction(fraction), + kernel_inverted.boundary_enclosing_area_fraction(fraction) + ) + + def test_icdf(self): + sigma = 1 * pq.s + fractions_test = np.linspace(0, 1, num=10, endpoint=False) + for kern_cls in self.kernel_types: + kernel = kern_cls(sigma=sigma, invert=False) + kernel_inverted = kern_cls(sigma=sigma, invert=True) + for fraction in fractions_test: + # ICDF(0) for several kernels produces -inf + # of fsolve complains about stuck at local optima + with warnings.catch_warnings(): + warnings.simplefilter('ignore', RuntimeWarning) + icdf = kernel.icdf(fraction) + icdf_inverted = kernel_inverted.icdf(fraction) + if kernel.is_symmetric(): + self.assertAlmostEqual(icdf, icdf_inverted) + else: + # AlphaKernel, ExponentialKernel + self.assertGreaterEqual(icdf, 0 * pq.s) + self.assertLessEqual(icdf_inverted, 0 * pq.s) + + def test_cdf_icdf(self): + sigma = 1 * pq.s + fractions_test = np.linspace(0, 1, num=10, endpoint=False) + for kern_cls in self.kernel_types: + for invert in (False, True): + kernel = kern_cls(sigma=sigma, invert=invert) + for fraction in fractions_test: + # ICDF(0) for several kernels produces -inf + # of fsolve complains about stuck at local optima + with warnings.catch_warnings(): + warnings.simplefilter('ignore', RuntimeWarning) + self.assertAlmostEqual( + kernel.cdf(kernel.icdf(fraction)), fraction) + + def test_icdf_cdf(self): + sigma = 1 * pq.s + times = np.linspace(-10, 10) * sigma.units + for kern_cls in self.kernel_types: + for invert in (False, True): + kernel = kern_cls(sigma=sigma, invert=invert) + for t in times: + cdf = kernel.cdf(t) + self.assertGreaterEqual(cdf, 0.) + self.assertLessEqual(cdf, 1.) + if 0 < cdf < 1: + self.assertAlmostEqual( + kernel.icdf(cdf), t, places=2) + + def test_icdf_at_1(self): + sigma = 1 * pq.s + for kern_cls in self.kernel_types: + for invert in (False, True): + kernel = kern_cls(sigma=sigma, invert=invert) + if isinstance(kernel, (kernels.RectangularKernel, + kernels.TriangularKernel)): + icdf = kernel.icdf(1.0) + # check finite + self.assertLess(np.abs(icdf.magnitude), np.inf) + else: + self.assertRaises(ValueError, kernel.icdf, 1.0) + + def test_cdf_symmetric(self): + sigma = 1 * pq.s + cutoff = 1e2 * sigma # a large value + times = np.linspace(-cutoff, cutoff, num=10) + kern_symmetric = filter(lambda kern_type: issubclass( + kern_type, kernels.SymmetricKernel), self.kernel_types) + for kern_cls in kern_symmetric: + kernel = kern_cls(sigma=sigma, invert=False) + kernel_inverted = kern_cls(sigma=sigma, invert=True) + for t in times: + self.assertAlmostEqual(kernel.cdf(t), kernel_inverted.cdf(t)) + + +class KernelOldImplementation(unittest.TestCase): + def setUp(self): + self.kernel_types = tuple( + kern_cls for kern_cls in kernels.__dict__.values() + if isinstance(kern_cls, type) and + issubclass(kern_cls, kernels.Kernel) and + kern_cls is not kernels.Kernel and + kern_cls is not kernels.SymmetricKernel) + self.sigma = 1 * pq.s + self.time_input = np.linspace(-10, 10, num=100) * self.sigma.units + + def test_triangular(self): + def evaluate_old(t): + t_units = t.units + t_abs = np.abs(t.magnitude) + tau = math.sqrt(6) * kernel.sigma.rescale(t_units).magnitude + kernel_pdf = (t_abs < tau) * 1 / tau * (1 - t_abs / tau) + kernel_pdf = pq.Quantity(kernel_pdf, units=1 / t_units) + return kernel_pdf + + for invert in (False, True): + kernel = kernels.TriangularKernel(self.sigma, invert=invert) + assert_array_almost_equal(kernel(self.time_input), + evaluate_old(self.time_input)) + + def test_gaussian(self): + def evaluate_old(t): + t_units = t.units + t = t.magnitude + sigma = kernel.sigma.rescale(t_units).magnitude + kernel_pdf = (1.0 / (math.sqrt(2.0 * math.pi) * sigma)) * np.exp( + -0.5 * (t / sigma) ** 2) + kernel_pdf = pq.Quantity(kernel_pdf, units=1 / t_units) + return kernel_pdf + + for invert in (False, True): + kernel = kernels.GaussianKernel(self.sigma, invert=invert) + assert_array_almost_equal(kernel(self.time_input), + evaluate_old(self.time_input)) + + def test_laplacian(self): + def evaluate_old(t): + t_units = t.units + t = t.magnitude + tau = kernel.sigma.rescale(t_units).magnitude / math.sqrt(2) + kernel_pdf = 1 / (2 * tau) * np.exp(-np.abs(t / tau)) + kernel_pdf = pq.Quantity(kernel_pdf, units=1 / t_units) + return kernel_pdf + + for invert in (False, True): + kernel = kernels.LaplacianKernel(self.sigma, invert=invert) + assert_array_almost_equal(kernel(self.time_input), + evaluate_old(self.time_input)) + + def test_exponential(self): + def evaluate_old(t): + t_units = t.units + t = t.magnitude + tau = kernel.sigma.rescale(t_units).magnitude + if not kernel.invert: + kernel_pdf = (t >= 0) * 1 / tau * np.exp(-t / tau) + else: + kernel_pdf = (t <= 0) * 1 / tau * np.exp(t / tau) + kernel_pdf = pq.Quantity(kernel_pdf, units=1 / t_units) + return kernel_pdf + + for invert in (False, True): + kernel = kernels.ExponentialKernel(self.sigma, invert=invert) + assert_array_almost_equal(kernel(self.time_input), + evaluate_old(self.time_input)) + + +class KernelMedianIndex(unittest.TestCase): + def setUp(self): + kernel_types = tuple( + kern_cls for kern_cls in kernels.__dict__.values() + if isinstance(kern_cls, type) and + issubclass(kern_cls, kernels.Kernel) and + kern_cls is not kernels.Kernel and + kern_cls is not kernels.SymmetricKernel) + self.sigma = 1 * pq.s + self.time_input = np.linspace(-10, 10, num=100) * self.sigma.units + self.kernels = [] + for kern_cls in kernel_types: + for invert in (False, True): + self.kernels.append(kern_cls(self.sigma, invert=invert)) + + def test_small_size(self): + time_empty = [] * pq.s + time_size_2 = [0, 1] * pq.s + for kernel in self.kernels: + self.assertRaises(ValueError, kernel.median_index, time_empty) + median_id = kernel.median_index(time_size_2) + self.assertEqual(median_id, 0) + + def test_not_sorted(self): + np.random.seed(9) + np.random.shuffle(self.time_input) + for kernel in self.kernels: + self.assertRaises(ValueError, kernel.median_index, self.time_input) + + def test_non_support(self): + time_negative = np.linspace(-100, -20) * pq.s + for kernel in self.kernels: + if isinstance(kernel, (kernels.GaussianKernel, + kernels.LaplacianKernel)): + continue + kernel.invert = False + median_id = kernel.median_index(time_negative) + self.assertEqual(median_id, len(time_negative) // 2) + self.assertAlmostEqual(kernel.cdf(time_negative[median_id]), 0.) + + def test_old_implementation(self): + def median_index(t): + cumsum = kernel(t).cumsum() + dt = (t[-1] - t[0]) / (len(t) - 1) + quantiles = cumsum * dt + return np.nonzero(quantiles >= 0.5)[0].min() + + for kernel in self.kernels: + median_id = kernel.median_index(self.time_input) + median_id_old = median_index(self.time_input) + # the old implementation was off by 1 index, because the cumsum + # did not start with 0 (the zero element should have been added + # in the cumsum in old implementation). + self.assertLessEqual(abs(median_id - median_id_old), 1) + if __name__ == '__main__': unittest.main() diff --git a/elephant/test/test_spike_train_dissimilarity.py b/elephant/test/test_spike_train_dissimilarity.py index 6f9f3fecf..3b04fb4ff 100644 --- a/elephant/test/test_spike_train_dissimilarity.py +++ b/elephant/test/test_spike_train_dissimilarity.py @@ -210,17 +210,17 @@ def test_victor_purpura_distance_fast(self): self.assertAlmostEqual( stds.victor_purpura_dist([self.st16, self.st14], self.q3)[0, 1], stds.victor_purpura_dist([self.st16, self.st15], self.q3)[0, 1]) - self.assertEqual( + self.assertAlmostEqual( stds.victor_purpura_dist([self.st01, self.st05], self.q3)[0, 1], stds.victor_purpura_dist([self.st01, self.st05], self.q7)[0, 1]) # Tests on algorithmic behaviour for equal spike times - self.assertEqual( + self.assertAlmostEqual( stds.victor_purpura_dist([self.st31, self.st34], self.q3)[0, 1], 0.8 + 1.0) - self.assertEqual( + self.assertAlmostEqual( stds.victor_purpura_dist([self.st31, self.st34], self.q3)[0, 1], stds.victor_purpura_dist([self.st32, self.st33], self.q3)[0, 1]) - self.assertEqual( + self.assertAlmostEqual( stds.victor_purpura_dist( [self.st31, self.st33], self.q3)[0, 1] * 2.0, stds.victor_purpura_dist( diff --git a/elephant/test/test_statistics.py b/elephant/test/test_statistics.py index f0749f866..36d5eecf0 100644 --- a/elephant/test/test_statistics.py +++ b/elephant/test/test_statistics.py @@ -10,7 +10,6 @@ import math import sys import unittest -import warnings import neo import numpy as np @@ -19,7 +18,8 @@ from numpy.testing.utils import assert_array_almost_equal, assert_array_equal import elephant.kernels as kernels -import elephant.statistics as es +from elephant import statistics +from elephant.spike_train_generation import homogeneous_poisson_process python_version_major = sys.version_info.major @@ -29,7 +29,7 @@ def setUp(self): self.test_array_2d = np.array([[0.3, 0.56, 0.87, 1.23], [0.02, 0.71, 1.82, 8.46], [0.03, 0.14, 0.15, 0.92]]) - self.targ_array_2d_0 = np.array([[-0.28, 0.15, 0.95, 7.23], + self.targ_array_2d_0 = np.array([[-0.28, 0.15, 0.95, 7.23], [0.01, -0.57, -1.67, -7.54]]) self.targ_array_2d_1 = np.array([[0.26, 0.31, 0.36], [0.69, 1.11, 6.64], @@ -43,40 +43,40 @@ def test_isi_with_spiketrain(self): st = neo.SpikeTrain( self.test_array_1d, units='ms', t_stop=10.0, t_start=0.29) target = pq.Quantity(self.targ_array_1d, 'ms') - res = es.isi(st) + res = statistics.isi(st) assert_array_almost_equal(res, target, decimal=9) def test_isi_with_quantities_1d(self): st = pq.Quantity(self.test_array_1d, units='ms') target = pq.Quantity(self.targ_array_1d, 'ms') - res = es.isi(st) + res = statistics.isi(st) assert_array_almost_equal(res, target, decimal=9) def test_isi_with_plain_array_1d(self): st = self.test_array_1d target = self.targ_array_1d - res = es.isi(st) + res = statistics.isi(st) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_isi_with_plain_array_2d_default(self): st = self.test_array_2d target = self.targ_array_2d_default - res = es.isi(st) + res = statistics.isi(st) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_isi_with_plain_array_2d_0(self): st = self.test_array_2d target = self.targ_array_2d_0 - res = es.isi(st, axis=0) + res = statistics.isi(st, axis=0) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_isi_with_plain_array_2d_1(self): st = self.test_array_2d target = self.targ_array_2d_1 - res = es.isi(st, axis=1) + res = statistics.isi(st, axis=1) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) @@ -86,15 +86,15 @@ def setUp(self): self.test_array_regular = np.arange(1, 6) def test_cv_isi_regular_spiketrain_is_zero(self): - st = neo.SpikeTrain(self.test_array_regular, units='ms', t_stop=10.0) + st = neo.SpikeTrain(self.test_array_regular, units='ms', t_stop=10.0) targ = 0.0 - res = es.cv(es.isi(st)) + res = statistics.cv(statistics.isi(st)) self.assertEqual(res, targ) def test_cv_isi_regular_array_is_zero(self): st = self.test_array_regular targ = 0.0 - res = es.cv(es.isi(st)) + res = statistics.cv(statistics.isi(st)) self.assertEqual(res, targ) @@ -119,120 +119,135 @@ def setUp(self): self.targ_array_1d = self.targ_array_2d_1[0] self.max_array_1d = self.max_array_2d_1[0] + def test_invalid_input_spiketrain(self): + # empty spiketrain + self.assertRaises(ValueError, statistics.mean_firing_rate, []) + for st_invalid in (None, 0.1): + self.assertRaises(TypeError, statistics.mean_firing_rate, + st_invalid) + def test_mean_firing_rate_with_spiketrain(self): st = neo.SpikeTrain(self.test_array_1d, units='ms', t_stop=10.0) - target = pq.Quantity(self.targ_array_1d/10., '1/ms') - res = es.mean_firing_rate(st) + target = pq.Quantity(self.targ_array_1d / 10., '1/ms') + res = statistics.mean_firing_rate(st) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_spiketrain_set_ends(self): st = neo.SpikeTrain(self.test_array_1d, units='ms', t_stop=10.0) - target = pq.Quantity(2/0.5, '1/ms') - res = es.mean_firing_rate(st, t_start=0.4, t_stop=0.9) + target = pq.Quantity(2 / 0.5, '1/ms') + res = statistics.mean_firing_rate(st, t_start=0.4 * pq.ms, + t_stop=0.9 * pq.ms) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_quantities_1d(self): st = pq.Quantity(self.test_array_1d, units='ms') - target = pq.Quantity(self.targ_array_1d/self.max_array_1d, '1/ms') - res = es.mean_firing_rate(st) + target = pq.Quantity(self.targ_array_1d / self.max_array_1d, '1/ms') + res = statistics.mean_firing_rate(st) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_quantities_1d_set_ends(self): st = pq.Quantity(self.test_array_1d, units='ms') - target = pq.Quantity(2/0.6, '1/ms') - res = es.mean_firing_rate(st, t_start=400*pq.us, t_stop=1.) - assert_array_almost_equal(res, target, decimal=9) + + # t_stop is not a Quantity + self.assertRaises(TypeError, statistics.mean_firing_rate, st, + t_start=400 * pq.us, t_stop=1.) + + # t_start is not a Quantity + self.assertRaises(TypeError, statistics.mean_firing_rate, st, + t_start=0.4, t_stop=1. * pq.ms) def test_mean_firing_rate_with_plain_array_1d(self): st = self.test_array_1d - target = self.targ_array_1d/self.max_array_1d - res = es.mean_firing_rate(st) + target = self.targ_array_1d / self.max_array_1d + res = statistics.mean_firing_rate(st) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_1d_set_ends(self): st = self.test_array_1d - target = self.targ_array_1d/(1.23-0.3) - res = es.mean_firing_rate(st, t_start=0.3, t_stop=1.23) + target = self.targ_array_1d / (1.23 - 0.3) + res = statistics.mean_firing_rate(st, t_start=0.3, t_stop=1.23) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_2d_default(self): st = self.test_array_2d - target = self.targ_array_2d_default/self.max_array_2d_default - res = es.mean_firing_rate(st) + target = self.targ_array_2d_default / self.max_array_2d_default + res = statistics.mean_firing_rate(st) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_2d_0(self): st = self.test_array_2d - target = self.targ_array_2d_0/self.max_array_2d_0 - res = es.mean_firing_rate(st, axis=0) + target = self.targ_array_2d_0 / self.max_array_2d_0 + res = statistics.mean_firing_rate(st, axis=0) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_2d_1(self): st = self.test_array_2d - target = self.targ_array_2d_1/self.max_array_2d_1 - res = es.mean_firing_rate(st, axis=1) + target = self.targ_array_2d_1 / self.max_array_2d_1 + res = statistics.mean_firing_rate(st, axis=1) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_3d_None(self): st = self.test_array_3d - target = np.sum(self.test_array_3d, None)/5. - res = es.mean_firing_rate(st, axis=None, t_stop=5.) + target = np.sum(self.test_array_3d, None) / 5. + res = statistics.mean_firing_rate(st, axis=None, t_stop=5.) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_3d_0(self): st = self.test_array_3d - target = np.sum(self.test_array_3d, 0)/5. - res = es.mean_firing_rate(st, axis=0, t_stop=5.) + target = np.sum(self.test_array_3d, 0) / 5. + res = statistics.mean_firing_rate(st, axis=0, t_stop=5.) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_3d_1(self): st = self.test_array_3d - target = np.sum(self.test_array_3d, 1)/5. - res = es.mean_firing_rate(st, axis=1, t_stop=5.) + target = np.sum(self.test_array_3d, 1) / 5. + res = statistics.mean_firing_rate(st, axis=1, t_stop=5.) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_3d_2(self): st = self.test_array_3d - target = np.sum(self.test_array_3d, 2)/5. - res = es.mean_firing_rate(st, axis=2, t_stop=5.) + target = np.sum(self.test_array_3d, 2) / 5. + res = statistics.mean_firing_rate(st, axis=2, t_stop=5.) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_2d_1_set_ends(self): st = self.test_array_2d - target = np.array([4, 1, 3])/(1.23-0.14) - res = es.mean_firing_rate(st, axis=1, t_start=0.14, t_stop=1.23) + target = np.array([4, 1, 3]) / (1.23 - 0.14) + res = statistics.mean_firing_rate(st, axis=1, t_start=0.14, + t_stop=1.23) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) def test_mean_firing_rate_with_plain_array_2d_None(self): st = self.test_array_2d - target = self.targ_array_2d_None/self.max_array_2d_None - res = es.mean_firing_rate(st, axis=None) + target = self.targ_array_2d_None / self.max_array_2d_None + res = statistics.mean_firing_rate(st, axis=None) assert not isinstance(res, pq.Quantity) assert_array_almost_equal(res, target, decimal=9) - def test_mean_firing_rate_with_plain_array_and_units_start_stop_typeerror(self): + def test_mean_firing_rate_with_plain_array_and_units_start_stop_typeerror( + self): st = self.test_array_2d - self.assertRaises(TypeError, es.mean_firing_rate, st, + self.assertRaises(TypeError, statistics.mean_firing_rate, st, t_start=pq.Quantity(0, 'ms')) - self.assertRaises(TypeError, es.mean_firing_rate, st, + self.assertRaises(TypeError, statistics.mean_firing_rate, st, t_stop=pq.Quantity(10, 'ms')) - self.assertRaises(TypeError, es.mean_firing_rate, st, + self.assertRaises(TypeError, statistics.mean_firing_rate, st, t_start=pq.Quantity(0, 'ms'), t_stop=pq.Quantity(10, 'ms')) - self.assertRaises(TypeError, es.mean_firing_rate, st, + self.assertRaises(TypeError, statistics.mean_firing_rate, st, t_start=pq.Quantity(0, 'ms'), t_stop=10.) - self.assertRaises(TypeError, es.mean_firing_rate, st, + self.assertRaises(TypeError, statistics.mean_firing_rate, st, t_start=0., t_stop=pq.Quantity(10, 'ms')) @@ -262,87 +277,87 @@ def test_fanofactor_spiketrains(self): # Test with list of spiketrains self.assertEqual( np.var(self.sp_counts) / np.mean(self.sp_counts), - es.fanofactor(self.test_spiketrains)) + statistics.fanofactor(self.test_spiketrains)) # One spiketrain in list st = self.test_spiketrains[0] - self.assertEqual(es.fanofactor([st]), 0.0) + self.assertEqual(statistics.fanofactor([st]), 0.0) def test_fanofactor_empty(self): # Test with empty list - self.assertTrue(np.isnan(es.fanofactor([]))) - self.assertTrue(np.isnan(es.fanofactor([[]]))) + self.assertTrue(np.isnan(statistics.fanofactor([]))) + self.assertTrue(np.isnan(statistics.fanofactor([[]]))) # Test with empty quantity - self.assertTrue(np.isnan(es.fanofactor([] * pq.ms))) + self.assertTrue(np.isnan(statistics.fanofactor([] * pq.ms))) # Empty spiketrain st = neo.core.SpikeTrain([] * pq.ms, t_start=0 * pq.ms, t_stop=1.5 * pq.ms) - self.assertTrue(np.isnan(es.fanofactor(st))) + self.assertTrue(np.isnan(statistics.fanofactor(st))) def test_fanofactor_spiketrains_same(self): # Test with same spiketrains in list sts = [self.test_spiketrains[0]] * 3 - self.assertEqual(es.fanofactor(sts), 0.0) + self.assertEqual(statistics.fanofactor(sts), 0.0) def test_fanofactor_array(self): - self.assertEqual(es.fanofactor(self.test_array), + self.assertEqual(statistics.fanofactor(self.test_array), np.var(self.sp_counts) / np.mean(self.sp_counts)) def test_fanofactor_array_same(self): lst = [self.test_array[0]] * 3 - self.assertEqual(es.fanofactor(lst), 0.0) + self.assertEqual(statistics.fanofactor(lst), 0.0) def test_fanofactor_quantity(self): - self.assertEqual(es.fanofactor(self.test_quantity), + self.assertEqual(statistics.fanofactor(self.test_quantity), np.var(self.sp_counts) / np.mean(self.sp_counts)) def test_fanofactor_quantity_same(self): lst = [self.test_quantity[0]] * 3 - self.assertEqual(es.fanofactor(lst), 0.0) + self.assertEqual(statistics.fanofactor(lst), 0.0) def test_fanofactor_list(self): - self.assertEqual(es.fanofactor(self.test_list), + self.assertEqual(statistics.fanofactor(self.test_list), np.var(self.sp_counts) / np.mean(self.sp_counts)) def test_fanofactor_list_same(self): lst = [self.test_list[0]] * 3 - self.assertEqual(es.fanofactor(lst), 0.0) + self.assertEqual(statistics.fanofactor(lst), 0.0) class LVTestCase(unittest.TestCase): def setUp(self): - self.test_seq = [1, 28, 4, 47, 5, 16, 2, 5, 21, 12, - 4, 12, 59, 2, 4, 18, 33, 25, 2, 34, - 4, 1, 1, 14, 8, 1, 10, 1, 8, 20, - 5, 1, 6, 5, 12, 2, 8, 8, 2, 8, - 2, 10, 2, 1, 1, 2, 15, 3, 20, 6, - 11, 6, 18, 2, 5, 17, 4, 3, 13, 6, - 1, 18, 1, 16, 12, 2, 52, 2, 5, 7, - 6, 25, 6, 5, 3, 15, 4, 3, 16, 3, - 6, 5, 24, 21, 3, 3, 4, 8, 4, 11, - 5, 7, 5, 6, 8, 11, 33, 10, 7, 4] + self.test_seq = [1, 28, 4, 47, 5, 16, 2, 5, 21, 12, + 4, 12, 59, 2, 4, 18, 33, 25, 2, 34, + 4, 1, 1, 14, 8, 1, 10, 1, 8, 20, + 5, 1, 6, 5, 12, 2, 8, 8, 2, 8, + 2, 10, 2, 1, 1, 2, 15, 3, 20, 6, + 11, 6, 18, 2, 5, 17, 4, 3, 13, 6, + 1, 18, 1, 16, 12, 2, 52, 2, 5, 7, + 6, 25, 6, 5, 3, 15, 4, 3, 16, 3, + 6, 5, 24, 21, 3, 3, 4, 8, 4, 11, + 5, 7, 5, 6, 8, 11, 33, 10, 7, 4] self.target = 0.971826029994 def test_lv_with_quantities(self): seq = pq.Quantity(self.test_seq, units='ms') - assert_array_almost_equal(es.lv(seq), self.target, decimal=9) + assert_array_almost_equal(statistics.lv(seq), self.target, decimal=9) def test_lv_with_plain_array(self): seq = np.array(self.test_seq) - assert_array_almost_equal(es.lv(seq), self.target, decimal=9) + assert_array_almost_equal(statistics.lv(seq), self.target, decimal=9) def test_lv_with_list(self): seq = self.test_seq - assert_array_almost_equal(es.lv(seq), self.target, decimal=9) + assert_array_almost_equal(statistics.lv(seq), self.target, decimal=9) def test_lv_raise_error(self): seq = self.test_seq - self.assertRaises(ValueError, es.lv, []) - self.assertRaises(ValueError, es.lv, 1) - self.assertRaises(ValueError, es.lv, np.array([seq, seq])) + self.assertRaises(ValueError, statistics.lv, []) + self.assertRaises(ValueError, statistics.lv, 1) + self.assertRaises(ValueError, statistics.lv, np.array([seq, seq])) @unittest.skipUnless(python_version_major == 3, "assertWarns requires 3.2") def test_2short_spike_train(self): @@ -352,41 +367,41 @@ def test_2short_spike_train(self): Catches UserWarning: Input size is too small. Please provide an input with more than 1 entry. """ - self.assertTrue(math.isnan(es.lv(seq, with_nan=True))) - + self.assertTrue(math.isnan(statistics.lv(seq, with_nan=True))) + class CV2TestCase(unittest.TestCase): def setUp(self): - self.test_seq = [1, 28, 4, 47, 5, 16, 2, 5, 21, 12, - 4, 12, 59, 2, 4, 18, 33, 25, 2, 34, - 4, 1, 1, 14, 8, 1, 10, 1, 8, 20, - 5, 1, 6, 5, 12, 2, 8, 8, 2, 8, - 2, 10, 2, 1, 1, 2, 15, 3, 20, 6, - 11, 6, 18, 2, 5, 17, 4, 3, 13, 6, - 1, 18, 1, 16, 12, 2, 52, 2, 5, 7, - 6, 25, 6, 5, 3, 15, 4, 3, 16, 3, - 6, 5, 24, 21, 3, 3, 4, 8, 4, 11, - 5, 7, 5, 6, 8, 11, 33, 10, 7, 4] + self.test_seq = [1, 28, 4, 47, 5, 16, 2, 5, 21, 12, + 4, 12, 59, 2, 4, 18, 33, 25, 2, 34, + 4, 1, 1, 14, 8, 1, 10, 1, 8, 20, + 5, 1, 6, 5, 12, 2, 8, 8, 2, 8, + 2, 10, 2, 1, 1, 2, 15, 3, 20, 6, + 11, 6, 18, 2, 5, 17, 4, 3, 13, 6, + 1, 18, 1, 16, 12, 2, 52, 2, 5, 7, + 6, 25, 6, 5, 3, 15, 4, 3, 16, 3, + 6, 5, 24, 21, 3, 3, 4, 8, 4, 11, + 5, 7, 5, 6, 8, 11, 33, 10, 7, 4] self.target = 1.0022235296529176 def test_cv2_with_quantities(self): seq = pq.Quantity(self.test_seq, units='ms') - assert_array_almost_equal(es.cv2(seq), self.target, decimal=9) + assert_array_almost_equal(statistics.cv2(seq), self.target, decimal=9) def test_cv2_with_plain_array(self): seq = np.array(self.test_seq) - assert_array_almost_equal(es.cv2(seq), self.target, decimal=9) + assert_array_almost_equal(statistics.cv2(seq), self.target, decimal=9) def test_cv2_with_list(self): seq = self.test_seq - assert_array_almost_equal(es.cv2(seq), self.target, decimal=9) + assert_array_almost_equal(statistics.cv2(seq), self.target, decimal=9) def test_cv2_raise_error(self): seq = self.test_seq - self.assertRaises(ValueError, es.cv2, []) - self.assertRaises(ValueError, es.cv2, 1) - self.assertRaises(ValueError, es.cv2, np.array([seq, seq])) + self.assertRaises(ValueError, statistics.cv2, []) + self.assertRaises(ValueError, statistics.cv2, 1) + self.assertRaises(ValueError, statistics.cv2, np.array([seq, seq])) class RateEstimationTestCase(unittest.TestCase): @@ -398,28 +413,29 @@ def setUp(self): self.st_margin = 5.0 # seconds self.st_rate = 10.0 # Hertz - st_num_spikes = np.random.poisson( - self.st_rate*(self.st_dur-2*self.st_margin)) - spike_train = np.random.rand( - st_num_spikes) * (self.st_dur-2*self.st_margin) + self.st_margin + np.random.seed(19) + duration_effective = self.st_dur - 2 * self.st_margin + st_num_spikes = np.random.poisson(self.st_rate * duration_effective) + spike_train = np.random.rand(st_num_spikes) * duration_effective + \ + self.st_margin spike_train.sort() # convert spike train into neo objects - self.spike_train = neo.SpikeTrain(spike_train*pq.s, - t_start=self.st_tr[0]*pq.s, - t_stop=self.st_tr[1]*pq.s) + self.spike_train = neo.SpikeTrain(spike_train * pq.s, + t_start=self.st_tr[0] * pq.s, + t_stop=self.st_tr[1] * pq.s) # generation of a multiply used specific kernel - self.kernel = kernels.TriangularKernel(sigma=0.03*pq.s) + self.kernel = kernels.TriangularKernel(sigma=0.03 * pq.s) @unittest.skipUnless(python_version_major == 3, "assertWarns requires 3.2") def test_instantaneous_rate_and_warnings(self): st = self.spike_train - sampling_period = 0.01*pq.s + sampling_period = 0.01 * pq.s with self.assertWarns(UserWarning): # Catches warning: The width of the kernel was adjusted to a # minimally allowed width. - inst_rate = es.instantaneous_rate( + inst_rate = statistics.instantaneous_rate( st, sampling_period, self.kernel, cutoff=0) self.assertIsInstance(inst_rate, neo.core.AnalogSignal) self.assertEqual( @@ -430,116 +446,190 @@ def test_instantaneous_rate_and_warnings(self): def test_error_instantaneous_rate(self): self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=[1, 2, 3]*pq.s, - sampling_period=0.01*pq.ms, kernel=self.kernel) + TypeError, statistics.instantaneous_rate, + spiketrain=[1, 2, 3] * pq.s, + sampling_period=0.01 * pq.ms, kernel=self.kernel) self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=[1, 2, 3], - sampling_period=0.01*pq.ms, kernel=self.kernel) + TypeError, statistics.instantaneous_rate, spiketrain=[1, 2, 3], + sampling_period=0.01 * pq.ms, kernel=self.kernel) st = self.spike_train self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=st, + TypeError, statistics.instantaneous_rate, spiketrain=st, sampling_period=0.01, kernel=self.kernel) self.assertRaises( - ValueError, es.instantaneous_rate, spiketrain=st, - sampling_period=-0.01*pq.ms, kernel=self.kernel) + ValueError, statistics.instantaneous_rate, spiketrain=st, + sampling_period=-0.01 * pq.ms, kernel=self.kernel) self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=st, - sampling_period=0.01*pq.ms, kernel='NONE') - self.assertRaises(TypeError, es.instantaneous_rate, self.spike_train, - sampling_period=0.01*pq.s, kernel='wrong_string', - t_start=self.st_tr[0]*pq.s, t_stop=self.st_tr[1]*pq.s, + TypeError, statistics.instantaneous_rate, spiketrain=st, + sampling_period=0.01 * pq.ms, kernel='NONE') + self.assertRaises(TypeError, statistics.instantaneous_rate, + self.spike_train, + sampling_period=0.01 * pq.s, kernel='wrong_string', + t_start=self.st_tr[0] * pq.s, + t_stop=self.st_tr[1] * pq.s, trim=False) self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=st, - sampling_period=0.01*pq.ms, kernel=self.kernel, cutoff=20*pq.ms) + TypeError, statistics.instantaneous_rate, spiketrain=st, + sampling_period=0.01 * pq.ms, kernel=self.kernel, + cutoff=20 * pq.ms) self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=st, - sampling_period=0.01*pq.ms, kernel=self.kernel, t_start=2) + TypeError, statistics.instantaneous_rate, spiketrain=st, + sampling_period=0.01 * pq.ms, kernel=self.kernel, t_start=2) self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=st, - sampling_period=0.01*pq.ms, kernel=self.kernel, t_stop=20*pq.mV) + TypeError, statistics.instantaneous_rate, spiketrain=st, + sampling_period=0.01 * pq.ms, kernel=self.kernel, + t_stop=20 * pq.mV) self.assertRaises( - TypeError, es.instantaneous_rate, spiketrain=st, - sampling_period=0.01*pq.ms, kernel=self.kernel, trim=1) + TypeError, statistics.instantaneous_rate, spiketrain=st, + sampling_period=0.01 * pq.ms, kernel=self.kernel, trim=1) def test_rate_estimation_consistency(self): """ Test, whether the integral of the rate estimation curve is (almost) equal to the number of spikes of the spike train. """ - kernel_types = [obj for obj in kernels.__dict__.values() - if isinstance(obj, type) and - issubclass(obj, kernels.Kernel) and - hasattr(obj, "_evaluate") and - obj is not kernels.Kernel and - obj is not kernels.SymmetricKernel] - kernel_list = [kernel_type(sigma=0.5*pq.s, invert=False) - for kernel_type in kernel_types] - kernel_resolution = 0.01*pq.s - for kernel in kernel_list: - rate_estimate_a0 = es.instantaneous_rate( - self.spike_train, - sampling_period=kernel_resolution, - kernel='auto', - t_start=self.st_tr[0] * pq.s, - t_stop=self.st_tr[1] * pq.s, - trim=False) - - rate_estimate0 = es.instantaneous_rate( - self.spike_train, - sampling_period=kernel_resolution, - kernel=kernel) - - rate_estimate1 = es.instantaneous_rate( - self.spike_train, - sampling_period=kernel_resolution, - kernel=kernel, - t_start=self.st_tr[0] * pq.s, - t_stop=self.st_tr[1] * pq.s, - trim=False) - - rate_estimate2 = es.instantaneous_rate( - self.spike_train, - sampling_period=kernel_resolution, - kernel=kernel, - t_start=self.st_tr[0] * pq.s, - t_stop=self.st_tr[1] * pq.s, - trim=True) - # test consistency - rate_estimate_list = [rate_estimate0, rate_estimate1, - rate_estimate2, rate_estimate_a0] - - for rate_estimate in rate_estimate_list: + kernel_types = tuple( + kern_cls for kern_cls in kernels.__dict__.values() + if isinstance(kern_cls, type) and + issubclass(kern_cls, kernels.Kernel) and + kern_cls is not kernels.Kernel and + kern_cls is not kernels.SymmetricKernel) + kernels_available = [kern_cls(sigma=0.5 * pq.s, invert=False) + for kern_cls in kernel_types] + kernels_available.append('auto') + kernel_resolution = 0.01 * pq.s + for kernel in kernels_available: + for center_kernel in (False, True): + rate_estimate = statistics.instantaneous_rate( + self.spike_train, + sampling_period=kernel_resolution, + kernel=kernel, + t_start=self.st_tr[0] * pq.s, + t_stop=self.st_tr[1] * pq.s, + trim=False, + center_kernel=center_kernel) num_spikes = len(self.spike_train) auc = spint.cumtrapz( - y=rate_estimate.magnitude[:, 0], - x=rate_estimate.times.rescale('s').magnitude)[-1] + y=rate_estimate.magnitude.squeeze(), + x=rate_estimate.times.simplified.magnitude)[-1] self.assertAlmostEqual(num_spikes, auc, - delta=0.05 * num_spikes) + delta=0.01 * num_spikes) + + def test_not_center_kernel(self): + # issue 107 + t_spike = 1 * pq.s + st = neo.SpikeTrain([t_spike], t_start=0 * pq.s, t_stop=2 * pq.s, + units=pq.s) + kernel = kernels.AlphaKernel(200 * pq.ms) + fs = 0.1 * pq.ms + rate = statistics.instantaneous_rate(st, + sampling_period=fs, + kernel=kernel, + center_kernel=False) + rate_nonzero_index = np.nonzero(rate > 1e-6)[0] + # where the mass is concentrated + rate_mass = rate.times.rescale(t_spike.units)[rate_nonzero_index] + all_after_response_onset = (rate_mass >= t_spike).all() + self.assertTrue(all_after_response_onset) + + def test_regression_288(self): + np.random.seed(9) + sampling_period = 200 * pq.ms + spiketrain = homogeneous_poisson_process(10 * pq.Hz, + t_start=0 * pq.s, + t_stop=10 * pq.s) + kernel = kernels.AlphaKernel(sigma=5 * pq.ms, invert=True) + rate = statistics.instantaneous_rate(spiketrain, + sampling_period=sampling_period, + kernel=kernel) + self.assertEqual( + len(rate), (spiketrain.t_stop / sampling_period).simplified.item()) + + # 3 Hz is not a target - it's meant to test the non-negativity of the + # result rate; ideally, for smaller sampling rates, the integral + # should match the num. of spikes in the spiketrain + self.assertGreater(rate.mean(), 3 * pq.Hz) + + def test_spikes_on_edges(self): + # this test demonstrates that the trimming (convolve valid mode) + # removes the edge spikes, underestimating the true firing rate and + # thus is not able to reconstruct the number of spikes in a + # spiketrain (see test_rate_estimation_consistency) + cutoff = 5 + sampling_period = 0.01 * pq.s + t_spikes = np.array([-cutoff, cutoff]) * pq.s + spiketrain = neo.SpikeTrain(t_spikes, t_start=t_spikes[0], + t_stop=t_spikes[-1]) + kernel_types = tuple( + kern_cls for kern_cls in kernels.__dict__.values() + if isinstance(kern_cls, type) and + issubclass(kern_cls, kernels.Kernel) and + kern_cls is not kernels.Kernel and + kern_cls is not kernels.SymmetricKernel) + kernels_available = [kern_cls(sigma=1 * pq.s, invert=False) + for kern_cls in kernel_types] + for kernel in kernels_available: + for center_kernel in (False, True): + rate = statistics.instantaneous_rate( + spiketrain, + sampling_period=sampling_period, + kernel=kernel, + cutoff=cutoff, trim=True, + center_kernel=center_kernel) + assert_array_almost_equal(rate.magnitude, 0, decimal=3) + + def test_trim_as_convolve_mode(self): + cutoff = 5 + sampling_period = 0.01 * pq.s + t_spikes = np.linspace(-cutoff, cutoff, num=(2 * cutoff + 1)) * pq.s + spiketrain = neo.SpikeTrain(t_spikes, t_start=t_spikes[0], + t_stop=t_spikes[-1]) + kernel = kernels.RectangularKernel(sigma=1 * pq.s) + assert cutoff > kernel.min_cutoff, "Choose larger cutoff" + kernel_types = tuple( + kern_cls for kern_cls in kernels.__dict__.values() + if isinstance(kern_cls, type) and + issubclass(kern_cls, kernels.SymmetricKernel) and + kern_cls is not kernels.SymmetricKernel) + kernels_symmetric = [kern_cls(sigma=1 * pq.s, invert=False) + for kern_cls in kernel_types] + for kernel in kernels_symmetric: + for trim in (False, True): + rate_centered = statistics.instantaneous_rate( + spiketrain, sampling_period=sampling_period, + kernel=kernel, cutoff=cutoff, trim=trim) + + rate_convolve = statistics.instantaneous_rate( + spiketrain, sampling_period=sampling_period, + kernel=kernel, cutoff=cutoff, trim=trim, + center_kernel=False) + assert_array_almost_equal(rate_centered, rate_convolve) def test_instantaneous_rate_spiketrainlist(self): - st_num_spikes = np.random.poisson( - self.st_rate * (self.st_dur - 2 * self.st_margin)) - spike_train2 = np.random.rand( - st_num_spikes) * (self.st_dur - 2 * self.st_margin) + \ + np.random.seed(19) + duration_effective = self.st_dur - 2 * self.st_margin + st_num_spikes = np.random.poisson(self.st_rate * duration_effective) + spike_train2 = np.random.rand(st_num_spikes) * duration_effective + \ self.st_margin spike_train2.sort() spike_train2 = neo.SpikeTrain(spike_train2 * pq.s, t_start=self.st_tr[0] * pq.s, t_stop=self.st_tr[1] * pq.s) - st_rate_1 = es.instantaneous_rate(self.spike_train, - sampling_period=0.01*pq.s, - kernel=self.kernel) - st_rate_2 = es.instantaneous_rate(spike_train2, - sampling_period=0.01*pq.s, - kernel=self.kernel) - combined_rate = es.instantaneous_rate([self.spike_train, - spike_train2], - sampling_period=0.01*pq.s, - kernel=self.kernel) + st_rate_1 = statistics.instantaneous_rate(self.spike_train, + sampling_period=0.01 * pq.s, + kernel=self.kernel) + st_rate_2 = statistics.instantaneous_rate(spike_train2, + sampling_period=0.01 * pq.s, + kernel=self.kernel) + combined_rate = statistics.instantaneous_rate( + [self.spike_train, spike_train2], + sampling_period=0.01 * pq.s, + kernel=self.kernel) summed_rate = st_rate_1 + st_rate_2 # equivalent for identical kernels + # 'time_vector.dtype' in instantaneous_rate() is changed from float64 + # to float32 which results in 3e-6 abs difference assert_array_almost_equal(combined_rate.magnitude, - summed_rate.magnitude) + summed_rate.magnitude, decimal=5) # Regression test for #144 def test_instantaneous_rate_regression_144(self): @@ -547,24 +637,25 @@ def test_instantaneous_rate_regression_144(self): # other, that the optimal kernel cannot be detected. Therefore, the # function should react with a ValueError. st = neo.SpikeTrain([2.12, 2.13, 2.15] * pq.s, t_stop=10 * pq.s) - self.assertRaises(ValueError, es.instantaneous_rate, st, 1 * pq.ms) + self.assertRaises(ValueError, statistics.instantaneous_rate, st, + 1 * pq.ms) # Regression test for #245 def test_instantaneous_rate_regression_245(self): # This test makes sure that the correct kernel width is chosen when # selecting 'auto' as kernel spiketrain = neo.SpikeTrain( - range(1, 30) * pq.ms, t_start=0*pq.ms, t_stop=30*pq.ms) + range(1, 30) * pq.ms, t_start=0 * pq.ms, t_stop=30 * pq.ms) # This is the correct procedure to attain the kernel: first, the result # of sskernel retrieves the kernel bandwidth of an optimal Gaussian # kernel in terms of its standard deviation sigma, then uses this value # directly in the function for creating the Gaussian kernel - kernel_width_sigma = es.sskernel( + kernel_width_sigma = statistics.sskernel( spiketrain.magnitude, tin=None, bootstrap=False)['optw'] kernel = kernels.GaussianKernel(kernel_width_sigma * spiketrain.units) - result_target = es.instantaneous_rate( - spiketrain, 10*pq.ms, kernel=kernel) + result_target = statistics.instantaneous_rate( + spiketrain, 10 * pq.ms, kernel=kernel) # Here, we check if the 'auto' argument leads to the same operation. In # the regression, it was incorrectly assumed that the sskernel() @@ -573,8 +664,8 @@ def test_instantaneous_rate_regression_245(self): # factor 2.0 connects kernel width with its half width, # factor 2.7 connects half width of Gaussian distribution with # 99% probability mass with its standard deviation. - result_automatic = es.instantaneous_rate( - spiketrain, 10*pq.ms, kernel='auto') + result_automatic = statistics.instantaneous_rate( + spiketrain, 10 * pq.ms, kernel='auto') assert_array_almost_equal(result_target, result_automatic) @@ -595,48 +686,52 @@ def tearDown(self): def test_time_histogram(self): targ = np.array([4, 2, 1, 1, 2, 2, 1, 0, 1, 0]) - histogram = es.time_histogram(self.spiketrains, binsize=pq.s) + histogram = statistics.time_histogram(self.spiketrains, binsize=pq.s) assert_array_equal(targ, histogram.magnitude[:, 0]) def test_time_histogram_binary(self): targ = np.array([2, 2, 1, 1, 2, 2, 1, 0, 1, 0]) - histogram = es.time_histogram(self.spiketrains, binsize=pq.s, - binary=True) + histogram = statistics.time_histogram(self.spiketrains, binsize=pq.s, + binary=True) assert_array_equal(targ, histogram.magnitude[:, 0]) def test_time_histogram_tstart_tstop(self): # Start, stop short range targ = np.array([2, 1]) - histogram = es.time_histogram(self.spiketrains, binsize=pq.s, - t_start=5 * pq.s, t_stop=7 * pq.s) + histogram = statistics.time_histogram(self.spiketrains, binsize=pq.s, + t_start=5 * pq.s, + t_stop=7 * pq.s) assert_array_equal(targ, histogram.magnitude[:, 0]) # Test without t_stop targ = np.array([4, 2, 1, 1, 2, 2, 1, 0, 1, 0]) - histogram = es.time_histogram(self.spiketrains, binsize=1 * pq.s, - t_start=0 * pq.s) + histogram = statistics.time_histogram(self.spiketrains, + binsize=1 * pq.s, + t_start=0 * pq.s) assert_array_equal(targ, histogram.magnitude[:, 0]) # Test without t_start - histogram = es.time_histogram(self.spiketrains, binsize=1 * pq.s, - t_stop=10 * pq.s) + histogram = statistics.time_histogram(self.spiketrains, + binsize=1 * pq.s, + t_stop=10 * pq.s) assert_array_equal(targ, histogram.magnitude[:, 0]) def test_time_histogram_output(self): # Normalization mean - histogram = es.time_histogram(self.spiketrains, binsize=pq.s, - output='mean') + histogram = statistics.time_histogram(self.spiketrains, binsize=pq.s, + output='mean') targ = np.array([4, 2, 1, 1, 2, 2, 1, 0, 1, 0], dtype=float) / 2 assert_array_equal(targ.reshape(targ.size, 1), histogram.magnitude) # Normalization rate - histogram = es.time_histogram(self.spiketrains, binsize=pq.s, - output='rate') + histogram = statistics.time_histogram(self.spiketrains, binsize=pq.s, + output='rate') assert_array_equal(histogram.view(pq.Quantity), targ.reshape(targ.size, 1) * 1 / pq.s) # Normalization unspecified, raises error - self.assertRaises(ValueError, es.time_histogram, self.spiketrains, + self.assertRaises(ValueError, statistics.time_histogram, + self.spiketrains, binsize=pq.s, output=' ') @@ -659,12 +754,13 @@ def tearDown(self): def test_complexity_pdf(self): targ = np.array([0.92, 0.01, 0.01, 0.06]) - complexity = es.complexity_pdf(self.spiketrains, binsize=0.1*pq.s) + complexity = statistics.complexity_pdf(self.spiketrains, + binsize=0.1 * pq.s) assert_array_equal(targ, complexity.magnitude[:, 0]) self.assertEqual(1, complexity.magnitude[:, 0].sum()) - self.assertEqual(len(self.spiketrains)+1, len(complexity)) + self.assertEqual(len(self.spiketrains) + 1, len(complexity)) self.assertIsInstance(complexity, neo.AnalogSignal) - self.assertEqual(complexity.units, 1*pq.dimensionless) + self.assertEqual(complexity.units, 1 * pq.dimensionless) if __name__ == '__main__': diff --git a/elephant/utils.py b/elephant/utils.py index 156f9c8f7..c761790f7 100644 --- a/elephant/utils.py +++ b/elephant/utils.py @@ -1,6 +1,7 @@ from __future__ import division, print_function, unicode_literals import numpy as np +import quantities as pq def is_binary(array): @@ -12,8 +13,31 @@ def is_binary(array): Returns ------- bool - Whether the input array is binary or nor. + Whether the input array is binary or not. """ array = np.asarray(array) return ((array == 0) | (array == 1)).all() + + +def is_time_quantity(x, allow_none=False): + """ + Parameters + ---------- + x : array-like + A scalar or array-like to check for being a Quantity with time units. + allow_none : bool + Allow `x` to be None or not. + + Returns + ------- + bool + Whether the input is a time Quantity (True) or not (False). + If the input is None and `allow_none` is set to True, returns True. + + """ + if x is None and allow_none: + return True + if not isinstance(x, pq.Quantity): + return False + return x.dimensionality.simplified == pq.Quantity(1, "s").dimensionality diff --git a/requirements/requirements-docs.txt b/requirements/requirements-docs.txt index 8c3f1bb26..460baeab5 100644 --- a/requirements/requirements-docs.txt +++ b/requirements/requirements-docs.txt @@ -5,3 +5,4 @@ sphinx>=2.4.3 nbsphinx>=0.5.0 sphinxcontrib-bibtex>=1.0.0 sphinx-tabs>=1.1.13 +matplotlib>=3.1.0