-
Notifications
You must be signed in to change notification settings - Fork 1
/
message_processor.py
59 lines (52 loc) · 1.94 KB
/
message_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import time
from typing import Dict
from meshtastic.stream_interface import StreamInterface
class MessageProcessor:
URL_TIMEOUT = 10 # wait time for URL requests
NO_DATA_NOGPS = "No GPS data available"
ERROR_FETCHING_DATA = "error fetching data"
LATITUDE: float = 48.50
LONGITUDE: float = -123.0
node_list: Dict[str, Dict] = {}
def __init__(self, interface: StreamInterface):
self.interface = interface
self.trap_list = []
self.node_list = interface.nodes.values()
# print(f"System: Node List {self.node_list}")
self.myNodeNum = interface.getMyNodeInfo()["num"]
pass
def auto_response(
self, message, snr, rssi, hop, message_from_id, location: list[float]
):
# wait a 700ms to avoid message collision from lora-ack
time.sleep(0.7)
pass
def messageTrap(self, msg):
# Check if the message contains a trap word
message_list = msg.split(" ")
for m in message_list:
for t in self.trap_list:
if t.lower() == m.lower():
return True
return False
@staticmethod
def get_name_from_number(interface: StreamInterface, number, type="long"):
name = ""
for node in interface.nodes.values():
if number == node["num"]:
if type == "long":
name = node["user"]["longName"]
return name
elif type == "short":
name = node["user"]["shortName"]
return name
else:
pass
else:
name = str(
MessageProcessor.decimal_to_hex(number)
) # If long name not found, use the ID as string
return name
@staticmethod
def decimal_to_hex(decimal_number):
return f"!{decimal_number:08x}"