-
Notifications
You must be signed in to change notification settings - Fork 0
/
wPCA.py
90 lines (77 loc) · 3.08 KB
/
wPCA.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from scipy.sparse.linalg import svds
from scipy.sparse import csr_matrix
import numpy as np
import scanpy as sc
def weighted_pca(anndata,weights,n_comps, corr=True,
max_value=10):
'''
This function calculate a weighted version of PCA as suggested in https://doi.org/10.1016/j.medj.2022.05.002
by Korsunsky et al
input:
anndata: a log10CPM normalized count matrix. This should not be scaled in any ways
n_comps: number of components for wPCA
output:
the V and U matrices from SVD
'''
assert anndata.shape[0] == weights.shape[0]
if max_value is not None:
assert max_value > 0
def vars(a, axis=None):
""" Variance of sparse matrix a
var = mean(a**2) - mean(a)**2
"""
a_squared = a.copy()
a_squared.data **= 2
return a_squared.mean(axis) - np.square(a.mean(axis))
def stds(a, axis=None):
""" Standard deviation of sparse matrix a
std = sqrt(var(a))
"""
return np.sqrt(vars(a, axis))
wanndataX=scipy.sparse.csc_matrix(anndata.X.copy())
sklearn.utils.sparsefuncs.inplace_row_scale(wanndataX, weights)
mu=wanndataX.mean(axis=0)
sig=np.array((1/stds(wanndataX,axis=0)).T)
##### We want to scale the old data (not the new data using mean and std of the new data):
print("shifting by weighted mean:")
wanndataX=scipy.sparse.csr_matrix(anndata.X.copy()-mu)
print("row scaling with weighted standard deviation")
sklearn.utils.sparsefuncs.inplace_column_scale(wanndataX, sig)
del mu,sig
if corr==True:
#####And then run scaling on this matrix one more time observation-wise:
mu=wanndataX.mean(axis=1)
print(mu.shape)
sig=np.array((1/stds(wanndataX,axis=1)))
print(sig.shape)
wanndataX=scipy.sparse.csr_matrix(wanndataX-mu)
print(" scaling with weighted standard deviation")
sklearn.utils.sparsefuncs.inplace_row_scale(wanndataX, sig)
if max_value is not None:
wanndataX[wanndataX>=max_value]=max_value
wanndataX[wanndataX<=-1*max_value]=-1*max_value
##### and then multiply this column sqrt of weights:
print("adding weights:")
sklearn.utils.sparsefuncs.inplace_row_scale(wanndataX, np.sqrt(weights))
del mu,sig
print("running SVDs:")
u, s, v = svds( wanndataX.T, k=n_comps)
v=scipy.sparse.csr_matrix(v)
############
print("outputing V and U:")
w=np.array((1/np.sqrt(weights)).T)
sklearn.utils.sparsefuncs.inplace_column_scale(v,w)
sklearn.utils.sparsefuncs.inplace_row_scale(v,np.array(s))
gc.collect()
return np.array(v.todense().T),np.array(u)
def generate_weights(anndata,Batch_key):
'''input:
anndata: an anndata object
Batch_key: the Batch key in .obs
output:
weights with w.shape[0] ==anndata.obs.shape[0]
'''
assert Batch_key in anndata.obs.columns
w= 1/anndata.obs[Batch_key].value_counts()
w=w[anndata.obs[Batch_key]]/len(np.unique(anndata.obs[Batch_key]))
return(np.array(w))