-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathtstransformers.py
390 lines (315 loc) · 16.9 KB
/
tstransformers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
import time
from typing import Union, Iterable
import numpy as np
import pandas as pd
import psutil
import pywt
from loguru import logger
from pyts import approximation, transformation
from sklearn.base import TransformerMixin
from sklearn.externals import joblib
from tslearn import utils as tsutils
from tslearn.piecewise import SymbolicAggregateApproximation, OneD_SymbolicAggregateApproximation
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from nilmlab.lab import TimeSeriesTransformer, TransformerType
from utils import chaotic_toolkit
from utils.logger import debug, timing, debug_mem, info
SECONDS_PER_DAY = 60 * 60 * 24
CAPACITY15GB = 1024 * 1024 * 1024 * 15
class Signal2Vec(TimeSeriesTransformer):
def __init__(self, classifier_path: str, embedding_path: str, num_of_representative_vectors: int = 1):
super().__init__()
self.clf = joblib.load(classifier_path)
embedding = pd.read_csv(embedding_path)
self.embedding = embedding.reset_index().to_dict('list')
self.type = TransformerType.transform_and_approximate
self.num_of_representative_vectors = num_of_representative_vectors
def __repr__(self):
return f"Signal2Vec num_of_representative_vectors: {self.num_of_representative_vectors}"
def transform(self, series: np.ndarray, sample_period: int = 6) -> np.ndarray:
discrete_series = self.discretize_in_chunks(series, sample_period)
debug_mem('Time series {} MB', series)
debug_mem('Discrete series {} MB', discrete_series)
vector_representation = self.map_into_vectors(discrete_series)
debug_mem('Sequence of vectors : {} MB', vector_representation)
return np.array(vector_representation)
def approximate(self, data_in_batches: np.ndarray, window: int = 1, should_fit: bool = True) -> list:
# TODO: Window is used only by signal2vec, move it to constructor or extract it as len(segment).
if self.num_of_representative_vectors > 1:
window = int(window / self.num_of_representative_vectors)
data_in_batches = np.reshape(data_in_batches,
(len(data_in_batches), window, 300 * self.num_of_representative_vectors))
squeezed_seq = np.sum(data_in_batches, axis=1)
vf = np.vectorize(lambda x: x / window)
squeezed_seq = vf(squeezed_seq)
return squeezed_seq
def reconstruct(self, series: np.ndarray) -> list:
raise Exception('Signal2Vec doesn\'t support reconstruct yet.')
def get_name(self):
return type(self).__name__
def get_type(self):
return self.type
def set_type(self, method_type: TransformerType):
if method_type == TransformerType.approximate:
raise Exception('Signal2vec does not support only approximation. The series has to be transformed firstly')
self.type = method_type
def discretize(self, data):
debug('Length of data {}'.format(len(data)))
start_time = time.time()
pred = self.clf.predict(data.reshape(-1, 1))
timing('clf.predict: {}'.format(round(time.time() - start_time, 2)))
debug('Length of predicted sequence {}'.format(len(pred)))
debug('Type of discrete sequence {}'.format(type(pred)))
return pred
def map_into_vectors(self, sequence):
start_time = time.time()
sequence_of_vectors = [self.embedding[str(i)] for i in sequence]
timing('Appending vectors to list : {}'.format(round(time.time() - start_time, 2)))
return sequence_of_vectors
def discretize_in_chunks(self, sequence, sample_period: int = 6):
memory = psutil.virtual_memory()
debug('Memory: {}'.format(memory))
chunk_size = sample_period * SECONDS_PER_DAY
if memory.total >= CAPACITY15GB:
chunk_size = chunk_size * 2
seq = list()
split_n = max(int(len(sequence) / chunk_size), 1)
rem = len(sequence) % split_n
if rem != 0:
sequence = sequence[:-rem]
debug('Spliting data into {} parts for memory efficient classification'.format(split_n))
for d in np.split(sequence, split_n):
debug('Discretising time series...')
s = self.discretize(d)
seq.append(s)
return np.concatenate(seq)
class WaveletAdapter(TimeSeriesTransformer):
"""
http://ataspinar.com/2018/12/21/a-guide-for-using-the-wavelet-transform-in-machine-learning/
"""
def __init__(self, wavelet_name: str = 'haar', filter_bank: str = None, mode='symmetric', level=None,
drop_cA=False):
super().__init__()
self.wavelet_name = wavelet_name
self.filter_bank = filter_bank
self.mode = mode
self.level = level
self.drop_cA = drop_cA
self.type = TransformerType.approximate
def __repr__(self):
return str(f"Wavelet {self.wavelet_name}, level {self.level}, filter_bank {self.filter_bank}, "
f"mode {self.mode}, drop_cA {self.drop_cA}")
def transform(self, series: np.ndarray, sample_period: int = 6) -> list:
debug('WaveletAdapter series shape {}'.format(series.shape))
coeffs = pywt.wavedec(data=series, wavelet=self.wavelet_name, level=self.level, mode=self.mode)
ts_representation = pywt.waverec(coeffs, wavelet=self.wavelet_name, mode=self.mode)
debug('WaveletAdapter series shape after inverse {}'.format(series.shape))
return ts_representation[0].ravel()
def approximate(self, series: np.ndarray, window: int = 1, should_fit: bool = True) -> np.ndarray:
ts_representation = list()
debug(f'WaveletAdapter.approximate: param series \n{series} ')
for segment in series:
coeffs = pywt.wavedec(data=segment, wavelet=self.wavelet_name,
level=self.level, mode=self.mode)
if self.drop_cA:
coeffs = coeffs[0]
else:
coeffs = np.concatenate(coeffs)
ts_representation.append(coeffs)
# debug('TSLearnApproximatorWrapper.approximate: ts_representation \n{}'.format(ts_representation))
debug('WaveletAdapter.approximate: ts_representation shape {}'.format(np.shape(ts_representation)))
# ts_representation = np.reshape(ts_representation, (
# np.shape(ts_representation)[0], np.shape(ts_representation)[1] * np.shape(ts_representation)[2]))
# debug('WaveletAdapter.approximate: ts_representation \n{}'.format(ts_representation))
return np.asarray(ts_representation)
def reconstruct(self, series: np.ndarray) -> list:
raise Exception('WaveletAdapter doesn\'t support reconstruct yet.')
def get_type(self) -> TransformerType:
return self.type
def set_type(self, method_type: TransformerType):
self.type = method_type
def get_name(self):
return type(self).__name__
class TimeDelayEmbeddingAdapter(TimeSeriesTransformer):
"""
http://eprints.maths.manchester.ac.uk/175/1/embed.pdf
"""
def __init__(self, delay_in_seconds: int, dimension: int, sample_period: int = 6):
super().__init__()
self.delay_in_seconds = delay_in_seconds
self.dimension = dimension
self.sample_period = sample_period
self.type = TransformerType.approximate
def __repr__(self):
return f"TimeDelayEmbedding delay={self.delay_in_seconds} dim={self.dimension}"
def transform(self, series: np.ndarray, sample_period: int = 6) -> list:
"""
Given a whole time series, it is automatically segmented into segments with size
window_size = delay_items * self.dimension. Next, delay embeddings are extracted for each segment.
"""
delay_items = int(self.delay_in_seconds / sample_period)
window_size = delay_items * self.dimension
num_of_segments = int(len(series) / window_size)
delay_embeddings = []
for i in range(num_of_segments):
segment = series[i * window_size:(i + 1) * window_size]
embedding = chaotic_toolkit.takens_embedding(segment, delay_items, self.dimension)
delay_embeddings.append(embedding)
return delay_embeddings
def approximate(self, series_in_segments: np.ndarray, window: int = 1, should_fit: bool = True) -> np.ndarray:
"""
The time series is given as segments. For each segment we extract the delay embeddings.
"""
delay_items = int(self.delay_in_seconds / self.sample_period)
window_size = delay_items * self.dimension
if window_size > len(series_in_segments[0]):
raise Exception(
f'Not enough data for the given delay ({self.delay_in_seconds} seconds) and dimension ({self.dimension}).'
f'\ndelay_items * dimension > len(data): {window_size} > {len(series_in_segments[0])}')
if window_size == len(series_in_segments[0]):
info(f"TimeDelayEmbeddingAdapter is applied with delay embeddings equavalent to the length of each segment"
f" {window_size} == {len(series_in_segments[0])}")
if window_size < len(series_in_segments[0]):
info(f"TimeDelayEmbeddingAdapter is applied with delay embeddings covering less than the length of each "
f"segment. {window_size} < {len(series_in_segments[0])}")
delay_embeddings = []
for segment in series_in_segments:
embedding = chaotic_toolkit.takens_embedding(segment, delay_items, self.dimension)
delay_embeddings.append(embedding)
return np.asarray(delay_embeddings)
def reconstruct(self, series: np.ndarray) -> list:
raise Exception('TimeDelayEmbeddingAdapter doesn\'t support reconstruct yet.')
def get_type(self) -> TransformerType:
return self.type
def set_type(self, method_type: TransformerType):
self.type = method_type
def get_name(self):
return type(self).__name__
class TSLearnTransformerWrapper(TimeSeriesTransformer):
def __init__(self, transformer: TransformerMixin, supports_approximation: bool = True):
super().__init__()
if not isinstance(transformer, TransformerMixin):
raise Exception('Invalid type of approximator. It should be an instance of TransformerMixin.')
self.transformer = transformer
self.supports_approximation = supports_approximation
if supports_approximation:
self.type = TransformerType.approximate
else:
self.type = TransformerType.transform
def __repr__(self):
return str(self.transformer)
def transform(self, series: np.ndarray, sample_period: int = 6) -> list:
debug('TSLearnApproximatorWrapper series shape {}'.format(series.shape))
ts_representation = self.transformer.inverse_transform(self.transformer.fit_transform(series))
return ts_representation[0].ravel()
def approximate(self, series: np.ndarray, window: int = 1, should_fit: bool = True) -> np.ndarray:
# series is already in batches
debug('TSLearnApproximatorWrapper.approximate: series shape {}'.format(series.shape))
debug('TSLearnApproximatorWrapper.approximate: to_time_series shape {}'.format(series.shape))
ts_representation = list()
debug(f'TSLearnApproximatorWrapper.approximate: param series \n{series} ')
for segment in series:
if isinstance(self.transformer, SymbolicAggregateApproximation) or isinstance(self.transformer, OneD_SymbolicAggregateApproximation):
logger.info("Scaling the data so that they consist a normal distribution.")
scaler = TimeSeriesScalerMeanVariance(mu=0., std=1.) # Rescale time series
segment = scaler.fit_transform(segment)
ts_representation.append(self.transformer.fit_transform(segment))
# debug('TSLearnApproximatorWrapper.approximate: ts_representation \n{}'.format(ts_representation))
debug('TSLearnApproximatorWrapper.approximate: ts_representation shape {}'.format(np.shape(ts_representation)))
ts_representation = np.reshape(ts_representation, (
np.shape(ts_representation)[0], np.shape(ts_representation)[1] * np.shape(ts_representation)[2]))
debug('TSLearnApproximatorWrapper.approximate: ts_representation \n{}'.format(ts_representation))
debug('TSLearnApproximatorWrapper.approximate: ts_representation shape {}'.format(ts_representation.shape))
return ts_representation
def reconstruct(self, series: np.ndarray) -> list:
raise Exception('Pyts doesn\'t support reconstruct.')
def get_type(self):
return self.type
def get_name(self):
return type(self.transformer).__name__
def set_type(self, method_type: TransformerType):
if not self.supports_approximation and method_type == TransformerType.approximate:
raise Exception('{} does not support approximation.'.format(type(self.transformer).__name__))
self.type = method_type
class PytsTransformerWrapper(TimeSeriesTransformer):
def __init__(self, transformer, supports_approximation: bool = True):
super().__init__()
if not isinstance(transformer, TransformerMixin):
raise Exception('Invalid type of approximator. It should be an instance of TransformerMixin.')
self.transformer = transformer
self.supports_approximation = supports_approximation
if supports_approximation:
self.type = TransformerType.approximate
else:
self.type = TransformerType.transform
def __repr__(self):
return str(self.transformer)
def transform(self, series: np.ndarray, sample_period: int = 6) -> Union[np.ndarray, Iterable, int, float]:
if isinstance(self.transformer, approximation.DiscreteFourierTransform):
n_coefs = self.transformer.n_coefs
series = tsutils.to_time_series(series)
series = np.reshape(series, (1, -1))
n_samples, n_timestamps = series.shape
self.transformer.drop_sum = True
X_dft = self.transformer.fit_transform(series)
# Compute the inverse transformation
if n_coefs % 2 == 0:
real_idx = np.arange(1, n_coefs, 2)
imag_idx = np.arange(2, n_coefs, 2)
X_dft_new = np.c_[
X_dft[:, :1],
X_dft[:, real_idx] + 1j * np.c_[X_dft[:, imag_idx],
np.zeros((n_samples,))]
]
else:
real_idx = np.arange(1, n_coefs, 2)
imag_idx = np.arange(2, n_coefs + 1, 2)
X_dft_new = np.c_[
X_dft[:, :1],
X_dft[:, real_idx] + 1j * X_dft[:, imag_idx]
]
X_irfft = np.fft.irfft(X_dft_new, n_timestamps)
debug('PytsTransformerWrapper ts_representation shape {}'.format(np.shape(X_irfft)))
return np.ravel(X_irfft)
else:
raise Exception('Pyts doesn\'t support trasform')
def approximate(self, series: np.ndarray, window: int = 1, target=None, should_fit: bool = True) -> list:
# series is already in batches
debug('PytsTransformerWrapper series shape {}'.format(series.shape))
if isinstance(self.transformer, transformation.WEASEL):
labels = list()
for t in target:
l: str = ''
for i in range(target.shape[1]):
l = l + str(int(t[i]))
labels.append(l)
if should_fit:
ts_representation = self.transformer.fit_transform(series, labels)
else:
ts_representation = self.transformer.transform(series)
else:
if should_fit:
ts_representation = self.transformer.fit_transform(series)
elif isinstance(self.transformer, transformation.BOSS):
debug("BOSS instance, only transform")
ts_representation = self.transformer.transform(series)
else:
debug("Fit transform.")
ts_representation = self.transformer.fit_transform(series)
# debug('PytsTransformerWrapper ts_representation \n{}'.format(ts_representation))
debug('PytsTransformerWrapper ts_representation shape {}'.format(np.shape(ts_representation)))
return ts_representation
def reconstruct(self, series: np.ndarray) -> list:
raise Exception('Pyts doesn\'t support reconstruct.')
def get_type(self):
return self.type
def set_type(self, method_type: TransformerType):
if not self.supports_approximation and method_type == TransformerType.approximate:
raise Exception('{} does not support approximation.'.format(type(self.transformer).__name__))
self.type = method_type
def get_name(self):
return type(self.transformer).__name__
def uses_labels(self):
if isinstance(self.transformer, transformation.WEASEL):
return True
return False