-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpmsx003.py
205 lines (186 loc) · 8.42 KB
/
pmsx003.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import serial
import atexit
import time
import argparse
class PMSX003(object):
"""
How the sensor works:
- On boot, the sensor wakes up and is in active mode, fan is running and keeps reporting data.
- When in sleep status, the fan stops running and the sensor doesn't report any data.
- You need to wake up the sensor to get data no matter in active or passive mode.
- In active mode, it can be swapped to passive mode or vice versa.
- In passive mode, the fan is running, but sensor doesn't report any data.
- In passive mode, a query_once_in_passive is required to generate one sensor data report.
- When turning into active mode, a 42 4D 00 04 E1 01 01 75 should be returned from the sensor.
- When turning into passive mode, a 42 4D 00 04 E1 00 01 74 should be returned from the sensor.
- When turning into sleep status, a 42 4D 00 04 E4 00 01 77 should be returned from the sensor.
- When turning into wake up status, nothing returns from sensor, it only keeps reporting 32 bytes data.
- [IMPORTANT] If there are sensor buffer that haven't been exausted before swapping to passive from active,
you need to exaust the buffer first (read multiple times) to get the response (8 bytes) generated by turning mode.
This only happens when the sensor is in active mode. Usually 3~5 reads can exaust the buffer (N x 32 bytes).
- I suggest using this sensor in passive mode.
In general, wake up the sensor first, then you can keep reading data and don't worry about other things.
But if you don't want the fan to rotate indefinitely, you should:
1. Wake up.
2. Go passive, get a success response (8 bytes).
3. Wait 30 seconds, let the fan rotate, and wait PM data to stablize.
4. Query once.
5. Read once -> this is the data we want.
6. Go sleep, get a success response (8 bytes).
"""
CMD_START = [0x42, 0x4d]
CMD_MODE_PASSIVE = [0xe1, 0x00, 0x00]
CMD_MODE_ACTIVE = [0xe1, 0x00, 0x01]
CMD_QUERY_ONCE = [0xe2, 0x00, 0x00]
CMD_SLEEP = [0xe4, 0x00, 0x00]
CMD_WAKEUP = [0xe4, 0x00, 0x01]
CRC_BYTES = 2
PM1_H, PM1_L = 10, 11
PM25_H, PM25_L = 12, 13
PM10_H, PM10_L = 14, 15
def __init__(self, port="/dev/ttyAMA0"):
self.ser = serial.Serial(port, 9600, timeout=3)
atexit.register(self.port_close)
time.sleep(0.1)
def port_close(self):
self.ser.close()
def nice_print(self, signal_str):
"""
E.g.:
self.nice_print("424d001c0009000b000f0009000b000f050401a6003b0006000400009700027d") =>
42 4D 00 1C 00 09 00 0B 00 0F 00 09 00 0B 00 0F 05 04 01 A6 00 3B 00 06 00 04 00 00 97 00 02 7D
"""
for index, char in enumerate(signal_str):
if index % 2 == 0:
print(char.upper(), end="")
else:
print(char.upper(), end=" ")
print("")
def read_response(self):
"""
Read a response from PMSX003, first read the 0x42 0x4D, this is the format of this sensor
then comes two length bytes, indicating how many more bytes (N bytes) are there to be read in this response,
next read the indicated (N-2) bytes, -2 because the last two bytes are CRC checksum,
at last read the 2 CRC checksum bytes, then run the checksum to ensure this response is clean.
The response might be 32 bytes (a PM result), or sometimes 8 bytes (a command response, e.g when going to sleep).
"""
resp_start = self.ser.read(2) # [0x42, 0x4D]
frame_len = self.ser.read(2) # Frame Length High and Frame Length Low, eg. [0x00, 0x1C] or [0x00, 0x04]
data = self.ser.read(frame_len[0] * 256 + frame_len[1] - self.CRC_BYTES)
crc = self.ser.read(self.CRC_BYTES) # [CRC_H, CRC_L]
resp = resp_start + frame_len + data + crc # [0x42, 0x4D, 0x00, 0x1C, ...data..., CRC_H, CRC_L]
if self.checksum(resp_start + frame_len + data) != [crc[0], crc[1]]:
raise ChecksumError("Checksum not correct for: {}".format(resp))
else:
# self.nice_print(resp.hex())
return resp
def read_pm(self):
"""
Get the PM1, PM25, PM10 value.
"""
self.query_once_in_passive()
resp = self.read_response()
return self.parse_response(resp)
def go_wakeup(self):
"""
Set the fan to start. Set the sensor to active mode.
The sensor needs roughly 3 seconds to swap to wake up mode.
"""
cmd = self.build_cmd(self.CMD_WAKEUP)
self.ser.write(cmd)
time.sleep(2.5)
def go_sleep(self):
"""
Set the fan to stop.
Please read_response() once after calling this command, the PMSX003 responds once after this command.
The sensor needs roughly 3 seconds to swap to sleep mode.
The sensor should responds with: 42 4D 00 04 E4 00 01 77.
Note: if it's in active mode, you should read multiple times to exaust the buffer to get this response.
"""
cmd = self.build_cmd(self.CMD_SLEEP)
self.ser.write(cmd)
time.sleep(4)
self.read_response()
def go_passive(self):
"""
When in passive mode, sensor stops reporting data.
The sensor needs roughly 4 seconds to go from active mode to passive.
Sensor reports once on every request, i.e. on every `query_once_in_passive()`.
The sensor should responds with 42 4D 00 04 E1 00 01 74.
"""
cmd = self.build_cmd(self.CMD_MODE_PASSIVE)
self.ser.write(cmd)
time.sleep(5) # give sensor some seconds to swap mode.
self.read_response()
def go_active(self):
"""
Default mode when booted. Sensor continously report data.
The sensor should responds with: 42 4D 00 04 E1 01 01 75.
"""
cmd = self.build_cmd(self.CMD_MODE_ACTIVE)
self.ser.write(cmd)
time.sleep(5) # give sensor some seconds to swap mode.
self.read_response()
def query_once_in_passive(self):
"""
Sending this CMD to sensor to let sensor respond once in passive mode.
First invoke this method, then read_pm() to get the data the sensor reports.
"""
cmd = self.build_cmd(self.CMD_QUERY_ONCE)
self.ser.write(cmd)
time.sleep(0.1)
def build_cmd(self, cmd):
"""
E.g.: We want to go passive mode, then:
CMD_START = 0x42 0x4d
CMD_MODE_PASSIVE = 0xe1 0x00 0x00
CRC = sum(bytearray([0x42, 0x4d, 0xe1, 0x00, 0x00])) => 368
CRC_High = 368 // 256 = 1 => 0x01
CRC_Low = 368 % 256 = 112 => 0x70
=> [0x42, 0x4d, 0xe1, 0x00, 0x00, 0x01, 0x70]
"""
raw_cmd = self.CMD_START + cmd
crc = self.checksum(raw_cmd)
cmd = bytearray(raw_cmd + crc)
return cmd
def checksum(self, cmd):
"""
For each command, the last two bytes are checksum.
Use this function to calculate the checksum bytes.
When sending cmd to sensor, you need this function to build the full command.
When receiving a response, use this to calculate if transmission is faulty.
"""
summed = sum(bytearray(cmd))
crc_h = summed // 0x100
crc_l = summed % 0x100
return [crc_h, crc_l]
def parse_response(self, resp):
"""
Sensor responds with 32 bytes, these are the bytes we care about.
Two bytes form one value according to datasheet, high bytes needs to multiply 0x100 (256).
"""
pm1 = resp[self.PM1_H] * 256 + resp[self.PM1_L]
pm25 = resp[self.PM25_H] * 256 + resp[self.PM25_L]
pm10 = resp[self.PM10_H] * 256 + resp[self.PM10_L]
print("PM1: {}, PM2.5: {}, PM10: {}.".format(pm1, pm25, pm10))
return pm1, pm25, pm10
class ChecksumError(RuntimeError):
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='''
Read PM1, PM2.5 and PM10 for Plantower PMSX003 sensors.
E.g.:
python3 pmsx003.py /dev/ttyAMA0
''', formatter_class=argparse.RawDescriptionHelpFormatter,)
parser.add_argument('port', help='''[Required] Your serial port,
usually /dev/ttyUSB0 or /dev/ttyAMA0 or /dev/ttyS4, etc.''')
args = parser.parse_args()
sensor = PMSX003(port = args.port)
print("Waking up, going passive...")
sensor.go_wakeup()
sensor.go_passive()
print("Sleeping for 30 seconds to measure...")
time.sleep(3)
sensor.read_pm()
print("Going to sleep.")
sensor.go_sleep()