diff --git a/SpatialDE/_internal/score_test.py b/SpatialDE/_internal/score_test.py index 3f1c260..a02d84c 100644 --- a/SpatialDE/_internal/score_test.py +++ b/SpatialDE/_internal/score_test.py @@ -273,3 +273,44 @@ def _grad_negative_negbinom_loglik(params, counts, sizefactors): + (counts - mus) / one_alpha_mu ) # d/d_alpha return -tf.convert_to_tensor((grad0, grad1)) + + +class NormalScoreTest(ScoreTest): + @dataclass + class NullModel(ScoreTest.NullModel): + mu: tf.Tensor + sigmasq: tf.Tensor + + def _fit_null(self, y: tf.Tensor) -> NullModel: + return self.NullModel(tf.reduce_mean(y), tf.reduce_variance(y)) + + def _test( + self, y: tf.Tensor, nullmodel: NullModel + ) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor, tf.Tensor]: + return self._do_test( + self._K, + to_default_float(y), + to_default_float(nullmodel.sigmasq), + to_default_float(nullmodel.mu), + ) + + @staticmethod + @tf.function(experimental_compile=True) + def _do_test( + K: tf.Tensor, rawy: tf.Tensor, sigmasq: tf.Tensor, mu: tf.Tensor + ) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor, tf.Tensor]: + W = 1 / sigmasq # W^-1 + stat = 0.5 * tf.reduce_sum( + (rawy - mu) * W * tf.tensordot(K, W * (rawy - mu), axes=(-1, -1)), axis=-1 + ) + + P = tf.linalg.diag(W) - W[:, tf.newaxis] * W[tf.newaxis, :] / tf.reduce_sum(W) + PK = W[:, tf.newaxis] * K - W[:, tf.newaxis] * ((W[tf.newaxis, :] @ K) / tf.reduce_sum(W)) + trace_PK = tf.linalg.trace(PK) + e_tilde = 0.5 * trace_PK + I_tau_tau = 0.5 * tf.reduce_sum(PK * PK, axis=(-2, -1)) + I_tau_theta = 0.5 * tf.reduce_sum(PK * P, axis=(-2, -1)) + I_theta_theta = 0.5 * tf.reduce_sum(tf.square(P), axis=(-2, -1)) + I_tau_tau_tilde = I_tau_tau - tf.square(I_tau_theta) / I_theta_theta + + return stat, e_tilde, I_tau_tau_tilde diff --git a/SpatialDE/de_test.py b/SpatialDE/de_test.py index 0fd9fb1..92e5004 100644 --- a/SpatialDE/de_test.py +++ b/SpatialDE/de_test.py @@ -2,7 +2,7 @@ from time import time import warnings from itertools import zip_longest -from typing import Optional, Dict, Tuple, Union, List +from typing import Optional, Dict, Tuple, Union, List, Literal import numpy as np import pandas as pd @@ -16,6 +16,7 @@ from ._internal.util import bh_adjust, calc_sizefactors, default_kernel_space, kspace_walk from ._internal.score_test import ( NegativeBinomialScoreTest, + NormalScoreTest, combine_pvalues, ) from ._internal.tf_dataset import AnnDataDataset @@ -63,6 +64,7 @@ def test( kernel_space: Optional[Dict[str, Union[float, List[float]]]] = None, sizefactors: Optional[np.ndarray] = None, stack_kernels: Optional[bool] = None, + obs_dist: Literal["NegativeBinomial", "Normal"] = "NegativeBinomial", use_cache: bool = True, ) -> Tuple[pd.DataFrame, Union[pd.DataFrame, None]]: """ @@ -94,6 +96,7 @@ def test( the kernel matrices. This leads to increased memory consumption, but will drastically improve runtime on GPUs for smaller data sets. Defaults to ``True`` for datasets with less than 2000 observations and ``False`` otherwise. + obs_dist: Distribution of the observations. If set as "Normal", model the regression to have Gaussian mean field error with identity link function. use_cache: Whether to use a pre-computed distance matrix for all kernels instead of computing the distance matrix anew for each kernel. Increases memory consumption, but is somewhat faster. @@ -111,7 +114,6 @@ def test( sizefactors = calc_sizefactors(adata) if kernel_space is None: kernel_space = default_kernel_space(dcache) - individual_results = None if omnibus else [] if stack_kernels is None and adata.n_obs <= 2000 or stack_kernels or omnibus: kernels = [] @@ -119,11 +121,14 @@ def test( for k, name in kspace_walk(kernel_space, dcache): kernels.append(k) kernelnames.append(name) - test = NegativeBinomialScoreTest( - sizefactors, - omnibus, - kernels, - ) + if obs_dist == "NegativeBinomial": + test = NegativeBinomialScoreTest( + sizefactors, + omnibus, + kernels, + ) + else: + test = NormalScoreTest(omnibus, kernels) results = [] with tqdm(total=adata.n_vars) as pbar: