Skip to content

Commit

Permalink
install python packages from apt, use threads, split off utils and de…
Browse files Browse the repository at this point in the history
…code, better error handling, add a pos regex
  • Loading branch information
rpatel3001 committed Feb 20, 2024
1 parent 7128d78 commit f4e8561
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 200 deletions.
33 changes: 17 additions & 16 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@ FROM ghcr.io/sdr-enthusiasts/docker-baseimage:python

SHELL ["/bin/bash", "-o", "pipefail", "-c"]

COPY requirements.txt /tmp

# hadolint ignore=DL3008,SC2086,DL4006,SC2039
RUN set -x && \
# TEMP_PACKAGES=() && \
# KEPT_PACKAGES=() && \
# # temp
# TEMP_PACKAGES+=() && \
# # keep
# KEPT_PACKAGES+=() && \
# apt-get update && \
# apt-get install -y --no-install-recommends \
# "${KEPT_PACKAGES[@]}" \
# "${TEMP_PACKAGES[@]}" \
# && \
pip install --break-system-packages -r /tmp/requirements.txt && \
TEMP_PACKAGES=() && \
KEPT_PACKAGES=() && \
# temp
TEMP_PACKAGES+=() && \
# keep
KEPT_PACKAGES+=(python3-prctl) && \
KEPT_PACKAGES+=(python3-bs4) && \
KEPT_PACKAGES+=(python3-colorama) && \
KEPT_PACKAGES+=(python3-requests) && \
apt-get update && \
apt-get install -y --no-install-recommends \
"${KEPT_PACKAGES[@]}" \
"${TEMP_PACKAGES[@]}" \
&& \
pip install --break-system-packages icao_nnumber_converter_us && \
# Clean up
# apt-get remove -y "${TEMP_PACKAGES[@]}" && \
# apt-get autoremove -y && \
apt-get remove -y "${TEMP_PACKAGES[@]}" && \
apt-get autoremove -y && \
rm -rf /src/* /tmp/* /var/lib/apt/lists/*

COPY rootfs /
4 changes: 0 additions & 4 deletions requirements.txt

This file was deleted.

281 changes: 101 additions & 180 deletions rootfs/scripts/acars2pos.py
Original file line number Diff line number Diff line change
@@ -1,179 +1,92 @@
import fcntl
import locale
import os
import socket
import traceback
from datetime import datetime
from json import loads
from math import pow
from os import getenv
from pprint import pprint
import prctl
from queue import SimpleQueue
from re import findall, search, split, sub
from sys import stderr
from threading import Thread, current_thread
from time import sleep

import requests
from bs4 import BeautifulSoup
from colorama import Fore, init
from icao_nnumber_converter_us import n_to_icao

enc = locale.getpreferredencoding(False)
def readlines_nb(f):
fd = f.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
buf = bytearray()
from acars_decode import Decoder as AD
from util import *

def rx_thread(host, rxq):
prctl.set_name(f"rx {host[0]}:{host[1]}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(host)
rdr = sock2lines(sock)
print(f"Connected to JSON input at {host[0]}:{host[1]}")
while True:
try:
block = os.read(fd, 8192)
except BlockingIOError:
yield ""
continue
if not block:
if buf:
yield buf.decode(enc)
buf.clear()
break
buf.extend(block)
while True:
r = buf.find(b'\r')
n = buf.find(b'\n')
if r == -1 and n == -1: break
if r == -1 or r > n:
yield buf[:(n+1)].decode(enc)
buf = buf[(n+1):]
elif n == -1 or n > r:
yield buf[:r].decode(enc) + '\n'
if n == r+1:
buf = buf[(r+2):]
else:
buf = buf[(r+1):]
msg = next(rdr).strip()
if msg:
rxq.put_nowait(msg)
else:
sleep(1)

ACARS_HOST = getenv("ACARS_HOST", "acars_router")
VDLM2_HOST = getenv("VDLM2_HOST", "acars_router")
HFDL_HOST = getenv("HFDL_HOST", "acars_router")
SBS_HOST = getenv("SBS_HOST", "ultrafeeder")
def tx_thread(host, txq):
prctl.set_name(f"tx {host[0]}:{host[1]}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(host)
print(f"Connected to SBS output at {host[0]}:{host[1]}")
while True:
msg = txq.get()
sock.sendall(msg.encode(enc))

ACARS_PORT = int(getenv("ACARS_PORT", 15550))
VDLM2_PORT = int(getenv("VDLM2_PORT", 15555))
HFDL_PORT = int(getenv("HFDL_PORT", 15556))
SBS_PORT = int(getenv("SBS_PORT", 12000))
# wrapper to catch exceptions and restart threads
def thread_wrapper(func, *args):
slp = 10
while True:
try:
print(f"[{current_thread().name}] starting thread")
func(*args)
except ConnectionRefusedError:
print(f"[{current_thread().name}] connection refused; restarting thread in {slp} seconds")
except StopIteration:
print(f"[{current_thread().name}] lost connection; restarting thread in {slp} seconds")
except BaseException as exc:
print(traceback.format_exc())
print(f"[{current_thread().name}] exception {type(exc).__name__}; restarting thread in {slp} seconds")
else:
print(f"[{current_thread().name}] thread function returned; restarting thread in {slp} seconds")
sleep(slp)

acarssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
acarssock.connect((ACARS_HOST, ACARS_PORT))
acarsgen = readlines_nb(acarssock)
json_in = getenv("JSON_IN", "acars_router:15550")
json_in = json_in.split(";")
json_in = [x.split(":") for x in json_in]
json_in = [(x,int(y)) for x,y in json_in]

vdlm2sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
vdlm2sock.connect((VDLM2_HOST, VDLM2_PORT))
vdlm2gen = readlines_nb(vdlm2sock)
sbs_out = getenv("SBS_OUT", "ultrafeeder:12000")
sbs_out = sbs_out.split(";")
sbs_out = [x.split(":") for x in sbs_out]
sbs_out = [(x,int(y)) for x,y in sbs_out]

hfdlsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
hfdlsock.connect((HFDL_HOST, HFDL_PORT))
hfdlgen = readlines_nb(hfdlsock)
rxq = SimpleQueue()
for i,j in enumerate(json_in):
Thread(name=f"rx {j[0]}:{j[1]}", target=thread_wrapper, args=(rx_thread, j, rxq)).start()

outsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
outsock.connect((SBS_HOST, SBS_PORT))
txqs = []
for i,s in enumerate(sbs_out):
txqs.append(SimpleQueue())
Thread(name=f"tx {s[0]}:{s[1]}", target=thread_wrapper, args=(tx_thread, s, txqs[-1])).start()

acdb = {}
regdb = {}
while True:
try:
sbs = {}

raw = next(acarsgen).strip()
if raw:
sbs["type"] = "acars"
else:
raw = next(vdlm2gen).strip()
if raw:
sbs["type"] = "vdlm2"
else:
raw = next(hfdlgen).strip()
if raw:
sbs["type"] = "hfdl"
else:
sleep(1)
continue
raw = rxq.get()

data = loads(raw)
if sbs["type"] == "vdlm2":
data = data["vdl2"]
elif sbs["type"] == "hfdl":
data = loads(raw)["hfdl"]
del data["app"]
# pprint(data)

if sbs["type"] == "acars":
if data.get("text") is None or data["label"] == "SQ":
continue
else:
sbs["reg"] = data["tail"]
sbs["time"] = data["timestamp"]
sbs["flight"] = data.get("flight", "")
sbs["txt"] = data["text"]
sbs["msgtype"] = data["label"]
elif sbs["type"] == "vdlm2":
if data.get("avlc") is None or data["avlc"].get("acars") is None or data["avlc"]["acars"].get("msg_text") is None:
continue
sbs["reg"] = data["avlc"]["acars"]["reg"]
sbs["time"] = data["t"]["sec"] + data["t"]["usec"]/1e6
sbs["flight"] = data["avlc"]["acars"]["flight"]
sbs["txt"] = data["avlc"]["acars"]["msg_text"]
sbs["msgtype"] = data["avlc"]["acars"]["label"]
elif sbs["type"] == "hfdl":
if not data.get("lpdu"):
if not data.get("spdu"):
pprint(data, stream=stderr)
print("hfdl bad top level key", file=stderr)
continue
# if not data["lpdu"].get("hfnpdu"):
# pprint(data, stream=stderr)
# print("hfdl no keys", file=stderr)
# continue
sbs["time"] = data["t"]["sec"] + data["t"]["usec"]/1e6
sbs["flight"] = data["lpdu"].get("hfnpdu", {}).get("flight_id", "")
try:
sbs["lat"] = data["lpdu"]["hfnpdu"]["pos"]["lat"]
sbs["lon"] = data["lpdu"]["hfnpdu"]["pos"]["lon"]
if sbs["lat"] == 180 and sbs["lon"] == 180:
continue
except:
pass
if data["lpdu"].get("hfnpdu", {}).get("acars"):
sbs["reg"] = data["lpdu"]["hfnpdu"]["acars"]["reg"]
sbs["flight"] = data["lpdu"]["hfnpdu"]["acars"].get("flight", "")
sbs["txt"] = data["lpdu"]["hfnpdu"]["acars"]["msg_text"]
sbs["msgtype"] = data["lpdu"]["hfnpdu"]["acars"]["label"]

sbs["id"] = data["lpdu"]["src"]["id"]
sbs["msgtype"] = data["lpdu"]["type"]["id"]
if sbs["msgtype"] == 191 or sbs["msgtype"] == 79:
# pprint(data["lpdu"]["ac_info"], stream=stderr)
sbs["icao"] = data["lpdu"]["ac_info"].get("icao", "")
sbs["reg"] = data["lpdu"]["ac_info"].get("regnr", "")
regdb[sbs["flight"]] = {"icao": sbs["icao"], "reg": sbs["reg"]}
print(f"hfdl logon req/res {sbs['flight']} {regdb[sbs['flight']]}", file=stderr)
elif sbs["msgtype"] == 159:
# pprint(data["lpdu"]["ac_info"], stream=stderr)
sbs["id"] = data["lpdu"]["assigned_ac_id"]
sbs["icao"] = data["lpdu"]["ac_info"].get("icao", "")
sbs["reg"] = data["lpdu"]["ac_info"].get("regnr", "")
acdb[sbs["id"]] = {"icao": sbs["icao"], "reg": sbs["reg"], "flight": sbs["flight"]}
print(f"hfdl logon confirm {sbs['id']} {acdb[sbs['id']]}", file=stderr)
else:
if acdb.get(sbs["id"]):
# print(f"hfdl ac in db: {sbs['id']}", file=stderr)
sbs["reg"] = acdb[sbs["id"]]["reg"]
sbs["icao"] = acdb[sbs["id"]]["icao"]
elif regdb.get(sbs["flight"]):
# print(f"hfdl ac in reg db: {sbs['flight']}", file=stderr)
sbs["reg"] = regdb[sbs["flight"]]["reg"]
sbs["icao"] = regdb[sbs["flight"]]["icao"]
else:
# pprint(data, stream=stderr)
# print(f"hfdl ac not in db: {sbs['id']} {sbs['flight']}", file=stderr)
continue
else:
sbs = AD.decode(data)
if sbs is None:
continue

if not sbs.get("txt") and not sbs.get("lat"):
Expand All @@ -185,12 +98,15 @@ def readlines_nb(f):
lat = sbs["lat"]
lon = sbs["lon"]
else:
rgx1 = r"([NS][\s\d\.]{4,15},?\s*/?[WE][\s\d\.]{4,15})"
rgx2 = r"([\s\d\.]{4,15}[NS],?\s*/?[\s\d\.]{4,15}[WE])"
# pos = findall("[NS]\s*\d+\.?\d*\s*,?\s*[WE]\s*\d+\.?\d*", sbs["txt"])
pos1 = findall("/?[N]?[\s\d\.]{4,15},?\s*/?[W-][\s\d\.]{4,15}", sbs["txt"])
pos1 = findall(rgx1, sbs["txt"])
pos1b = findall(rgx2, sbs["txt"])
pos2a = findall("LAT", sbs["txt"])
pos2b = findall("LON", sbs["txt"])
if (len(pos1) == 1):
txt = sub(r'(/?[N]?[\s\d\.]{4,15},?\s*/?[W-][\s\d\.]{4,15})', Fore.RED + r'\1' + Fore.RESET, sbs["txt"])
txt = sub(rgx1, Fore.RED + r'\1' + Fore.RESET, sbs["txt"])

