-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathtest_loop_plugins.py
226 lines (194 loc) · 8.42 KB
/
test_loop_plugins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
from strax import testutils
import strax
import numpy as np
from hypothesis import given, strategies, example, settings
import tempfile
# Save some selfs some time, let's import from cut-plugin tests some dummy arrays
from .test_cut_plugin import _dtype_name, full_dt_dtype, full_time_dtype, get_some_array
def rechunk_array_to_arrays(array, n: int):
"""Yield successive n-sized chunks from array."""
for i in range(0, len(array), n):
yield array[i:i + n]
def drop_random(chunks: list) -> list:
"""
Drop some of the data in the chunks
:param chunks: list op numpy arrays to modify. Here we will drop some of the fields randomly
:return: list of chunks
"""
res = []
for chunk in chunks:
if len(chunk) > 1:
# We are going to keep this many items in this chunk
keep_n = np.random.randint(1, len(chunk)+1)
# These are the indices we will keep (only keep unique ones)
keep_indices = np.random.randint(0, len(chunk)-1, keep_n)
keep_indices = np.unique(keep_indices)
keep_indices.sort()
# This chunk will now be reduced using only keep_indices
d = chunk[keep_indices]
res.append(d)
return res
@given(get_some_array().filter(lambda x: len(x) >= 0),
strategies.integers(min_value=1, max_value=10))
@settings(deadline=None)
@example(
big_data=np.array(
[(0, 0, 1, 1),
(1, 1, 1, 1),
(5, 2, 2, 1),
(11, 4, 2, 4)],
dtype=full_dt_dtype),
nchunks=2)
def test_loop_plugin(big_data, nchunks):
"""Test the loop plugin for random data"""
_loop_test_inner(big_data, nchunks)
@given(get_some_array().filter(lambda x: len(x) >= 0),
strategies.integers(min_value=1, max_value=10))
@settings(deadline=None)
@example(
big_data=np.array(
[(0, 0, 1, 1),
(1, 1, 1, 1),
(5, 2, 2, 1),
(11, 4, 2, 4)],
dtype=full_dt_dtype),
nchunks=2)
def test_loop_plugin_multi_output(big_data, nchunks,):
"""
Test the loop plugin for random data where it should give multiple
outputs
"""
_loop_test_inner(big_data, nchunks, target='other_combined_things')
@given(get_some_array().filter(lambda x: len(x) == 0),
strategies.integers(min_value=2, max_value=10))
@settings(deadline=None)
@example(
big_data=np.array(
[],
dtype=full_dt_dtype),
nchunks=2)
def test_value_error_for_loop_plugin(big_data, nchunks):
"""Make sure that we are are getting the right ValueError"""
try:
_loop_test_inner(big_data, nchunks, force_value_error=True)
raise RuntimeError(
'did not run into ValueError despite the fact we are having '
'multiple none-type chunks')
except ValueError:
# Good we got the ValueError we wanted
pass
def _loop_test_inner(big_data, nchunks, target='added_thing', force_value_error=False):
"""
Test loop plugins for random data. For this test we are going to
setup to plugins that will be looped over and combined into a loop
plugin (depending on the target, this may be a multi output plugin).
We are going to setup as follows:
- setup chunks for a big data plugin (where we will loop over later)
- generate some data with similar chunking called 'small data' this
we will add to the big data in the loop plugin
"""
if len(big_data) or force_value_error:
# Generate some random amount of chunks for the big-data
big_chunks = list(rechunk_array_to_arrays(big_data, nchunks))
else:
# If empty, there is no reason to make multiple empty chunks
# unless we want to force the ValueError later
big_chunks = [big_data]
_dtype = big_data.dtype
# TODO smarter test. I want to drop some random data from the
# small_chunks but this does not work yet. Perhaps related to
# https://github.com/AxFoundation/strax/pull/345 (will fix in that
# PR)
# small_chunks = drop_random(big_chunks.copy()) # What I want to do
small_chunks = big_chunks
class BigThing(strax.Plugin):
"""Plugin that provides data for looping over"""
depends_on = tuple()
dtype = _dtype
provides = 'big_thing'
data_kind = 'big_kinda_data'
def compute(self, chunk_i):
data = big_chunks[chunk_i]
chunk = self.chunk(
data=data,
start=(
int(data[0]['time']) if len(data)
else np.arange(len(big_chunks))[chunk_i]),
end=(
int(strax.endtime(data[-1])) if len(data)
else np.arange(1, len(big_chunks) + 1)[chunk_i]))
return chunk
def is_ready(self, chunk_i):
# Hack to make peak output stop after a few chunks
return chunk_i < len(big_chunks)
def source_finished(self):
return True
class SmallThing(strax.CutPlugin):
"""Minimal working example of CutPlugin"""
depends_on = tuple()
provides = 'small_thing'
data_kind = 'small_kinda_data'
dtype = _dtype
def compute(self, chunk_i):
data = small_chunks[chunk_i]
chunk = self.chunk(
data=data,
start=(
int(data[0]['time']) if len(data)
else np.arange(len(small_chunks))[chunk_i]),
end=(
int(strax.endtime(data[-1])) if len(data)
else np.arange(1, len(small_chunks) + 1)[chunk_i]))
return chunk
def is_ready(self, chunk_i):
# Hack to make peak output stop after a few chunks
return chunk_i < len(small_chunks)
def source_finished(self):
return True
class AddBigToSmall(strax.LoopPlugin):
"""
Test loop plugin by looping big_thing and adding whatever is in small_thing
"""
depends_on = 'big_thing', 'small_thing'
provides = 'added_thing'
loop_over = 'big_thing' # Also just test this feature
def infer_dtype(self):
# Get the dtype from the dependency
return self.deps['big_thing'].dtype
def compute(self, big_kinda_data, small_kinda_data):
res = np.zeros(len(big_kinda_data), dtype=self.dtype)
for k in res.dtype.names:
if k == _dtype_name:
res[k] = big_kinda_data[k]
for small_bit in small_kinda_data[k]:
for i in range(len(res[k])):
res[k][i] += small_bit
else:
res[k] = big_kinda_data[k]
return res
class AddBigToSmallMultiOutput(strax.LoopPlugin):
depends_on = 'big_thing', 'small_thing'
provides = 'some_combined_things', 'other_combined_things'
data_kind = {k: k for k in provides}
def infer_dtype(self):
# Get the dtype from the dependency.
# NB! This should be a dict for the kind of provide arguments
return {k: self.deps['big_thing'].dtype for k in self.provides}
def compute(self, big_kinda_data, small_kinda_data):
res = np.zeros(len(big_kinda_data), _dtype)
for k in res.dtype.names:
if k == _dtype_name:
res[k] = big_kinda_data[k]
for small_bit in small_kinda_data[k]:
for i in range(len(res[k])):
res[k][i] += small_bit
else:
res[k] = big_kinda_data[k]
return {k: res for k in self.provides}
with tempfile.TemporaryDirectory() as temp_dir:
st = strax.Context(storage=[strax.DataDirectory(temp_dir)])
st.register((BigThing, SmallThing, AddBigToSmall, AddBigToSmallMultiOutput))
result = st.get_array(run_id='some_run', targets=target)
assert np.shape(result) == np.shape(big_data), 'Looping over big_data resulted in a different datasize?!'
assert np.sum(result[_dtype_name]) >= np.sum(big_data[_dtype_name]), "Result should be at least as big as big_data because we added small_data data"
assert isinstance(result, np.ndarray), "Result is not ndarray?"