-
Notifications
You must be signed in to change notification settings - Fork 8
/
openaps_reports.py
124 lines (93 loc) · 3.06 KB
/
openaps_reports.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
Interface for openaps reports
"""
from datetime import datetime
from datetime import time
from dateutil.parser import parse
from glob import glob
import json
import os
from settings import Settings
class Schedule(object):
def __init__(self, entries):
"""
:param entries:
:type entries: list(dict)
:return:
:rtype:
"""
self.entries = entries
def at(self, time):
"""
:param time:
:type time: time
:return:
:rtype: dict
"""
result = {}
for entry in self.entries:
if parse(entry['start']).timetz() > time:
break
result = entry
return result
def between(self, start_time, end_time):
"""
:param start_time:
:type start_time: time
:param end_time:
:type end_time: time
:return:
:rtype: list(dict)
"""
if start_time > end_time:
return self.between(start_time, time.max) + self.between(time.min, end_time)
results = [
self.at(start_time)
]
for entry in self.entries:
entry_time = parse(entry['start']).timetz()
if entry_time > end_time:
break
elif entry_time > start_time:
results.append(entry)
return results
class OpenAPS(object):
def __init__(self, path):
self.path = path
self.name = os.path.basename(path)
# Filesystem utils ####################
def _all_files(self):
for f in sorted(glob('{}/*.json'.format(self.path)), key=os.path.getmtime, reverse=True):
yield f
for f in glob('{}/*.ini'.format(self.path)):
yield f
def all_filenames(self):
for f in self._all_files():
yield os.path.basename(f)
def all_filenames_and_data(self):
for f in self._all_files():
with open(f) as fp:
yield os.path.basename(f), datetime.fromtimestamp(os.path.getmtime(f)), fp.read()
def _get_report_path(self, report_filename):
return os.path.join(self.path, report_filename)
def _read_json(self, filename, default=None):
try:
with open(self._get_report_path(filename)) as fp:
return json.load(fp)
except (IOError, ValueError):
return default
# Reports #############################
def read_bg_targets(self):
return self._read_json(Settings.READ_BG_TARGETS, {'targets': []})
def predicted_glucose(self):
return self._read_json(Settings.PREDICT_GLUCOSE, [])
def predicted_glucose_without_dose(self):
return self._read_json(Settings.PREDICT_GLUCOSE_WITHOUT_DOSE, [])
def recent_glucose(self):
return self._read_json(Settings.CLEAN_GLUCOSE, [])
def normalized_history(self):
return self._read_json(Settings.NORMALIZE_HISTORY, [])
def iob(self):
return self._read_json(Settings.IOB, [])
def cob(self):
return self._read_json(Settings.COB, [])