-
Notifications
You must be signed in to change notification settings - Fork 0
/
rtk-driver.py
136 lines (115 loc) · 4.13 KB
/
rtk-driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import rospy
import serial
import threading
import time
# import pynmea2
from std_msgs.msg import Header
from gps_common.msg import GPSFix, GPSStatus
class RTK:
def __init__(self, port='/dev/ttyUSB0', ros_pub=None):
self.serial = serial.Serial(port, 115200)
self.ros_pub = ros_pub
if self.ros_pub is None:
print('No ros_pub !')
# self.speed = 0.
self.yaw = 0.
self.err_yaw = 0.
self.latitude = 0.
self.longitude = 0.
# time
self.last_gps = 0
self.last_heading = 0
# flag
self.new_gps = False
self.new_heading = False
recv_thread = threading.Thread(
target = self.read_serial, args=()
)
recv_thread.start()
def read_serial(self):
while True:
try:
line = self.serial.readline()
line = line.decode()
self.parse(line)
except:
pass
try:
# get both data, publish
if self.new_gps and self.new_heading:
# clear flag
self.new_gps = False
self.new_heading = False
# check timeout
timeout_valid = self.check_signal(timeout = 0.1)
ros_msg = GPSFix()
ros_msg.header = Header()
ros_msg.header.stamp = rospy.Time.now()
ros_msg.header.frame_id = 'gnss_link'
ros_msg.latitude = self.latitude
ros_msg.longitude = self.longitude
ros_msg.dip = self.yaw
ros_msg.err_dip = self.err_yaw
ros_msg.status = GPSStatus()
ros_msg.status.status = 2 if timeout_valid else 0
# ros_msg.speed = self.speed
if self.ros_pub is not None:
self.ros_pub.publish(ros_msg)
except:
pass
def check_signal(self, timeout = 0.1):
now = time.time()
if (now-self.last_gps) < timeout and (now-self.last_heading) < timeout:
return True
else:
return False
def gps60to10(self, la, la_dig, lo, lo_dig):
new_la = la + la_dig/60.
new_lo = lo + lo_dig/60.
return new_la, new_lo
def parse(self, data):
if data[:9] == '#HEADINGA':
sp_line = data.split(',')
self.yaw = float(sp_line[12])
self.err_yaw = float(sp_line[15])
if abs(self.err_yaw) > 0.3:
self.err_yaw = 0.1
print('get error err_yaw', float(sp_line[15]))
self.last_heading = time.time()
self.new_heading = True
if data[:6] == '$GPGGA':
sp_line = data.split(',')
la = 30#(float(sp_line[2])-float(sp_line[2])%100)/100
la_dig = float(sp_line[2]) % 100
lo = 120#(float(sp_line[4])-float(sp_line[4])%100)/100
lo_dig = float(sp_line[4]) % 100
new_la, new_lo = self.gps60to10(la, la_dig, lo, lo_dig)
# rmc = pynmea2.parse(data)
# self.latitude = rmc.latitude
# self.longitude = rmc.longitude
self.latitude = new_la
self.longitude = new_lo
self.last_gps = time.time()
self.new_gps = True
def open_serial():
serial_list = [item for item in os.listdir('/dev/') if item[:6]=='ttyUSB']
if len(serial_list) > 0:
for item in serial_list:
os.system('sudo chmod 777 /dev/' + item)
if len(serial_list) > 1:
print('More than one USB found:')
for item in serial_list:
print(item)
print('Using', serial_list[0])
return serial_list[0]
if __name__ == '__main__':
rospy.init_node('rtk', anonymous=True)
rtk_pub = rospy.Publisher('/jzhw/gps/fix', GPSFix, queue_size=0)
usb = open_serial()
rtk = RTK('/dev/' + usb, rtk_pub)
rate = rospy.Rate(1000)
while not rospy.is_shutdown():
rate.sleep()