forked from sparky8512/starlink-grpc-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dish_obstruction_map.py
186 lines (163 loc) · 6.96 KB
/
dish_obstruction_map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/python3
"""Write a PNG image representing Starlink obstruction map data.
This scripts queries obstruction map data from the Starlink user terminal
reachable on the local network and writes a PNG image based on that data.
"""
import argparse
from datetime import datetime
import logging
import os
import png
import sys
import time
import starlink_grpc
DEFAULT_OBSTRUCTED_COLOR = "FFFF0000"
DEFAULT_UNOBSTRUCTED_COLOR = "FFFFFFFF"
DEFAULT_NO_DATA_COLOR = "00000000"
DEFAULT_OBSTRUCTED_GREYSCALE = "FF00"
DEFAULT_UNOBSTRUCTED_GREYSCALE = "FFFF"
DEFAULT_NO_DATA_GREYSCALE = "0000"
LOOP_TIME_DEFAULT = 0
def loop_body(opts, context):
snr_data = starlink_grpc.obstruction_map(context)
def pixel_bytes(row):
for point in row:
if point > 1.0:
# shouldn't happen, but just in case...
point = 1.0
if point >= 0.0:
if opts.greyscale:
yield round(point * opts.unobstructed_color_g +
(1.0-point) * opts.obstructed_color_g)
else:
yield round(point * opts.unobstructed_color_r +
(1.0-point) * opts.obstructed_color_r)
yield round(point * opts.unobstructed_color_g +
(1.0-point) * opts.obstructed_color_g)
yield round(point * opts.unobstructed_color_b +
(1.0-point) * opts.obstructed_color_b)
if not opts.no_alpha:
yield round(point * opts.unobstructed_color_a +
(1.0-point) * opts.obstructed_color_a)
else:
if opts.greyscale:
yield opts.no_data_color_g
else:
yield opts.no_data_color_r
yield opts.no_data_color_g
yield opts.no_data_color_b
if not opts.no_alpha:
yield opts.no_data_color_a
if opts.filename == "-":
# Open new stdout file to get binary mode
out_file = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
else:
now = int(time.time())
filename = opts.filename.replace("%u", str(now))
filename = filename.replace("%d",
datetime.utcfromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S"))
filename = filename.replace("%s", str(opts.sequence))
out_file = open(filename, "wb")
if not snr_data or not snr_data[0]:
logging.error("Invalid SNR map data: Zero-length")
return 1
writer = png.Writer(len(snr_data[0]),
len(snr_data),
alpha=(not opts.no_alpha),
greyscale=opts.greyscale)
writer.write(out_file, (bytes(pixel_bytes(row)) for row in snr_data))
out_file.close()
opts.sequence += 1
return 0
def parse_args():
parser = argparse.ArgumentParser(
description="Collect directional obstruction map data from a Starlink user terminal and "
"emit it as a PNG image")
parser.add_argument(
"filename",
help="The image file to write, or - to write to stdout; may be a template with the "
"following to be filled in per loop iteration: %%s for sequence number, %%d for UTC date "
"and time, %%u for seconds since Unix epoch.")
parser.add_argument(
"-o",
"--obstructed-color",
help="Color of obstructed areas, in RGB, ARGB, L, or AL hex notation, default: " +
DEFAULT_OBSTRUCTED_COLOR + " or " + DEFAULT_OBSTRUCTED_GREYSCALE)
parser.add_argument(
"-u",
"--unobstructed-color",
help="Color of unobstructed areas, in RGB, ARGB, L, or AL hex notation, default: " +
DEFAULT_UNOBSTRUCTED_COLOR + " or " + DEFAULT_UNOBSTRUCTED_GREYSCALE)
parser.add_argument(
"-n",
"--no-data-color",
help="Color of areas with no data, in RGB, ARGB, L, or AL hex notation, default: " +
DEFAULT_NO_DATA_COLOR + " or " + DEFAULT_NO_DATA_GREYSCALE)
parser.add_argument(
"-g",
"--greyscale",
action="store_true",
help="Emit a greyscale image instead of the default full color image; greyscale images "
"use L or AL hex notation for the color options")
parser.add_argument(
"-z",
"--no-alpha",
action="store_true",
help="Emit an image without alpha (transparency) channel instead of the default that "
"includes alpha channel")
parser.add_argument("-e",
"--target",
help="host:port of dish to query, default is the standard IP address "
"and port (192.168.100.1:9200)")
parser.add_argument("-t",
"--loop-interval",
type=float,
default=float(LOOP_TIME_DEFAULT),
help="Loop interval in seconds or 0 for no loop, default: " +
str(LOOP_TIME_DEFAULT))
parser.add_argument("-s",
"--sequence",
type=int,
default=1,
help="Starting sequence number for templatized filenames, default: 1")
opts = parser.parse_args()
if opts.obstructed_color is None:
opts.obstructed_color = DEFAULT_OBSTRUCTED_GREYSCALE if opts.greyscale else DEFAULT_OBSTRUCTED_COLOR
if opts.unobstructed_color is None:
opts.unobstructed_color = DEFAULT_UNOBSTRUCTED_GREYSCALE if opts.greyscale else DEFAULT_UNOBSTRUCTED_COLOR
if opts.no_data_color is None:
opts.no_data_color = DEFAULT_NO_DATA_GREYSCALE if opts.greyscale else DEFAULT_NO_DATA_COLOR
for option in ("obstructed_color", "unobstructed_color", "no_data_color"):
try:
color = int(getattr(opts, option), 16)
if opts.greyscale:
setattr(opts, option + "_a", (color >> 8) & 255)
setattr(opts, option + "_g", color & 255)
else:
setattr(opts, option + "_a", (color >> 24) & 255)
setattr(opts, option + "_r", (color >> 16) & 255)
setattr(opts, option + "_g", (color >> 8) & 255)
setattr(opts, option + "_b", color & 255)
except ValueError:
logging.error("Invalid hex number for %s", option)
sys.exit(1)
return opts
def main():
opts = parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s")
context = starlink_grpc.ChannelContext(target=opts.target)
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, context)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
finally:
context.close()
sys.exit(rc)
if __name__ == '__main__':
main()