-
Notifications
You must be signed in to change notification settings - Fork 1
/
sampling_strategy.py
266 lines (216 loc) · 9.42 KB
/
sampling_strategy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
"""
Code modified from modAL project: https://github.com/modAL-python/modAL
Uncertainty measures and uncertainty based sampling strategies for the active learning models.
"""
from typing import Tuple, Union, Callable, List
import numpy as np
from scipy.stats import entropy
import tensorflow as tf
from tensorflow.keras import Model
from sklearn.metrics.pairwise import euclidean_distances
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from model import create_dnn, create_dnn2
def shuffled_argmax(values: np.ndarray, n_instances: int = 1) -> np.ndarray:
"""
Shuffles the values and sorts them afterwards. This can be used to break
the tie when the highest utility score is not unique. The shuffle randomizes
order, which is preserved by the mergesort algorithm.
Args:
values: Contains the values to be selected from.
n_instances: Specifies how many indices to return.
Returns:
The indices of the n_instances largest values.
"""
assert n_instances <= values.shape[0], 'n_instances must be less or equal than the size of utility'
# shuffling indices and corresponding values
shuffled_idx = np.random.permutation(len(values))
shuffled_values = values[shuffled_idx]
# getting the n_instances best instance
# since mergesort is used, the shuffled order is preserved
sorted_query_idx = np.argsort(shuffled_values, kind='mergesort')[len(shuffled_values)-n_instances:]
# inverting the shuffle
query_idx = shuffled_idx[sorted_query_idx]
return query_idx
def multi_argmax(values: np.ndarray, n_instances: int = 1) -> np.ndarray:
"""
Selects the indices of the n_instances highest values.
Args:
values: Contains the values to be selected from.
n_instances: Specifies how many indices to return.
Returns:
The indices of the n_instances largest values.
"""
assert n_instances <= values.shape[0], 'n_instances must be less or equal than the size of utility'
max_idx = np.argpartition(-values, n_instances-1, axis=0)[:n_instances]
return max_idx
def classifier_entropy(classifier: Model, X: np.ndarray, y: np.ndarray, binary_labels:bool = True, dual=False) -> np.ndarray:
"""
Entropy of predictions of the for the provided samples.
Args:
classifier: The classifier for which the prediction entropy is to be measured.
X: The samples for which the prediction entropy is to be measured.
Returns:
Entropy of the class probabilities.
"""
if dual:
classwise_uncertainty = classifier(X,y).reshape(-1, 1)
classwise_uncertainty = np.hstack((1-classwise_uncertainty, classwise_uncertainty))
else:
classwise_uncertainty = classifier(X).reshape(-1, 1)
classwise_uncertainty = np.hstack((1-classwise_uncertainty, classwise_uncertainty))
# print(classwise_uncertainty.shape)
return np.transpose(entropy(np.transpose(classwise_uncertainty)))
def entropy_sampling(classifier: Model, X: np.ndarray, y: np.ndarray, binary_labels: bool = True,
n_instances: int = 1, dual=False, random_tie_break: bool = False) -> np.ndarray:
"""
Entropy sampling query strategy. Selects the instances where the class probabilities
have the largest entropy.
Args:
classifier: The classifier for which the labels are to be queried.
X: The pool of samples to query from.
n_instances: Number of samples to be queried.
random_tie_break: If True, shuffles utility scores to randomize the order. This
can be used to break the tie when the highest utility score is not unique.
Returns:
The indices of the instances from X chosen to be labelled;
the instances from X chosen to be labelled.
"""
ent = classifier_entropy(classifier, X, y, binary_labels, dual)
# print(ent.shape, n_instances)
if not random_tie_break:
query_idx = multi_argmax(ent, n_instances=n_instances)
else:
query_idx = shuffled_argmax(ent, n_instances=n_instances)
return query_idx
def random_sampling(X: np.ndarray, rg: np.random.Generator, n_instances: int = 1)-> np.ndarray:
"""
Random sampling query strategy. Selects random instances from X.
Args:
classifier: The classifier for which the labels are to be queried.
X: The pool of samples to query from.
n_instances: Number of samples to be queried.
Returns:
The indices of the instances from X chosen to be labelled;
the instances from X chosen to be labelled.
"""
query_idx = rg.integers(0, X.shape[0]-1, size=n_instances)
return query_idx
def mc_dropout(X_seed: np.ndarray, y_seed: np.ndarray, y_seed_true: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray, y_val_true: np.ndarray,
X: np.ndarray, y: np.ndarray,
n_models: int = 1,
variance = False,
n_instances: int = 1,
dual: bool = False) -> np.ndarray:
"""
MC-dropout implementattion
"""
checkpoint_filepath = '/tmp/checkpoint3'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_filepath,
save_weights_only=True,
monitor='val_accuracy',
mode='max',
save_best_only=True)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=30)
if dual:
model = create_dnn2(mc=True)
# Fit the substitute model with the initial seed data
model.fit((X_seed, y_seed_true), y_seed,
batch_size=128,
epochs=100,
validation_data=((X_val, y_val_true), y_val),
callbacks=[model_checkpoint_callback, early_stopping])
else:
model = create_dnn(mc=True)
# sub_model.summary()
# Fit the substitute model with the initial seed data
model.fit(X_seed, y_seed,
batch_size=128,
epochs=100,
validation_data=(X_val, y_val),
callbacks=[model_checkpoint_callback, early_stopping])
model.load_weights(checkpoint_filepath)
predictions = []
for _ in range(n_models):
if dual:
y_pred = model.predict((X,y)).reshape(-1, 1)
else:
y_pred = model.predict(X).reshape(-1, 1)
# print(classwise_uncertainty.shape)
predictions.append(y_pred)
# print(y_pred[:10])
print(np.array(predictions).shape)
if variance:
var = np.var(predictions, axis=0)
# print(var.shape)
#query_idx = np.argpartition(var, -n_instances)[-n_instances:]
query_idx = multi_argmax(var, n_instances=n_instances).squeeze()
# print(query_idx[:10])
# print(query_idx.shape)
del var, predictions, model
else:
mean_pred = np.mean(predictions, axis=0)
del predictions
class_predictions = np.hstack((1-mean_pred, mean_pred))
# print(class_predictions.shape)
ent = np.transpose(entropy(np.transpose(class_predictions)))
# print(ent.shape)
query_idx = multi_argmax(ent, n_instances=n_instances)
# print(query_idx.shape)
del ent, class_predictions, mean_pred, model
return query_idx
def k_center(X_cluster: np.ndarray,
X_med: np.ndarray,
n_instances: int = 1) -> Tuple[List, np.ndarray]:
"""
Greedy K-center implementattion
"""
query_idx = []
for i in range(n_instances):
# Calculate distances between cluster centers and points
dist = euclidean_distances(X_med, X_cluster)
# Find the minimum distance from a cluster center for each point
D_min = np.min(dist, axis=1)
# Find the point that has the maximum distance
D_min_argmax = np.argmax(D_min)
# Add the point to the cluster centers
X_cluster = np.vstack([X_cluster, X_med[D_min_argmax]])
query_idx.append(D_min_argmax)
assert len(query_idx) == n_instances
return query_idx, X_cluster
def ensemble(X_seed:np.ndarray, y_seed:np.ndarray, X: np.ndarray, num_models: int = 3, n_instances: int = 1)-> np.ndarray:
uncertainties = []
for _ in range(num_models):
X_train, X_test, y_train, y_test = train_test_split(X_seed, y_seed, test_size=0.1, random_state=np.random.randint(0, 1000))
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_test, label=y_test)
lgb_params = {
"boosting_type" : "gbdt",
"objective" : "binary",
"learning_rate" : 0.05,
"num_leaves": 2048,
"max_depth" : 15,
"min_child_samples": 30,
"verbose": -1,
}
# val_data = lgb.Dataset(X_val, y_val, free_raw_data=False)
model = lgb.train(lgb_params, train_data,
num_boost_round=500,
valid_sets=[val_data],
verbose_eval=False,
early_stopping_rounds=50
)
classwise_uncertainty = model.predict(X).reshape(-1, 1)
# print(classwise_uncertainty[:10])
uncertainties.append(classwise_uncertainty)
# print(np.array(uncertainties).shape)
var = np.var(uncertainties, axis=0)
# print(var.shape)
#query_idx = np.argpartition(var, -n_instances)[-n_instances:]
query_idx = multi_argmax(var, n_instances=n_instances).squeeze()
# print(query_idx[:10])
# print(query_idx.shape)
del var, uncertainties
return query_idx