pos = pos1[0]
pos = sub(r'/', '', pos)
Expand All @@ -201,17 +117,45 @@ def readlines_nb(f):

issouth = "S" in pos
iswest = "W" in pos
isnorth = "N" in pos
iseast = "E" in pos

if not(isnorth or issouth) and not(iswest or iseast):
continue

print(txt, file=stderr)

pos = split(r'[WE]', pos[1:])

if "N" in pos or "S" in pos:
if "W" not in pos and "E" not in pos:
continue
lat = pos[0].lstrip("0")
lon = pos[1].lstrip("0")[:len(lat)]

if lat and lon:
lat = int(lat)/pow(10, len(lat)-2) * (-1 if issouth else 1)
lon = int(lon)/pow(10, len(lon)-2) * (-1 if iswest else 1)
else:
if "W" in pos or "E" in pos:
continue
continue
elif len(pos1b) == 1:
txt = sub(rgx2, Fore.RED + r'\1' + Fore.RESET, sbs["txt"])

pos = pos1b[0]
pos = sub(r'/', '', pos)
pos = sub(r'\s', '', pos)
pos = sub(r',', '', pos)
pos = sub(r'\.', '', pos)
pos = sub(r'-', 'W', pos)

issouth = "S" in pos
iswest = "W" in pos
isnorth = "N" in pos
iseast = "E" in pos

