-
Notifications
You must be signed in to change notification settings - Fork 1
/
functions.py
245 lines (230 loc) · 9.03 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import config as lite
import csv
import datetime
import json
import math
import numpy as np
import pymap3d as pm
import urllib.request
def azel_to_hadec(azel):
''' Converts coordinates from AZ, EL to HA, DEC '''
a = azel[1]
z = 90 - a
A = azel[0]
phi = lite.ref_pos[0]
# Python math requires radian values
z_rad = math.radians(z)
A_rad = math.radians(A)
phi_rad = math.radians(phi)
dec_rad = math.asin(math.cos(z_rad) * math.sin(phi_rad)
+ math.sin(z_rad) * math.cos(phi_rad) * math.cos(A_rad))
ha_rad = math.acos(math.cos(z_rad) / (math.cos(dec_rad) * math.cos(phi_rad))
- math.tan(dec_rad) * math.tan(phi_rad))
# Convert back to degrees
ha_deg = math.degrees(ha_rad)
dec_deg = math.degrees(dec_rad)
if azel[0] > 0 and azel[0] < 180:
ha_deg *= -1
return [ha_deg, dec_deg]
def get_ground_value(data, idx):
''' Get value from row in log data '''
try:
value = (data[idx].strip('\n'))
# If no value, then return 0
except ValueError:
value = 0
return value
def get_ground_pos():
''' Get latitude, longitude, altitude, and utime from Ground Station .log file '''
file = None
lat = lng = alt = utime = -404
# Read data from specified .log file
try:
file = open(lite.log_path, 'r')
except FileNotFoundError:
print("File not found")
exit(1)
row = reversed(list(csv.reader(file)))
# Ensure .log data read is from the correct callsign
try:
while True:
data = next(row)
# Only get data if the correct callsign is found
try:
if get_ground_value(data, 3).lower() == lite.ground_callsign.lower():
file.close()
lat = float(get_ground_value(data, 10))
lng = float(get_ground_value(data, 11))
alt = float(get_ground_value(data, 14))
utime = float(get_ground_value(data, 1))
return [lat, lng, alt, utime]
# Do nothing if row has no callsign
except ValueError:
pass
# If no data from callsign is found, return null_pos
except StopIteration:
file.close()
return lite.null_pos
def get_aprs_value(json_data, key):
''' Get value from JSON data from APRS '''
try:
value = json_data['entries'][0][key]
return value
# If no value, then return 0
except KeyError:
return "0"
def get_aprs_pos():
''' Get latitude, longitude, altitude, and utime from APRS '''
url = "https://api.aprs.fi/api/get?name=" + lite.aprs_callsign + \
"&what=loc&apikey=" + lite.aprs_key + "&format=json"
try:
response = urllib.request.urlopen(url)
# If URL is invalid, return null_pos
except ValueError:
return lite.null_pos
json_data = json.loads(response.read().decode())
# Check if JSON from APRS contains data
if json_data['result'] == "ok":
try:
lat = float(get_aprs_value(json_data, 'lat'))
lng = float(get_aprs_value(json_data, 'lng'))
alt = float(get_aprs_value(json_data, 'altitude'))
# Altitude should be stored at ['altitude'], but check for ['alt']
if alt == 0:
alt = float(get_aprs_value(json_data, 'alt'))
utime = get_aprs_value(json_data, 'time')
return [lat, lng, alt, utime]
# If no data from callsign is found, return null_pos
except KeyError:
return lite.null_pos
def is_pos_updated(pos, log_data):
''' Check if position data from source has updated '''
# Check if data is null
if np.array_equal(pos, lite.null_pos[0:3]):
return False
# Check if data for updates against previous data
elif lite.n > 0:
if np.array_equal(pos, log_data):
return False
else:
return True
# First instance of data will always be up to date
else:
return True
def conditional_data_update(ground_pos, aprs_pos):
''' Determine which data source, if any, has updated most recently '''
pos = lite.pred_pos
source = "ground"
# Check for new Ground Station data
if is_pos_updated(lite.ground_pos[0:3], ground_pos[0:3]):
pos = lite.ground_pos
lite.last_ground_update = 0
# Also check for APRS update
if is_pos_updated(lite.aprs_pos[0:3], aprs_pos[0:3]):
lite.last_aprs_update = 0
else:
lite.last_aprs_update += 1
# Else, check for new APRS data
elif is_pos_updated(lite.aprs_pos[0:3], aprs_pos[0:3]):
pos = lite.aprs_pos
lite.last_aprs_update = 0
source = "aprs"
# Also increment for Ground Station update
lite.last_ground_update += 1
# Else if no data is up to date, use prediction
else:
lite.last_ground_update += 1
lite.last_aprs_update += 1
source = "pred"
if len(pos) != 4:
pos.append(-404)
return pos, source
def get_updated_data(log_data):
''' Get updated data from relevant source '''
pos = lite.pred_pos
source = "ground"
# Check current data against null_pos
if lite.n == 0:
pos, source = conditional_data_update(lite.null_pos, lite.null_pos)
# Check current data against previous data
elif lite.n > 1:
pos, source = conditional_data_update(lite.log[lite.n - 1]['ground_pos'],
lite.log[lite.n - 1]['aprs_pos'])
log_data['source'] = source
return pos
def get_hadec(pred_pos, log_data):
''' Convert predicted position from Geodetic to HADEC '''
# Get AZEL and HADEC if data is updated
if is_pos_updated(pred_pos[0:3], lite.null_pos[0:3]):
az, el, range = pm.geodetic2aer(pred_pos[0], pred_pos[1], pred_pos[2],
lite.ref_pos[0], lite.ref_pos[1], lite.ref_pos[2])
[ha, dec] = azel_to_hadec([az, el, range])
ha += lite.offset_ha
dec += lite.offset_dec
# Set AZEL and HADEC to dummy values if no data update
else:
az = el = range = ha = dec = -404
# Push data to lite.log[]
log_data['azel'] = [az, el, range]
log_data['hadec'] = [ha, dec]
log_data['hadec_offset'] = [lite.offset_ha, lite.offset_dec]
return [ha, dec]
def get_last_pred_pos():
''' Return the last predicted geodetic position '''
if lite.n == 0:
return lite.null_pos
else:
return lite.log[lite.n - 1]['pred_pos']
def is_pos_change_invalid(hadec_1, hadec_2):
''' Returns true if position changes by more than 2 degrees for HA and DEC '''
# Check HA change
if abs(abs(hadec_1[0]) - abs(hadec_2[0])) > 2:
return True
# Check DEC change
elif abs(abs(hadec_1[1]) - abs(hadec_2[1])) > 2:
return True
# If both HA and DEC changes is <= 2 degrees
else:
return False
def is_command_valid(geodetic_pos, elevation, hadec):
''' Check if next command is valid '''
# Must be greater than or equal to minimum elevation, non-null, and telescope movement must not be paused
if elevation >= lite.min_el and not np.array_equal(geodetic_pos, lite.null_pos) and not lite.pause:
if lite.n == 0:
return True
# Ensure HA DEC has changed by no more than 2 degrees (removes noise)
elif lite.n > 0 and not is_pos_change_invalid(hadec, lite.log[lite.n - 1]['hadec']):
return True
# Might not need to check np.array_equal(geodetic_pos, lite.null_pos)
else:
return False
else:
return False
def print_data_small():
''' Recurring print funciton called every 10 seconds '''
# Print HA, DEC
str_output = "--------------------------------------------\n"\
" TIME: " + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + "\n"\
" HA: " + str(round(lite.log[lite.n]['hadec'][0], 4)) + " deg\n"\
" DEC: " + str(round(lite.log[lite.n]['hadec'][1], 4)) + " deg\n"
# Indicate if command is sent to telescope during this iteration
if lite.n % 3 == 2:
str_output += "Sending command to telescope ....\n"
else:
str_output += "Next command will be sent in " + str(20 - 10 * (lite.n % 3)) + " seconds\n"
return str_output
"""
The following functions are used in place of input() and print()
They will output to a text file in addition to their standard input/output
Call them using print(input_out(my_str)) or print(print_out(my_str))
"""
def input_out(str_prompt):
''' Get string input from user and write to output file '''
lite.output_file.write(str_prompt + "\n")
str_input = input(str_prompt)
lite.output_file.write(str_input + "\n")
return str_input
def print_out(str_input):
''' Print and write to output file '''
lite.output_file.write(str_input + "\n")
return str_input