forked from chen0040/keras-anomaly-detection
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconvolutional.py
109 lines (88 loc) · 4.36 KB
/
convolutional.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from keras.layers import Conv1D, GlobalMaxPool1D, Dense, Flatten
from keras.models import Sequential
from keras.callbacks import ModelCheckpoint
import numpy as np
class Conv1DAutoEncoder(object):
model_name = 'con1d-auto-encoder'
VERBOSE = 1
def __init__(self):
self.model = None
self.time_window_size = None
self.metric = None
self.threshold = 5.0
self.config = None
@staticmethod
def create_model(time_window_size, metric):
model = Sequential()
model.add(Conv1D(filters=256, kernel_size=5, padding='same', activation='relu',
input_shape=(time_window_size, 1)))
model.add(GlobalMaxPool1D())
model.add(Dense(units=time_window_size, activation='linear'))
model.compile(optimizer='adam', loss='mean_squared_error', metrics=[metric])
print(model.summary())
return model
@staticmethod
def get_config_file(model_dir_path):
return model_dir_path + '/' + Conv1DAutoEncoder.model_name + '-config.npy'
@staticmethod
def get_weight_file(model_dir_path):
return model_dir_path + '/' + Conv1DAutoEncoder.model_name + '-weights.h5'
@staticmethod
def get_architecture_file(model_dir_path):
return model_dir_path + '/' + Conv1DAutoEncoder.model_name + '-architecture.json'
def load_model(self, model_dir_path):
config_file_path = Conv1DAutoEncoder.get_config_file(model_dir_path)
self.config = np.load(config_file_path).item()
self.metric = self.config['metric']
self.time_window_size = self.config['time_window_size']
self.threshold = self.config['threshold']
self.model = Conv1DAutoEncoder.create_model(self.time_window_size, self.metric)
weight_file_path = Conv1DAutoEncoder.get_weight_file(model_dir_path)
self.model.load_weights(weight_file_path)
def fit(self, dataset, model_dir_path, batch_size=None, epochs=None, validation_split=None, metric=None,
estimated_negative_sample_ratio=None):
if batch_size is None:
batch_size = 8
if epochs is None:
epochs = 100
if validation_split is None:
validation_split = 0.2
if metric is None:
metric = 'mean_absolute_error'
if estimated_negative_sample_ratio is None:
estimated_negative_sample_ratio = 0.9
self.time_window_size = dataset.shape[1]
self.metric = metric
input_timeseries_dataset = np.expand_dims(dataset, axis=2)
weight_file_path = Conv1DAutoEncoder.get_weight_file(model_dir_path=model_dir_path)
architecture_file_path = Conv1DAutoEncoder.get_architecture_file(model_dir_path)
checkpoint = ModelCheckpoint(weight_file_path)
self.model = Conv1DAutoEncoder.create_model(self.time_window_size, metric=self.metric)
open(architecture_file_path, 'w').write(self.model.to_json())
history = self.model.fit(x=input_timeseries_dataset, y=dataset,
batch_size=batch_size, epochs=epochs,
verbose=Conv1DAutoEncoder.VERBOSE, validation_split=validation_split,
callbacks=[checkpoint]).history
self.model.save_weights(weight_file_path)
scores = self.predict(dataset)
scores.sort()
cut_point = int(estimated_negative_sample_ratio * len(scores))
self.threshold = scores[cut_point]
print('estimated threshold is ' + str(self.threshold))
self.config = dict()
self.config['time_window_size'] = self.time_window_size
self.config['metric'] = self.metric
self.config['threshold'] = self.threshold
config_file_path = Conv1DAutoEncoder.get_config_file(model_dir_path=model_dir_path)
np.save(config_file_path, self.config)
return history
def predict(self, timeseries_dataset):
input_timeseries_dataset = np.expand_dims(timeseries_dataset, axis=2)
target_timeseries_dataset = self.model.predict(x=input_timeseries_dataset)
dist = np.linalg.norm(timeseries_dataset - target_timeseries_dataset, axis=-1)
return dist
def anomaly(self, timeseries_dataset, threshold=None):
if threshold is not None:
self.threshold = threshold
dist = self.predict(timeseries_dataset)
return zip(dist >= self.threshold, dist)