if not(isnorth or issouth) and not(iswest or iseast):
continue

print(txt, file=stderr)

pos = split(r'[WE-]', pos[1:])
pos = split(r'[NS]', pos[1:])

lat = pos[0].lstrip("0")
lon = pos[1].lstrip("0")[:len(lat)]
Expand Down Expand Up @@ -255,31 +199,8 @@ def readlines_nb(f):
out = f'MSG,3,1,1,{sbs["icao"].upper()},1,{datetime.fromtimestamp(sbs["time"]):%Y/%m/%d,%T.%f},{datetime.now():%Y/%m/%d,%T.%f},{sbs["flight"]},,,,{lat},{lon},,{squawk},,,,\n'

print(f'{Fore.BLUE}{out}{Fore.RESET}', file=stderr)
outsock.sendall(out.encode(enc))
except KeyboardInterrupt:
print("Got ctrl-c, closing", file=stderr)
acarssock.close()
vdlm2sock.close()
hfdlsock.close()
outsock.close()
break
except BrokenPipeError:
print("Reconnecting", file=stderr)
acarssock.close()
vdlm2sock.close()
hfdlsock.close()
outsock.close()
acarssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
acarssock.connect((ACARS_HOST, ACARS_PORT))
acarsgen = readlines_nb(acarssock)
vdlm2sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
vdlm2sock.connect((VDLM2_HOST, VDLM2_PORT))
vdlm2gen = readlines_nb(vdlm2sock)
hfdlsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
hfdlsock.connect((HFDL_HOST, HFDL_PORT))
hfdlgen = readlines_nb(hfdlsock)
outsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
outsock.connect((SBS_HOST, SBS_PORT))
for q in txqs:
q.put(out)
except BaseException:
print("Other exception:", file=stderr)
pprint(data, stream=stderr)
Expand Down
Loading

0 comments on commit f4e8561

Please sign in to comment.