From 9a8dcc689354a2a1ebd499e92273308df1b72473 Mon Sep 17 00:00:00 2001 From: Clayton Barnes Date: Thu, 10 Mar 2022 18:40:53 -0500 Subject: [PATCH 1/3] Modules for multitesting Added modules for multiple hypothesis testing and stratified multiple hypothesis testing --- permute/multitest_core.py | 604 ++++++++++++++++++++++++++++++++ permute/multitest_stratified.py | 450 ++++++++++++++++++++++++ 2 files changed, 1054 insertions(+) create mode 100644 permute/multitest_core.py create mode 100644 permute/multitest_stratified.py diff --git a/permute/multitest_core.py b/permute/multitest_core.py new file mode 100644 index 0000000..cde7d87 --- /dev/null +++ b/permute/multitest_core.py @@ -0,0 +1,604 @@ +# -*- coding: utf-8 -*- +""" +Created on Tue Mar 8 10:38:11 2022 + +@author: Clayton +""" + + +import numpy as np +from scipy.stats import ttest_ind, ttest_1samp + +from .utils import get_prng, permute + + + +""" TODO for multi testing core package + +two_sample_conf_int once it is finalized + +""" + +def multitest_corr(x, y, alternative='greater', reps=10**4, seed=None, plus1=True): + r""" + Simulate permutation p-value for multiple Pearson correlation coefficients + + Parameters + ---------- + x : array-like with shape (observations,tests) + y : array-like with shape (observations,tests) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + tuple + Returns test statistic, p-value, simulated distribution + """ + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + num_tests = x.shape[1] + prng = get_prng(seed) + corr_mat_mask = np.zeros((2*num_tests,2*num_tests),dtype=bool) + corr_mat_mask[x.shape[1]+np.arange(num_tests),np.arange(num_tests)] = True + tst = np.corrcoef(x, y,rowvar=False)[corr_mat_mask] + sims = [np.corrcoef(permute(x, prng), y,rowvar=False)[corr_mat_mask] for i in range(reps)] + left_pv = (np.sum(sims <= tst,axis=0)+plus1) / (reps+plus1) + right_pv = (np.sum(sims >= tst,axis=0)+plus1) / (reps+plus1) + if alternative == 'greater': + pvalue = right_pv + elif alternative == 'less': + pvalue = left_pv + elif alternative == 'two-sided': + pvalue = np.min([1*np.ones(num_tests), 2 * np.min([left_pv, right_pv],axis=0)],axis=0) + return tst, pvalue, sims + +def multitest_spearman_corr(x, y, alternative='greater', reps=10**4, seed=None, plus1=True): + r""" + Simulate permutation p-value for multiple Spearman correlation coefficients + + Parameters + ---------- + x : array-like with shape (observations,tests) + y : array-like with shape (observations,tests) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + tuple + Returns test statistic, p-value, simulated distribution + """ + + xnew = np.argsort(x,0)+1 + ynew = np.argsort(y,0)+1 + return multitest_corr(xnew, ynew, alternative=alternative, reps=reps, seed=seed) + + + +def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative='greater', + reps=10**5, keep_dist=False, seed=None, plus1=True, max_correct=False): + r""" + Main workhorse function for two_sample and two_sample_shift + + Parameters + ---------- + potential_outcomes_all : array-like + 3D array [observations,tests,conditions] of multiple potential + outcomes under treatment [:,:,0] and control [:,:,1]. + To be passed in from potential_outcomes + nx : int + Size of the treatment group x + reps : int + number of repetitions + tst_stat: function + The test statistic + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + keep_dist : bool + flag for whether to store and return the array of values + of the test statistic. Default is False. + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + max_correct : bool + flag for whether to perform max statistic multiple testing + correction. Builds the null distribution from the most extreme value + across tests for each iteration of the permutation. Default is False. + + Returns + ------- + float + the estimated p-value + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + prng = get_prng(seed) + num_tests = potential_outcomes_all.shape[1] + rr = list(range(potential_outcomes_all.shape[0])) # create indexing vect? + tst = tst_stat(potential_outcomes_all[:nx,:,0], # get tst_stat of each paired outcome (difference control vs test condition) + potential_outcomes_all[nx:,:, 1]) + + thePvalue = { + 'greater': lambda pUp, pDn: pUp+plus1/(reps+plus1), + 'less': lambda pUp, pDn: pDn+plus1/(reps+plus1), + 'two-sided': lambda pUp, pDn: 2 * np.min(np.stack([0.5*np.ones(num_tests), \ + pUp+plus1/(reps+plus1), \ + pDn+plus1/(reps+plus1)],1),1) + } + + if keep_dist: + if max_correct: + dist = np.empty(reps) + for i in range(reps): + prng.shuffle(rr) + pp = np.take(potential_outcomes_all, rr, axis=0) + curr_tst = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) + dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + pUp = np.empty(num_tests) + pDn = np.empty(num_tests) + for i in range(num_tests): + pUp[i] = np.sum(dist >= tst[i])/(reps+plus1) + pDn[i] = np.sum(dist <= tst[i])/(reps+plus1) + return thePvalue[alternative](pUp, pDn), dist + else: + dist = np.empty((reps,num_tests)) + for i in range(reps): + prng.shuffle(rr) + pp = np.take(potential_outcomes_all, rr, axis=0) + dist[i,:] = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) + pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) + pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) + return thePvalue[alternative](pUp, pDn), dist + else: + hitsUp = np.zeros(num_tests) + hitsDn = np.zeros(num_tests) + if max_correct: + for i in range(reps): + prng.shuffle(rr) + pp = np.take(potential_outcomes_all, rr, axis=0) + curr_tst = tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) + curr_max = max(curr_tst.min(), curr_tst.max(), key=abs) + hitsUp += curr_max >= tst + hitsDn += curr_max <= tst + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) + return thePvalue[alternative](pUp, pDn) + else: + for i in range(reps): + prng.shuffle(rr) + pp = np.take(potential_outcomes_all, rr, axis=0) + hitsUp += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) >= tst + hitsDn += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) <= tst + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) + return thePvalue[alternative](pUp, pDn) + + + +def multitest_one_sample(x, y=None, reps=10**5, stat='mean', alternative="greater", + keep_dist=False, seed=None, plus1=True,max_correct=False): + r""" + One-sided or two-sided, one-sample permutation test for the mean, + with p-value estimated by simulated random sampling with + reps replications. + + Alternatively, a permutation test for equality of means of two paired + samples. + + Tests the hypothesis that x is distributed symmetrically symmetric about 0 + (or x and y have the same center) against the alternative that x comes from + a population with mean + + (a) greater than 0 (greater than that of the population from which y comes), + if side = 'greater' + (b) less than 0 (less than that of the population from which y comes), + if side = 'less' + (c) different from 0 (different from that of the population from which y comes), + if side = 'two-sided' + + If ``keep_dist``, return the distribution of values of the test statistic; + otherwise, return only the number of permutations for which the value of + the test statistic and p-value. + + Parameters + ---------- + x : array-like + Sample 1 with shape (observations,tests) + y : array-like + Sample 2 with shape (observations,tests). Must preserve the order of pairs with x. + If None, x is taken to be the one sample. + reps : int + number of repetitions + stat : {'mean', 't'} + The test statistic. The statistic is computed based on either z = x or + z = x - y, if y is specified. + + (a) If stat == 'mean', the test statistic is mean(z). + (b) If stat == 't', the test statistic is the t-statistic-- + but the p-value is still estimated by the randomization, + approximating the permutation distribution. + (c) If stat is a function (a callable object), the test statistic is + that function. The function should take a permutation of the + data and compute the test function from it. For instance, if the + test statistic is the maximum absolute value, $\max_i |z_i|$, + the test statistic could be written: + + f = lambda u: np.max(abs(u)) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + keep_dist : bool + flag for whether to store and return the array of values + of the irr test statistic + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the test statistic + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + prng = get_prng(seed) + if y is None: + z = x + elif x.shape != y.shape: + raise ValueError('x and y must have the same shape') + else: + z = np.array(x) - np.array(y) + num_tests = z.shape[1] + thePvalue = { + 'greater': lambda pUp, pDn: pUp+plus1/(reps+plus1), + 'less': lambda pUp, pDn: pDn+plus1/(reps+plus1), + 'two-sided': lambda pUp, pDn: 2 * np.min(np.stack([0.5*np.ones(num_tests), \ + pUp+plus1/(reps+plus1), \ + pDn+plus1/(reps+plus1)],1),axis=1) + } + stats = { + 'mean': lambda u: np.mean(u,axis=0), + 't': lambda u: ttest_1samp(u, 0, axis=0)[0] + } + if callable(stat): + tst_fun = stat + else: + tst_fun = stats[stat] + tst = tst_fun(z) + if keep_dist: + if max_correct: + dist = np.empty(reps) + for i in range(reps): + curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + pUp = np.empty(num_tests) + pDn = np.empty(num_tests) + for i in range(num_tests): + pUp[i] = np.sum(dist >= tst[i])/(reps+plus1) + pDn[i] = np.sum(dist <= tst[i])/(reps+plus1) + return thePvalue[alternative](pUp, pDn), tst, dist + else: + dist = np.empty((reps,num_tests)) + for i in range(reps): + dist[i,:] = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) + pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) + return thePvalue[alternative](pUp, pDn), tst + else: + hitsUp = np.zeros(num_tests) + hitsDn = np.zeros(num_tests) + if max_correct: + for i in range(reps): + curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + curr_max = max(curr_tst.min(), curr_tst.max(), key=abs) + hitsUp += curr_max >= tst + hitsDn += curr_max <= tst + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) + return thePvalue[alternative](pUp, pDn), tst, dist + else: + for i in range(reps): + curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + hitsUp += curr_tst >= tst + hitsDn += curr_tst <= tst + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) + return thePvalue[alternative](pUp, pDn), tst + + +def multitest_two_sample(x, y, reps=10**5, stat='mean', alternative="greater", + keep_dist=False, seed=None, plus1=True, max_correct=False): + r""" + One-sided or two-sided, two-sample permutation multi-test for equality of + two means, with p-value estimated by simulated random sampling with + reps replications. + + Tests the hypothesis that x and y are a random partition of x,y + against the alternative that x comes from a population with mean + + (a) greater than that of the population from which y comes, + if side = 'greater' + (b) less than that of the population from which y comes, + if side = 'less' + (c) different from that of the population from which y comes, + if side = 'two-sided' + + If ``keep_dist``, return the distribution of values of the test statistic; + otherwise, return only the number of permutations for which the value of + the test statistic and p-value. + + Parameters + ---------- + x : array-like + Sample 1 with shape (observations,tests) + y : array-like + Sample 2 with shape (observations,tests) + reps : int + number of repetitions + stat : {'mean', 't'} + The test statistic. + + (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) + (equivalently, sum(x), since those are monotonically related) + (b) If stat == 't', the test statistic is the two-sample t-statistic-- + but the p-value is still estimated by the randomization, + approximating the permutation distribution. + The t-statistic is computed using scipy.stats.ttest_ind + (c) If stat is a function (a callable object), the test statistic is + that function. The function should take two arguments: + given a permutation of the pooled data, the first argument is the + "new" x and the second argument is the "new" y. + For instance, if the test statistic is the Kolmogorov-Smirnov distance + between the empirical distributions of the two samples, + $\max_t |F_x(t) - F_y(t)|$, the test statistic could be written: + + f = lambda u, v: np.max( \ + [abs(sum(u<=val)/len(u)-sum(v<=val)/len(v)) for val in np.concatenate([u, v])]\ + ) + + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + keep_dist : bool + flag for whether to store and return the array of values + of the irr test statistic + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the test statistic + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + # Set up potential outcomes; under the null, all units are exchangeable + pot_out_all = np.stack( + [np.concatenate([x, y]), np.concatenate([x, y])],2) + + + # If stat is callable, use it as the test function. Otherwise, look in the + # dictionary + stats = { + 'mean': lambda u, v: np.mean(u,axis=0) - np.mean(v,axis=0), + 't': lambda u, v: ttest_ind(u, v, axis=0, equal_var=True)[0] + } + + if callable(stat): + tst_fun = stat + else: + tst_fun = stats[stat] + + nx = x.shape[0] + observed_tst = tst_fun(pot_out_all[:nx, :, 0], pot_out_all[nx:, :, 1]) + + res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, + reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1, max_correct=max_correct) + + if keep_dist: + return res[0], observed_tst, res[1] + else: + return res, observed_tst + + +def multitest_potential_outcomes(x, y, f, finverse): + """ + Given observations $x$ under treatment and $y$ under control conditions, + returns the potential outcomes for units under their unobserved condition + under the hypothesis that $x_i = f(y_i)$ for all units. + + Parameters + ---------- + x : array-like + Outcomes under treatment + y : array-like + Outcomes under control + f : function + An invertible function + finverse : function + The inverse function to f. + + Returns + ------- + potential_outcomes : 2D array + The first column contains all potential outcomes under the treatment, + the second column contains all potential outcomes under the control. + """ + + tester = np.array(range(5)) + 1 + assert np.allclose(finverse(f(tester)), + tester), "f and finverse aren't inverses" + assert np.allclose(f(finverse(tester)), + tester), "f and finverse aren't inverses" + + pot_treat = np.concatenate([x, f(y)]) + pot_ctrl = np.concatenate([finverse(x), y]) + + return np.stack([pot_treat, pot_ctrl],2) + + +def multitest_two_sample_shift(x, y, reps=10**5, stat='mean', alternative="greater", + keep_dist=False, seed=None, shift=None, plus1=True): + r""" + One-sided or two-sided, two-sample permutation multi-test for equality of + two means, with p-value estimated by simulated random sampling with + reps replications. + + Tests the hypothesis that x and y are a random partition of x,y + against the alternative that x comes from a population with mean + + (a) greater than that of the population from which y comes, + if side = 'greater' + (b) less than that of the population from which y comes, + if side = 'less' + (c) different from that of the population from which y comes, + if side = 'two-sided' + + If ``keep_dist``, return the distribution of values of the test statistic; + otherwise, return only the number of permutations for which the value of + the test statistic and p-value. + + Parameters + ---------- + x : array-like + Sample 1 + y : array-like + Sample 2 + reps : int + number of repetitions + stat : {'mean', 't'} + The test statistic. + + (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) + (equivalently, sum(x), since those are monotonically related) + (b) If stat == 't', the test statistic is the two-sample t-statistic-- + but the p-value is still estimated by the randomization, + approximating the permutation distribution. + The t-statistic is computed using scipy.stats.ttest_ind + (c) If stat is a function (a callable object), the test statistic is + that function. The function should take two arguments: + given a permutation of the pooled data, the first argument is the + "new" x and the second argument is the "new" y. + For instance, if the test statistic is the Kolmogorov-Smirnov distance + between the empirical distributions of the two samples, + $\max_t |F_x(t) - F_y(t)|$, the test statistic could be written: + + f = lambda u, v: np.max( \ + [abs(sum(u<=val)/len(u)-sum(v<=val)/len(v)) for val in np.concatenate([u, v])]\ + ) + + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + keep_dist : bool + flag for whether to store and return the array of values + of the irr test statistic + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + shift : float + The relationship between x and y under the null hypothesis. + + (a) A constant scalar shift in the distribution of y. That is, x is equal + in distribution to y + shift. + (b) A tuple containing the function and its inverse $(f, f^{-1})$, so + $x_i = f(y_i)$ and $y_i = f^{-1}(x_i)$ + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + + Returns + ------- + float + the estimated p-value + float + the test statistic + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + # Set up potential outcomes according to shift + if isinstance(shift, float) or isinstance(shift, int): + # Potential outcomes for all units under treatment + pot_outx = np.concatenate([x, y + shift]) + # Potential outcomes for all units under control + pot_outy = np.concatenate([x - shift, y]) + pot_out_all = np.stack([pot_outx, pot_outy],2) + elif isinstance(shift, tuple): + assert (callable(shift[0])), "Supply f and finverse in shift tuple" + assert (callable(shift[1])), "Supply f and finverse in shift tuple" + pot_out_all = multitest_potential_outcomes(x, y, shift[0], shift[1]) + else: + raise ValueError("Bad input for shift") + # If stat is callable, use it as the test function. Otherwise, look in the + # dictionary + stats = { + 'mean': lambda u, v: np.mean(u,axis=0) - np.mean(v,axis=0), + 't': lambda u, v: ttest_ind(u, v, axis=0, equal_var=True)[0] + } + if callable(stat): + tst_fun = stat + else: + tst_fun = stats[stat] + nx = x.shape[0] + observed_tst = tst_fun(pot_out_all[:nx,:, 0], pot_out_all[nx:,:, 1]) + res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, + reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1) + if keep_dist: + return res[0], observed_tst, res[1] + else: + return res, observed_tst + + +""" start multitest version of 'stratified' """ + + diff --git a/permute/multitest_stratified.py b/permute/multitest_stratified.py new file mode 100644 index 0000000..d9d337f --- /dev/null +++ b/permute/multitest_stratified.py @@ -0,0 +1,450 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Mar 9 15:31:17 2022 + +@author: Clayton +""" + +import numpy as np +from scipy.stats import ttest_ind + +from .utils import get_prng, permute_within_groups + +def multitest_stratified_corrcoef(x, y, group): + r""" + Calculates sum of Spearman correlations between x and y, + computed separately in each group. + + Parameters + ---------- + x : array-like + Variable 1 + y : array-like + Variable 2, of the same length as x + group : array-like + Group memberships, of the same length as x + + Returns + ------- + float + The sum of Spearman correlations + """ + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + num_tests = x.shape[1] + corr_mat_mask = np.zeros((2*num_tests,2*num_tests),dtype=bool) + corr_mat_mask[x.shape[1]+np.arange(num_tests),np.arange(num_tests)] = True + tst = np.zeros(num_tests) + for g in np.unique(group): + gg = group == g + tst += np.corrcoef(x[gg,:], y[gg,:],rowvar=False)[corr_mat_mask] + return tst + + + +def multitest_stratified_sim_corr(x, y, group, reps=10**4, alternative='greater', seed=None, plus1=True, max_correct=False): + r""" + Simulate permutation p-value of stratified Spearman correlation test. + + Parameters + ---------- + x : array-like + Variable 1 + y : array-like + Variable 2, of the same length as x + group : array-like + Group memberships, of the same length as x + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + Number of repetitions + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator. + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + max_correct : bool + flag for whether to perform max statistic multiple testing + correction. Builds the null distribution from the most extreme value + across tests for each iteration of the permutation. Default is False. + + Returns + ------- + float + the estimated p-value + float + the observed test statistic + list + the null distribution + """ + if x.shape != y.shape: + raise ValueError('x and y must have the same shape') + num_tests = x.shape[1] + prng = get_prng(seed) + x = x.astype(float) + y = y.astype(float) + tst = multitest_stratified_corrcoef(x, y, group) + if max_correct: + dist = np.empty(reps) + for i in range(reps): + curr_tst = multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) + dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + right_pv = np.empty(num_tests) + for i in range(num_tests): + right_pv[i] = np.sum(dist >= tst[i]) / (reps+plus1) + else: + dist = [multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) + for i in range(reps)] + right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) + thePvalue = { + 'greater': lambda p: p + plus1/(reps+plus1), + 'less': lambda p: 1 - (p + plus1/(reps+plus1)), + 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), + 1 - (p + plus1/(reps+plus1))],axis=0) + } + return thePvalue[alternative](right_pv), tst, dist + + +def multitest_stratified_permutationtest_mean(group, condition, response, + groups=None, conditions=None): + r""" + Calculates variability in sample means between treatment conditions, + within groups. + + If there are two treatment conditions, the test statistic is the + difference in means, aggregated across groups. + If there are more than two treatment conditions, the test statistic + is the standard deviation of the means, aggregated across groups. + + Parameters + ---------- + group : array-like + Group memberships + condition : array-like + Treatment conditions, of the same length as group + response : array-like + Responses, of the same length as group. Has shape [observations,tests].. + groups : array-like + Group labels. By default, it is the unique values of group + conditions : array-like + Condition labels. By default, it is the unique values of condition + + + Returns + ------- + tst : float + The observed test statistic + """ + num_tests = response.shape[1] + if groups is None: + groups = np.unique(group) + if conditions is None: + conditions = np.unique(condition) + tst = np.zeros(num_tests) + if len(groups) < 2: + raise ValueError('Number of groups must be at least 2.') + elif len(groups) == 2: + stat = lambda u: np.fabs(u[0] - u[1]) + for g in groups: + gg = group == g + x = [gg & (condition == c) for c in conditions] + tst += stat([response[x[j],:].mean(axis=0) for j in range(len(x))]) + elif len(groups) > 2: + for g in groups: + gg = group == g + x = [gg & (condition == c) for c in conditions] + tst += np.std([response[x[j],:].mean(axis=0) for j in range(len(x))],0) + return tst + + +def multitest_stratified_permutationtest( + group, + condition, + response, + alternative='greater', + reps=10**5, + testStatistic='mean', + seed=None, + plus1=True, + max_correct=False): + r""" + Stratified permutation test based on differences in means. + + The test statistic is + + .. math:: \sum_{g \in \text{groups}} [ + f(mean(\text{response for cases in group $g$ + assigned to each condition}))]. + + The function f is the difference if there are two conditions, and + the standard deviation if there are more than two conditions. + + There should be at least one group and at least two conditions. + Under the null hypothesis, all assignments to the two conditions that + preserve the number of cases assigned to the conditions are equally + likely. + + Groups in which all cases are assigned to the same condition are + skipped; they do not contribute to the p-value since all randomizations + give the same contribution to the difference in means. + + Parameters + ---------- + group : array-like + Group memberships + condition : array-like + Treatment conditions, of the same length as group + response : array-like + Responses, of the same length as group + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + Number of repetitions + testStatistic : function + Function to compute test statistic. By default, + stratified_permutationtest_mean + The test statistic. Either a string or function. + + (a) If stat == 'mean', the test statistic is + stratified_permutationtest_mean (default). + (b) If stat is a function (a callable object), the test statistic is + that function. The function should take a permutation of the + data and compute the test function from it. For instance, if the + test statistic is the maximum absolute value, $\max_i |z_i|$, + the test statistic could be written: + + f = lambda u: np.max(abs(u)) + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + max_correct : bool + flag for whether to perform max statistic multiple testing + correction. Builds the null distribution from the most extreme value + across tests for each iteration of the permutation. Default is False. + + Returns + ------- + float + the estimated p-value + float + the observed test statistic + list + the null distribution + """ + prng = get_prng(seed) + num_tests = response.shape[1] + groups = np.unique(group) + conditions = np.unique(condition) + stats = { + 'mean': lambda u: multitest_stratified_permutationtest_mean( + group, + u, + response, + groups, + conditions)} + if callable(testStatistic): + tst_fun = testStatistic + else: + tst_fun = stats[testStatistic] + thePvalue = { + 'greater': lambda p: p + plus1/(reps+plus1), + 'less': lambda p: 1 - (p + plus1/(reps+plus1)), + 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), + 1 - (p + plus1/(reps+plus1))],axis=0) + } + + if len(conditions) < 2: + return 1.0, np.nan, None + else: + tst = tst_fun(condition) + if max_correct: + dist = np.zeros(reps) + for i in range(int(reps)): + curr_tst = tst_fun(permute_within_groups(condition, group, prng)) + dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + right_pv = np.empty(num_tests) + for i in range(num_tests): + right_pv[i] = np.sum(dist >= tst[i])/(reps+plus1) + return thePvalue[alternative](right_pv), tst, dist + else: + dist = np.zeros((reps,num_tests)) + for i in range(int(reps)): + dist[i,:] = tst_fun(permute_within_groups(condition, group, prng)) + + right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) + return thePvalue[alternative](right_pv), tst, dist + + +def multitest_stratified_two_sample( + group, + condition, + response, + stat='mean', + alternative="greater", + reps=10**5, + keep_dist=False, + seed=None, + plus1=True, + max_correct=False): + r""" + One-sided or two-sided, two-sample permutation test for equality of + two means, with p-value estimated by simulated random sampling with + reps replications. + + Tests the hypothesis that x and y are a random partition of x,y + against the alternative that x comes from a population with mean + + (a) greater than that of the population from which y comes, + if side = 'greater' + (b) less than that of the population from which y comes, + if side = 'less' + (c) different from that of the population from which y comes, + if side = 'two-sided' + + Permutations are carried out within the given groups. Under the null + hypothesis, observations within each group are exchangeable. + + If ``keep_dist``, return the distribution of values of the test statistic; + otherwise, return only the number of permutations for which the value of + the test statistic and p-value. + + Parameters + ---------- + group : array-like + Group memberships + condition : array-like + Treatment conditions, of the same length as group + response : array-like + Responses, of the same length as group + stat : {'mean', 't'} + The test statistic. + + (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) + (equivalently, sum(x), since those are monotonically related), + omitting NaNs, which therefore can be used to code non-responders + (b) If stat == 't', the test statistic is the two-sample t-statistic-- + but the p-value is still estimated by the randomization, + approximating the permutation distribution. + The t-statistic is computed using scipy.stats.ttest_ind + (c) If stat == 'mean_within_strata', the test statistic is the + difference in means within each stratum, added across strata. + (d) If stat is a function (a callable object), the test statistic is + that function. The function should take a permutation of the + pooled data and compute the test function from it. For instance, + if the test statistic is the Kolmogorov-Smirnov distance between + the empirical distributions of the two samples, + $max_t |F_x(t) - F_y(t)|$, the test statistic could be written: + + f = lambda u: np.max( \ + [abs(sum(u[:len(x)]<=v)/len(x)-sum(u[len(x):]<=v)/len(y)) + for v in u]\ + ) + alternative : {'greater', 'less', 'two-sided'} + The alternative hypothesis to test + reps : int + Number of permutations + keep_dist : bool + flag for whether to store and return the array of values + of the test statistic + seed : RandomState instance or {None, int, RandomState instance} + If None, the pseudorandom number generator is the RandomState + instance used by `np.random`; + If int, seed is the seed used by the random number generator; + If RandomState instance, seed is the pseudorandom number generator. + plus1 : bool + flag for whether to add 1 to the numerator and denominator of the + p-value based on the empirical permutation distribution. + Default is True. + max_correct : bool + flag for whether to perform max statistic multiple testing + correction. Builds the null distribution from the most extreme value + across tests for each iteration of the permutation. Default is False. + + Returns + ------- + float + the estimated p-value + float + the test statistic + list + The distribution of test statistics. + These values are only returned if `keep_dist` == True + """ + + prng = get_prng(seed) + num_tests = response.shape[1] + ordering = condition.argsort() + response = response[ordering] + condition = condition[ordering] + group = group[ordering] + ntreat = np.sum(condition == condition[0]) + + groups = np.unique(group) + conditions = np.unique(condition) + # If stat is callable, use it as the test function. Otherwise, look in the + # dictionary + stats = { + 'mean': lambda u: np.nanmean(u[:ntreat],axis=0) - np.nanmean(u[ntreat:],axis=0), + 't': lambda u: ttest_ind( + u[:len(x)][~np.isnan(u[:ntreat])], + u[len(x):][~np.isnan(u[ntreat:])], + axis=0,equal_var=True)[0], + 'mean_within_strata': lambda u: multitest_stratified_permutationtest_mean(group, + condition, + u, + groups, + conditions) + } + if callable(stat): + tst_fun = stat + else: + tst_fun = stats[stat] + + thePvalue = { + 'greater': lambda p: p + plus1/(reps+plus1), + 'less': lambda p: 1 - (p + plus1/(reps+plus1)), + 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), + 1 - (p + plus1/(reps+plus1))],axis=0) + } + observed_tst = tst_fun(response) + + if keep_dist: + if max_correct: + dist = np.empty(reps) + for i in range(reps): + curr_tst = tst_fun(permute_within_groups( + response, group, seed=prng)) + dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + hits = np.empty(num_tests) + for i in range(num_tests): + hits[i] = np.sum(dist >= observed_tst[i]) + return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist + else: + dist = np.empty((reps,num_tests)) + for i in range(reps): + dist[i,:] = tst_fun(permute_within_groups( + response, group, seed=prng)) + hits = np.sum(dist >= observed_tst,axis=0) + return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist + else: + if max_correct: + hits = np.sum([(tst_fun(permute_within_groups( + response, group, seed=prng)) >= observed_tst) + for i in range(reps)],axis=0) + return thePvalue[alternative](hits / (reps+plus1)), observed_tst + else: + hits = np.zeros(num_tests) + for i in range(reps): + curr_tst = tst_fun(permute_within_groups(response, group, seed=prng)) + hits += curr_tst >= observed_tst + return thePvalue[alternative](hits / (reps+plus1)), observed_tst From 292d1925f27f91f1971baac4b27d10731716be2d Mon Sep 17 00:00:00 2001 From: Clayton Barnes Date: Fri, 25 Mar 2022 15:05:17 -0400 Subject: [PATCH 2/3] Added comments and tests Added comments for clarity and tests for multitest modules. --- permute/multitest_core.py | 313 +++++++++++++-------- permute/multitest_stratified.py | 96 ++++++- permute/tests/test_multitest_core.py | 220 +++++++++++++++ permute/tests/test_multitest_stratified.py | 127 +++++++++ 4 files changed, 623 insertions(+), 133 deletions(-) create mode 100644 permute/tests/test_multitest_core.py create mode 100644 permute/tests/test_multitest_stratified.py diff --git a/permute/multitest_core.py b/permute/multitest_core.py index cde7d87..4a32fe7 100644 --- a/permute/multitest_core.py +++ b/permute/multitest_core.py @@ -45,16 +45,23 @@ def multitest_corr(x, y, alternative='greater', reps=10**4, seed=None, plus1=Tru tuple Returns test statistic, p-value, simulated distribution """ + # make sure the two samples have the same shape if x.shape != y.shape: raise ValueError('x and y must have the same shape') + # get number of hypotheses being tested num_tests = x.shape[1] prng = get_prng(seed) + # create mask to grab the corr values we care about from np.corrcoef function (majority of what it returns we aren't interested in) corr_mat_mask = np.zeros((2*num_tests,2*num_tests),dtype=bool) corr_mat_mask[x.shape[1]+np.arange(num_tests),np.arange(num_tests)] = True + # calculate correlations and grab the pairs we are interested in tst = np.corrcoef(x, y,rowvar=False)[corr_mat_mask] + # permute to get null distribution sims = [np.corrcoef(permute(x, prng), y,rowvar=False)[corr_mat_mask] for i in range(reps)] + # get percentiles of statistic left_pv = (np.sum(sims <= tst,axis=0)+plus1) / (reps+plus1) right_pv = (np.sum(sims >= tst,axis=0)+plus1) / (reps+plus1) + # assign pvalue based on hypothesis if alternative == 'greater': pvalue = right_pv elif alternative == 'less': @@ -89,7 +96,7 @@ def multitest_spearman_corr(x, y, alternative='greater', reps=10**4, seed=None, tuple Returns test statistic, p-value, simulated distribution """ - + # sort observations per each test xnew = np.argsort(x,0)+1 ynew = np.argsort(y,0)+1 return multitest_corr(xnew, ynew, alternative=alternative, reps=reps, seed=seed) @@ -141,11 +148,14 @@ def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative= These values are only returned if `keep_dist` == True """ prng = get_prng(seed) + # get number of hypotheses being tested num_tests = potential_outcomes_all.shape[1] - rr = list(range(potential_outcomes_all.shape[0])) # create indexing vect? - tst = tst_stat(potential_outcomes_all[:nx,:,0], # get tst_stat of each paired outcome (difference control vs test condition) + # create indexing vector + rr = list(range(potential_outcomes_all.shape[0])) + # get observed statistic + tst = tst_stat(potential_outcomes_all[:nx,:,0], potential_outcomes_all[nx:,:, 1]) - + # create dictionary to store functions for calculating p values thePvalue = { 'greater': lambda pUp, pDn: pUp+plus1/(reps+plus1), 'less': lambda pUp, pDn: pDn+plus1/(reps+plus1), @@ -153,15 +163,22 @@ def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative= pUp+plus1/(reps+plus1), \ pDn+plus1/(reps+plus1)],1),1) } - + # account for all combinations of keep_dist (keep distribution) + # and max_correct (create max distribution to correct for multiple + # hypothesis testing) if keep_dist: if max_correct: dist = np.empty(reps) for i in range(reps): + # permute indexing vector prng.shuffle(rr) + # grab shuffled values pp = np.take(potential_outcomes_all, rr, axis=0) + # calculate statistic curr_tst = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) + # grab the most extreme statistic observed across all tests dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + # calculate percentile pUp = np.empty(num_tests) pDn = np.empty(num_tests) for i in range(num_tests): @@ -171,32 +188,48 @@ def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative= else: dist = np.empty((reps,num_tests)) for i in range(reps): + # permute indexing vector prng.shuffle(rr) + # grab shuffled values pp = np.take(potential_outcomes_all, rr, axis=0) + # calculate statistic dist[i,:] = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) + # calculate percentile pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) return thePvalue[alternative](pUp, pDn), dist else: + # preallocate vectors to store number of times our observed statistic + # is greater than the permuted statistic(s). Keeps memory requirement low hitsUp = np.zeros(num_tests) hitsDn = np.zeros(num_tests) if max_correct: for i in range(reps): + # permute indexing vector prng.shuffle(rr) + # grab shuffled values pp = np.take(potential_outcomes_all, rr, axis=0) + # calculate statistic curr_tst = tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) + # grab the most extreme statistic observed across all tests curr_max = max(curr_tst.min(), curr_tst.max(), key=abs) + # count if observed statistic is larger or smaller than permuted hitsUp += curr_max >= tst hitsDn += curr_max <= tst + # calculate percentile pUp = hitsUp/(reps+plus1) pDn = hitsDn/(reps+plus1) return thePvalue[alternative](pUp, pDn) else: for i in range(reps): + # permute indexing vector prng.shuffle(rr) + # grab shuffled values pp = np.take(potential_outcomes_all, rr, axis=0) + # count if observed statistic is larger or smaller than permuted hitsUp += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) >= tst hitsDn += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) <= tst + # calculate percentile pUp = hitsUp/(reps+plus1) pDn = hitsDn/(reps+plus1) return thePvalue[alternative](pUp, pDn) @@ -278,13 +311,18 @@ def multitest_one_sample(x, y=None, reps=10**5, stat='mean', alternative="greate These values are only returned if `keep_dist` == True """ prng = get_prng(seed) + # since one sample test, if we get both x and y turn into one sample + # (and ensure they have the same shape) if y is None: z = x + # make sure elif x.shape != y.shape: raise ValueError('x and y must have the same shape') else: z = np.array(x) - np.array(y) + # get number of hypotheses being tested num_tests = z.shape[1] + # create dictionary to store functions for calculating p values thePvalue = { 'greater': lambda pUp, pDn: pUp+plus1/(reps+plus1), 'less': lambda pUp, pDn: pDn+plus1/(reps+plus1), @@ -292,21 +330,32 @@ def multitest_one_sample(x, y=None, reps=10**5, stat='mean', alternative="greate pUp+plus1/(reps+plus1), \ pDn+plus1/(reps+plus1)],1),axis=1) } + # create dictionary to store common statistics ensuring correct axis stats = { 'mean': lambda u: np.mean(u,axis=0), 't': lambda u: ttest_1samp(u, 0, axis=0)[0] } + # if we were given a custom statistic function, use that if callable(stat): tst_fun = stat else: tst_fun = stats[stat] + # calculate observed statistic tst = tst_fun(z) + # account for all combinations of keep_dist (keep distribution) + # and max_correct (create max distribution to correct for multiple + # hypothesis testing) if keep_dist: if max_correct: + # preallocate space to build null distribution + # (1D since only taking extreme values) dist = np.empty(reps) for i in range(reps): + # calculate statistic of current permutation curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # grab the most extreme value across tests dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + # calculate percentile for each test pUp = np.empty(num_tests) pDn = np.empty(num_tests) for i in range(num_tests): @@ -314,29 +363,42 @@ def multitest_one_sample(x, y=None, reps=10**5, stat='mean', alternative="greate pDn[i] = np.sum(dist <= tst[i])/(reps+plus1) return thePvalue[alternative](pUp, pDn), tst, dist else: + # preallocate space to build null distribution + # (2D since each test will have its own distribution) dist = np.empty((reps,num_tests)) for i in range(reps): + # calculate statistic of current permutation dist[i,:] = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # calculate percentile for each test pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) - return thePvalue[alternative](pUp, pDn), tst + return thePvalue[alternative](pUp, pDn), tst, dist else: + # preallocate vectors to store number of times our observed statistic + # is greater than the permuted statistic(s). Keeps memory requirement low. hitsUp = np.zeros(num_tests) hitsDn = np.zeros(num_tests) if max_correct: for i in range(reps): + # calculate statistic for current permutation curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # grab the most extreme statistic across all tests curr_max = max(curr_tst.min(), curr_tst.max(), key=abs) + # iterate counters accordingly hitsUp += curr_max >= tst hitsDn += curr_max <= tst + # calculate percentiles pUp = hitsUp/(reps+plus1) pDn = hitsDn/(reps+plus1) - return thePvalue[alternative](pUp, pDn), tst, dist + return thePvalue[alternative](pUp, pDn), tst else: for i in range(reps): + # calculate statistic for current permutation curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # iterate counters accordingly hitsUp += curr_tst >= tst hitsDn += curr_tst <= tst + # calculate percentiles pUp = hitsUp/(reps+plus1) pDn = hitsDn/(reps+plus1) return thePvalue[alternative](pUp, pDn), tst @@ -417,6 +479,7 @@ def multitest_two_sample(x, y, reps=10**5, stat='mean', alternative="greater", The distribution of test statistics. These values are only returned if `keep_dist` == True """ + # ensure x and y have the same shape if x.shape != y.shape: raise ValueError('x and y must have the same shape') # Set up potential outcomes; under the null, all units are exchangeable @@ -435,13 +498,15 @@ def multitest_two_sample(x, y, reps=10**5, stat='mean', alternative="greater", tst_fun = stat else: tst_fun = stats[stat] - + + # get number of observations nx = x.shape[0] + # calculate observed statistic for all tests observed_tst = tst_fun(pot_out_all[:nx, :, 0], pot_out_all[nx:, :, 1]) - + # call main worker function res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1, max_correct=max_correct) - + # accomadate user request for returning distribution if keep_dist: return res[0], observed_tst, res[1] else: @@ -484,121 +549,125 @@ def multitest_potential_outcomes(x, y, f, finverse): return np.stack([pot_treat, pot_ctrl],2) -def multitest_two_sample_shift(x, y, reps=10**5, stat='mean', alternative="greater", - keep_dist=False, seed=None, shift=None, plus1=True): - r""" - One-sided or two-sided, two-sample permutation multi-test for equality of - two means, with p-value estimated by simulated random sampling with - reps replications. - - Tests the hypothesis that x and y are a random partition of x,y - against the alternative that x comes from a population with mean - - (a) greater than that of the population from which y comes, - if side = 'greater' - (b) less than that of the population from which y comes, - if side = 'less' - (c) different from that of the population from which y comes, - if side = 'two-sided' - - If ``keep_dist``, return the distribution of values of the test statistic; - otherwise, return only the number of permutations for which the value of - the test statistic and p-value. - - Parameters - ---------- - x : array-like - Sample 1 - y : array-like - Sample 2 - reps : int - number of repetitions - stat : {'mean', 't'} - The test statistic. - - (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) - (equivalently, sum(x), since those are monotonically related) - (b) If stat == 't', the test statistic is the two-sample t-statistic-- - but the p-value is still estimated by the randomization, - approximating the permutation distribution. - The t-statistic is computed using scipy.stats.ttest_ind - (c) If stat is a function (a callable object), the test statistic is - that function. The function should take two arguments: - given a permutation of the pooled data, the first argument is the - "new" x and the second argument is the "new" y. - For instance, if the test statistic is the Kolmogorov-Smirnov distance - between the empirical distributions of the two samples, - $\max_t |F_x(t) - F_y(t)|$, the test statistic could be written: - - f = lambda u, v: np.max( \ - [abs(sum(u<=val)/len(u)-sum(v<=val)/len(v)) for val in np.concatenate([u, v])]\ - ) +# def multitest_two_sample_shift(x, y, reps=10**5, stat='mean', alternative="greater", +# keep_dist=False, seed=None, shift=None, plus1=True): +# r""" +# One-sided or two-sided, two-sample permutation multi-test for equality of +# two means, with p-value estimated by simulated random sampling with +# reps replications. + +# Tests the hypothesis that x and y are a random partition of x,y +# against the alternative that x comes from a population with mean + +# (a) greater than that of the population from which y comes, +# if side = 'greater' +# (b) less than that of the population from which y comes, +# if side = 'less' +# (c) different from that of the population from which y comes, +# if side = 'two-sided' + +# If ``keep_dist``, return the distribution of values of the test statistic; +# otherwise, return only the number of permutations for which the value of +# the test statistic and p-value. + +# Parameters +# ---------- +# x : array-like +# Sample 1 +# y : array-like +# Sample 2 +# reps : int +# number of repetitions +# stat : {'mean', 't'} +# The test statistic. + +# (a) If stat == 'mean', the test statistic is (mean(x) - mean(y)) +# (equivalently, sum(x), since those are monotonically related) +# (b) If stat == 't', the test statistic is the two-sample t-statistic-- +# but the p-value is still estimated by the randomization, +# approximating the permutation distribution. +# The t-statistic is computed using scipy.stats.ttest_ind +# (c) If stat is a function (a callable object), the test statistic is +# that function. The function should take two arguments: +# given a permutation of the pooled data, the first argument is the +# "new" x and the second argument is the "new" y. +# For instance, if the test statistic is the Kolmogorov-Smirnov distance +# between the empirical distributions of the two samples, +# $\max_t |F_x(t) - F_y(t)|$, the test statistic could be written: + +# f = lambda u, v: np.max( \ +# [abs(sum(u<=val)/len(u)-sum(v<=val)/len(v)) for val in np.concatenate([u, v])]\ +# ) - alternative : {'greater', 'less', 'two-sided'} - The alternative hypothesis to test - keep_dist : bool - flag for whether to store and return the array of values - of the irr test statistic - seed : RandomState instance or {None, int, RandomState instance} - If None, the pseudorandom number generator is the RandomState - instance used by `np.random`; - If int, seed is the seed used by the random number generator; - If RandomState instance, seed is the pseudorandom number generator - shift : float - The relationship between x and y under the null hypothesis. - - (a) A constant scalar shift in the distribution of y. That is, x is equal - in distribution to y + shift. - (b) A tuple containing the function and its inverse $(f, f^{-1})$, so - $x_i = f(y_i)$ and $y_i = f^{-1}(x_i)$ - plus1 : bool - flag for whether to add 1 to the numerator and denominator of the - p-value based on the empirical permutation distribution. - Default is True. +# alternative : {'greater', 'less', 'two-sided'} +# The alternative hypothesis to test +# keep_dist : bool +# flag for whether to store and return the array of values +# of the irr test statistic +# seed : RandomState instance or {None, int, RandomState instance} +# If None, the pseudorandom number generator is the RandomState +# instance used by `np.random`; +# If int, seed is the seed used by the random number generator; +# If RandomState instance, seed is the pseudorandom number generator +# shift : float +# The relationship between x and y under the null hypothesis. + +# (a) A constant scalar shift in the distribution of y. That is, x is equal +# in distribution to y + shift. +# (b) A tuple containing the function and its inverse $(f, f^{-1})$, so +# $x_i = f(y_i)$ and $y_i = f^{-1}(x_i)$ +# plus1 : bool +# flag for whether to add 1 to the numerator and denominator of the +# p-value based on the empirical permutation distribution. +# Default is True. + +# Returns +# ------- +# float +# the estimated p-value +# float +# the test statistic +# list +# The distribution of test statistics. +# These values are only returned if `keep_dist` == True +# """ +# # Set up potential outcomes according to shift +# if isinstance(shift, float) or isinstance(shift, int): +# # Potential outcomes for all units under treatment +# pot_outx = np.concatenate([x, y + shift]) +# # Potential outcomes for all units under control +# pot_outy = np.concatenate([x - shift, y]) +# pot_out_all = np.stack([pot_outx, pot_outy],2) +# elif isinstance(shift, tuple): +# assert (callable(shift[0])), "Supply f and finverse in shift tuple" +# assert (callable(shift[1])), "Supply f and finverse in shift tuple" +# pot_out_all = multitest_potential_outcomes(x, y, shift[0], shift[1]) +# else: +# raise ValueError("Bad input for shift") +# # If stat is callable, use it as the test function. Otherwise, look in the +# # dictionary +# stats = { +# 'mean': lambda u, v: np.mean(u,axis=0) - np.mean(v,axis=0), +# 't': lambda u, v: ttest_ind(u, v, axis=0, equal_var=True)[0] +# } +# if callable(stat): +# tst_fun = stat +# else: +# tst_fun = stats[stat] +# # get number of observations +# nx = x.shape[0] +# # calculate observed statistics for all tests +# observed_tst = tst_fun(pot_out_all[:nx,:, 0], pot_out_all[nx:,:, 1]) +# # call main worker function +# res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, +# reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1) +# # accomadate user request for returning distribution +# if keep_dist: +# return res[0], observed_tst, res[1] +# else: +# return res, observed_tst - Returns - ------- - float - the estimated p-value - float - the test statistic - list - The distribution of test statistics. - These values are only returned if `keep_dist` == True - """ - # Set up potential outcomes according to shift - if isinstance(shift, float) or isinstance(shift, int): - # Potential outcomes for all units under treatment - pot_outx = np.concatenate([x, y + shift]) - # Potential outcomes for all units under control - pot_outy = np.concatenate([x - shift, y]) - pot_out_all = np.stack([pot_outx, pot_outy],2) - elif isinstance(shift, tuple): - assert (callable(shift[0])), "Supply f and finverse in shift tuple" - assert (callable(shift[1])), "Supply f and finverse in shift tuple" - pot_out_all = multitest_potential_outcomes(x, y, shift[0], shift[1]) - else: - raise ValueError("Bad input for shift") - # If stat is callable, use it as the test function. Otherwise, look in the - # dictionary - stats = { - 'mean': lambda u, v: np.mean(u,axis=0) - np.mean(v,axis=0), - 't': lambda u, v: ttest_ind(u, v, axis=0, equal_var=True)[0] - } - if callable(stat): - tst_fun = stat - else: - tst_fun = stats[stat] - nx = x.shape[0] - observed_tst = tst_fun(pot_out_all[:nx,:, 0], pot_out_all[nx:,:, 1]) - res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, - reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1) - if keep_dist: - return res[0], observed_tst, res[1] - else: - return res, observed_tst -""" start multitest version of 'stratified' """ diff --git a/permute/multitest_stratified.py b/permute/multitest_stratified.py index d9d337f..c163a95 100644 --- a/permute/multitest_stratified.py +++ b/permute/multitest_stratified.py @@ -29,14 +29,20 @@ def multitest_stratified_corrcoef(x, y, group): float The sum of Spearman correlations """ + # ensure x and y are the same shape (same number of observations and tests) if x.shape != y.shape: raise ValueError('x and y must have the same shape') + # get number of hypotheses tested num_tests = x.shape[1] + # create mask to grab correlations from corrcoeff we care about (don't care about all pairs) corr_mat_mask = np.zeros((2*num_tests,2*num_tests),dtype=bool) corr_mat_mask[x.shape[1]+np.arange(num_tests),np.arange(num_tests)] = True + # preallocate vector to store aggregate correlations for each test tst = np.zeros(num_tests) for g in np.unique(group): + # create mask for current group gg = group == g + # calculate and grab correlation coefficients for current group tst += np.corrcoef(x[gg,:], y[gg,:],rowvar=False)[corr_mat_mask] return tst @@ -81,25 +87,37 @@ def multitest_stratified_sim_corr(x, y, group, reps=10**4, alternative='greater' list the null distribution """ + # ensure x and y have the same shape (same number of observations and tests) if x.shape != y.shape: raise ValueError('x and y must have the same shape') + # get the number of hypotheses to test num_tests = x.shape[1] prng = get_prng(seed) x = x.astype(float) y = y.astype(float) + # calculate observed statistic tst = multitest_stratified_corrcoef(x, y, group) + # account for user wanting to perform max correction if max_correct: + # preallocate space to build null distribution + # (1D since going to take extreme value across tests) dist = np.empty(reps) for i in range(reps): + # calculate statistic of current permutation curr_tst = multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) + # grab the most extreme value across tests dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + # calculate the percentile for each test right_pv = np.empty(num_tests) for i in range(num_tests): right_pv[i] = np.sum(dist >= tst[i]) / (reps+plus1) else: + # calculate statistic on each permutation to build null distribution dist = [multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) for i in range(reps)] + # calculate percentile for each test right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) + # create dictionary to store p value calculations thePvalue = { 'greater': lambda p: p + plus1/(reps+plus1), 'less': lambda p: 1 - (p + plus1/(reps+plus1)), @@ -139,24 +157,38 @@ def multitest_stratified_permutationtest_mean(group, condition, response, tst : float The observed test statistic """ + # get number of hypotheses to test num_tests = response.shape[1] + # get the ID for each group if groups is None: groups = np.unique(group) + # get the ID for each condition if conditions is None: conditions = np.unique(condition) + # preallocate vector to store the aggregate statistic for each test tst = np.zeros(num_tests) + # check there are at least 2 groups if len(groups) < 2: raise ValueError('Number of groups must be at least 2.') - elif len(groups) == 2: - stat = lambda u: np.fabs(u[0] - u[1]) + # if 2 conditions, calculate mean. If more than 2, calculate std of outcomes + # TODO ensure this is intended behavior, in stratified.py this is done + # with the variable groups, but that doesn't seem right to me + elif len(conditions) == 2: + stat = lambda u: u[0] - u[1] for g in groups: + # create mask for current group gg = group == g + # create conjugate mask for group and condition x = [gg & (condition == c) for c in conditions] + # aggregate statistic for each group and condition tst += stat([response[x[j],:].mean(axis=0) for j in range(len(x))]) - elif len(groups) > 2: + elif len(conditions) > 2: for g in groups: + # create mask for current group gg = group == g + # create conjugate mask for group and condition x = [gg & (condition == c) for c in conditions] + # aggregate statistic for each group and condition tst += np.std([response[x[j],:].mean(axis=0) for j in range(len(x))],0) return tst @@ -242,9 +274,13 @@ def multitest_stratified_permutationtest( the null distribution """ prng = get_prng(seed) + # get the number of hypotheses to test num_tests = response.shape[1] + # get the group IDs groups = np.unique(group) + # get the condition IDs conditions = np.unique(condition) + # create a dictionary to store common statistic calculation stats = { 'mean': lambda u: multitest_stratified_permutationtest_mean( group, @@ -256,31 +292,43 @@ def multitest_stratified_permutationtest( tst_fun = testStatistic else: tst_fun = stats[testStatistic] + # create dictionary to store p values calculatoins thePvalue = { 'greater': lambda p: p + plus1/(reps+plus1), 'less': lambda p: 1 - (p + plus1/(reps+plus1)), 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), 1 - (p + plus1/(reps+plus1))],axis=0) } - + # if len(conditions) < 2: + # TODO would it be more appropriate to raise error? + # raise ValueError('Number of conditions must be at least 2.') return 1.0, np.nan, None else: + # calculate observed statistic tst = tst_fun(condition) if max_correct: + # preallocate vector to store null distribution + # (1D because going to take most extreme value across all tests) dist = np.zeros(reps) for i in range(int(reps)): + # calculate statistic for current permutation curr_tst = tst_fun(permute_within_groups(condition, group, prng)) + # grab the most extreme value across tests dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + # calculate percentile for each test right_pv = np.empty(num_tests) for i in range(num_tests): right_pv[i] = np.sum(dist >= tst[i])/(reps+plus1) return thePvalue[alternative](right_pv), tst, dist else: + # preallocate vector to store null distribution + # (2D because each test will have its own distribution) dist = np.zeros((reps,num_tests)) for i in range(int(reps)): + # calculate statistic for current permutation dist[i,:] = tst_fun(permute_within_groups(condition, group, prng)) - + # calculate percentile for each test right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) return thePvalue[alternative](right_pv), tst, dist @@ -382,17 +430,23 @@ def multitest_stratified_two_sample( """ prng = get_prng(seed) + # get number of hypotheses to test num_tests = response.shape[1] + # get indexing to sort by condition (not sure why this is necessary) ordering = condition.argsort() response = response[ordering] condition = condition[ordering] group = group[ordering] + # get number of samples that received condition with lowest ID + # TODO should we ensure each condition has the same number of samples? ntreat = np.sum(condition == condition[0]) + # get the IDs for each group and condition groups = np.unique(group) conditions = np.unique(condition) # If stat is callable, use it as the test function. Otherwise, look in the # dictionary + # TODO there is no x, not sure what desired behavior is here stats = { 'mean': lambda u: np.nanmean(u[:ntreat],axis=0) - np.nanmean(u[ntreat:],axis=0), 't': lambda u: ttest_ind( @@ -409,42 +463,62 @@ def multitest_stratified_two_sample( tst_fun = stat else: tst_fun = stats[stat] - + # create dictionary to store p value calculations thePvalue = { 'greater': lambda p: p + plus1/(reps+plus1), 'less': lambda p: 1 - (p + plus1/(reps+plus1)), 'two-sided': lambda p: 2 * np.min([p + plus1/(reps+plus1), 1 - (p + plus1/(reps+plus1))],axis=0) } + # get observed statistic observed_tst = tst_fun(response) - + # account for all combinations of keep_dist (keep distribution) + # and max_correct (create max distribution to correct for multiple + # hypothesis testing) if keep_dist: if max_correct: + # preallocate vector for null distribution + # (1D because going to take most extreme statistic across tests) dist = np.empty(reps) for i in range(reps): + # calculate statistic for current permutation curr_tst = tst_fun(permute_within_groups( response, group, seed=prng)) + # grab most extreme statistic across tests dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) + # calculate percentile for each test hits = np.empty(num_tests) for i in range(num_tests): hits[i] = np.sum(dist >= observed_tst[i]) - return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist + return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist else: + # preallocate vector for null distribution + # (2D because build null distribution for each test) dist = np.empty((reps,num_tests)) for i in range(reps): + # calculate statistic for current permutation dist[i,:] = tst_fun(permute_within_groups( response, group, seed=prng)) + # calculate percentile for each test hits = np.sum(dist >= observed_tst,axis=0) return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist else: if max_correct: - hits = np.sum([(tst_fun(permute_within_groups( - response, group, seed=prng)) >= observed_tst) - for i in range(reps)],axis=0) + # create vector to store number of times each hypothesis is less + # than the most extreme value across all tests per permutation + hits = np.zeros(num_tests) + for i in range(reps): + # calculate statistic of current permutation + curr_tst = tst_fun(permute_within_groups(response, group, seed=prng)) + # take most extreme value and compare with observed statistic + hits += max(curr_tst.min(), curr_tst.max(), key=abs) >= observed_tst return thePvalue[alternative](hits / (reps+plus1)), observed_tst else: + # create vector to store number of times each hypothesis is less + # than the corresponding statistic of the permuted values hits = np.zeros(num_tests) for i in range(reps): + # calculate current statistic curr_tst = tst_fun(permute_within_groups(response, group, seed=prng)) hits += curr_tst >= observed_tst return thePvalue[alternative](hits / (reps+plus1)), observed_tst diff --git a/permute/tests/test_multitest_core.py b/permute/tests/test_multitest_core.py new file mode 100644 index 0000000..830188b --- /dev/null +++ b/permute/tests/test_multitest_core.py @@ -0,0 +1,220 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Mar 21 12:23:07 2022 + +@author: Clayton +""" + +import pytest + +import numpy as np +from numpy.random import RandomState +from cryptorandom.cryptorandom import SHA256 + +from ..multitest_core import (multitest_corr, + multitest_spearman_corr, + multitest_two_sample, + multitest_two_sample_shift, + multitest_two_sample_conf_int, + multitest_one_sample) + + + +def multitest_test_corr(): + prng = SHA256(42) + num_tests = 2 + x = prng.randint(0, 5, size=(10,num_tests)) + y = x + res1 = multitest_corr(x, y, seed=prng) + res2 = multitest_corr(x, y) + assert len(res1) == 3 + assert len(res2) == 3 + np.testing.assert_almost_equal(res1[0], np.ones(num_tests), decimal=1) + np.testing.assert_almost_equal(res2[0], np.ones(num_tests), decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) + print("finished test 1 in test_corr()") + + y = prng.randint(0, 5, size=(10,num_tests)) + res1 = multitest_corr(x, y, alternative="less", seed=prng) + res2 = multitest_corr(x, y, alternative="less") + assert len(res1) == 3 + assert len(res2) == 3 + np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) + print("finished test 2 in test_corr()") + + res1 = multitest_corr(x, y, alternative="two-sided", seed=prng) + res2 = multitest_corr(x, y, alternative="greater") + assert len(res1) == 3 + assert len(res2) == 3 + np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1]*2, decimal=1) + print("finished test 3 in test_corr()") + + +def multitest_test_spearman_corr(): + prng = SHA256(42) + num_tests = 2 + x = (np.array([2, 4, 6, 8, 10])*np.ones((num_tests,5))).T + y = (np.array([1, 3, 5, 6, 9])*np.ones((num_tests,5))).T + xorder = np.array([[1, 2, 3, 4, 5] ,[1, 2, 3, 4, 5]]).T + res1 = multitest_corr(xorder, xorder, seed=prng) + print("finished test 1 in test_spearman_corr()") + + prng = SHA256(42) + res2 = multitest_spearman_corr(x, y, seed=prng) + np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) + np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) + np.testing.assert_array_equal(res1[2], res2[2]) + print("finished test 2 in test_spearman_corr()") + + +@pytest.mark.slow +def multitest_test_two_sample(): + prng = RandomState(42) + num_samples = 200 + num_tests = 2 + # Normal-normal, different means examples + x = prng.normal(1, size=(num_samples,num_tests)) + y = prng.normal(4, size=(num_samples,num_tests)) + expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-30,-30]]) # define expected probabilites + plus1 = (True,False) + max_correct = (True,False) + keep_dist = (True,False) + alternative = ('greater','less','two-sided') + stats = ('mean','t') # TODO add custom stat lambda here + num_cases = str(len(plus1)*len(max_correct)*len(keep_dist)*len(alternative)*len(stats)) + case_count = 1 + # go through all combinations of parameters + for p in plus1: + for m in max_correct: + for k in keep_dist: + for a in alternative: + for s in stats: + res = multitest_two_sample(x,y,reps=10**4,seed=42,plus1=p,max_correct=m,keep_dist=k,alternative=a,stat=s) + # check pvals + if a == 'greater': + np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals + elif a == 'less' or a =='two-sided': + np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals + # check observed statistic + if s == 'mean': + np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic + elif s == 't': + np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic + #check returned distribution + if k: + assert len(res) == 3 # if keep keep dist, expect to res to be length 3 + if m: + assert len(res[2].shape) == 1 # if max correct, should get 1D dist + else: + assert len(res[2].shape) == 2 # if not max correct, should get 2D dist + assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests + else: + assert len(res) == 2 + print("finished test " + str(case_count) + " of " + num_cases + " in test_two_sample()") + case_count += 1 + +@pytest.mark.slow +def multitest_test_one_sample(): + # same code as multitest_test_two_sample(), but switched tested function to one sample version + prng = RandomState(42) + num_samples = 200 + num_tests = 2 + # Normal-normal, different means examples + x = prng.normal(1, size=(num_samples,num_tests)) + y = prng.normal(4, size=(num_samples,num_tests)) + expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-28,-28]]) # define expected probabilites for different alternative hypotheses and observed statistics (not sure why observed statistic is different for one and two sample tests) + plus1 = (True,False) + max_correct = (True,False) + keep_dist = (True,False) + alternative = ('greater','less','two-sided') + stats = ('mean','t') # TODO add custom stat lambda here + num_cases = str(len(plus1)*len(max_correct)*len(keep_dist)*len(alternative)*len(stats)) + case_count = 1 + # go through all combinations of parameters + for p in plus1: + for m in max_correct: + for k in keep_dist: + for a in alternative: + for s in stats: + res = multitest_one_sample(x,y,seed=42,plus1=p,max_correct=m,keep_dist=k,alternative=a,stat=s) + # check pvals + if a == 'greater': + np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals + elif a == 'less' or a =='two-sided': + np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals + # check observed statistic + if s == 'mean': + np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic + elif s == 't': + np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic + #check returned distribution + if k: + assert len(res) == 3 # if keep keep dist, expect to res to be length 3 + if m: + assert len(res[2].shape) == 1 # if max correct, should get 1D dist + else: + assert len(res[2].shape) == 2 # if not max correct, should get 2D dist + assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests + else: + assert len(res) == 2 + print("finished test " + str(case_count) + " of " + num_cases + " in test_one_sample()") + case_count += 1 + + +# @pytest.mark.slow +# def multitest_test_two_sample_shift(): +# f = lambda u: u - 3 +# finv = lambda u: u + 3 +# f_err = lambda u: 2 * u +# f_err_inv = lambda u: u / 2 +# prng = RandomState(42) +# num_samples = 200 +# num_tests = 2 +# # Normal-normal, different means examples +# x = prng.normal(1, size=(num_samples,num_tests)) +# y = prng.normal(4, size=(num_samples,num_tests)) +# expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-28,-28]]) # define expected probabilites for different alternative hypotheses and observed statistics (not sure why observed statistic is different for one and two sample tests) +# plus1 = (True,False) +# max_correct = (True,False) +# keep_dist = (True,False) +# alternative = ('greater','less','two-sided') +# stats = ('mean','t') # TODO add custom stat lambda here +# shift = (2,(f, finv),(f_err, f_err_inv)) +# num_cases = str(len(plus1)*len(keep_dist)*len(alternative)*len(stats)*len(shift)) +# case_count = 1 +# # go through all combinations of parameters +# for p in plus1: +# for k in keep_dist: +# for a in alternative: +# for s in stats: +# for sh in shift: +# res = multitest_two_sample_shift(x,y,seed=42,shift = sh, plus1=p,keep_dist=k,alternative=a,stat=s) +# # check pvals +# if a == 'greater': +# np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals +# elif a == 'less' or a =='two-sided': +# np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals +# # check observed statistic +# if s == 'mean': +# np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic +# elif s == 't': +# np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic +# #check returned distribution +# if k: +# assert len(res[2].shape) == 2 # if not max correct, should get 2D dist +# assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests +# else: +# assert len(res) == 2 +# print("finished test " + str(case_count) + " of " + num_cases + " in test_one_sample()") +# case_count += 1 + + +# def test_two_sample_bad_shift(): +# # Break it with a bad shift +# x = np.array(range(5)) +# y = np.array(range(1, 6)) +# shift = lambda u: u + 3 +# pytest.raises(ValueError, multitest_two_sample_shift, x, y, seed=5, shift=shift) + diff --git a/permute/tests/test_multitest_stratified.py b/permute/tests/test_multitest_stratified.py new file mode 100644 index 0000000..e205b2b --- /dev/null +++ b/permute/tests/test_multitest_stratified.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Mar 21 20:55:37 2022 + +@author: Clayton +""" + +import numpy as np +from numpy.random import RandomState + +import pytest +from cryptorandom.cryptorandom import SHA256 + +from ..stratified import stratified_permutationtest as spt +from ..stratified import stratified_permutationtest_mean as sptm +from ..stratified import corrcoef, sim_corr, stratified_two_sample + +def test_multitest_stratified_permutationtest(): + num_tests = 2 + group = np.repeat([1, 2, 3], 9) + condition = np.repeat([1, 2, 3] * 3, 3) + response = np.zeros((group.shape[0],num_tests)) + response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 + + res = spt(group, condition, response, reps=1000, seed=42) + res1 = spt(group, condition, response, alternative='less', reps=1000, seed=42) + assert np.all(res[0] < 0.01) + assert np.all(res[1] == res1[1]) + np.testing.assert_almost_equal(res[0], 1-res1[0]) + res2 = spt(group, condition, response, alternative='two-sided', reps=1000, seed=42) + assert np.all(res2[0] < 0.02) + + group = np.array([1, 1, 1]) + condition = np.array([2, 2, 2]) + response = np.zeros_like(group) + res2 = spt(group, condition, response, reps=1000, seed=42) + assert res2 == (1.0, np.nan, None) + + +def test_multitest_stratified_permutationtest_mean(): + num_tests = 2 + group = np.array([1, 2, 1, 2]) + condition = np.array([1, 1, 2, 2]) + response = np.zeros((group.shape[0],num_tests)) + groups = np.unique(group) + conditions = np.unique(condition) + res = sptm(group, condition, response, groups, conditions) + assert np.all(res == (0.0,0.0)) + res2 = sptm(group, condition, response) # check defaults work + assert np.all(res2 == (0.0,0.0)) + + +def test_multitest_stratified_permutationtest_mean_error(): + num_tests = 2 + group = np.array([1, 1, 1]) + condition = np.array([2, 2, 2]) + response = np.zeros((group.shape[0],num_tests)) + groups = np.unique(group) + conditions = np.unique(condition) + pytest.raises(ValueError, sptm, group, condition, response, groups, conditions) + + +def test_multitest_stratified_corrcoef(): + num_tests = 2 + prng = RandomState(42) + x = prng.rand(10,num_tests) + y = x + group = prng.randint(3, size=10) + res1 = multitest_stratified_corrcoef(x, y, group) + res2 = multitest_stratified_corrcoef(x, y, group) + assert np.all(res1 == res2) + + +def test_multitest_stratified_sim_corr(): + num_tests = 2 + prng = SHA256(42) + x = prng.rand(10,num_tests) + y = x + group = prng.randint(0, 3, size=10) + res1 = multitest_stratified_sim_corr(x, y, group, seed=prng, reps=100) + res2 = multitest_stratified_sim_corr(x, y, group, seed=prng, alternative='less', reps=100) + res3 = multitest_stratified_sim_corr(x, y, group, seed=prng, alternative='two-sided', reps=100) + + np.testing.assert_almost_equal(res1[0], 1-res2[0]) + assert np.all(res1[1] == res2[1]) + assert np.all(res1[1] == res3[1]) + assert np.all(2*res1[0] == res3[0]) + + +def test_multitest_stratified_strat_tests_equal(): + num_tests = 2 + group = np.repeat([1, 2, 3], 10) + condition = np.repeat([1, 2] * 3, 5) + response = np.zeros((group.shape[0],num_tests)) + response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 + + res1 = spt(group, condition, response, reps=1000, seed=42) + res2 = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat='mean_within_strata', seed=42) + assert np.all(res1[1] == res2[1]) + assert np.all(np.fabs(res1[0]-res2[0]) < 0.05) + +def test_multitest_stratified_two_sample(): + num_tests = 2 + group = np.repeat([1, 2, 3], 10) + condition = np.repeat([1, 2] * 3, 5) + response = np.zeros((group.shape[0],num_tests)) + response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 + + res = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat='mean', seed=42) + np.testing.assert_almost_equal(res[0], 0.245, 2) + assert np.all(res[1] == 0.2) + + (p, t, dist) = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat='mean', seed=42, keep_dist=True) + assert np.all(res[0] == p) + assert np.all(res[1] == t) + + stat_fun = lambda u: sptm(group, condition, u, np.unique(group), np.unique(condition)) + res = multitest_stratified_two_sample(group, condition, response, reps=1000, + stat=stat_fun, seed=42) + # below differs from test_stratified because changed dependence of stat to use + # in multitest_stratified_two_sample from number of unique groups to conditions. + # TODO ensure this is appropriate + np.testing.assert_almost_equal(res[0], 0.6733, 3) + np.testing.assert_almost_equal(res[1], -0.2, 3) From 8f216cce3cb04c1b40cc46757991fce6e65b86bd Mon Sep 17 00:00:00 2001 From: Clayton Barnes Date: Wed, 10 Aug 2022 10:16:41 -0400 Subject: [PATCH 3/3] Removed max _correct and touched up tests Removed max statistic multiple hypothesis testing correction and made corresponding adjustments to the multitest_core and multitest_stratified tests. Also touched up some of the tests to make them run smoother. --- permute/multitest_core.py | 175 ++++++--------------- permute/multitest_stratified.py | 148 +++++------------ permute/tests/test_multitest_core.py | 112 ++++++------- permute/tests/test_multitest_stratified.py | 17 +- 4 files changed, 145 insertions(+), 307 deletions(-) diff --git a/permute/multitest_core.py b/permute/multitest_core.py index 4a32fe7..0c28270 100644 --- a/permute/multitest_core.py +++ b/permute/multitest_core.py @@ -12,7 +12,6 @@ from .utils import get_prng, permute - """ TODO for multi testing core package two_sample_conf_int once it is finalized @@ -104,7 +103,7 @@ def multitest_spearman_corr(x, y, alternative='greater', reps=10**4, seed=None, def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative='greater', - reps=10**5, keep_dist=False, seed=None, plus1=True, max_correct=False): + reps=10**5, keep_dist=False, seed=None, plus1=True): r""" Main workhorse function for two_sample and two_sample_shift @@ -134,10 +133,6 @@ def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative= flag for whether to add 1 to the numerator and denominator of the p-value based on the empirical permutation distribution. Default is True. - max_correct : bool - flag for whether to perform max statistic multiple testing - correction. Builds the null distribution from the most extreme value - across tests for each iteration of the permutation. Default is False. Returns ------- @@ -163,81 +158,42 @@ def multitest_two_sample_core(potential_outcomes_all, nx, tst_stat, alternative= pUp+plus1/(reps+plus1), \ pDn+plus1/(reps+plus1)],1),1) } - # account for all combinations of keep_dist (keep distribution) - # and max_correct (create max distribution to correct for multiple - # hypothesis testing) + # account for keep_dist (keep distribution) if keep_dist: - if max_correct: - dist = np.empty(reps) - for i in range(reps): - # permute indexing vector - prng.shuffle(rr) - # grab shuffled values - pp = np.take(potential_outcomes_all, rr, axis=0) - # calculate statistic - curr_tst = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) - # grab the most extreme statistic observed across all tests - dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) - # calculate percentile - pUp = np.empty(num_tests) - pDn = np.empty(num_tests) - for i in range(num_tests): - pUp[i] = np.sum(dist >= tst[i])/(reps+plus1) - pDn[i] = np.sum(dist <= tst[i])/(reps+plus1) - return thePvalue[alternative](pUp, pDn), dist - else: - dist = np.empty((reps,num_tests)) - for i in range(reps): - # permute indexing vector - prng.shuffle(rr) - # grab shuffled values - pp = np.take(potential_outcomes_all, rr, axis=0) - # calculate statistic - dist[i,:] = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) - # calculate percentile - pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) - pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) - return thePvalue[alternative](pUp, pDn), dist + dist = np.empty((reps,num_tests)) + for i in range(reps): + # permute indexing vector + prng.shuffle(rr) + # grab shuffled values + pp = np.take(potential_outcomes_all, rr, axis=0) + # calculate statistic + dist[i,:] = tst_stat(pp[:nx, :, 0], pp[nx:,: , 1]) + # calculate percentile + pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) + pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) + return thePvalue[alternative](pUp, pDn), dist else: # preallocate vectors to store number of times our observed statistic # is greater than the permuted statistic(s). Keeps memory requirement low hitsUp = np.zeros(num_tests) hitsDn = np.zeros(num_tests) - if max_correct: - for i in range(reps): - # permute indexing vector - prng.shuffle(rr) - # grab shuffled values - pp = np.take(potential_outcomes_all, rr, axis=0) - # calculate statistic - curr_tst = tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) - # grab the most extreme statistic observed across all tests - curr_max = max(curr_tst.min(), curr_tst.max(), key=abs) - # count if observed statistic is larger or smaller than permuted - hitsUp += curr_max >= tst - hitsDn += curr_max <= tst - # calculate percentile - pUp = hitsUp/(reps+plus1) - pDn = hitsDn/(reps+plus1) - return thePvalue[alternative](pUp, pDn) - else: - for i in range(reps): - # permute indexing vector - prng.shuffle(rr) - # grab shuffled values - pp = np.take(potential_outcomes_all, rr, axis=0) - # count if observed statistic is larger or smaller than permuted - hitsUp += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) >= tst - hitsDn += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) <= tst - # calculate percentile - pUp = hitsUp/(reps+plus1) - pDn = hitsDn/(reps+plus1) - return thePvalue[alternative](pUp, pDn) + for i in range(reps): + # permute indexing vector + prng.shuffle(rr) + # grab shuffled values + pp = np.take(potential_outcomes_all, rr, axis=0) + # count if observed statistic is larger or smaller than permuted + hitsUp += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) >= tst + hitsDn += tst_stat(pp[:nx, :, 0], pp[nx:, :, 1]) <= tst + # calculate percentile + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) + return thePvalue[alternative](pUp, pDn) def multitest_one_sample(x, y=None, reps=10**5, stat='mean', alternative="greater", - keep_dist=False, seed=None, plus1=True,max_correct=False): + keep_dist=False, seed=None, plus1=True): r""" One-sided or two-sided, one-sample permutation test for the mean, with p-value estimated by simulated random sampling with @@ -342,70 +298,37 @@ def multitest_one_sample(x, y=None, reps=10**5, stat='mean', alternative="greate tst_fun = stats[stat] # calculate observed statistic tst = tst_fun(z) - # account for all combinations of keep_dist (keep distribution) - # and max_correct (create max distribution to correct for multiple - # hypothesis testing) + # account for keep_dist (keep distribution) if keep_dist: - if max_correct: - # preallocate space to build null distribution - # (1D since only taking extreme values) - dist = np.empty(reps) - for i in range(reps): - # calculate statistic of current permutation - curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) - # grab the most extreme value across tests - dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) - # calculate percentile for each test - pUp = np.empty(num_tests) - pDn = np.empty(num_tests) - for i in range(num_tests): - pUp[i] = np.sum(dist >= tst[i])/(reps+plus1) - pDn[i] = np.sum(dist <= tst[i])/(reps+plus1) - return thePvalue[alternative](pUp, pDn), tst, dist - else: - # preallocate space to build null distribution - # (2D since each test will have its own distribution) - dist = np.empty((reps,num_tests)) - for i in range(reps): - # calculate statistic of current permutation - dist[i,:] = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) - # calculate percentile for each test - pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) - pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) - return thePvalue[alternative](pUp, pDn), tst, dist + # preallocate space to build null distribution + # (2D since each test will have its own distribution) + dist = np.empty((reps,num_tests)) + for i in range(reps): + # calculate statistic of current permutation + dist[i,:] = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # calculate percentile for each test + pUp = np.sum(dist >= tst,axis=0)/(reps+plus1) + pDn = np.sum(dist <= tst,axis=0)/(reps+plus1) + return thePvalue[alternative](pUp, pDn), tst, dist else: # preallocate vectors to store number of times our observed statistic # is greater than the permuted statistic(s). Keeps memory requirement low. hitsUp = np.zeros(num_tests) hitsDn = np.zeros(num_tests) - if max_correct: - for i in range(reps): - # calculate statistic for current permutation - curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) - # grab the most extreme statistic across all tests - curr_max = max(curr_tst.min(), curr_tst.max(), key=abs) - # iterate counters accordingly - hitsUp += curr_max >= tst - hitsDn += curr_max <= tst - # calculate percentiles - pUp = hitsUp/(reps+plus1) - pDn = hitsDn/(reps+plus1) - return thePvalue[alternative](pUp, pDn), tst - else: - for i in range(reps): - # calculate statistic for current permutation - curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) - # iterate counters accordingly - hitsUp += curr_tst >= tst - hitsDn += curr_tst <= tst - # calculate percentiles - pUp = hitsUp/(reps+plus1) - pDn = hitsDn/(reps+plus1) + for i in range(reps): + # calculate statistic for current permutation + curr_tst = tst_fun(z * (1 - 2 * prng.randint(0, 2, z.shape))) + # iterate counters accordingly + hitsUp += curr_tst >= tst + hitsDn += curr_tst <= tst + # calculate percentiles + pUp = hitsUp/(reps+plus1) + pDn = hitsDn/(reps+plus1) return thePvalue[alternative](pUp, pDn), tst def multitest_two_sample(x, y, reps=10**5, stat='mean', alternative="greater", - keep_dist=False, seed=None, plus1=True, max_correct=False): + keep_dist=False, seed=None, plus1=True): r""" One-sided or two-sided, two-sample permutation multi-test for equality of two means, with p-value estimated by simulated random sampling with @@ -505,7 +428,7 @@ def multitest_two_sample(x, y, reps=10**5, stat='mean', alternative="greater", observed_tst = tst_fun(pot_out_all[:nx, :, 0], pot_out_all[nx:, :, 1]) # call main worker function res = multitest_two_sample_core(pot_out_all, nx, tst_fun, alternative=alternative, - reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1, max_correct=max_correct) + reps=reps, keep_dist=keep_dist, seed=seed, plus1=plus1) # accomadate user request for returning distribution if keep_dist: return res[0], observed_tst, res[1] diff --git a/permute/multitest_stratified.py b/permute/multitest_stratified.py index c163a95..ba8d6d2 100644 --- a/permute/multitest_stratified.py +++ b/permute/multitest_stratified.py @@ -48,7 +48,7 @@ def multitest_stratified_corrcoef(x, y, group): -def multitest_stratified_sim_corr(x, y, group, reps=10**4, alternative='greater', seed=None, plus1=True, max_correct=False): +def multitest_stratified_sim_corr(x, y, group, reps=10**4, alternative='greater', seed=None, plus1=True): r""" Simulate permutation p-value of stratified Spearman correlation test. @@ -73,10 +73,6 @@ def multitest_stratified_sim_corr(x, y, group, reps=10**4, alternative='greater' flag for whether to add 1 to the numerator and denominator of the p-value based on the empirical permutation distribution. Default is True. - max_correct : bool - flag for whether to perform max statistic multiple testing - correction. Builds the null distribution from the most extreme value - across tests for each iteration of the permutation. Default is False. Returns ------- @@ -90,33 +86,17 @@ def multitest_stratified_sim_corr(x, y, group, reps=10**4, alternative='greater' # ensure x and y have the same shape (same number of observations and tests) if x.shape != y.shape: raise ValueError('x and y must have the same shape') - # get the number of hypotheses to test - num_tests = x.shape[1] prng = get_prng(seed) x = x.astype(float) y = y.astype(float) # calculate observed statistic tst = multitest_stratified_corrcoef(x, y, group) # account for user wanting to perform max correction - if max_correct: - # preallocate space to build null distribution - # (1D since going to take extreme value across tests) - dist = np.empty(reps) - for i in range(reps): - # calculate statistic of current permutation - curr_tst = multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) - # grab the most extreme value across tests - dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) - # calculate the percentile for each test - right_pv = np.empty(num_tests) - for i in range(num_tests): - right_pv[i] = np.sum(dist >= tst[i]) / (reps+plus1) - else: - # calculate statistic on each permutation to build null distribution - dist = [multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) - for i in range(reps)] - # calculate percentile for each test - right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) + # calculate statistic on each permutation to build null distribution + dist = [multitest_stratified_corrcoef(permute_within_groups(x, group, prng), y, group) + for i in range(reps)] + # calculate percentile for each test + right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) # create dictionary to store p value calculations thePvalue = { 'greater': lambda p: p + plus1/(reps+plus1), @@ -172,7 +152,7 @@ def multitest_stratified_permutationtest_mean(group, condition, response, raise ValueError('Number of groups must be at least 2.') # if 2 conditions, calculate mean. If more than 2, calculate std of outcomes # TODO ensure this is intended behavior, in stratified.py this is done - # with the variable groups, but that doesn't seem right to me + # with the variable "groups", but that doesn't seem right to me elif len(conditions) == 2: stat = lambda u: u[0] - u[1] for g in groups: @@ -201,8 +181,7 @@ def multitest_stratified_permutationtest( reps=10**5, testStatistic='mean', seed=None, - plus1=True, - max_correct=False): + plus1=True): r""" Stratified permutation test based on differences in means. @@ -259,10 +238,6 @@ def multitest_stratified_permutationtest( flag for whether to add 1 to the numerator and denominator of the p-value based on the empirical permutation distribution. Default is True. - max_correct : bool - flag for whether to perform max statistic multiple testing - correction. Builds the null distribution from the most extreme value - across tests for each iteration of the permutation. Default is False. Returns ------- @@ -307,30 +282,15 @@ def multitest_stratified_permutationtest( else: # calculate observed statistic tst = tst_fun(condition) - if max_correct: - # preallocate vector to store null distribution - # (1D because going to take most extreme value across all tests) - dist = np.zeros(reps) - for i in range(int(reps)): - # calculate statistic for current permutation - curr_tst = tst_fun(permute_within_groups(condition, group, prng)) - # grab the most extreme value across tests - dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) - # calculate percentile for each test - right_pv = np.empty(num_tests) - for i in range(num_tests): - right_pv[i] = np.sum(dist >= tst[i])/(reps+plus1) - return thePvalue[alternative](right_pv), tst, dist - else: - # preallocate vector to store null distribution - # (2D because each test will have its own distribution) - dist = np.zeros((reps,num_tests)) - for i in range(int(reps)): - # calculate statistic for current permutation - dist[i,:] = tst_fun(permute_within_groups(condition, group, prng)) - # calculate percentile for each test - right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) - return thePvalue[alternative](right_pv), tst, dist + # preallocate vector to store null distribution + # (2D because each test will have its own distribution) + dist = np.zeros((reps,num_tests)) + for i in range(int(reps)): + # calculate statistic for current permutation + dist[i,:] = tst_fun(permute_within_groups(condition, group, prng)) + # calculate percentile for each test + right_pv = np.sum(dist >= tst,axis=0) / (reps+plus1) + return thePvalue[alternative](right_pv), tst, dist def multitest_stratified_two_sample( @@ -342,8 +302,7 @@ def multitest_stratified_two_sample( reps=10**5, keep_dist=False, seed=None, - plus1=True, - max_correct=False): + plus1=True): r""" One-sided or two-sided, two-sample permutation test for equality of two means, with p-value estimated by simulated random sampling with @@ -413,10 +372,6 @@ def multitest_stratified_two_sample( flag for whether to add 1 to the numerator and denominator of the p-value based on the empirical permutation distribution. Default is True. - max_correct : bool - flag for whether to perform max statistic multiple testing - correction. Builds the null distribution from the most extreme value - across tests for each iteration of the permutation. Default is False. Returns ------- @@ -472,53 +427,24 @@ def multitest_stratified_two_sample( } # get observed statistic observed_tst = tst_fun(response) - # account for all combinations of keep_dist (keep distribution) - # and max_correct (create max distribution to correct for multiple - # hypothesis testing) + # account for keep_dist (keep distribution) if keep_dist: - if max_correct: - # preallocate vector for null distribution - # (1D because going to take most extreme statistic across tests) - dist = np.empty(reps) - for i in range(reps): - # calculate statistic for current permutation - curr_tst = tst_fun(permute_within_groups( - response, group, seed=prng)) - # grab most extreme statistic across tests - dist[i] = max(curr_tst.min(), curr_tst.max(), key=abs) - # calculate percentile for each test - hits = np.empty(num_tests) - for i in range(num_tests): - hits[i] = np.sum(dist >= observed_tst[i]) - return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist - else: - # preallocate vector for null distribution - # (2D because build null distribution for each test) - dist = np.empty((reps,num_tests)) - for i in range(reps): - # calculate statistic for current permutation - dist[i,:] = tst_fun(permute_within_groups( - response, group, seed=prng)) - # calculate percentile for each test - hits = np.sum(dist >= observed_tst,axis=0) - return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist + # preallocate vector for null distribution + # (2D because build null distribution for each test) + dist = np.empty((reps,num_tests)) + for i in range(reps): + # calculate statistic for current permutation + dist[i,:] = tst_fun(permute_within_groups( + response, group, seed=prng)) + # calculate percentile for each test + hits = np.sum(dist >= observed_tst,axis=0) + return thePvalue[alternative](hits / (reps+plus1)), observed_tst, dist else: - if max_correct: - # create vector to store number of times each hypothesis is less - # than the most extreme value across all tests per permutation - hits = np.zeros(num_tests) - for i in range(reps): - # calculate statistic of current permutation - curr_tst = tst_fun(permute_within_groups(response, group, seed=prng)) - # take most extreme value and compare with observed statistic - hits += max(curr_tst.min(), curr_tst.max(), key=abs) >= observed_tst - return thePvalue[alternative](hits / (reps+plus1)), observed_tst - else: - # create vector to store number of times each hypothesis is less - # than the corresponding statistic of the permuted values - hits = np.zeros(num_tests) - for i in range(reps): - # calculate current statistic - curr_tst = tst_fun(permute_within_groups(response, group, seed=prng)) - hits += curr_tst >= observed_tst - return thePvalue[alternative](hits / (reps+plus1)), observed_tst + # create vector to store number of times each hypothesis is less + # than the corresponding statistic of the permuted values + hits = np.zeros(num_tests) + for i in range(reps): + # calculate current statistic + curr_tst = tst_fun(permute_within_groups(response, group, seed=prng)) + hits += curr_tst >= observed_tst + return thePvalue[alternative](hits / (reps+plus1)), observed_tst diff --git a/permute/tests/test_multitest_core.py b/permute/tests/test_multitest_core.py index 830188b..382b0e1 100644 --- a/permute/tests/test_multitest_core.py +++ b/permute/tests/test_multitest_core.py @@ -33,7 +33,7 @@ def multitest_test_corr(): np.testing.assert_almost_equal(res2[0], np.ones(num_tests), decimal=1) np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) print("finished test 1 in test_corr()") - + y = prng.randint(0, 5, size=(10,num_tests)) res1 = multitest_corr(x, y, alternative="less", seed=prng) res2 = multitest_corr(x, y, alternative="less") @@ -42,7 +42,7 @@ def multitest_test_corr(): np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) np.testing.assert_almost_equal(res1[1], res2[1], decimal=1) print("finished test 2 in test_corr()") - + res1 = multitest_corr(x, y, alternative="two-sided", seed=prng) res2 = multitest_corr(x, y, alternative="greater") assert len(res1) == 3 @@ -60,7 +60,7 @@ def multitest_test_spearman_corr(): xorder = np.array([[1, 2, 3, 4, 5] ,[1, 2, 3, 4, 5]]).T res1 = multitest_corr(xorder, xorder, seed=prng) print("finished test 1 in test_spearman_corr()") - + prng = SHA256(42) res2 = multitest_spearman_corr(x, y, seed=prng) np.testing.assert_almost_equal(res1[0], res2[0], decimal=1) @@ -79,41 +79,36 @@ def multitest_test_two_sample(): y = prng.normal(4, size=(num_samples,num_tests)) expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-30,-30]]) # define expected probabilites plus1 = (True,False) - max_correct = (True,False) keep_dist = (True,False) alternative = ('greater','less','two-sided') stats = ('mean','t') # TODO add custom stat lambda here - num_cases = str(len(plus1)*len(max_correct)*len(keep_dist)*len(alternative)*len(stats)) + num_cases = str(len(plus1)*len(keep_dist)*len(alternative)*len(stats)) case_count = 1 # go through all combinations of parameters for p in plus1: - for m in max_correct: - for k in keep_dist: - for a in alternative: - for s in stats: - res = multitest_two_sample(x,y,reps=10**4,seed=42,plus1=p,max_correct=m,keep_dist=k,alternative=a,stat=s) - # check pvals - if a == 'greater': - np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals - elif a == 'less' or a =='two-sided': - np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals - # check observed statistic - if s == 'mean': - np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic - elif s == 't': - np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic - #check returned distribution - if k: - assert len(res) == 3 # if keep keep dist, expect to res to be length 3 - if m: - assert len(res[2].shape) == 1 # if max correct, should get 1D dist - else: - assert len(res[2].shape) == 2 # if not max correct, should get 2D dist - assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests - else: - assert len(res) == 2 - print("finished test " + str(case_count) + " of " + num_cases + " in test_two_sample()") - case_count += 1 + for k in keep_dist: + for a in alternative: + for s in stats: + res = multitest_two_sample(x,y,reps=10**4,seed=42,plus1=p,keep_dist=k,alternative=a,stat=s) + # check pvals + if a == 'greater': + np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals + elif a == 'less' or a =='two-sided': + np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals + # check observed statistic + if s == 'mean': + np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic + elif s == 't': + np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic + #check returned distribution + if k: + assert len(res) == 3 # if keep keep dist, expect to res to be length 3 + assert len(res[2].shape) == 2 # should get 2D dist + assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests + else: + assert len(res) == 2 + print("finished test " + str(case_count) + " of " + num_cases + " in test_two_sample()") + case_count += 1 @pytest.mark.slow def multitest_test_one_sample(): @@ -126,41 +121,36 @@ def multitest_test_one_sample(): y = prng.normal(4, size=(num_samples,num_tests)) expected = ([[1.0, 1.0],[0,0]], [[-3,-3],[-28,-28]]) # define expected probabilites for different alternative hypotheses and observed statistics (not sure why observed statistic is different for one and two sample tests) plus1 = (True,False) - max_correct = (True,False) keep_dist = (True,False) alternative = ('greater','less','two-sided') stats = ('mean','t') # TODO add custom stat lambda here - num_cases = str(len(plus1)*len(max_correct)*len(keep_dist)*len(alternative)*len(stats)) + num_cases = str(len(plus1)*len(keep_dist)*len(alternative)*len(stats)) case_count = 1 # go through all combinations of parameters for p in plus1: - for m in max_correct: - for k in keep_dist: - for a in alternative: - for s in stats: - res = multitest_one_sample(x,y,seed=42,plus1=p,max_correct=m,keep_dist=k,alternative=a,stat=s) - # check pvals - if a == 'greater': - np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals - elif a == 'less' or a =='two-sided': - np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals - # check observed statistic - if s == 'mean': - np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic - elif s == 't': - np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic - #check returned distribution - if k: - assert len(res) == 3 # if keep keep dist, expect to res to be length 3 - if m: - assert len(res[2].shape) == 1 # if max correct, should get 1D dist - else: - assert len(res[2].shape) == 2 # if not max correct, should get 2D dist - assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests - else: - assert len(res) == 2 - print("finished test " + str(case_count) + " of " + num_cases + " in test_one_sample()") - case_count += 1 + for k in keep_dist: + for a in alternative: + for s in stats: + res = multitest_one_sample(x,y,seed=42,reps=10**4,plus1=p,keep_dist=k,alternative=a,stat=s) + # check pvals + if a == 'greater': + np.testing.assert_almost_equal(res[0], expected[0][0], decimal = 1) # compare p vals + elif a == 'less' or a =='two-sided': + np.testing.assert_almost_equal(res[0], expected[0][1], decimal = 1) # compare p vals + # check observed statistic + if s == 'mean': + np.testing.assert_almost_equal(res[1], expected[1][0], decimal = 1) # compare observed statistic + elif s == 't': + np.testing.assert_almost_equal(res[1], expected[1][1], decimal = 0) # compare observed statistic + #check returned distribution + if k: + assert len(res) == 3 # if keep keep dist, expect to res to be length 3 + assert len(res[2].shape) == 2 # should get 2D dist + assert res[2].shape[1] == num_tests # second D should have same number of elements as number of tests + else: + assert len(res) == 2 + print("finished test " + str(case_count) + " of " + num_cases + " in test_one_sample()") + case_count += 1 # @pytest.mark.slow diff --git a/permute/tests/test_multitest_stratified.py b/permute/tests/test_multitest_stratified.py index e205b2b..36be451 100644 --- a/permute/tests/test_multitest_stratified.py +++ b/permute/tests/test_multitest_stratified.py @@ -9,11 +9,10 @@ from numpy.random import RandomState import pytest -from cryptorandom.cryptorandom import SHA256 -from ..stratified import stratified_permutationtest as spt -from ..stratified import stratified_permutationtest_mean as sptm -from ..stratified import corrcoef, sim_corr, stratified_two_sample +from ..stratified import multitest_stratified_permutationtest as spt +from ..stratified import multitest_stratified_permutationtest_mean as sptm +from ..stratified import multitest_stratified_corrcoef, multitest_stratified_sim_corr, multitest_stratified_two_sample def test_multitest_stratified_permutationtest(): num_tests = 2 @@ -29,10 +28,10 @@ def test_multitest_stratified_permutationtest(): np.testing.assert_almost_equal(res[0], 1-res1[0]) res2 = spt(group, condition, response, alternative='two-sided', reps=1000, seed=42) assert np.all(res2[0] < 0.02) - + group = np.array([1, 1, 1]) condition = np.array([2, 2, 2]) - response = np.zeros_like(group) + response = np.zeros((group.shape[0],num_tests)) res2 = spt(group, condition, response, reps=1000, seed=42) assert res2 == (1.0, np.nan, None) @@ -73,7 +72,7 @@ def test_multitest_stratified_corrcoef(): def test_multitest_stratified_sim_corr(): num_tests = 2 - prng = SHA256(42) + prng = RandomState(42) x = prng.rand(10,num_tests) y = x group = prng.randint(0, 3, size=10) @@ -93,7 +92,7 @@ def test_multitest_stratified_strat_tests_equal(): condition = np.repeat([1, 2] * 3, 5) response = np.zeros((group.shape[0],num_tests)) response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 - + res1 = spt(group, condition, response, reps=1000, seed=42) res2 = multitest_stratified_two_sample(group, condition, response, reps=1000, stat='mean_within_strata', seed=42) @@ -106,7 +105,7 @@ def test_multitest_stratified_two_sample(): condition = np.repeat([1, 2] * 3, 5) response = np.zeros((group.shape[0],num_tests)) response[[0, 1, 3, 9, 10, 11, 18, 19, 20],:] = 1 - + res = multitest_stratified_two_sample(group, condition, response, reps=1000, stat='mean', seed=42) np.testing.assert_almost_equal(res[0], 0.245, 2)