-
Notifications
You must be signed in to change notification settings - Fork 1
/
NMEA-GPS_Serial_local.py
95 lines (72 loc) · 2.3 KB
/
NMEA-GPS_Serial_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import serial
import time
# import requests
import pynmea2
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
# You can generate a Token from the "Tokens Tab" in the UI
token = "TOKEN"
org = "ORG"
bucket = "BUCKET"
client = InfluxDBClient(url="http://127.0.0.1:8086", token=token)
def GPS_RMC(msg):
if msg.status == "A":
send("RMCFix", "1")
else:
send("RMCFix", "0")
def GPS_GSA(msg):
# print(repr(msg))
# print(dir(msg))
send('GSA3d', msg.mode_fix_type)
def GPS_GSV(msg):
# print(repr(msg))
send('GSView', msg.num_sv_in_view)
def GPS_GGA(msg):
# print(repr(msg))
send('GGA', msg.num_sats)
dispatch = {
'GGA' : GPS_GGA,
# b'$PRWIZC' : do_sink,
'GSA' : GPS_GSA,
'GSV' : GPS_GSV,
'RMC' : GPS_RMC,
}
def send(datum, value):
write_api = client.write_api(write_options=SYNCHRONOUS)
data = f"{datum} value={value}"
# print(data)
write_api.write(bucket, org, data)
def main(argv):
while(True):
read = b'...'
while(read != b''):
time.sleep(5)
try:
with serial.Serial('/dev/ttyS1', 4800, timeout=2) as ser:
while(read != b''):
read = ser.readline().strip()
# print(read)
if read.startswith(b'$') and not read.startswith(b'$PRWIZCH'):
try:
msg = pynmea2.parse(read.decode())
except UnicodeDecodeError:
pass
# print(dir(msg))
# print(msg.sentence_type)
try:
dispatch[msg.sentence_type](msg)
except KeyError:
pass
except pynmea2.nmea.ChecksumError:
print("Checksum Error")
except serial.serialutil.SerialException:
print("Connection Failed")
if __name__ == "__main__":
try:
main(sys.argv)
except KeyboardInterrupt:
print('Received Ctrl-c')