-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
507 additions
and
117 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
"""Nova PM SDS011 Reader module. | ||
Device: https://www.amazon.com/SDS011-Quality-Detection-Conditioning-Monitor/dp/B07FSDMRR5 | ||
Spec: https://cdn.sparkfun.com/assets/parts/1/2/2/7/5/Laser_Dust_Sensor_Control_Protocol_V1.3.pdf | ||
Spec: https://cdn-reichelt.de/documents/datenblatt/X200/SDS011-DATASHEET.pdf | ||
""" | ||
|
||
import serial | ||
from .responses import ( | ||
QueryReadResponse, | ||
ReportingModeReadResponse, | ||
SleepWakeReadResponse, | ||
DeviceIdResponse, | ||
CheckFirmwareReadResponse, | ||
WorkingPeriodReadResponse, | ||
) | ||
from . import constants as con | ||
from .exceptions import IncompleteReadException, InvalidDeviceIdException | ||
|
||
|
||
class NovaPmReader: | ||
"""NOVA PM SDS011 Reader.""" | ||
|
||
def __init__(self, ser_dev: serial.Serial): | ||
"""Create the device.""" | ||
self.ser = ser_dev | ||
|
||
def query(self, query_mode: bool = True) -> QueryReadResponse: | ||
"""Query the device for pollutant data.""" | ||
if query_mode: | ||
cmd = con.Commands.QUERY.value + (b"\x00" * 12) + con.ALL_SENSOR_ID | ||
self._send_command(cmd) | ||
return QueryReadResponse(self._read_response()) | ||
|
||
def get_reporting_mode(self) -> ReportingModeReadResponse: | ||
"""Get the current reporting mode of the device.""" | ||
cmd = ( | ||
con.Commands.SET_REPORTING_MODE.value | ||
+ con.ReportingMode.QUERY.value | ||
+ con.ReportingState.ACTIVE.value | ||
+ (b"\x00" * 10) | ||
+ con.ALL_SENSOR_ID | ||
) | ||
self._send_command(cmd) | ||
return ReportingModeReadResponse(self._read_response()) | ||
|
||
def set_reporting_mode(self, reporting_mode: con.ReportingState) -> ReportingModeReadResponse: | ||
"""Set the reporting mode, either ACTIVE or QUERYING. | ||
ACTIVE mode means the device will always return a Query command response when data is asked for, regardless of | ||
what command was sent. | ||
QUERYING mode means the device will only return responses to submitted commands, even for Query commands. | ||
ACTIVE mode is the factory default, but generally, QUERYING mode is preferrable for the longevity of the device. | ||
""" | ||
cmd = ( | ||
con.Commands.SET_REPORTING_MODE.value | ||
+ con.ReportingMode.SET_MODE.value | ||
+ reporting_mode.value | ||
+ (b"\x00" * 10) | ||
+ con.ALL_SENSOR_ID | ||
) | ||
self._send_command(cmd) | ||
# Switching between reporting modes is finicky; resetting the serial connection seems to address issues. | ||
self.ser.close() | ||
self.ser.open() | ||
return ReportingModeReadResponse(self._read_response()) | ||
|
||
def get_sleep_state(self) -> SleepWakeReadResponse: | ||
"""Get the current sleep state.""" | ||
cmd = ( | ||
con.Commands.SET_SLEEP.value | ||
+ con.SleepMode.QUERY.value | ||
+ con.SleepState.WORK.value | ||
+ (b"\x00" * 10) | ||
+ con.ALL_SENSOR_ID | ||
) | ||
self._send_command(cmd) | ||
return SleepWakeReadResponse(self._read_response()) | ||
|
||
def set_sleep_state(self, sleep_state: con.SleepState) -> SleepWakeReadResponse: | ||
"""Set the sleep state, either wake or sleep.""" | ||
cmd = ( | ||
con.Commands.SET_SLEEP.value | ||
+ con.SleepMode.SET_MODE.value | ||
+ sleep_state.value | ||
+ (b"\x00" * 10) | ||
+ con.ALL_SENSOR_ID | ||
) | ||
self._send_command(cmd) | ||
return SleepWakeReadResponse(self._read_response()) | ||
|
||
def sleep(self) -> SleepWakeReadResponse: | ||
"""Put the device to sleep, turning off fan and diode.""" | ||
return self.set_sleep_state(con.SleepState.SLEEP) | ||
|
||
def wake(self) -> SleepWakeReadResponse: | ||
"""Wake the device up to start reading.""" | ||
return self.set_sleep_state(con.SleepState.WORK) | ||
|
||
def set_device_id(self, device_id: bytes, target_device_id: bytes = con.ALL_SENSOR_ID) -> DeviceIdResponse: | ||
"""Set the device ID.""" | ||
if len(device_id) != 2 or len(target_device_id) != 2: | ||
raise AttributeError(f"Device ID must be 4 bytes, found {len(device_id)}, and {len(target_device_id)}") | ||
cmd = con.Commands.SET_DEVICE_ID.value + (b"\x00" * 10) + device_id + target_device_id | ||
self._send_command(cmd) | ||
try: | ||
return DeviceIdResponse(self._read_response()) | ||
except IncompleteReadException: | ||
raise InvalidDeviceIdException(f"Unable to find device ID of {target_device_id!s}") | ||
|
||
def get_working_period(self) -> WorkingPeriodReadResponse: | ||
"""Retrieve the current working period for the device.""" | ||
cmd = ( | ||
con.Commands.SET_WORKING_PERIOD.value | ||
+ con.WorkingPeriodMode.QUERY.value | ||
+ (b"\x00" * 11) | ||
+ con.ALL_SENSOR_ID | ||
) | ||
self._send_command(cmd) | ||
return WorkingPeriodReadResponse(self._read_response()) | ||
|
||
def set_working_period(self, working_period: int) -> WorkingPeriodReadResponse: | ||
"""Set the working period for the device. | ||
Working period must be between 0 and 30. | ||
0 means the device will read continuously. | ||
Any value 1-30 means the device will wake and read for 30 seconds every n*60-30 seconds. | ||
""" | ||
if 0 >= working_period >= 30: | ||
raise AttributeError("Working period must be between 0 and 30") | ||
cmd = ( | ||
con.Commands.SET_WORKING_PERIOD.value | ||
+ con.WorkingPeriodMode.SET_MODE.value | ||
+ bytes([working_period]) | ||
+ (b"\x00" * 10) | ||
+ con.ALL_SENSOR_ID | ||
) | ||
self._send_command(cmd) | ||
return WorkingPeriodReadResponse(self._read_response()) | ||
|
||
def get_firmware_version(self) -> CheckFirmwareReadResponse: | ||
"""Retrieve the firmware version from the device.""" | ||
cmd = con.Commands.CHECK_FIRMWARE_VERSION.value + (b"\x00" * 12) + con.ALL_SENSOR_ID | ||
self._send_command(cmd) | ||
return CheckFirmwareReadResponse(self._read_response()) | ||
|
||
def _send_command(self, cmd: bytes): | ||
"""Send a command to the device as bytes.""" | ||
head = con.HEAD + con.SUBMIT_TYPE | ||
full_command = head + cmd + bytes([self._cmd_checksum(cmd)]) + con.TAIL | ||
if len(full_command) != 19: | ||
raise Exception(f"Command length must be 19, but was {len(full_command)}") | ||
self.ser.write(full_command) | ||
|
||
def _read_response(self) -> bytes: | ||
"""Read a response from the device.""" | ||
result = self.ser.read(10) | ||
if len(result) != 10: | ||
raise IncompleteReadException(len(result)) | ||
return result | ||
|
||
def _cmd_checksum(self, data: bytes) -> int: | ||
"""Generate a checksum for the data bytes of a command.""" | ||
if len(data) != 15: | ||
raise AttributeError("Invalid checksum length.") | ||
return sum(d for d in data) % 256 |
Oops, something went wrong.