diff --git a/Dockerfile b/Dockerfile index d491a58..e4ff30a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,25 +2,26 @@ FROM ghcr.io/sdr-enthusiasts/docker-baseimage:python SHELL ["/bin/bash", "-o", "pipefail", "-c"] -COPY requirements.txt /tmp - # hadolint ignore=DL3008,SC2086,DL4006,SC2039 RUN set -x && \ -# TEMP_PACKAGES=() && \ -# KEPT_PACKAGES=() && \ -# # temp -# TEMP_PACKAGES+=() && \ -# # keep -# KEPT_PACKAGES+=() && \ -# apt-get update && \ -# apt-get install -y --no-install-recommends \ -# "${KEPT_PACKAGES[@]}" \ -# "${TEMP_PACKAGES[@]}" \ -# && \ - pip install --break-system-packages -r /tmp/requirements.txt && \ + TEMP_PACKAGES=() && \ + KEPT_PACKAGES=() && \ + # temp + TEMP_PACKAGES+=() && \ + # keep + KEPT_PACKAGES+=(python3-prctl) && \ + KEPT_PACKAGES+=(python3-bs4) && \ + KEPT_PACKAGES+=(python3-colorama) && \ + KEPT_PACKAGES+=(python3-requests) && \ + apt-get update && \ + apt-get install -y --no-install-recommends \ + "${KEPT_PACKAGES[@]}" \ + "${TEMP_PACKAGES[@]}" \ + && \ + pip install --break-system-packages icao_nnumber_converter_us && \ # Clean up -# apt-get remove -y "${TEMP_PACKAGES[@]}" && \ -# apt-get autoremove -y && \ + apt-get remove -y "${TEMP_PACKAGES[@]}" && \ + apt-get autoremove -y && \ rm -rf /src/* /tmp/* /var/lib/apt/lists/* COPY rootfs / diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 2acd9ab..0000000 --- a/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -beautifulsoup4==4.12.3 -colorama==0.4.6 -icao_nnumber_converter_us==0.1.0 -Requests==2.31.0 diff --git a/rootfs/scripts/acars2pos.py b/rootfs/scripts/acars2pos.py index fa16837..7a59627 100755 --- a/rootfs/scripts/acars2pos.py +++ b/rootfs/scripts/acars2pos.py @@ -1,6 +1,4 @@ -import fcntl import locale -import os import socket import traceback from datetime import datetime @@ -8,8 +6,11 @@ from math import pow from os import getenv from pprint import pprint +import prctl +from queue import SimpleQueue from re import findall, search, split, sub from sys import stderr +from threading import Thread, current_thread from time import sleep import requests @@ -17,163 +18,75 @@ from colorama import Fore, init from icao_nnumber_converter_us import n_to_icao -enc = locale.getpreferredencoding(False) -def readlines_nb(f): - fd = f.fileno() - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - buf = bytearray() +from acars_decode import Decoder as AD +from util import * + +def rx_thread(host, rxq): + prctl.set_name(f"rx {host[0]}:{host[1]}") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(host) + rdr = sock2lines(sock) + print(f"Connected to JSON input at {host[0]}:{host[1]}") while True: - try: - block = os.read(fd, 8192) - except BlockingIOError: - yield "" - continue - if not block: - if buf: - yield buf.decode(enc) - buf.clear() - break - buf.extend(block) - while True: - r = buf.find(b'\r') - n = buf.find(b'\n') - if r == -1 and n == -1: break - if r == -1 or r > n: - yield buf[:(n+1)].decode(enc) - buf = buf[(n+1):] - elif n == -1 or n > r: - yield buf[:r].decode(enc) + '\n' - if n == r+1: - buf = buf[(r+2):] - else: - buf = buf[(r+1):] + msg = next(rdr).strip() + if msg: + rxq.put_nowait(msg) + else: + sleep(1) -ACARS_HOST = getenv("ACARS_HOST", "acars_router") -VDLM2_HOST = getenv("VDLM2_HOST", "acars_router") -HFDL_HOST = getenv("HFDL_HOST", "acars_router") -SBS_HOST = getenv("SBS_HOST", "ultrafeeder") +def tx_thread(host, txq): + prctl.set_name(f"tx {host[0]}:{host[1]}") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(host) + print(f"Connected to SBS output at {host[0]}:{host[1]}") + while True: + msg = txq.get() + sock.sendall(msg.encode(enc)) -ACARS_PORT = int(getenv("ACARS_PORT", 15550)) -VDLM2_PORT = int(getenv("VDLM2_PORT", 15555)) -HFDL_PORT = int(getenv("HFDL_PORT", 15556)) -SBS_PORT = int(getenv("SBS_PORT", 12000)) +# wrapper to catch exceptions and restart threads +def thread_wrapper(func, *args): + slp = 10 + while True: + try: + print(f"[{current_thread().name}] starting thread") + func(*args) + except ConnectionRefusedError: + print(f"[{current_thread().name}] connection refused; restarting thread in {slp} seconds") + except StopIteration: + print(f"[{current_thread().name}] lost connection; restarting thread in {slp} seconds") + except BaseException as exc: + print(traceback.format_exc()) + print(f"[{current_thread().name}] exception {type(exc).__name__}; restarting thread in {slp} seconds") + else: + print(f"[{current_thread().name}] thread function returned; restarting thread in {slp} seconds") + sleep(slp) -acarssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -acarssock.connect((ACARS_HOST, ACARS_PORT)) -acarsgen = readlines_nb(acarssock) +json_in = getenv("JSON_IN", "acars_router:15550") +json_in = json_in.split(";") +json_in = [x.split(":") for x in json_in] +json_in = [(x,int(y)) for x,y in json_in] -vdlm2sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -vdlm2sock.connect((VDLM2_HOST, VDLM2_PORT)) -vdlm2gen = readlines_nb(vdlm2sock) +sbs_out = getenv("SBS_OUT", "ultrafeeder:12000") +sbs_out = sbs_out.split(";") +sbs_out = [x.split(":") for x in sbs_out] +sbs_out = [(x,int(y)) for x,y in sbs_out] -hfdlsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -hfdlsock.connect((HFDL_HOST, HFDL_PORT)) -hfdlgen = readlines_nb(hfdlsock) +rxq = SimpleQueue() +for i,j in enumerate(json_in): + Thread(name=f"rx {j[0]}:{j[1]}", target=thread_wrapper, args=(rx_thread, j, rxq)).start() -outsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -outsock.connect((SBS_HOST, SBS_PORT)) +txqs = [] +for i,s in enumerate(sbs_out): + txqs.append(SimpleQueue()) + Thread(name=f"tx {s[0]}:{s[1]}", target=thread_wrapper, args=(tx_thread, s, txqs[-1])).start() -acdb = {} -regdb = {} while True: try: - sbs = {} - - raw = next(acarsgen).strip() - if raw: - sbs["type"] = "acars" - else: - raw = next(vdlm2gen).strip() - if raw: - sbs["type"] = "vdlm2" - else: - raw = next(hfdlgen).strip() - if raw: - sbs["type"] = "hfdl" - else: - sleep(1) - continue + raw = rxq.get() data = loads(raw) - if sbs["type"] == "vdlm2": - data = data["vdl2"] - elif sbs["type"] == "hfdl": - data = loads(raw)["hfdl"] - del data["app"] -# pprint(data) - - if sbs["type"] == "acars": - if data.get("text") is None or data["label"] == "SQ": - continue - else: - sbs["reg"] = data["tail"] - sbs["time"] = data["timestamp"] - sbs["flight"] = data.get("flight", "") - sbs["txt"] = data["text"] - sbs["msgtype"] = data["label"] - elif sbs["type"] == "vdlm2": - if data.get("avlc") is None or data["avlc"].get("acars") is None or data["avlc"]["acars"].get("msg_text") is None: - continue - sbs["reg"] = data["avlc"]["acars"]["reg"] - sbs["time"] = data["t"]["sec"] + data["t"]["usec"]/1e6 - sbs["flight"] = data["avlc"]["acars"]["flight"] - sbs["txt"] = data["avlc"]["acars"]["msg_text"] - sbs["msgtype"] = data["avlc"]["acars"]["label"] - elif sbs["type"] == "hfdl": - if not data.get("lpdu"): - if not data.get("spdu"): - pprint(data, stream=stderr) - print("hfdl bad top level key", file=stderr) - continue -# if not data["lpdu"].get("hfnpdu"): -# pprint(data, stream=stderr) -# print("hfdl no keys", file=stderr) -# continue - sbs["time"] = data["t"]["sec"] + data["t"]["usec"]/1e6 - sbs["flight"] = data["lpdu"].get("hfnpdu", {}).get("flight_id", "") - try: - sbs["lat"] = data["lpdu"]["hfnpdu"]["pos"]["lat"] - sbs["lon"] = data["lpdu"]["hfnpdu"]["pos"]["lon"] - if sbs["lat"] == 180 and sbs["lon"] == 180: - continue - except: - pass - if data["lpdu"].get("hfnpdu", {}).get("acars"): - sbs["reg"] = data["lpdu"]["hfnpdu"]["acars"]["reg"] - sbs["flight"] = data["lpdu"]["hfnpdu"]["acars"].get("flight", "") - sbs["txt"] = data["lpdu"]["hfnpdu"]["acars"]["msg_text"] - sbs["msgtype"] = data["lpdu"]["hfnpdu"]["acars"]["label"] - - sbs["id"] = data["lpdu"]["src"]["id"] - sbs["msgtype"] = data["lpdu"]["type"]["id"] - if sbs["msgtype"] == 191 or sbs["msgtype"] == 79: -# pprint(data["lpdu"]["ac_info"], stream=stderr) - sbs["icao"] = data["lpdu"]["ac_info"].get("icao", "") - sbs["reg"] = data["lpdu"]["ac_info"].get("regnr", "") - regdb[sbs["flight"]] = {"icao": sbs["icao"], "reg": sbs["reg"]} - print(f"hfdl logon req/res {sbs['flight']} {regdb[sbs['flight']]}", file=stderr) - elif sbs["msgtype"] == 159: -# pprint(data["lpdu"]["ac_info"], stream=stderr) - sbs["id"] = data["lpdu"]["assigned_ac_id"] - sbs["icao"] = data["lpdu"]["ac_info"].get("icao", "") - sbs["reg"] = data["lpdu"]["ac_info"].get("regnr", "") - acdb[sbs["id"]] = {"icao": sbs["icao"], "reg": sbs["reg"], "flight": sbs["flight"]} - print(f"hfdl logon confirm {sbs['id']} {acdb[sbs['id']]}", file=stderr) - else: - if acdb.get(sbs["id"]): -# print(f"hfdl ac in db: {sbs['id']}", file=stderr) - sbs["reg"] = acdb[sbs["id"]]["reg"] - sbs["icao"] = acdb[sbs["id"]]["icao"] - elif regdb.get(sbs["flight"]): -# print(f"hfdl ac in reg db: {sbs['flight']}", file=stderr) - sbs["reg"] = regdb[sbs["flight"]]["reg"] - sbs["icao"] = regdb[sbs["flight"]]["icao"] - else: -# pprint(data, stream=stderr) -# print(f"hfdl ac not in db: {sbs['id']} {sbs['flight']}", file=stderr) - continue - else: + sbs = AD.decode(data) + if sbs is None: continue if not sbs.get("txt") and not sbs.get("lat"): @@ -185,12 +98,15 @@ def readlines_nb(f): lat = sbs["lat"] lon = sbs["lon"] else: + rgx1 = r"([NS][\s\d\.]{4,15},?\s*/?[WE][\s\d\.]{4,15})" + rgx2 = r"([\s\d\.]{4,15}[NS],?\s*/?[\s\d\.]{4,15}[WE])" # pos = findall("[NS]\s*\d+\.?\d*\s*,?\s*[WE]\s*\d+\.?\d*", sbs["txt"]) - pos1 = findall("/?[N]?[\s\d\.]{4,15},?\s*/?[W-][\s\d\.]{4,15}", sbs["txt"]) + pos1 = findall(rgx1, sbs["txt"]) + pos1b = findall(rgx2, sbs["txt"]) pos2a = findall("LAT", sbs["txt"]) pos2b = findall("LON", sbs["txt"]) if (len(pos1) == 1): - txt = sub(r'(/?[N]?[\s\d\.]{4,15},?\s*/?[W-][\s\d\.]{4,15})', Fore.RED + r'\1' + Fore.RESET, sbs["txt"]) + txt = sub(rgx1, Fore.RED + r'\1' + Fore.RESET, sbs["txt"]) pos = pos1[0] pos = sub(r'/', '', pos) @@ -201,17 +117,45 @@ def readlines_nb(f): issouth = "S" in pos iswest = "W" in pos + isnorth = "N" in pos + iseast = "E" in pos + + if not(isnorth or issouth) and not(iswest or iseast): + continue + + print(txt, file=stderr) + + pos = split(r'[WE]', pos[1:]) - if "N" in pos or "S" in pos: - if "W" not in pos and "E" not in pos: - continue + lat = pos[0].lstrip("0") + lon = pos[1].lstrip("0")[:len(lat)] + + if lat and lon: + lat = int(lat)/pow(10, len(lat)-2) * (-1 if issouth else 1) + lon = int(lon)/pow(10, len(lon)-2) * (-1 if iswest else 1) else: - if "W" in pos or "E" in pos: - continue + continue + elif len(pos1b) == 1: + txt = sub(rgx2, Fore.RED + r'\1' + Fore.RESET, sbs["txt"]) + + pos = pos1b[0] + pos = sub(r'/', '', pos) + pos = sub(r'\s', '', pos) + pos = sub(r',', '', pos) + pos = sub(r'\.', '', pos) + pos = sub(r'-', 'W', pos) + + issouth = "S" in pos + iswest = "W" in pos + isnorth = "N" in pos + iseast = "E" in pos + + if not(isnorth or issouth) and not(iswest or iseast): + continue print(txt, file=stderr) - pos = split(r'[WE-]', pos[1:]) + pos = split(r'[NS]', pos[1:]) lat = pos[0].lstrip("0") lon = pos[1].lstrip("0")[:len(lat)] @@ -255,31 +199,8 @@ def readlines_nb(f): out = f'MSG,3,1,1,{sbs["icao"].upper()},1,{datetime.fromtimestamp(sbs["time"]):%Y/%m/%d,%T.%f},{datetime.now():%Y/%m/%d,%T.%f},{sbs["flight"]},,,,{lat},{lon},,{squawk},,,,\n' print(f'{Fore.BLUE}{out}{Fore.RESET}', file=stderr) - outsock.sendall(out.encode(enc)) - except KeyboardInterrupt: - print("Got ctrl-c, closing", file=stderr) - acarssock.close() - vdlm2sock.close() - hfdlsock.close() - outsock.close() - break - except BrokenPipeError: - print("Reconnecting", file=stderr) - acarssock.close() - vdlm2sock.close() - hfdlsock.close() - outsock.close() - acarssock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - acarssock.connect((ACARS_HOST, ACARS_PORT)) - acarsgen = readlines_nb(acarssock) - vdlm2sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - vdlm2sock.connect((VDLM2_HOST, VDLM2_PORT)) - vdlm2gen = readlines_nb(vdlm2sock) - hfdlsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - hfdlsock.connect((HFDL_HOST, HFDL_PORT)) - hfdlgen = readlines_nb(hfdlsock) - outsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - outsock.connect((SBS_HOST, SBS_PORT)) + for q in txqs: + q.put(out) except BaseException: print("Other exception:", file=stderr) pprint(data, stream=stderr) diff --git a/rootfs/scripts/acars_decode/Decoder.py b/rootfs/scripts/acars_decode/Decoder.py new file mode 100644 index 0000000..a48be06 --- /dev/null +++ b/rootfs/scripts/acars_decode/Decoder.py @@ -0,0 +1,91 @@ +def decode(msg): + if msg.get("vdl2"): + dat = decodeVDLM2(msg["vdl2"]) + elif msg.get("hfdl"): + dat = decodeHFDL(msg["hfdl"]) + else: + dat = decodeACARS(msg) + return dat + +def decodeACARS(msg): + if msg.get("text") is None or msg["label"] == "SQ": + return None + dat = {} + dat["type"] = "acars" + dat["reg"] = msg["tail"] + dat["time"] = msg["timestamp"] + dat["flight"] = msg.get("flight", "") + dat["txt"] = msg["text"] + dat["msgtype"] = msg["label"] + return dat + +def decodeVDLM2(msg): + if msg.get("avlc") is None or msg["avlc"].get("acars") is None or msg["avlc"]["acars"].get("msg_text") is None: + return None + dat = {} + dat["type"] = "vdlm2" + dat["reg"] = msg["avlc"]["acars"]["reg"] + dat["time"] = msg["t"]["sec"] + msg["t"]["usec"]/1e6 + dat["flight"] = msg["avlc"]["acars"]["flight"] + dat["txt"] = msg["avlc"]["acars"]["msg_text"] + dat["msgtype"] = msg["avlc"]["acars"]["label"] + return dat + +acdb = {} +regdb = {} +def decodeHFDL(msg): + if not msg.get("lpdu"): + if not msg.get("spdu"): + pprint(msg) + print("hfdl bad top level key") + return None +# if not msg["lpdu"].get("hfnpdu"): +# pprint(msg) +# print("hfdl no keys") +# return None + dat = {} + dat["type"] = "hfdl" + dat["time"] = msg["t"]["sec"] + msg["t"]["usec"]/1e6 + dat["flight"] = msg["lpdu"].get("hfnpdu", {}).get("flight_id", "") + try: + dat["lat"] = msg["lpdu"]["hfnpdu"]["pos"]["lat"] + dat["lon"] = msg["lpdu"]["hfnpdu"]["pos"]["lon"] + if dat["lat"] == 180 and dat["lon"] == 180: + return None + except: + pass + if msg["lpdu"].get("hfnpdu", {}).get("acars"): + dat["reg"] = msg["lpdu"]["hfnpdu"]["acars"]["reg"] + dat["flight"] = msg["lpdu"]["hfnpdu"]["acars"].get("flight", "") + dat["txt"] = msg["lpdu"]["hfnpdu"]["acars"]["msg_text"] + dat["msgtype"] = msg["lpdu"]["hfnpdu"]["acars"]["label"] + + dat["id"] = msg["lpdu"]["src"]["id"] + dat["msgtype"] = msg["lpdu"]["type"]["id"] + if dat["msgtype"] == 191 or dat["msgtype"] == 79: +# pprint(msg["lpdu"]["ac_info"]) + dat["icao"] = msg["lpdu"]["ac_info"].get("icao", "") + dat["reg"] = msg["lpdu"]["ac_info"].get("regnr", "") + regdb[dat["flight"]] = {"icao": dat["icao"], "reg": dat["reg"]} + print(f"hfdl logon req/res {dat['flight']} {regdb[dat['flight']]}") + elif dat["msgtype"] == 159: +# pprint(msg["lpdu"]["ac_info"]) + dat["id"] = msg["lpdu"]["assigned_ac_id"] + dat["icao"] = msg["lpdu"]["ac_info"].get("icao", "") + dat["reg"] = msg["lpdu"]["ac_info"].get("regnr", "") + acdb[dat["id"]] = {"icao": dat["icao"], "reg": dat["reg"], "flight": dat["flight"]} + print(f"hfdl logon confirm {dat['id']} {acdb[dat['id']]}") + else: + if acdb.get(dat["id"]): +# print(f"hfdl ac in db: {dat['id']}") + dat["reg"] = acdb[dat["id"]]["reg"] + dat["icao"] = acdb[dat["id"]]["icao"] + elif regdb.get(dat["flight"]): +# print(f"hfdl ac in reg db: {dat['flight']}") + dat["reg"] = regdb[dat["flight"]]["reg"] + dat["icao"] = regdb[dat["flight"]]["icao"] + else: +# pprint(msg) +# print(f"hfdl ac not in db: {dat['id']} {dat['flight']}") + return None + return dat diff --git a/rootfs/scripts/acars_decode/__pycache__/AcarsDecoder.cpython-311.pyc b/rootfs/scripts/acars_decode/__pycache__/AcarsDecoder.cpython-311.pyc new file mode 100644 index 0000000..29859bb Binary files /dev/null and b/rootfs/scripts/acars_decode/__pycache__/AcarsDecoder.cpython-311.pyc differ diff --git a/rootfs/scripts/util.py b/rootfs/scripts/util.py new file mode 100644 index 0000000..22a2334 --- /dev/null +++ b/rootfs/scripts/util.py @@ -0,0 +1,36 @@ +import fcntl +import locale +import os + +enc = locale.getpreferredencoding(False) + +def sock2lines(f): + fd = f.fileno() + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + buf = bytearray() + while True: + try: + block = os.read(fd, 8192) + except BlockingIOError: + yield "" + continue + if not block: + if buf: + yield buf.decode(enc) + buf.clear() + break + buf.extend(block) + while True: + r = buf.find(b'\r') + n = buf.find(b'\n') + if r == -1 and n == -1: break + if r == -1 or r > n: + yield buf[:(n+1)].decode(enc) + buf = buf[(n+1):] + elif n == -1 or n > r: + yield buf[:r].decode(enc) + '\n' + if n == r+1: + buf = buf[(r+2):] + else: + buf = buf[(r+1):]