-
Notifications
You must be signed in to change notification settings - Fork 0
/
preprocess.py
358 lines (269 loc) · 12.6 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
import json
import numpy as np
import pandas as pd
import os
import re
from collections import defaultdict
import cv2 as cv
from pathlib import Path
import matplotlib.pyplot as plt
from keras_preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
def __calculate_bbx (bbx_list):
x1, y1, w, h = tuple(bbx_list)
x1 = int(x1)
y1 = int(y1)
x2 = int(x1 + w)
y2 = int(y1 + h)
start = (x1, y1)
end = (x2, y2)
return start, end
class cocoParser:
"""[summary]
"""
def __init__(self, cocopath ) -> None:
self.cocopath = cocopath
#self.imagespath = imagespath
with open(cocopath, 'rb') as f:
coco_labels = json.load(f)
self.annotations = coco_labels['annotations']
self.label_ids = [dct['id'] for dct in coco_labels['annotations'] ]
self.image_ids = [dct['image_id'] for dct in coco_labels['annotations']]
self.segs = [dct.get('segmentation') for dct in coco_labels['annotations']]
self.bbxs = [dct.get('bbox') for dct in coco_labels['annotations']]
self.names_ids = [(dct['id'], dct['file_name']) for dct in coco_labels['images']]
@staticmethod
def __formatsegs (lst):
segs = [np.array(seg, dtype= np.int32) for seg in lst]
segs = [seg.reshape(-1,2) for seg in segs]
return segs
@staticmethod
def __createmask (segs, shape):
canvas = np.zeros(shape)
mask = cv.fillPoly(canvas, pts = segs, color = (255, 255, 255))
return mask
@staticmethod
def __create_border (mask, thickness):
image = np.copy(mask)
kernel = cv.getStructuringElement(cv.MORPH_ELLIPSE, (thickness, thickness))
image = cv.erode(image, kernel)
border = mask - image
return border
def postive_data(self):
# retrieve the unique image names and ids with a valid annotation poly in the cocolabels file.
ids_, names = zip(*self.names_ids)
valid_names_ids = [(img_id, name) for img_id in set(self.image_ids) for (id_, name) in self.names_ids if img_id == id_]
valid_ids, valid_names = zip(*valid_names_ids)
# get a tuple of each valid image id & their annotation polys --> (id, [seg_poly])
valid_segs = defaultdict(list)
segs_ids = [(img_id, ann['segmentation']) for img_id in valid_ids for ann in self.annotations if img_id == ann['image_id'] ]
# collect the multiple segmenation ploys that belong to the same id in a single 2d list per id --> {id: [[ploy_1, poly_2,..,ploy_n]]}
for img_id, seg in segs_ids:
valid_segs[img_id].append(seg[0])
# record in a data frame
segs = list(valid_segs.values()) # put all the 2d seg lists from the dict in one single list to pass to the data frame
df_dct = {'img_id': valid_ids, 'name': valid_names, 'seg': segs}
pos_data = pd.DataFrame(df_dct)
# format the polygon arrays, create borders and masks
shape = (256,256)
pos_data['seg'] = pos_data['seg'].apply(self.__formatsegs)
pos_data['mask'] = pos_data['seg'].apply(lambda x: self.__createmask(x, shape ) )
pos_data['border'] = pos_data['mask'].apply(lambda x: self.__create_border(x, thickness = 15))
return pos_data
class buildSplits:
'''
the class expects the train/val/test split images each in a folder.
takes in the postive data frame
returns a dataframe with cols : [index, segmentation coords, mask label array, border label array]
'''
def __init__(self, postive_df, test_imgpath, seed = 32) -> None:
self.postive_df = postive_df
self.test_imgpath = test_imgpath
self.seed = seed
@staticmethod
def compose_arrays(signal):
masks = np.ndarray(shape = (len(signal), 256, 256, 1), dtype= np.float32)
for indx in range(len(signal)):
mask = signal.loc[indx]
mask = mask.reshape((256, 256, 1))
masks[indx] = mask
return masks
def sklearn_splits(self, neg_test = True):
names = self.postive_df[['name']]
signal = self.postive_df[['mask', 'border']]
xtrain, xtest, ytrain, ytest = train_test_split(names, signal , test_size=.2, random_state= self.seed )
xtrain, xval, ytrain, yval = train_test_split(xtrain, ytrain, test_size=.2, random_state= self.seed )
splits = [xtrain, ytrain, xval, yval, xtest, ytest]
splits = [split.reset_index(inplace= False, drop= True) for split in splits]
train_df = pd.concat([splits[0], splits[1]], axis= 1) # xtrain, ytrain
val_df = pd.concat([splits[2], splits[3]], axis= 1) # xval, yval
test_df = pd.concat([splits[4], splits[5]], axis= 1) # xtest, ytest
if neg_test:
negtest_df = self.neg_test_df(test_df)
test_df = negtest_df
return train_df, val_df, test_df
def neg_test_df(self, test_df, neg_lenght = 1000):
pattern = r'\d+_\w.png'
nonschool_names = [f for f in os.listdir(self.test_imgpath) if re.search(pattern, f)]
split_df = pd.DataFrame (nonschool_names, columns = ['name'])
split_df = pd.merge(split_df.iloc[:neg_lenght], test_df, on= 'name', how= 'outer')
blank_seg = np.zeros_like(self.postive_df.loc[1, 'seg'])
blank_canvas = np.zeros_like(self.postive_df.loc[1, 'mask'])
split_df = split_df.replace(np.nan, 0)
#split_df['seg'] = split_df['seg'].map(lambda x: blank_seg if type(x) is int else x )
split_df['mask'] = split_df['mask'].map(lambda x: blank_canvas if type(x) is int else x )
split_df['border'] = split_df['border'].map(lambda x: blank_canvas if type(x) is int else x )
split_df = split_df.sample(frac = 1, random_state = self.seed) # shuffle the dataframe rows inplace
split_df.reset_index(inplace= True, drop= True)
return split_df
def get_split_df(self, split = 'train'):
split_df = pd.DataFrame (os.listdir(self.split_imgpath + f'{split}/'), columns = ['name'])
split_df = pd.merge(split_df, self.postive_df, on= 'name', how= 'left')
blank_seg = np.zeros_like(self.postive_df.loc[1, 'seg'])
blank_canvas = np.zeros_like(self.postive_df.loc[1, 'mask'])
split_df = split_df.replace(np.nan, 0)
split_df['seg'] = split_df['seg'].map(lambda x: blank_seg if type(x) is int else x )
split_df['mask'] = split_df['mask'].map(lambda x: blank_canvas if type(x) is int else x )
split_df['border'] = split_df['border'].map(lambda x: blank_canvas if type(x) is int else x )
split_df.reset_index(inplace= True, drop= True)
return split_df
class augmentation:
# kwargs = {'seed' : 32,
# 'bright_range': (.4, 1.),
# 'hue_range' : 2.0,
# 'batch_size' : 1,
# 'rotation': None,
# 'zoom': None,
# 'interpolation': None}
def __init__(self, user_dct = None ) -> None:
self.seed = 32
self.bright_range = (.4, 1.)
self.hue_range = 2.0
self.batch_size = 12
self.rotation = None
self.zoom = None
self.interpolation = None
if user_dct:
for k, v in user_dct.items():
if k in self.__dict__:
setattr(self, k, v)
else:
raise KeyError(k)
aug_args = dict(
horizontal_flip=True,
vertical_flip=True,
brightness_range = self.bright_range,
channel_shift_range = self.hue_range,
rescale=1./255
)
self.gen = ImageDataGenerator(**aug_args)
self.testgen = ImageDataGenerator(rescale= 1./255)
keys = ['seed', 'bright_range', 'hue_range' ,'batch_size', 'rotation' ,'zoom', 'interpolation']
self.aug_dct = {key: self.__dict__.get(key) for key in keys}
@staticmethod
def __data_gen (x_gen, mask_gen, border_gen):
while True:
image = next(x_gen)
ymask = next(mask_gen)
yborder = next(border_gen)
yield image, [ymask, yborder]
@staticmethod
def __get_targetarrays(split_df, target_col = 'mask'):
size = len(split_df)
labels = np.ndarray(shape = (size, 256, 256, 1), dtype= np.float32)
for indx in range(size):
y = split_df.loc[indx, target_col]
y = y.reshape((256,256,1))
labels[indx] = y
return labels
@staticmethod
def filter_df (split_df, imgpath ):
fltr = split_df['name'].map(lambda x: x in os.listdir(imgpath))
split_df = split_df[fltr].reset_index(drop= True)
return split_df
def get_splitgen (self, split_df, imgpath, test = False):
split_df = self.filter_df (split_df, imgpath)
gen = self.testgen if test else self.gen
masks_arrs = self.__get_targetarrays(split_df)
borders_arrs = self.__get_targetarrays(split_df, target_col = 'border')
x_flow = gen.flow_from_dataframe(split_df, x_col = 'name', class_mode= None, validate_filenames= True,
directory= imgpath, batch_size=self.batch_size, seed= self.seed)
y_maskflow = gen.flow(masks_arrs, batch_size=self.batch_size, seed= self.seed)
y_borderflow = gen.flow(borders_arrs, batch_size=self.batch_size, seed= self.seed)
split_gen = self.__data_gen(x_flow, y_maskflow, y_borderflow)
return split_gen
def label_overlay(ximg, mask, border, pred = False):
pixel_clip = np.vectorize(lambda pixel: 0 if pixel < 0.5 else 1)
mask_clr, brdr_clr = (255, 0, 0) , (0, 255, 0)
if pred:
mask_clr, brdr_clr = (0, 0, 255), (255, 255, 255)
mask, border = (pixel_clip(label).squeeze() for label in (mask, border))
image = np.copy(ximg)
image /= np.max(image)
image *= 255
image = image.astype(dtype = np.uint8).squeeze()
mask *= 255
border *= 255
mask = mask.astype(dtype = np.uint8).squeeze()
border = border.astype(dtype = np.uint8).squeeze()
blue_canvas = np.full(image.shape, mask_clr, image.dtype)
white_canvas = np.full(image.shape, brdr_clr, image.dtype)
blueMask = cv.bitwise_and(blue_canvas, blue_canvas, mask=mask)
whiteborder = cv.bitwise_and(white_canvas, white_canvas, mask=border)
out = cv.addWeighted(blueMask, .5, image, 1, 0, image)
out = cv.addWeighted(whiteborder, .5, out, 1, 0, out)
return out
def inspect_trues(test_gen, length) :
ovts = []
for i in range(length):
x, [ymask, yborder] = next(test_gen)
ov = label_overlay(x, ymask, yborder)
ovts.append(ov)
return ovts
def single_overlay(ximg, mask):
image = np.copy(ximg)
image /= np.max(image)
image *= 255
image = image.astype(dtype = np.uint8).squeeze()
mask *= 255
mask = mask.astype(dtype = np.uint8).squeeze()
red_canvas = np.full(image.shape, (0, 0, 255), image.dtype)
redMask = cv.bitwise_and(red_canvas, red_canvas, mask=mask)
out = cv.addWeighted(redMask, .5, image, 1, 0, image)
return out
def cocodraw(self, size, index = 0,*, image_num = None):
if image_num:
image_name = str(image_num)
image_id = [img_id for (img_id, name) in self.names_ids if image_name in name][0]
annotation = [dct for dct in self.annotations if dct['image_id'] == image_id]
else:
image_id = [dct['image_id'] for dct in self.annotations if dct['id'] == index][0]
annotation = [dct for dct in self.annotations if dct['image_id'] == image_id]
label_id = annotation[0]['id']
img_id = annotation[0]['image_id']
segs = [dct['segmentation'] for dct in annotation]
bbxes = [dct['bbox'] for dct in annotation]
img_name = [name for id, name in self.names_ids if id == img_id][0]
img_path = Path(self.imagespath + img_name)
segs = [np.array(seg, dtype= np.int32) for seg in segs]
segs = [seg.reshape(-1,2) for seg in segs]
bbxes = [__calculate_bbx(box) for box in bbxes]
try:
img_path = img_path.resolve(strict=True)
except FileNotFoundError as err:
print(err)
else:
img = cv.imread(img_path.as_posix())
img_seg = np.copy(img)
img_bbx = np.copy(img)
img_seg = cv.polylines(img_seg, segs, True,(36,255,12) , 2)
for box in bbxes:
start, end = box
cv.rectangle(img_bbx, start, end,(36,255,12) , 2)
plt.figure(figsize= size)
plt.subplot(121)
plt.imshow(img_seg)
plt.title (f"Id: {label_id} | Img_id: {img_id} | {img_name}")
plt.subplot(122 )
plt.imshow(img_bbx)
plt.show()