-
Notifications
You must be signed in to change notification settings - Fork 0
/
XBee.py
184 lines (150 loc) · 5.66 KB
/
XBee.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# code credit: https://github.com/niolabs/python-xbee
# has been slightly modified for my use
import serial
from collections import deque
class XBee():
RxBuff = bytearray()
RxMessages = deque()
def __init__(self, serialport, baudrate=9600):
self.serial = serial.Serial(port=serialport, baudrate=baudrate)
def Receive(self):
"""
Receives data from serial and checks buffer for potential messages.
Returns the next message in the queue if available.
"""
remaining = self.serial.inWaiting()
while remaining:
chunk = self.serial.read(remaining)
remaining -= len(chunk)
self.RxBuff.extend(chunk)
msgs = self.RxBuff.split(bytes(b'\x7E'))
received_msg = ""
for msg in msgs[:-1]:
received_msg = self.Validate(msg)
self.RxBuff = (bytearray() if self.Validate(msgs[-1]) else msgs[-1])
if self.RxMessages:
return self.RxMessages.popleft()
else:
return None
def Validate(self, msg):
"""
Parses a byte or bytearray object to verify the contents are a
properly formatted XBee message.
Inputs: An incoming XBee message
Outputs: True or False, indicating message validity
"""
# 9 bytes is Minimum length to be a valid Rx frame
# LSB, MSB, Type, Source Address(2), RSSI,
# Options, 1 byte data, checksum
if (len(msg) - msg.count(bytes(b'0x7D'))) < 9:
return False
# All bytes in message must be unescaped before validating content
frame = self.Unescape(msg)
LSB = frame[1]
# Frame (minus checksum) must contain at least length equal to LSB
if LSB > (len(frame[2:]) - 1):
return False
# Validate checksum
if (sum(frame[2:3+LSB]) & 0xFF) != 0xFF:
return False
# print("Rx: " + self.format(bytearray(b'\x7E') + msg))
partially_decoded = str(msg.decode("utf-8", errors="ignore"))
f = open("zigbee_messages.txt", "a")
f.write(str(partially_decoded[10]))
f.close()
print(partially_decoded[10])
self.RxMessages.append(frame)
# print(frame)
return True
def SendStr(self, msg):
"""
Inputs:
msg: A message, in string format, to be sent
addr: The 16 bit address of the destination XBee
(default: 0xFFFF broadcast)
options: Optional byte to specify transmission options
(default 0x01: disable acknowledge)
frameid: Optional frameid, only used if Tx status is desired
Returns:
Number of bytes sent
"""
return self.Send(msg.encode('utf-8'))
def Send(self, msg):
"""
Inputs:
msg: A message, in bytes or bytearray format, to be sent to an XBee
addr: The 16 bit address of the destination XBee
(default broadcast)
options: Optional byte to specify transmission options
(default 0x01: disable ACK)
frameod: Optional frameid, only used if transmit status is desired
Returns:
Number of bytes sent
"""
if not msg:
return 0
hexs = '7E 00 {:02X} 10 01 00 00 00 00 00 00 00 00 00 00 00 00'.format(
len(msg) + 14 # LSB (length)
)
frame = bytearray.fromhex(hexs)
# Append message content
frame.extend(msg)
# Calculate checksum byte
frame.append(0xFF - (sum(frame[3:]) & 0xFF))
# Escape any bytes containing reserved characters
frame = self.Escape(frame)
# print("Tx: " + self.format(frame))
return self.serial.write(frame)
def Unescape(self, msg):
"""
Helper function to unescaped an XBee API message.
Inputs:
msg: An byte or bytearray object containing a raw XBee message
minus the start delimeter
Outputs:
XBee message with original characters.
"""
if msg[-1] == 0x7D:
# Last byte indicates an escape, can't unescape that
return None
out = bytearray()
skip = False
for i in range(len(msg)):
if skip:
skip = False
continue
if msg[i] == 0x7D:
out.append(msg[i+1] ^ 0x20)
skip = True
else:
out.append(msg[i])
return out
def Escape(self, msg):
"""
Escapes reserved characters before an XBee message is sent.
Inputs:
msg: A bytes or bytearray object containing an original message to
be sent to an XBee
Outputs:
A bytearray object prepared to be sent to an XBee in API mode
"""
escaped = bytearray()
reserved = bytearray(b"\x7E\x7D\x11\x13")
escaped.append(msg[0])
for m in msg[1:]:
if m in reserved:
escaped.append(0x7D)
escaped.append(m ^ 0x20)
else:
escaped.append(m)
return escaped
def format(self, msg):
"""
Formats a byte or bytearray object into a more human readable string
where each bytes is represented by two ascii characters and a space
Input:
msg: A bytes or bytearray object
Output:
A string representation
"""
return " ".join("{:02x}".format(b) for b in msg)