-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathDataload_eeg.py
259 lines (209 loc) · 9.8 KB
/
Dataload_eeg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import os
import scipy.io
from scipy.signal import butter
from scipy import signal
from EAV_datasplit import *
'''
NEU_SPE = 108, 0
S_SPE = 1
A_SPE = 2
H_SPE = 3
R_SPE = 4 #####
'''
class DataLoadEEG:
def __init__(self, subject='all', band=[0.3, 50], fs_orig=500, fs_target=100,
parent_directory=r'C:\Users\minho.lee\Dropbox\Datasets\EAV'):
self.subject = subject
self.band = band
self.parent_directory = parent_directory
self.fs_orig = fs_orig
self.fs_target = fs_target
self.seg = []
self.label = []
self.label_div = []
self.seg_f = []
self.seg_f_div = []
def data_mat(self):
subject = f'subject{self.subject:02d}'
eeg_folder = os.path.join(self.parent_directory, subject, 'EEG')
eeg_file_name = subject.rstrip('__') + '_eeg.mat'
eeg_file_path = os.path.join(eeg_folder, eeg_file_name)
label_file_name = subject.rstrip('__') + '_eeg_label.mat'
label_file_path = os.path.join(eeg_folder, label_file_name)
if os.path.exists(eeg_file_path):
mat = scipy.io.loadmat(eeg_file_path)
cnt_ = np.array(mat.get('seg1'))
if np.ndim(cnt_) == 3:
cnt_ = np.array(mat.get('seg1'))
else:
cnt_ = np.array(mat.get('seg'))
mat_y = scipy.io.loadmat(label_file_path)
label = np.array(mat_y.get('label'))
self.seg = np.transpose(cnt_, [1, 0, 2]) # (10000, 30, 200) -> (30ch, 10000t, 200trial)
self.label = label
print(f'Loaded EEG data for {subject}')
else:
print(f'EEG data not found for {subject}')
def downsampling(self, fs_target=100):
[ch, t, tri] = self.seg.shape
factor = fs_target / self.fs_orig
tm = np.reshape(self.seg, [ch, t * tri], order='F')
tm2 = signal.resample_poly(tm, up=1, down=int(self.fs_orig / fs_target), axis=1)
self.seg = np.reshape(tm2, [ch, int(t * factor), tri], order='F')
def bandpass(self):
[ch, t, tri] = self.seg.shape
dat = np.reshape(self.seg, [ch, t * tri], order='F')
# bandpass after the downsample -> fs_target
sos = butter(5, self.band, btype='bandpass', fs=self.fs_target, output='sos')
fdat = list()
for i in range(np.size(dat, 0)):
tm = signal.sosfilt(sos, dat[i, :])
fdat.append(tm)
self.seg_f = np.array(fdat).reshape((ch, t, tri), order='F')
def data_div(self):
# Here 2000 (20seconds) are divided into 4 splits
[ch, t, tri] = self.seg_f.shape
tm1 = self.seg_f.reshape((30, 500, 4, 200), order='F')
self.seg_f_div = tm1.reshape((30, 500, 4 * 200), order='F')
self.label_div = np.repeat(self.label, repeats=4, axis=1)
# Here we only select the listening classes
selected_classes = [1, 3, 5, 7, 9]
label = self.label_div[selected_classes, :]
selected_indices = np.isin(np.argmax(self.label_div, axis=0), selected_classes)
label = label[:, selected_indices]
x = self.seg_f_div[:, :, selected_indices]
self.seg_f_div = np.transpose(x, (2, 0, 1)) # (30, 500, 400) -> (400, 30, 500)
class_indices = np.argmax(label, axis=0)
#self.label_div = label
self.label_div = class_indices
def data_split(self):
selected_classes = [1, 3, 5, 7, 9] # only listening classes
label = self.label_div[selected_classes, :]
selected_indices = np.isin(np.argmax(self.label_div, axis=0), selected_classes)
label = label[:, selected_indices]
x = self.seg_f_div[:, :, selected_indices]
x_train_list = []
x_test_list = []
y_train_list = []
y_test_list = []
for i in range(5): # Looping over each class
class_indices = np.where(label.T[:, i] == 1)[0] # Find indices where current class label is 1
midpoint = len(class_indices) // 2 # Calculate the midpoint for 50% split
# Split data based on found indices
x_train_list.append(x[:, :, class_indices[:midpoint]])
x_test_list.append(x[:, :, class_indices[midpoint:]])
y_train_list.append(label.T[class_indices[:midpoint]])
y_test_list.append(label.T[class_indices[midpoint:]])
# Convert lists to numpy arrays
x_train = np.concatenate(x_train_list, axis=0)
x_test = np.concatenate(x_test_list, axis=0)
y_train = np.concatenate(y_train_list, axis=0)
y_test = np.concatenate(y_test_list, axis=0)
def data_prepare(self):
self.data_mat()
self.downsampling()
self.bandpass()
self.data_div()
return self.seg_f_div, self.label_div
#create eeg pickle files
if __name__ == "__main__":
for sub in range(1,43):
print(sub)
file_path = "C:/Users/minho.lee/Dropbox/Datasets/EAV/Input_images/EEG/"
file_name = f"subject_{sub:02d}_eeg.pkl"
file_ = os.path.join(file_path, file_name)
eeg_loader = DataLoadEEG(subject=sub, band=[0.5, 45], fs_orig=500, fs_target=100,
parent_directory='C://Users//minho.lee//Dropbox//Datasets//EAV')
data_eeg, data_eeg_y = eeg_loader.data_prepare()
division_eeg = EAVDataSplit(data_eeg, data_eeg_y)
[tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg] = division_eeg.get_split(h_idx=56)
EEG_list = [tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg]
'''
# Here you can write / load vision features tr:{280}(30, 500), te:{120}(30, 500)
import pickle
with open(file_, 'wb') as f:
pickle.dump(EEG_list, f)
# You can directly work from here
with open(file_, 'rb') as f:
eeg_list = pickle.load(f)
tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg = eeg_list
data = [tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg]
'''
data = [tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg]
# Transformer for EEG
from Transformer_torch import Transformer_EEG
model = Transformer_EEG.EEGClassificationModel(eeg_channel=30)
trainer = Transformer_EEG.EEGModelTrainer(data, model = model, lr=0.001, batch_size = 64)
trainer.train(epochs=100, lr=None, freeze=False)
[accuracy, predictions] = trainer.evaluate()
# CNN_tensorflow for EEG
from CNN_tensorflow.CNN_EEG_tf import EEGNet
from sklearn.metrics import accuracy_score, confusion_matrix
model = EEGNet(nb_classes=5, D=8, F2=64, Chans=30, kernLength=300, Samples=500,
dropoutRate=0.5)
model.compile(loss='categorical_crossentropy', optimizer='adam',
metrics=['accuracy'])
y_train = np.zeros((tr_y_eeg.shape[0], 5))
y_train[np.arange(tr_y_eeg.shape[0]), tr_y_eeg.flatten()] = 1
y_test = np.zeros((te_y_eeg.shape[0], 5))
y_test[np.arange(te_y_eeg.shape[0]), te_y_eeg.flatten()] = 1
x_train = np.reshape(tr_x_eeg, (280, 30, 500, 1))
x_test = np.reshape(te_x_eeg, (120, 30, 500, 1))
model.fit(x_train, y_train, batch_size=32, epochs=200, shuffle=True, validation_data=(x_test, y_test))
pred = model.predict(x_test)
pred = np.argmax(pred, axis=1)
y_test2 = np.argmax(y_test, axis=1)
cm = confusion_matrix(pred, y_test2)
accuracy = accuracy_score(pred, y_test2)
# CNN_pytorch for EEG, fix the error, and make the accuracy same
from CNN_torch.EEGNet_tor import EEGNet_tor, Trainer_uni
import torch.nn as nn
model = EEGNet_tor(nb_classes=5, D=8, F2=64, Chans=30, kernLength=300, Samples=500,
dropoutRate=0.5)
trainer = Trainer_uni(model=model, data=data, lr=1e-5, batch_size=32, num_epochs=200)
trainer.train()
model.eval()
criterion = nn.CrossEntropyLoss()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
te_x_eeg = torch.tensor(te_x_eeg, dtype=torch.float32).to(device)
te_y_eeg = torch.tensor(te_y_eeg, dtype=torch.long).to(device)
model.to(device)
with torch.no_grad():
scores = model(te_x_eeg)
predictions = scores.argmax(dim=1)
correct = (predictions == te_y_eeg).sum().item()
total = te_y_eeg.size(0)
accuracy = correct / total
print(accuracy)
''' Direct evaluation
if __name__ == "__main__":
eeg_loader = DataLoadEEG(subject=1, band=[0.5, 45], fs_orig=500, fs_target=100,
parent_directory='C://Users//minho.lee//Dropbox//Datasets//EAV')
data_eeg, data_eeg_y = eeg_loader.data_prepare()
division_eeg = EAVDataSplit(data_eeg, data_eeg_y)
[tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg] = division_eeg.get_split()
data = [tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg]
trainer = Transformer_EEG.EEGModelTrainer(data, lr=0.001, batch_size = 64)
trainer.train(epochs=200, lr=None, freeze=False)
'''
'''
from Transformer_EEG import EEGClassificationModel
accuracy_all = list()
prediction_all = list()
if __name__ == "__main__": # from pickle data
import pickle
for sub in range(1, 43):
file_path = "C:/Users/minho.lee/Dropbox/Datasets/EAV/Input_images/EEG/"
file_name = f"subject_{sub:02d}_eeg.pkl"
file_ = os.path.join(file_path, file_name)
with open(file_, 'rb') as f:
eeg_list2 = pickle.load(f)
tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg = eeg_list2
data = [tr_x_eeg, tr_y_eeg, te_x_eeg, te_y_eeg]
model = EEGClassificationModel(eeg_channel=30)
trainer = Transformer_EEG.EEGModelTrainer(data, model = model, lr=0.001, batch_size = 64)
trainer.train(epochs=100, lr=None, freeze=False)
[accuracy, predictions] = trainer.evaluate()
accuracy_all.append(accuracy)
prediction_all.append(predictions)
'''