forked from sparky8512/starlink-grpc-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dish_grpc_influx.py
330 lines (278 loc) · 12.4 KB
/
dish_grpc_influx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#!/usr/bin/python3
"""Write Starlink user terminal data to an InfluxDB database.
This script pulls the current status info and/or metrics computed from the
history data and writes them to the specified InfluxDB database either once
or in a periodic loop.
Data will be written into the requested database with the following
measurement / series names:
: spacex.starlink.user_terminal.status : Current status data
: spacex.starlink.user_terminal.history : Bulk history data
: spacex.starlink.user_terminal.ping_stats : Ping history statistics
: spacex.starlink.user_terminal.usage : Usage history statistics
NOTE: The Starlink user terminal does not include time values with its
history or status data, so this script uses current system time to compute
the timestamps it sends to InfluxDB. It is recommended to run this script on
a host that has its system clock synced via NTP. Otherwise, the timestamps
may get out of sync with real time.
"""
from datetime import datetime
from datetime import timezone
import logging
import os
import signal
import sys
import time
import warnings
from influxdb import InfluxDBClient
import dish_common
HOST_DEFAULT = "localhost"
DATABASE_DEFAULT = "starlinkstats"
BULK_MEASUREMENT = "spacex.starlink.user_terminal.history"
FLUSH_LIMIT = 6
MAX_BATCH = 5000
MAX_QUEUE_LENGTH = 864000
class Terminated(Exception):
pass
def handle_sigterm(signum, frame):
# Turn SIGTERM into an exception so main loop can clean up
raise Terminated
def parse_args():
parser = dish_common.create_arg_parser(output_description="write it to an InfluxDB database")
group = parser.add_argument_group(title="InfluxDB database options")
group.add_argument("-n",
"--hostname",
default=HOST_DEFAULT,
dest="host",
help="Hostname of MQTT broker, default: " + HOST_DEFAULT)
group.add_argument("-p", "--port", type=int, help="Port number to use on MQTT broker")
group.add_argument("-P", "--password", help="Set password for username/password authentication")
group.add_argument("-U", "--username", help="Set username for authentication")
group.add_argument("-D",
"--database",
default=DATABASE_DEFAULT,
help="Database name to use, default: " + DATABASE_DEFAULT)
group.add_argument("-R", "--retention-policy", help="Retention policy name to use")
group.add_argument("-k",
"--skip-query",
action="store_true",
help="Skip querying for prior sample write point in bulk mode")
group.add_argument("-C",
"--ca-cert",
dest="verify_ssl",
help="Enable SSL/TLS using specified CA cert to verify broker",
metavar="FILENAME")
group.add_argument("-I",
"--insecure",
action="store_false",
dest="verify_ssl",
help="Enable SSL/TLS but disable certificate verification (INSECURE!)")
group.add_argument("-S",
"--secure",
action="store_true",
dest="verify_ssl",
help="Enable SSL/TLS using default CA cert")
env_map = (
("INFLUXDB_HOST", "host"),
("INFLUXDB_PORT", "port"),
("INFLUXDB_USER", "username"),
("INFLUXDB_PWD", "password"),
("INFLUXDB_DB", "database"),
("INFLUXDB_RP", "retention-policy"),
("INFLUXDB_SSL", "verify_ssl"),
)
env_defaults = {}
for var, opt in env_map:
# check both set and not empty string
val = os.environ.get(var)
if val:
if var == "INFLUXDB_SSL" and val == "secure":
env_defaults[opt] = True
elif var == "INFLUXDB_SSL" and val == "insecure":
env_defaults[opt] = False
else:
env_defaults[opt] = val
parser.set_defaults(**env_defaults)
opts = dish_common.run_arg_parser(parser, need_id=True)
if opts.username is None and opts.password is not None:
parser.error("Password authentication requires username to be set")
opts.icargs = {"timeout": 5}
for key in ["port", "host", "password", "username", "database", "verify_ssl"]:
val = getattr(opts, key)
if val is not None:
opts.icargs[key] = val
if opts.verify_ssl is not None:
opts.icargs["ssl"] = True
return opts
def flush_points(opts, gstate):
try:
while len(gstate.points) > MAX_BATCH:
gstate.influx_client.write_points(gstate.points[:MAX_BATCH],
time_precision="s",
retention_policy=opts.retention_policy)
if opts.verbose:
print("Data points written: " + str(MAX_BATCH))
del gstate.points[:MAX_BATCH]
if gstate.points:
gstate.influx_client.write_points(gstate.points,
time_precision="s",
retention_policy=opts.retention_policy)
if opts.verbose:
print("Data points written: " + str(len(gstate.points)))
gstate.points.clear()
except Exception as e:
dish_common.conn_error(opts, "Failed writing to InfluxDB database: %s", str(e))
# If failures persist, don't just use infinite memory. Max queue
# is currently 10 days of bulk data, so something is very wrong
# if it's ever exceeded.
if len(gstate.points) > MAX_QUEUE_LENGTH:
logging.error("Max write queue exceeded, discarding data.")
del gstate.points[:-MAX_QUEUE_LENGTH]
return 1
return 0
def query_counter(gstate, start, end):
try:
# fetch the latest point where counter field was recorded
result = gstate.influx_client.query("SELECT counter FROM \"{0}\" "
"WHERE time>={1}s AND time<{2}s AND id=$id "
"ORDER by time DESC LIMIT 1;".format(
BULK_MEASUREMENT, start, end),
bind_params={"id": gstate.dish_id},
epoch="s")
points = list(result.get_points())
if points:
counter = points[0].get("counter", None)
timestamp = points[0].get("time", 0)
if counter and timestamp:
return int(counter), int(timestamp)
except TypeError as e:
# bind_params was added in influxdb-python v5.2.3. That would be easy
# enough to work around, but older versions had other problems with
# query(), so just skip this functionality.
logging.error(
"Failed running query, probably due to influxdb-python version too old. "
"Skipping resumption from prior counter value. Reported error was: %s", str(e))
return None, 0
def sync_timebase(opts, gstate):
try:
db_counter, db_timestamp = query_counter(gstate, gstate.start_timestamp, gstate.timestamp)
except Exception as e:
# could be temporary outage, so try again next time
dish_common.conn_error(opts, "Failed querying InfluxDB for prior count: %s", str(e))
return
gstate.timebase_synced = True
if db_counter and gstate.start_counter <= db_counter:
del gstate.deferred_points[:db_counter - gstate.start_counter]
if gstate.deferred_points:
delta_timestamp = db_timestamp - (gstate.deferred_points[0]["time"] - 1)
# to prevent +/- 1 second timestamp drift when the script restarts,
# if time base is within 2 seconds of that of the last sample in
# the database, correct back to that time base
if delta_timestamp == 0:
if opts.verbose:
print("Exactly synced with database time base")
elif -2 <= delta_timestamp <= 2:
if opts.verbose:
print("Replacing with existing time base: {0} -> {1}".format(
db_counter, datetime.fromtimestamp(db_timestamp, tz=timezone.utc)))
for point in gstate.deferred_points:
db_timestamp += 1
if point["time"] + delta_timestamp == db_timestamp:
point["time"] = db_timestamp
else:
# lost time sync when recording data, leave the rest
break
else:
gstate.timestamp = db_timestamp
else:
if opts.verbose:
print("Database time base out of sync by {0} seconds".format(delta_timestamp))
gstate.points.extend(gstate.deferred_points)
gstate.deferred_points.clear()
def loop_body(opts, gstate):
fields = {"status": {}, "ping_stats": {}, "usage": {}}
def cb_add_item(key, val, category):
fields[category][key] = val
def cb_add_sequence(key, val, category, start):
for i, subval in enumerate(val, start=start):
fields[category]["{0}_{1}".format(key, i)] = subval
def cb_add_bulk(bulk, count, timestamp, counter):
if gstate.start_timestamp is None:
gstate.start_timestamp = timestamp
gstate.start_counter = counter
points = gstate.points if gstate.timebase_synced else gstate.deferred_points
for i in range(count):
timestamp += 1
points.append({
"measurement": BULK_MEASUREMENT,
"tags": {
"id": gstate.dish_id
},
"time": timestamp,
"fields": {key: val[i] for key, val in bulk.items() if val[i] is not None},
})
if points:
# save off counter value for script restart
points[-1]["fields"]["counter"] = counter + count
now = time.time()
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence, add_bulk=cb_add_bulk)
if rc:
return rc
for category in fields:
if fields[category]:
gstate.points.append({
"measurement": "spacex.starlink.user_terminal." + category,
"tags": {
"id": gstate.dish_id
},
"time": int(now),
"fields": fields[category],
})
# This is here and not before the points being processed because if the
# query previously failed, there will be points that were processed in
# a prior loop. This avoids having to handle that as a special case.
if opts.bulk_mode and not gstate.timebase_synced:
sync_timebase(opts, gstate)
if opts.verbose:
print("Data points queued: " + str(len(gstate.points)))
if len(gstate.points) >= FLUSH_LIMIT:
return flush_points(opts, gstate)
return 0
def main():
opts = parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s")
gstate = dish_common.GlobalState(target=opts.target)
gstate.points = []
gstate.deferred_points = []
gstate.timebase_synced = opts.skip_query
gstate.start_timestamp = None
gstate.start_counter = None
if "verify_ssl" in opts.icargs and not opts.icargs["verify_ssl"]:
# user has explicitly said be insecure, so don't warn about it
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
signal.signal(signal.SIGTERM, handle_sigterm)
try:
# attempt to hack around breakage between influxdb-python client and 2.0 server:
gstate.influx_client = InfluxDBClient(**opts.icargs, headers={"Accept": "application/json"})
except TypeError:
# ...unless influxdb-python package version is too old
gstate.influx_client = InfluxDBClient(**opts.icargs)
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
except Terminated:
pass
finally:
if gstate.points:
rc = flush_points(opts, gstate)
gstate.influx_client.close()
gstate.shutdown()
sys.exit(rc)
if __name__ == '__main__':
main()