forked from winds-mobi/winds-mobi-providers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_scheduler.py
78 lines (70 loc) · 2.48 KB
/
run_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
from pydantic import parse_obj_as
def run_scheduler():
scheduler = BlockingScheduler()
scheduler.configure(
executors={
"admin": {"type": "processpool", "max_workers": 1},
"providers": {"type": "processpool", "max_workers": 2},
},
job_defaults={
"misfire_grace_time": 3 * 60,
"coalesce": True, # Reschedule a single job if it failed 3 minutes ago
"max_instances": 1, # Only 1 job instance executing concurrently
},
)
# Admin jobs
scheduler.add_job(
"admin_stations:delete_stations",
args=(60, ""),
trigger="cron",
hour="3",
executor="admin",
)
scheduler.add_job(
"admin_clusters:save_clusters",
args=(50,),
trigger="cron",
hour="4",
executor="admin",
)
# start_date must be in the future when the scheduler starts
start_date = datetime.now().astimezone() + timedelta(seconds=10)
for provider_job in [
# Alphabetical order
("providers.borntofly:borntofly", 5),
("providers.ffvl:ffvl", 5),
("providers.fluggruppe_aletsch:fluggruppe_aletsch", 5),
("providers.gxaircom:gxaircom", 5),
("providers.holfuy:holfuy", 5),
("providers.iweathar:iweathar", 5),
("providers.metar_noaa:metar_noaa", 10),
("providers.meteoswiss_opendata:meteoswiss", 5),
("providers.pdcs:pdcs", 5),
("providers.pioupiou:pioupiou", 5),
("providers.romma:romma", 5),
("providers.slf:slf", 5),
("providers.thunerwetter:thunerwetter", 5),
("providers.windline:windline", 5),
("providers.windspots:windspots", 5),
("providers.windy:windy", 5),
("providers.yvbeach:yvbeach", 5),
("providers.zermatt:zermatt", 5),
]:
func = provider_job[0]
func_name = func.split(":")[1]
if not parse_obj_as(bool, os.environ.get(f"DISABLE_PROVIDER_{func_name.upper()}", False)):
interval = provider_job[1]
scheduler.add_job(
func,
trigger="interval",
start_date=start_date,
minutes=interval,
jitter=5 * 60, # randomize start_date during 5 minutes period
executor="providers",
)
scheduler.start()
if __name__ == "__main__":
run_scheduler()