-
Notifications
You must be signed in to change notification settings - Fork 54
/
bcolz_array_iterator.py
103 lines (87 loc) · 4.27 KB
/
bcolz_array_iterator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import numpy as np
import bcolz
import threading
class BcolzArrayIterator(object):
"""
Returns an iterator object into Bcolz carray files
Original version by Thiago Ramon Gonçalves Montoya
Docs (and discovery) by @MPJansen
Refactoring, performance improvements, fixes by Jeremy Howard [email protected]
:Example:
X = bcolz.open('file_path/feature_file.bc', mode='r')
y = bcolz.open('file_path/label_file.bc', mode='r')
trn_batches = BcolzArrayIterator(X, y, batch_size=64, shuffle=True)
model.fit_generator(generator=trn_batches, samples_per_epoch=trn_batches.N, nb_epoch=1)
:param X: Input features
:param y: (optional) Input labels
:param w: (optional) Input feature weights
:param batch_size: (optional) Batch size, defaults to 32
:param shuffle: (optional) Shuffle batches, defaults to false
:param seed: (optional) Provide a seed to shuffle, defaults to a random seed
:rtype: BcolzArrayIterator
>>> A = np.random.random((32*10 + 17, 10, 10))
>>> c = bcolz.carray(A, rootdir='test.bc', mode='w', expectedlen=A.shape[0], chunklen=16)
>>> c.flush()
>>> Bc = bcolz.open('test.bc')
>>> bc_it = BcolzArrayIterator(Bc, shuffle=True)
>>> C_list = [next(bc_it) for i in range(11)]
>>> C = np.concatenate(C_list)
>>> np.allclose(sorted(A.flatten()), sorted(C.flatten()))
True
>>> c.purge()
"""
def __init__(self, X, y=None, w=None, batch_size=32, shuffle=False, seed=None):
if y is not None and len(X) != len(y):
raise ValueError('X (features) and y (labels) should have the same length'
'Found: X.shape = %s, y.shape = %s' % (X.shape, y.shape))
if w is not None and len(X) != len(w):
raise ValueError('X (features) and w (weights) should have the same length'
'Found: X.shape = %s, w.shape = %s' % (X.shape, w.shape))
if batch_size % X.chunklen != 0:
raise ValueError('batch_size needs to be a multiple of X.chunklen')
self.chunks_per_batch = batch_size // X.chunklen
self.X = X
self.y = y if y is not None else None
self.w = w if w is not None else None
self.N = X.shape[0]
self.batch_size = batch_size
self.batch_index = 0
self.total_batches_seen = 0
self.lock = threading.Lock()
self.shuffle = shuffle
self.seed = seed
self.loop()
def reset(self): self.batch_index = 0
def loop(self):
self.batch_index = 0
if self.seed is not None:
np.random.seed(self.seed + self.total_batches_seen)
index_length = (self.X.nchunks + 1) if self.X.leftover_elements > 0 else self.X.nchunks
self.index_array = (np.random.permutation(index_length) if self.shuffle else np.arange(index_length))
def next(self):
with self.lock:
batches_x, batches_y, batches_w = [],[],[]
for i in range(self.chunks_per_batch):
current_index = self.index_array[self.batch_index]
if current_index == self.X.nchunks:
batches_x.append(self.X.leftover_array[:self.X.leftover_elements])
current_batch_size = self.X.leftover_elements
else:
batches_x.append(self.X.chunks[current_index][:])
current_batch_size = self.X.chunklen
self.batch_index += 1
self.total_batches_seen += 1
idx = current_index * self.X.chunklen
if not self.y is None: batches_y.append(self.y[idx: idx + current_batch_size])
if not self.w is None: batches_w.append(self.w[idx: idx + current_batch_size])
if self.batch_index >= len(self.index_array):
self.loop()
break
batch_x = np.concatenate(batches_x)
if self.y is None: return batch_x
batch_y = np.concatenate(batches_y)
if self.w is None: return batch_x, batch_y
batch_w = np.concatenate(batches_w)
return batch_x, batch_y, batch_w
def __iter__(self): return self
def __next__(self, *args, **kwargs): return self.next(*args, **kwargs)