forked from alxxrg/copula-shirley
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutility_tests.py
280 lines (217 loc) · 10.9 KB
/
utility_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import warnings
from itertools import permutations
from math import isinf, isnan
import numpy as np
import pandas as pd
from scipy.stats import entropy, ks_2samp, spearmanr, ConstantInputWarning
from sklearn.neighbors import KernelDensity
from sklearn.metrics import matthews_corrcoef, mean_squared_error
from sklearn.linear_model import LinearRegression
from xgboost import XGBClassifier
def KLDiv(sample_E, sample_T):
"""Estimate the Kullback-Leibler Divergence between the experimental samples sample_E and the theorical samples sample_T.
TODO:
Adjust when sparse density.
Args:
sample_E (1-D array): The experimental observations.
sample_T (1-D array): The theorical or reference observations.
Returns:
float: The Kullback-Leibler Divergence.
"""
x = np.unique(sample_E.append(sample_T))
x = x.reshape((x.size, 1))
P = sample_E.to_numpy().reshape((sample_E.size, 1))
Q = sample_T.to_numpy().reshape((sample_T.size, 1))
model = KernelDensity(bandwidth=2)
model.fit(P)
prob_P = np.exp(model.score_samples(x))
model.fit(Q)
prob_Q = np.exp(model.score_samples(x))
return entropy(prob_P, prob_Q)
def KSDist(sample_E, sample_T):
"""Estimate the Kolmogorov-Smirnov distance between the experimental samples sample_E and the theorical samples sample_T.
Args:
sample_E (1-D array): The experimental observations.
sample_T (1-D array): The theorical or reference observations.
Returns:
float: The Kolmogorov-Smorniv distance.
"""
return ks_2samp(sample_E, sample_T)[0]
def KLDivDF(sample_E, sample_T):
"""Estimate the Kullback-Leibler Divergence between all the columns of the experimental samples sample_E
and the theorical samples sample_T and return the mean.
Args:
sample_E (n-D array): The experimental observations.
sample_T (n-D array): The theorical or reference observations.
Returns:
float: The mean Kullback-Leibler Divergence between all columns.
"""
res = [KLDiv(sample_E[col], sample_T[col]) for col in sample_E.columns]
res = [v for v in res if not isinf(v)]
return np.nanmean(res)
def KSDistDF(sample_E, sample_T):
"""Estimate the Kolmogorov-Smirnov distance between all the columns of the experimental samples sample_E
and the theorical samples sample_T and return the mean.
Args:
sample_E (n-D array): The experimental observations.
sample_T (n-D array): The theorical or reference observations.
Returns:
float: The mean Kolmogorov-Smorniv distance between all columns.
"""
res = [KSDist(sample_E[col], sample_T[col]) for col in sample_E.columns]
return np.mean(res)
def BinaryLabelCheck(df, label):
if not (np.unique(df[label]).size == 2):
raise ValueError("More than two labels for the binary label.")
df_ret = df.copy()
if (np.min(df[label]) == 0) and (np.max(df[label]) == 1):
return df
else:
scale = np.min(df[label])
df_ret[label] = df_ret[label] - scale
return df_ret
def MultiClassLabelCheck(synth, real, label):
synth_ret, real_ret = synth.copy(), real.copy()
min_val = np.min([np.min(real_ret[label]), np.min(synth_ret[label])])
max_val = np.max([np.max(real_ret[label]), np.max(synth_ret[label])])
missing_values = list( set(range(min_val, (max_val+1))) - set(np.unique(synth_ret[label])) )
if len(missing_values):
for value in missing_values:
synth_ret = synth_ret.append(pd.Series(np.nan, index=synth_ret.columns), ignore_index=True)
synth_ret[label].iloc[-1] = value
synth_ret[label] = synth_ret[label] - min_val
real_ret[label] = real_ret[label] - min_val
return synth_ret, real_ret
def BinaryClassif(synth_sample, real_sample, label, n_cores=1):
"""Computes the Matthews Correlation Coefficient (MCC) of the binary classification of label.
The XGBoost model is trained on synth_sample and tested on real_sample.
Args:
synth_sample (DataFrame): A DataFrame of synthetic observations.
real_sample (DataFrame): A DataFrame of orginal (raw) observations.
label (str): The name of the class (must be binary).
n_cores (int, optional): The number of cores to use. Defaults to 1.
Returns:
float: The MCC value of the classification.
"""
synth_sample = BinaryLabelCheck(synth_sample, label)
real_sample = BinaryLabelCheck(real_sample, label)
train_col = list(set(synth_sample.columns) - set([label]))
X_test = real_sample[train_col]
y_test = real_sample[label]
X_train = synth_sample[train_col]
y_train = synth_sample[label]
model = XGBClassifier(n_estimators=512,
use_label_encoder=False,
max_depth=64,
verbosity=0,
objective='binary:logistic',
eval_metric='error',
maximize=False,
n_jobs=n_cores,
)
y_pred = model.fit(X_train, y_train).predict(X_test)
return matthews_corrcoef(y_test, y_pred)
def MultiClassif(synth_sample, real_sample, label, n_cores=1):
"""Computes the Matthews Correlation Coefficient (MCC) of the multiclass classification of label.
The XGBoost model is trained on synth_sample and tested on real_sample.
Args:
synth_sample (DataFrame): A DataFrame of synthetic observations.
real_sample (DataFrame): A DataFrame of orginal (raw) observations.
label (str): The name of the class.
n_cores (int, optional): The number of cores to use. Defaults to 1.
Returns:
float: The MCC value of the multiclass classification.
"""
synth_sample, real_sample = MultiClassLabelCheck(synth_sample, real_sample, label)
train_col = list(set(synth_sample.columns) - set([label]))
X_test = real_sample[train_col]
y_test = real_sample[label]
X_train = synth_sample[train_col]
y_train = synth_sample[label]
model = XGBClassifier(n_estimators=512,
use_label_encoder=False,
max_depth=64,
verbosity=0,
objective = 'multi:softmax',
num_class = np.unique(y_train).size,
eval_metric = 'merror',
maximize=False,
n_jobs=n_cores,
)
y_pred = model.fit(X_train, y_train).predict(X_test)
return matthews_corrcoef(y_test, y_pred)
def LinearRegr(synth_sample, real_sample, label, n_cores=1):
"""Computes the Root Mean Square Error (RMSE) of the regression problem of label.
The Linear Regression model is trained on synth_sample and tested on real_sample.
Args:
synth_sample (DataFrame): A DataFrame of synthetic observations.
real_sample (DataFrame): A DataFrame of orginal (raw) observations.
label (str): The name of the class (must be continuous).
n_cores (int, optional): The number of cores to use. Defaults to 1.
Returns:
float: The RMSE value of the regression.
"""
train_col = list(set(synth_sample.columns) - set([label]))
X_test = real_sample[train_col]
y_test = real_sample[label]
X_train = synth_sample[train_col]
y_train = synth_sample[label]
model = LinearRegression(n_jobs=n_cores)
y_pred = model.fit(X_train, y_train).predict(X_test)
return np.sqrt(mean_squared_error(y_test, y_pred))
def LocalCorr(sample, ref_col, q_col, percentile=0.05):
"""Computes the Pearson Correlation Coefficient between the reference column ref_col and the query column q_col
in the lowest and the highest percentile of the data, in respect with the ref_col.
Example: First find the rows of the first and last percentile of the values in ref_col.
Select the first percentile rows in the sample data and compute the correlation coefficient between ref_col and q_col.
Do the same for the last percentile rows.
Args:
sample (DataFrame): A DataFrame.
ref_col (DataFrame): The reference column to use.
q_col (str): The query column to use.
percentile (float, optional): The size of the area to use for the computation of the correlation coefficients, with respect to ref_col. Defaults to 0.05.
Returns:
float: The Pearson Correlation Coefficient in the first 5 percentiles between ref_col and q_col.
float: The Pearson Correlation Coefficient in the last 5 percentiles between ref_col and q_col.
"""
cdf = ECDF(sample[ref_col])
l = sample[sample[ref_col] <= cdf.inv(percentile)]
h = sample[sample[ref_col] >= cdf.inv(1-percentile)]
warnings.simplefilter(action='ignore', category=ConstantInputWarning)
return spearmanr(l[ref_col], l[q_col])[0], spearmanr(h[ref_col], h[q_col])[0]
def BestPairForLocalCorr(sample, percentile=0.05):
"""Find the pair of attributes with low correlation in the first percentiles and high correlation in the last percentiles or vice-versa.
Returns the pair with the highest difference.
Args:
sample (DataFrame): A DataFrame.
percentile (float, optional): The size of the area to use for the computation of the correlation coefficients. Defaults to 0.05.
Returns:
tuple: A pair of attributes.
"""
d = {pair:LocalCorr(sample, pair[0], pair[1], percentile) for pair in permutations(sample.columns, r=2)}
index = np.nanargmax([abs(abs(v[0]) - abs(v[1])) for v in d.values()])
return list(d.keys())[index]
def GlobalCorr(synth_sample, real_sample):
"""Compute two scores (max and mean) from the Spearman correlation coefficients between the two correlation matrices of synth_sample and real_sample.
If a column in synth_sample or real_sample is single-valued, add a small noise on one (randomly) value of the column.
Args:
synth_sample (DataFrame): A DataFrame of synthetic observations.
real_sample (DataFrame): A DataFrame of orginal (raw) observations.
Returns:
float: The maximal difference between the correlation coefficients of synth_sample and real_sample.
float: The mean difference between the correlation coefficients of synth_sample and real_sample.
"""
delta = 1*10**(-12)
synth, real = synth_sample.values, real_sample.values
for col in range(synth.shape[1]):
if np.std(synth[:, col]) == 0:
rand_row = np.random.choice(synth.shape[0])
synth[rand_row, col] = synth[rand_row, col] - delta
if np.std(real[:, col]) == 0:
rand_row = np.random.choice(real.shape[0])
real[rand_row, col] = real[rand_row, col] - delta
real_corr = spearmanr(real)[0]
synth_corr = spearmanr(synth)[0]
max_diff_corr = np.max(np.abs(real_corr - synth_corr))
mean_diff_corr = np.mean(np.abs(real_corr - synth_corr))
return max_diff_corr, mean_diff_corr