Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add working implementation of 1D and nD potential landscape computation #28

Merged
merged 9 commits into from
Nov 26, 2024
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,7 @@ dmypy.json
.pytype/

# Cython debug symbols
cython_debug/
cython_debug/

# rendered videos
*.mp4
2 changes: 2 additions & 0 deletions neuro_py/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ __all__ = [
"spikes",
"stats",
"tuning",
"util",
]


Expand All @@ -24,4 +25,5 @@ from . import (
spikes,
stats,
tuning,
util,
)
6 changes: 6 additions & 0 deletions neuro_py/ensemble/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ __all__ = [
"computeAssemblyActivity",
"AssemblyReact",
"ExplainedVariance",
"potential_landscape",
"potential_landscape_nd",
"similarity_index",
"similaritymat",
"WeightedCorr",
Expand All @@ -32,6 +34,10 @@ from .assembly import (
)
from .assembly_reactivation import AssemblyReact
from .explained_variance import ExplainedVariance
from .geodyn import (
potential_landscape,
potential_landscape_nd,
)
from .similarity_index import similarity_index
from .similaritymat import similaritymat
from .replay import (
Expand Down
200 changes: 200 additions & 0 deletions neuro_py/ensemble/geodyn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import numpy as np

from scipy.stats import binned_statistic_dd

from ..util.array import (
find_terminal_masked_indices,
replace_border_zeros_with_nan,
)


def potential_landscape(X_dyn, projbins, domainbins=None):
kushaangupta marked this conversation as resolved.
Show resolved Hide resolved
"""Compute numerical approximation of potential energy landscape across
1D state and domain (e.g. time, position, etc.).

Potential landscape is defined as the integral of the flow vectors.

Parameters
----------
X_dyn : np.ndarray
State vectors of shape: trials x bins
projbins : int or array-like
Number of bins for projection axis or bin edges
domainbins : int or array-like, optional
Number of bins for domain axis or bin edges, by default None

Returns
-------
np.ndarray
Potential energy landscape across state and domain
np.ndarray
Temporal gradient of potential energy landscape across state and domain
np.ndarray
Histogram of state vectors across state and domain
np.ndarray
Bin edges of state vectors
np.ndarray
Bin edges of domain

References
----------
.. [1] Wang, S., Falcone, R., Richmond, B. et al. Attractor dynamics reflect
decision confidence in macaque prefrontal cortex. Nat Neurosci 26,
1970–1980 (2023).
"""
# _t suffix is following notation of paper but applicable across any domain
nnrns = 1
ntrials, nbins = X_dyn.shape
delta_t = np.diff(X_dyn, axis=1) # time derivatives: ntrials x nbins-1 x nnrns

X_t_flat = np.reshape(X_dyn[:, :-1], (-1, nnrns), order='F').ravel() # skip last bin as no displacement exists for last time point
delta_t_flat = np.reshape(delta_t, (-1, nnrns), order='F').ravel() # column-major order
norm_tpts = np.repeat(np.arange(nbins-1), ntrials)

nbins_domain = nbins - 1 if domainbins is None else domainbins # downsample domain bins

# 1D state space binning of time derivatives across domain
# assumes landscape may morph across domain
H, bin_edges, _ = binned_statistic_dd( # posbins x time
np.asarray((X_t_flat, norm_tpts)).T, delta_t_flat, statistic='count',
bins=(projbins, nbins_domain))
latentedges, domainedges = bin_edges

grad_pos_t_svm = binned_statistic_dd(
np.asarray((X_t_flat, norm_tpts)).T, delta_t_flat, statistic='sum',
bins=(projbins, nbins_domain)).statistic
# average derivative, a.k.a. flow/vector field for dynamics underlying
# population activity
grad_pos_t_svm = np.divide(grad_pos_t_svm, H, where=H != 0)
grad_pos_t_svm[H == 0] = np.nan # crucial to handle division by zero
# spatial integration via nnancumsum treats nan as zero for cumulative sum
potential_pos_t = -np.nancumsum(grad_pos_t_svm, axis=0) # projbins x domainbins

idx_zero_X_t = np.searchsorted(latentedges, 0)
offset = potential_pos_t[idx_zero_X_t, :] # use potential at X_t = 0 as reference
potential_pos_t = potential_pos_t - offset # potential difference

nonzero_mask = H != 0
idx_first_nonzero, idx_last_nonzero = \
find_terminal_masked_indices(nonzero_mask, axis=0) # each have shape: time
# along axis 0 set all values from start to idx_first_nonzero to nan
for t in range(H.shape[1]):
potential_pos_t[:idx_first_nonzero[t], t] = np.nan
potential_pos_t[idx_last_nonzero[t] + 1:, t] = np.nan

return potential_pos_t, grad_pos_t_svm, H, latentedges, domainedges

def potential_landscape_nd(X_dyn, projbins, domainbins=None, nanborderempty=True):
kushaangupta marked this conversation as resolved.
Show resolved Hide resolved
"""Compute numerical approximation of potential energy landscape across
n-dimensional state and domain (e.g. time, position, etc.).

Potential landscape is defined as the integral of the flow vectors.

Parameters
----------
X_dyn : np.ndarray
State vectors of shape: trials x bins x neurons
projbins : int or array-like
Number of bins for projection axis or bin edges for each neuron
domainbins : int or array-like, optional
Number of bins for domain axis or bin edges, by default None
nanborderempty : bool, optional
Whether to set border values to nan if they are empty, by default True

Returns
-------
np.ndarray
Potential energy landscape across state and domain for each neuron.
Shape: projbins x domainbins x nnrns
np.ndarray
Temporal gradient of potential energy landscape across state and domain
for each neuron. Shape: projbins x domainbins x nnrns
np.ndarray
Histogram of state vectors across state and domain for each neuron.
Shape: projbins x domainbins x nnrns
np.ndarray
Bin edges of state vectors for each neuron
np.ndarray
Bin edges of domain for each neuron

References
----------
.. [1] Wang, S., Falcone, R., Richmond, B. et al. Attractor dynamics reflect
decision confidence in macaque prefrontal cortex. Nat Neurosci 26,
1970–1980 (2023).
"""
# _t suffix is following notation of paper but applicable across any domain
ntrials, nbins, nnrns = X_dyn.shape
delta_t = np.diff(X_dyn, axis=1) # time derivatives: ntrials x ndomainbins-1 x nnrns

X_t_flat = np.reshape(X_dyn[:, :-1], (-1, nnrns), order='F') # skip last bin as no displacement exists for last time point
delta_t_flat = np.reshape(delta_t, (-1, nnrns), order='F') # column-major order
norm_tpts = np.repeat(np.arange(nbins-1), ntrials)

nbins_domain = nbins - 1 if domainbins is None else domainbins # downsample domain bins

potential_pos_t_nrns = []
grad_pos_t_svm_nrns = []
hist_nrns = []
latentedges_nrns = []
domainedges_nrns = []
for nnrn in range(nnrns):
# 1D state space binning of time derivatives across domain
# assumes landscape may morph across domain
H, bin_edges, _ = binned_statistic_dd( # (nnrns times projbins) x time
np.asarray((*X_t_flat.T, norm_tpts)).T, delta_t_flat[:, nnrn], statistic='count',
bins=(*[projbins if isinstance(projbins, int) else projbins[idx] for idx in range(nnrns)], nbins_domain))
latentedges= bin_edges[nnrn]
domainedges = bin_edges[-1]

grad_pos_t_svm = binned_statistic_dd(
np.asarray((*X_t_flat.T, norm_tpts)).T, delta_t_flat[:, nnrn], statistic='sum',
bins=(*[projbins if isinstance(projbins, int) else projbins[idx] for idx in range(nnrns)], nbins_domain)).statistic
# average derivative, a.k.a. flow/vector field for dynamics underlying
# population activity
grad_pos_t_svm = np.divide(grad_pos_t_svm, H, where=H != 0)
grad_pos_t_svm[H == 0] = np.nan # crucial to handle division by zero
# spatial integration via nnancumsum treats nan as zero for cumulative sum
potential_pos_t = -np.nancumsum(grad_pos_t_svm, axis=nnrn) # (nnrns times projbins) x domainbins

# idx_zero_X_t = np.apply_along_axis(
kushaangupta marked this conversation as resolved.
Show resolved Hide resolved
# np.searchsorted, 1, np.asarray(bin_edges[:nnrns]), 0)
# offset = potential_pos_t[tuple(idx_zero_X_t)] # use potential at X_t = 0 as reference
# potential_pos_t = potential_pos_t - offset # potential difference

if nanborderempty:
nonzero_mask = H != 0

for t in range(nbins_domain):
nrndimslices = [slice(None)] * nnrns
nrndimslices.append(t)
peripheral_zeros_nanmask = ~np.isnan(
replace_border_zeros_with_nan(nonzero_mask[tuple(nrndimslices)]))
peripheral_zeros_nanmask = np.where(
peripheral_zeros_nanmask,
peripheral_zeros_nanmask,
np.nan
)
potential_pos_t[tuple(nrndimslices)] *= peripheral_zeros_nanmask

potential_pos_t_nrns.append(potential_pos_t)
grad_pos_t_svm_nrns.append(grad_pos_t_svm)
hist_nrns.append(H)
latentedges_nrns.append(latentedges)
domainedges_nrns.append(domainedges)

potential_pos_t_nrns = np.stack(potential_pos_t_nrns, axis=-1) # projbins x domainbins x nnrns
grad_pos_t_svm_nrns = np.stack(grad_pos_t_svm_nrns, axis=-1) # projbins x domainbins x nnrns
hist = np.stack(hist_nrns, axis=-1) # projbins x domainbins x nnrns
latentedges_nrns = np.stack(latentedges_nrns, axis=-1) # projbins x nnrns
domainedges_nrns = np.stack(domainedges_nrns, axis=-1) # domainbins x nnrns
potential_pos = np.nanmean(np.asarray([
np.nanmean(potential_pos_t_nrns[:, :, :, nrn], axis=-1)
for nrn in range(nnrns)
]), axis=0)

return (
potential_pos, potential_pos_t_nrns, grad_pos_t_svm_nrns, hist,
latentedges_nrns,
domainedges_nrns,
)
4 changes: 4 additions & 0 deletions neuro_py/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import lazy_loader as _lazy

(__getattr__, __dir__, __all__) = _lazy.attach_stub(__name__, __file__)
del _lazy
9 changes: 9 additions & 0 deletions neuro_py/util/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
__all__ = (
"find_terminal_masked_indices",
"replace_border_zeros_with_nan",
)

from .array import (
find_terminal_masked_indices,
replace_border_zeros_with_nan,
)
83 changes: 83 additions & 0 deletions neuro_py/util/array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import numpy as np


def find_terminal_masked_indices(mask, axis):
kushaangupta marked this conversation as resolved.
Show resolved Hide resolved
"""
Find the first and last indices of non-masked values along an axis.

Only tested upto 2D arrays.

Parameters
----------
mask : np.ndarray
Mask of `arr`.
axis : int
Axis along which to find the first and last indices.

Returns
-------
np.ndarray
First index of non-masked values along `axis`.
np.ndarray
Last index of non-masked values along `axis`.
"""
first_idx = np.argmax(mask, axis=axis)
reversed_mask = np.flip(mask, axis=axis)
last_idx = mask.shape[axis] - np.argmax(reversed_mask, axis=axis) - 1

return first_idx, last_idx

def replace_border_zeros_with_nan(arr):
kushaangupta marked this conversation as resolved.
Show resolved Hide resolved
"""Replace zero values at the borders of each dimension of a n-dimensional
array with NaN.

Parameters
----------
arr : np.ndarray
Input array.

Returns
-------
np.ndarray
Array with zero values at the borders replaced with NaN.

Examples
--------
>>> arr = np.arange(27).reshape(3, 3, 3)
>>> arr[0, 2] = arr[2, 2] = arr[2, 0, 0] = arr[1, 1, 1] = 0
>>> replace_border_zeros_with_nan(arr)
array([[[nan, 1., 2.],
[ 3., 4., 5.],
[nan, nan, nan]],

[[ 9., 10., 11.],
[12., 0., 14.],
[15., 16., 17.]],

[[nan, 19., 20.],
[21., 22., 23.],
[nan, nan, nan]]])
"""
arr = np.array(arr, dtype=float)
dims = arr.shape

for axis in range(len(dims)):
# Find indices where zero values start and stop
for idx in np.ndindex(*[dims[i] for i in range(len(dims)) if i != axis]):
slicer = list(idx)
slicer.insert(axis, slice(None)) # Insert the full slice along the current axis

# Check for first sequence of zeros
subarray = arr[tuple(slicer)]
first_zero_indices = np.where(np.cumsum(subarray != 0) == 0)[0]
if len(first_zero_indices) > 0:
subarray[first_zero_indices] = np.nan

# Check for last sequence of zeros
last_zero_indices = np.where(np.cumsum(subarray[::-1] != 0) == 0)[0]
if len(last_zero_indices) > 0:
subarray[-last_zero_indices-1] = np.nan

arr[tuple(slicer)] = subarray # Replace modified subarray

return arr
Loading
Loading