-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpvp_data_loader.py
192 lines (163 loc) · 7.88 KB
/
pvp_data_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import numpy as np
import scipy.io as sio
from torch.utils.data import Dataset, DataLoader
from utils import TT_split, normalize
import torch
import random
def load_data(dataset, neg_prop, align_prop, is_noise):
all_data = []
train_pairs = []
label = []
mat = sio.loadmat('./datasets/' + dataset + '.mat')
if dataset == 'Scene15':
data = mat['X'][0][0:2] # 20, 59 dimensions
label = np.squeeze(mat['Y'])
elif dataset == 'Caltech101':
data = mat['X'][0][3:5]
label = np.squeeze(mat['Y'])
elif dataset == 'Reuters_dim10':
data = [] # 18758 samples
data.append(normalize(np.vstack((mat['x_train'][0], mat['x_test'][0]))))
data.append(normalize(np.vstack((mat['x_train'][1], mat['x_test'][1]))))
label = np.squeeze(np.hstack((mat['y_train'], mat['y_test'])))
elif dataset == 'NoisyMNIST-30000':
data = []
data.append(mat['X1'])
data.append(mat['X2'])
label = np.squeeze(mat['Y'])
elif dataset == 'Caltech101-deepfea':
data = []
label = mat['gt']
data.append(mat['X'][0][0].T)
data.append(mat['X'][0][1].T)
elif dataset == 'MNIST-USPS':
data = []
data.append(mat['X1'])
data.append(normalize(mat['X2']))
label = np.squeeze(mat['Y'])
elif dataset == 'AWA-deepfea':
data = []
label = mat['gt']
data.append(mat['X'][0][5].T)
data.append(mat['X'][0][6].T)
divide_seed = random.randint(1, 1000)
train_idx, test_idx = TT_split(len(label), 1 - align_prop, divide_seed)
train_label, test_label = label[train_idx], label[test_idx]
train_X, train_Y, test_X, test_Y = data[0][train_idx], data[1][train_idx], data[0][test_idx], data[1][test_idx]
# Use test_prop*sizeof(all data) to train the MvCLN, and shuffle the rest data to simulate the unaligned data.
# Note that, MvCLN establishes the correspondence of the all data rather than the unaligned portion in the testing.
# When test_prop = 0, MvCLN is directly performed on the all data without shuffling.
if align_prop == 1.:
all_data.append(train_X.T)
all_data.append(train_Y.T)
all_label, all_label_X, all_label_Y = train_label, train_label, train_label
else:
shuffle_idx = random.sample(range(len(test_Y)), len(test_Y))
test_Y = test_Y[shuffle_idx]
test_label_X, test_label_Y = test_label, test_label[shuffle_idx]
all_data.append(np.concatenate((train_X, test_X)).T)
all_data.append(np.concatenate((train_Y, test_Y)).T)
all_label = np.concatenate((train_label, test_label))
all_label_X = np.concatenate((train_label, test_label_X))
all_label_Y = np.concatenate((train_label, test_label_Y))
# pair construction. view 0 and 1 refer to pairs constructed for training. noisy and real labels refer to 0/1 label of those pairs
view0, view1, noisy_labels, real_labels, _, _ = get_pairs(train_X, train_Y, neg_prop, train_label)
count = 0
for i in range(len(noisy_labels)):
if noisy_labels[i] != real_labels[i]:
count += 1
print('noise rate of the constructed neg. pairs is ', round(count / (len(noisy_labels) - len(train_X)), 2))
if is_noise: # training with noisy negative correspondence
print("----------------------Training with noisy_labels----------------------")
train_pair_labels = noisy_labels
else: # training with gt negative correspondence
print("----------------------Training with real_labels----------------------")
train_pair_labels = real_labels
train_pairs.append(view0.T)
train_pairs.append(view1.T)
train_pair_real_labels = real_labels
return train_pairs, train_pair_labels, train_pair_real_labels, all_data, all_label, all_label_X, all_label_Y, divide_seed
def get_pairs(train_X, train_Y, neg_prop, train_label):
view0, view1, labels, real_labels, class_labels0, class_labels1 = [], [], [], [], [], []
# construct pos. pairs
for i in range(len(train_X)):
view0.append(train_X[i])
view1.append(train_Y[i])
labels.append(1)
real_labels.append(1)
class_labels0.append(train_label[i])
class_labels1.append(train_label[i])
# construct neg. pairs by taking each sample in view0 as an anchor and randomly sample neg_prop samples from view1,
# which may lead to the so called noisy labels, namely, some of the constructed neg. pairs may in the same category.
for j in range(len(train_X)):
neg_idx = random.sample(range(len(train_Y)), neg_prop)
for k in range(neg_prop):
view0.append(train_X[j])
view1.append(train_Y[neg_idx[k]])
labels.append(0)
class_labels0.append(train_label[j])
class_labels1.append(train_label[neg_idx[k]])
if train_label[j] != train_label[neg_idx[k]]:
real_labels.append(0)
else:
real_labels.append(1)
labels = np.array(labels, dtype=np.int64)
real_labels = np.array(real_labels, dtype=np.int64)
class_labels0, class_labels1 = np.array(class_labels0, dtype=np.int64), np.array(class_labels1, dtype=np.int64)
view0, view1 = np.array(view0, dtype=np.float32), np.array(view1, dtype=np.float32)
return view0, view1, labels, real_labels, class_labels0, class_labels1
class GetDataset(Dataset):
def __init__(self, data, labels, real_labels):
self.data = data
self.labels = labels
self.real_labels = real_labels
def __getitem__(self, index):
fea0, fea1 = torch.from_numpy(self.data[0][:, index]).float(), torch.from_numpy(self.data[1][:, index]).float()
fea0, fea1 = fea0.unsqueeze(0), fea1.unsqueeze(0)
label = np.int64(self.labels[index])
if len(self.real_labels) == 0:
return fea0, fea1, label
real_label = np.int64(self.real_labels[index])
return fea0, fea1, label, real_label
def __len__(self):
return len(self.labels)
class GetAllDataset(Dataset):
def __init__(self, data, labels, class_labels0, class_labels1):
self.data = data
self.labels = labels
self.class_labels0 = class_labels0
self.class_labels1 = class_labels1
def __getitem__(self, index):
fea0, fea1 = torch.from_numpy(self.data[0][:, index]).float(), torch.from_numpy(self.data[1][:, index]).float()
fea0, fea1 = fea0.unsqueeze(0), fea1.unsqueeze(0)
label = np.int64(self.labels[index])
class_labels0 = np.int64(self.class_labels0[index])
class_labels1 = np.int64(self.class_labels1[index])
return fea0, fea1, label, class_labels0, class_labels1
def __len__(self):
return len(self.labels)
def loader(train_bs, neg_prop, aligned_prop, complete_prop, is_noise, dataset):
"""
:param train_bs: batch size for training, default is 1024
:param neg_prop: negative / positive pairs' ratio
:param aligned_prop: known aligned proportions for training SURE
:param is_noise: training with noisy labels or not, 0 --- not, 1 --- yes
:param dataset: choice of dataset
:return: train_pair_loader including the constructed pos. and neg. pairs used for training MvCLN, all_loader including originally aligned and unaligned data used for testing MvCLN
"""
train_pairs, train_pair_labels, train_pair_real_labels, all_data, all_label, all_label_X, all_label_Y, \
divide_seed = load_data(dataset, neg_prop, aligned_prop, is_noise)
train_pair_dataset = GetDataset(train_pairs, train_pair_labels, train_pair_real_labels)
all_dataset = GetAllDataset(all_data, all_label, all_label_X, all_label_Y)
train_pair_loader = DataLoader(
train_pair_dataset,
batch_size=train_bs,
shuffle=True,
drop_last=True
)
all_loader = DataLoader(
all_dataset,
batch_size=1024,
shuffle=True
)
return train_pair_loader, all_loader, divide_seed