-
Notifications
You must be signed in to change notification settings - Fork 11
/
base_tsf_runner.py
329 lines (265 loc) · 12.9 KB
/
base_tsf_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
import math
import functools
from typing import Tuple, Union, Optional
import torch
import numpy as np
from easytorch.utils.dist import master_only
from .base_runner import BaseRunner
from ..data import SCALER_REGISTRY
from ..utils import load_pkl
from ..metrics import masked_mae, masked_mape, masked_rmse
class BaseTimeSeriesForecastingRunner(BaseRunner):
"""
Runner for short term multivariate time series forecasting datasets.
Typically, models predict the future 12 time steps based on historical time series.
Features:
- Evaluate at horizon 3, 6, 12, and overall.
- Metrics: MAE, RMSE, MAPE. The best model is the one with the smallest mae at validation.
- Loss: MAE (masked_mae). Allow customization.
- Support curriculum learning.
- Users only need to implement the `forward` function.
"""
def __init__(self, cfg: dict):
super().__init__(cfg)
self.dataset_name = cfg["DATASET_NAME"]
# different datasets have different null_values, e.g., 0.0 or np.nan.
self.null_val = cfg["TRAIN"].get("NULL_VAL", np.nan) # consist with metric functions
self.dataset_type = cfg["DATASET_TYPE"]
# read scaler for re-normalization
self.scaler = load_pkl("datasets/" + self.dataset_name + "/scaler_in{0}_out{1}.pkl".format(
cfg["DATASET_INPUT_LEN"], cfg["DATASET_OUTPUT_LEN"]))
# define loss
self.loss = cfg["TRAIN"]["LOSS"]
# define metric
self.metrics = {"MAE": masked_mae, "RMSE": masked_rmse, "MAPE": masked_mape}
# curriculum learning for output. Note that this is different from the CL in Seq2Seq archs.
self.cl_param = cfg.TRAIN.get("CL", None)
if self.cl_param is not None:
self.warm_up_epochs = cfg.TRAIN.CL.get("WARM_EPOCHS", 0)
self.cl_epochs = cfg.TRAIN.CL.get("CL_EPOCHS")
self.prediction_length = cfg.TRAIN.CL.get("PREDICTION_LENGTH")
# evaluation horizon
self.evaluation_horizons = [_ - 1 for _ in cfg["TEST"].get("EVALUATION_HORIZONS", range(1, 13))]
assert min(self.evaluation_horizons) >= 0, "The horizon should start counting from 0."
def init_training(self, cfg: dict):
"""Initialize training.
Including loss, training meters, etc.
Args:
cfg (dict): config
"""
super().init_training(cfg)
for key, _ in self.metrics.items():
self.register_epoch_meter("train_"+key, "train", "{:.4f}")
def init_validation(self, cfg: dict):
"""Initialize validation.
Including validation meters, etc.
Args:
cfg (dict): config
"""
super().init_validation(cfg)
for key, _ in self.metrics.items():
self.register_epoch_meter("val_"+key, "val", "{:.4f}")
def init_test(self, cfg: dict):
"""Initialize test.
Including test meters, etc.
Args:
cfg (dict): config
"""
super().init_test(cfg)
for key, _ in self.metrics.items():
self.register_epoch_meter("test_"+key, "test", "{:.4f}")
def build_train_dataset(self, cfg: dict):
"""Build MNIST train dataset
Args:
cfg (dict): config
Returns:
train dataset (Dataset)
"""
data_file_path = "{0}/data_in{1}_out{2}.pkl".format(cfg["TRAIN"]["DATA"]["DIR"], cfg["DATASET_INPUT_LEN"], cfg["DATASET_OUTPUT_LEN"])
index_file_path = "{0}/index_in{1}_out{2}.pkl".format(cfg["TRAIN"]["DATA"]["DIR"], cfg["DATASET_INPUT_LEN"], cfg["DATASET_OUTPUT_LEN"])
# build dataset args
dataset_args = cfg.get("DATASET_ARGS", {})
# three necessary arguments, data file path, corresponding index file path, and mode (train, valid, or test)
dataset_args["data_file_path"] = data_file_path
dataset_args["index_file_path"] = index_file_path
dataset_args["mode"] = "train"
dataset = cfg["DATASET_CLS"](**dataset_args)
print("train len: {0}".format(len(dataset)))
batch_size = cfg["TRAIN"]["DATA"]["BATCH_SIZE"]
self.iter_per_epoch = math.ceil(len(dataset) / batch_size)
return dataset
@staticmethod
def build_val_dataset(cfg: dict):
"""Build MNIST val dataset
Args:
cfg (dict): config
Returns:
validation dataset (Dataset)
"""
data_file_path = "{0}/data_in{1}_out{2}.pkl".format(cfg["VAL"]["DATA"]["DIR"], cfg["DATASET_INPUT_LEN"], cfg["DATASET_OUTPUT_LEN"])
index_file_path = "{0}/index_in{1}_out{2}.pkl".format(cfg["VAL"]["DATA"]["DIR"], cfg["DATASET_INPUT_LEN"], cfg["DATASET_OUTPUT_LEN"])
# build dataset args
dataset_args = cfg.get("DATASET_ARGS", {})
# three necessary arguments, data file path, corresponding index file path, and mode (train, valid, or test)
dataset_args["data_file_path"] = data_file_path
dataset_args["index_file_path"] = index_file_path
dataset_args["mode"] = "valid"
dataset = cfg["DATASET_CLS"](**dataset_args)
print("val len: {0}".format(len(dataset)))
return dataset
@staticmethod
def build_test_dataset(cfg: dict):
"""Build MNIST val dataset
Args:
cfg (dict): config
Returns:
train dataset (Dataset)
"""
data_file_path = "{0}/data_in{1}_out{2}.pkl".format(cfg["TEST"]["DATA"]["DIR"], cfg["DATASET_INPUT_LEN"], cfg["DATASET_OUTPUT_LEN"])
index_file_path = "{0}/index_in{1}_out{2}.pkl".format(cfg["TEST"]["DATA"]["DIR"], cfg["DATASET_INPUT_LEN"], cfg["DATASET_OUTPUT_LEN"])
# build dataset args
dataset_args = cfg.get("DATASET_ARGS", {})
# three necessary arguments, data file path, corresponding index file path, and mode (train, valid, or test)
dataset_args["data_file_path"] = data_file_path
dataset_args["index_file_path"] = index_file_path
dataset_args["mode"] = "test"
dataset = cfg["DATASET_CLS"](**dataset_args)
print("test len: {0}".format(len(dataset)))
return dataset
def curriculum_learning(self, epoch: int = None) -> int:
"""Calculate task level in curriculum learning.
Args:
epoch (int, optional): current epoch if in training process, else None. Defaults to None.
Returns:
int: task level
"""
if epoch is None:
return self.prediction_length
epoch -= 1
# generate curriculum length
if epoch < self.warm_up_epochs:
# still warm up
cl_length = self.prediction_length
else:
_ = (epoch - self.warm_up_epochs) // self.cl_epochs + 1
cl_length = min(_, self.prediction_length)
return cl_length
def forward(self, data: tuple, epoch: int = None, iter_num: int = None, train: bool = True, **kwargs) -> tuple:
"""Feed forward process for train, val, and test. Note that the outputs are NOT re-scaled.
Args:
data (tuple): data (future data, history data). [B, L, N, C] for each of them
epoch (int, optional): epoch number. Defaults to None.
iter_num (int, optional): iteration number. Defaults to None.
train (bool, optional): if in the training process. Defaults to True.
Returns:
tuple: (prediction, real_value). [B, L, N, C] for each of them.
"""
raise NotImplementedError()
def metric_forward(self, metric_func, args):
"""Computing metrics.
Args:
metric_func (function, functools.partial): metric function.
args (list): arguments for metrics computation.
"""
if isinstance(metric_func, functools.partial) and list(metric_func.keywords.keys()) == ["null_val"]:
# support partial(metric_func, null_val = something)
metric_item = metric_func(*args)
elif callable(metric_func):
# is a function
metric_item = metric_func(*args, null_val=self.null_val)
else:
raise TypeError("Unknown metric type: {0}".format(type(metric_func)))
return metric_item
def train_iters(self, epoch: int, iter_index: int, data: Union[torch.Tensor, Tuple]) -> torch.Tensor:
"""Training details.
Args:
data (Union[torch.Tensor, Tuple]): Data provided by DataLoader
epoch (int): current epoch.
iter_index (int): current iter.
Returns:
loss (torch.Tensor)
"""
iter_num = (epoch-1) * self.iter_per_epoch + iter_index
forward_return = list(self.forward(data=data, epoch=epoch, iter_num=iter_num, train=True))
# re-scale data
prediction_rescaled = SCALER_REGISTRY.get(self.scaler["func"])(forward_return[0], **self.scaler["args"])
real_value_rescaled = SCALER_REGISTRY.get(self.scaler["func"])(forward_return[1], **self.scaler["args"])
# loss
if self.cl_param:
cl_length = self.curriculum_learning(epoch=epoch)
forward_return[0] = prediction_rescaled[:, :cl_length, :, :]
forward_return[1] = real_value_rescaled[:, :cl_length, :, :]
else:
forward_return[0] = prediction_rescaled
forward_return[1] = real_value_rescaled
loss = self.metric_forward(self.loss, forward_return)
# metrics
for metric_name, metric_func in self.metrics.items():
metric_item = self.metric_forward(metric_func, forward_return[:2])
self.update_epoch_meter("train_"+metric_name, metric_item.item())
return loss
def val_iters(self, iter_index: int, data: Union[torch.Tensor, Tuple]):
"""Validation details.
Args:
data (Union[torch.Tensor, Tuple]): Data provided by DataLoader
train_epoch (int): current epoch if in training process. Else None.
iter_index (int): current iter.
"""
forward_return = self.forward(data=data, epoch=None, iter_num=None, train=False)
# re-scale data
prediction_rescaled = SCALER_REGISTRY.get(self.scaler["func"])(forward_return[0], **self.scaler["args"])
real_value_rescaled = SCALER_REGISTRY.get(self.scaler["func"])(forward_return[1], **self.scaler["args"])
# metrics
for metric_name, metric_func in self.metrics.items():
metric_item = self.metric_forward(metric_func, [prediction_rescaled, real_value_rescaled])
self.update_epoch_meter("val_"+metric_name, metric_item.item())
@torch.no_grad()
@master_only
def test(self):
"""Evaluate the model.
Args:
train_epoch (int, optional): current epoch if in training process.
"""
# test loop
prediction = []
real_value = []
for _, data in enumerate(self.test_data_loader):
forward_return = self.forward(data, epoch=None, iter_num=None, train=False)
prediction.append(forward_return[0]) # preds = forward_return[0]
real_value.append(forward_return[1]) # testy = forward_return[1]
prediction = torch.cat(prediction, dim=0)
real_value = torch.cat(real_value, dim=0)
# re-scale data
prediction = SCALER_REGISTRY.get(self.scaler["func"])(
prediction, **self.scaler["args"])
real_value = SCALER_REGISTRY.get(self.scaler["func"])(
real_value, **self.scaler["args"])
# summarize the results.
# test performance of different horizon
for i in self.evaluation_horizons:
# For horizon i, only calculate the metrics **at that time** slice here.
pred = prediction[:, i, :, :]
real = real_value[:, i, :, :]
# metrics
metric_results = {}
for metric_name, metric_func in self.metrics.items():
metric_item = self.metric_forward(metric_func, [pred, real])
metric_results[metric_name] = metric_item.item()
log = "Evaluate best model on test data for horizon " + \
"{:d}, Test MAE: {:.4f}, Test RMSE: {:.4f}, Test MAPE: {:.4f}"
log = log.format(
i+1, metric_results["MAE"], metric_results["RMSE"], metric_results["MAPE"])
self.logger.info(log)
# test performance overall
for metric_name, metric_func in self.metrics.items():
metric_item = self.metric_forward(metric_func, [prediction, real_value])
self.update_epoch_meter("test_"+metric_name, metric_item.item())
metric_results[metric_name] = metric_item.item()
@master_only
def on_validating_end(self, train_epoch: Optional[int]):
"""Callback at the end of validating.
Args:
train_epoch (Optional[int]): current epoch if in training process.
"""
if train_epoch is not None:
self.save_best_model(train_epoch, "val_MAE", greater_best=False)