-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdata_generator.py
373 lines (314 loc) · 9.65 KB
/
data_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#! /usr/bin/python3
import numpy as np
import matplotlib
matplotlib.use("agg")
import matplotlib.pyplot as plt # noqa: E402
SEED = 67
START_YEAR = 1900
END_YEAR = 2018
DAYS_PER_YEAR = 365.25
BASELINE_TEMP = 11
COMPONENTS_FIG = "components.png"
COMPONENTS_ZOOM_FIG = "components_zoom.png"
TEMPS_FIG = "temps.png"
TEMPS_ZOOM_FIG = "temps_zoom.png"
ANNUAL_TEMPS_FIG = "annual_temps.png"
def simulate_annual() -> np.array:
"""
Modify the simulated daily temperatures to get annual medians.
"""
temps = simulate()
n_years = END_YEAR - START_YEAR + 1
annual_temps = np.zeros(n_years)
for i_year in range(n_years):
i_start_day = int(np.floor(i_year * DAYS_PER_YEAR))
i_end_day = int(np.floor((i_year + 1) * DAYS_PER_YEAR))
annual_temps[i_year] = np.nanmedian(temps[i_start_day: i_end_day])
return annual_temps
def simulate() -> np.ndarray:
"""
Create some imaginary longitudinal temperature data.
Units are degrees Celsius.
Missing values are represented by nans.
"""
# Initialize the random number generator so that the data
# generated is the same each time.
np.random.seed(SEED)
n_days = int((END_YEAR - START_YEAR + 1) * DAYS_PER_YEAR)
warming_trend = create_warming_trend(n_days)
solar_cycle = create_solar_cycle(n_days)
annual_trend = create_annual_trend(n_days)
meanders = create_meanders(n_days)
jitter = create_jitter(n_days)
temps = (
BASELINE_TEMP
+ warming_trend
+ solar_cycle
+ annual_trend
+ meanders
+ jitter
)
temps = add_gaps(temps)
return temps
def create_warming_trend(
n_days: int,
pivot_year: int = 90,
time_constant_year: int = 30,
) -> np.ndarray:
"""
Simulate a warming trend as an exponential curve.
pivot_year is the year at which the exponential is at 1.
At pivot_year + time_constant_year it is at e.
"""
pivot = pivot_year * DAYS_PER_YEAR
time_constant = time_constant_year * DAYS_PER_YEAR
i_days = np.cumsum(np.ones(n_days))
warming_trend = np.exp((i_days - pivot) / time_constant)
return warming_trend
def create_solar_cycle(
n_days: int = None,
typical_duration: float = 10,
typical_amplitude: float = 2,
variation: float = .5,
) -> np.ndarray:
"""
Approximate the El Nino / La Nina solar radiation cycles.
"""
# Solar cycle
solar_cycle = np.zeros(n_days)
i_next = 0
done = False
while not done:
cycle_length = np.random.normal(loc=typical_duration)
n_days_cycle = int(cycle_length * DAYS_PER_YEAR)
i_days_cycle = np.cumsum(np.ones(n_days_cycle))
cycle_amplitude = np.maximum(
typical_amplitude / 4,
np.random.normal(loc=typical_amplitude, scale=variation))
cycle = (cycle_amplitude / 2) * (
np.sin(2 * np.pi * (i_days_cycle / n_days_cycle)))
if i_next + n_days_cycle < n_days:
solar_cycle[i_next: i_next + n_days_cycle] = cycle
i_next += n_days_cycle
else:
solar_cycle[i_next:] = cycle[:(n_days - i_next)]
done = True
return solar_cycle
def create_annual_trend(
n_days: int,
offset: int = 30,
seasonal_amplitude: float = 25,
) -> np.ndarray:
"""
Generate the annual trend in temperatures.
The first day is January 1.
The coldest day is January 1 + offset.
The hottest day is July 1 + offset.
There is a difference between summer and winter of seasonal_amplitude.
"""
i_days = np.cumsum(np.ones(n_days))
annual_trend = (seasonal_amplitude / 2) * (
-np.cos(2 * np.pi * (i_days - offset) / DAYS_PER_YEAR))
return annual_trend
def create_meanders(
n_days: int,
typical_meander_amplitude: float = 4,
typical_meander_interval: int = 4,
typical_meander_length: int = 6,
) -> np.ndarray:
"""
Create brief temperature meanders. Warm spells. Cold snaps.
"""
meanders = np.zeros(n_days)
i_next = 0
done = False
while not done:
n_days_meander = np.random.normal(loc=typical_meander_length)
n_days_meander = int(np.maximum(2, n_days_meander))
i_days_meander = np.cumsum(np.ones(n_days_meander))
meander_amplitude = np.random.normal(scale=typical_meander_amplitude)
meander = meander_amplitude * (
np.sin(np.pi * (i_days_meander / n_days_meander))**2)
if i_next + n_days_meander < n_days:
meanders[i_next: i_next + n_days_meander] = meander
meander_interval = np.random.normal(loc=typical_meander_interval)
meander_interval = int(np.maximum(1, meander_interval))
i_next += meander_interval
if i_next >= n_days:
done = True
else:
meanders[i_next:] += meander[:(n_days - i_next)]
done = True
return meanders
def create_jitter(
n_days: int,
typical_jitter_amplitude: float = 2,
) -> np.ndarray:
"""
Random jitter, day-by-day.
"""
jitter = np.random.normal(scale=typical_jitter_amplitude, size=n_days)
return jitter
def create_gaps(
temps: np.ndarray,
gap_candidate_period: int = 70,
n_gaps: int = None,
typical_length: float = None,
variation: float = None,
) -> np.ndarray:
"""
Add periods of missing values in the temperature data.
All gaps will occur within the first
gap_candidate_years of the data.
"""
for i_gap in range(n_gaps):
start = int(np.random.random_sample()
* gap_candidate_period
* DAYS_PER_YEAR)
gap_length = int(np.random.normal(
loc=typical_length,
scale=variation,
))
gap_length = np.maximum(typical_length // 4, gap_length)
temps[start: start + gap_length] = np.nan
return temps
def add_gaps(
temps: np.ndarray,
n_long_gaps: int = 4,
n_med_gaps: int = 12,
n_short_gaps: int = 50,
typical_long_gap_length: float = 60, # days
typical_med_gap_length: float = 14, # days
typical_short_gap_length: float = 2, # days
long_gap_variation: float = 20, # days
med_gap_variation: float = 4, # days
short_gap_variation: float = 1, # days
) -> np.ndarray:
"""
Add in segments of missing data of varying durations.
"""
temps = create_gaps(
temps,
n_gaps=n_long_gaps,
typical_length=typical_long_gap_length,
variation=long_gap_variation,
)
temps = create_gaps(
temps,
n_gaps=n_med_gaps,
typical_length=typical_med_gap_length,
variation=med_gap_variation,
)
temps = create_gaps(
temps,
n_gaps=n_short_gaps,
typical_length=typical_short_gap_length,
variation=short_gap_variation,
)
return temps
def visualize(
dpi: int = 300,
) -> None:
"""
Express the data in pictures.
"""
temps = simulate()
n_days = int((END_YEAR - START_YEAR + 1) * DAYS_PER_YEAR)
warming_trend = create_warming_trend(n_days)
solar_cycle = create_solar_cycle(n_days)
annual_trend = create_annual_trend(n_days)
meanders = create_meanders(n_days)
jitter = create_jitter(n_days)
# Choose a random year to zoom into.
i_year = int(np.random.random_sample() * (END_YEAR - START_YEAR))
year_label = str(START_YEAR + i_year)
start_day = int(i_year * DAYS_PER_YEAR)
end_day = int((i_year + 1) * DAYS_PER_YEAR)
def plot_temp(
temp_data: np.ndarray,
ylabel: str = "temps",
linewidth: float = 0,
marker: str = ".",
markersize: float = 1,
color: str = "black",
alpha: float = .1,
) -> None:
"""
Show time series temperature data in an easy-to-grasp manner.
"""
plt.plot(
temp_data,
linewidth=linewidth,
marker=marker,
markersize=markersize,
color=color,
alpha=alpha,
)
plt.ylabel(ylabel)
return
plt.figure(num=111)
plot_temp(temps)
plt.savefig(TEMPS_FIG, dpi=dpi)
plt.figure(num=11111)
plot_temp(
temps[start_day: end_day],
ylabel="temps " + year_label,
alpha=1,
)
plt.savefig(TEMPS_ZOOM_FIG, dpi=dpi)
plt.figure(num=222)
plt.subplot(5, 1, 1)
plot_temp(warming_trend, ylabel="warming_trend")
plt.subplot(5, 1, 2)
plot_temp(solar_cycle, ylabel="solar_cycle")
plt.subplot(5, 1, 3)
plot_temp(annual_trend, ylabel="annual_trend")
plt.subplot(5, 1, 4)
plot_temp(meanders, ylabel="meanders")
plt.subplot(5, 1, 5)
plot_temp(jitter, ylabel="jitter")
plt.savefig(COMPONENTS_FIG, dpi=dpi)
plt.figure(num=22222)
plt.subplot(5, 1, 1)
plot_temp(
warming_trend[start_day: end_day],
ylabel="warming_trend " + year_label,
alpha=1,
)
plt.subplot(5, 1, 2)
plot_temp(
solar_cycle[start_day: end_day],
ylabel="solar_cycle " + year_label,
alpha=1,
)
plt.subplot(5, 1, 3)
plot_temp(
annual_trend[start_day: end_day],
ylabel="annual_trend " + year_label,
alpha=1,
)
plt.subplot(5, 1, 4)
plot_temp(
meanders[start_day: end_day],
ylabel="meanders " + year_label,
alpha=1,
)
plt.subplot(5, 1, 5)
plot_temp(
jitter[start_day: end_day],
ylabel="jitter " + year_label,
alpha=1,
)
plt.savefig(COMPONENTS_ZOOM_FIG, dpi=dpi)
def test() -> None:
print(" ".join([
"Inspect",
COMPONENTS_FIG,
"and",
COMPONENTS_ZOOM_FIG,
"and check for plausibility.",
]))
visualize()
return
if __name__ == "__main__":
test()