From 866e6fa55f29dbbed5a9bd13b14665b4daa7d71b Mon Sep 17 00:00:00 2001 From: Tim Orme Date: Thu, 4 May 2023 20:10:06 -0700 Subject: [PATCH] Switch to use SDS011Lib instead --- aqimon/read/novapm.py | 4 +- aqimon/read/sds011/__init__.py | 316 --------------------- aqimon/read/sds011/constants.py | 67 ----- aqimon/read/sds011/exceptions.py | 55 ---- aqimon/read/sds011/responses.py | 122 --------- poetry.lock | 17 +- pyproject.toml | 1 + tests/read/sds011/__init__.py | 0 tests/read/sds011/mock_device_serial.py | 145 ---------- tests/read/sds011/test_sds011.py | 349 ------------------------ 10 files changed, 19 insertions(+), 1057 deletions(-) delete mode 100644 aqimon/read/sds011/__init__.py delete mode 100644 aqimon/read/sds011/constants.py delete mode 100644 aqimon/read/sds011/exceptions.py delete mode 100644 aqimon/read/sds011/responses.py delete mode 100644 tests/read/sds011/__init__.py delete mode 100644 tests/read/sds011/mock_device_serial.py delete mode 100644 tests/read/sds011/test_sds011.py diff --git a/aqimon/read/novapm.py b/aqimon/read/novapm.py index c79f296..2abd286 100644 --- a/aqimon/read/novapm.py +++ b/aqimon/read/novapm.py @@ -7,7 +7,7 @@ from . import AqiRead, ReaderState, ReaderStatus import serial from typing import Union -from .sds011 import QueryModeReader +from sds011lib import SDS011QueryReader from statistics import mean @@ -26,7 +26,7 @@ def __init__( if isinstance(ser_dev, str): ser_dev = serial.Serial(ser_dev, timeout=2) - self.reader = QueryModeReader(ser_dev=ser_dev, send_command_sleep=command_wait_time) + self.reader = SDS011QueryReader(ser_dev=ser_dev, send_command_sleep=command_wait_time) # Initial the reader to be in the mode we want. self.reader.wake() diff --git a/aqimon/read/sds011/__init__.py b/aqimon/read/sds011/__init__.py deleted file mode 100644 index 37314d9..0000000 --- a/aqimon/read/sds011/__init__.py +++ /dev/null @@ -1,316 +0,0 @@ -"""Nova PM SDS011 Reader module. - -Device: https://www.amazon.com/SDS011-Quality-Detection-Conditioning-Monitor/dp/B07FSDMRR5 - -Spec: https://cdn.sparkfun.com/assets/parts/1/2/2/7/5/Laser_Dust_Sensor_Control_Protocol_V1.3.pdf -Spec: https://cdn-reichelt.de/documents/datenblatt/X200/SDS011-DATASHEET.pdf -""" - -import serial -import time -from .responses import ( - QueryReadResponse, - ReportingModeReadResponse, - SleepWakeReadResponse, - DeviceIdResponse, - CheckFirmwareReadResponse, - WorkingPeriodReadResponse, -) -from . import constants as con -from .exceptions import IncompleteReadException, IncorrectCommandException, IncorrectCommandCodeException - - -class NovaPmReader: - """NOVA PM SDS011 Reader.""" - - def __init__(self, ser_dev: serial.Serial, send_command_sleep: int = 1): - """Create the device.""" - self.ser = ser_dev - self.send_command_sleep = send_command_sleep - - def request_data(self) -> None: - """Request device to return pollutant data.""" - cmd = con.Command.QUERY.value + (b"\x00" * 12) + con.ALL_SENSOR_ID - self._send_command(cmd) - - def query_data(self) -> QueryReadResponse: - """Query the device for pollutant data.""" - return QueryReadResponse(self._read_response()) - - def request_reporting_mode(self) -> None: - """Request device to return the current reporting mode.""" - cmd = ( - con.Command.SET_REPORTING_MODE.value - + con.OperationType.QUERY.value - + con.ReportingMode.ACTIVE.value - + (b"\x00" * 10) - + con.ALL_SENSOR_ID - ) - self._send_command(cmd) - - def query_reporting_mode(self) -> ReportingModeReadResponse: - """Get the current reporting mode of the device.""" - return ReportingModeReadResponse(self._read_response()) - - def set_active_mode(self) -> None: - """Set the reporting mode to active.""" - self._set_reporting_mode(con.ReportingMode.ACTIVE) - try: - self.query_reporting_mode() - except IncorrectCommandException: - pass - except IncompleteReadException: - pass - - def set_query_mode(self) -> None: - """Set the reporting mode to querying.""" - self._set_reporting_mode(con.ReportingMode.QUERYING) - try: - self.query_reporting_mode() - except IncorrectCommandException: - pass - except IncompleteReadException: - pass - except IncorrectCommandCodeException: - pass - - def _set_reporting_mode(self, reporting_mode: con.ReportingMode) -> None: - """Set the reporting mode, either ACTIVE or QUERYING. - - ACTIVE mode means the device will always return a Query command response when data is asked for, regardless of - what command was sent. - - QUERYING mode means the device will only return responses to submitted commands, even for Query commands. - - ACTIVE mode is the factory default, but generally, QUERYING mode is preferrable for the longevity of the device. - """ - cmd = ( - con.Command.SET_REPORTING_MODE.value - + con.OperationType.SET_MODE.value - + reporting_mode.value - + (b"\x00" * 10) - + con.ALL_SENSOR_ID - ) - self._send_command(cmd) - # Switching between reporting modes is finicky; resetting the serial connection seems to address issues. - self.ser.close() - self.ser.open() - - def request_sleep_state(self) -> None: - """Get the current sleep state.""" - cmd = con.Command.SET_SLEEP.value + con.OperationType.QUERY.value + b"\x00" + (b"\x00" * 10) + con.ALL_SENSOR_ID - self._send_command(cmd) - - def query_sleep_state(self) -> SleepWakeReadResponse: - """Get the current sleep state.""" - return SleepWakeReadResponse(self._read_response()) - - def set_sleep_state(self, sleep_state: con.SleepState) -> None: - """Set the sleep state, either wake or sleep.""" - cmd = ( - con.Command.SET_SLEEP.value - + con.OperationType.SET_MODE.value - + sleep_state.value - + (b"\x00" * 10) - + con.ALL_SENSOR_ID - ) - self._send_command(cmd) - - def sleep(self) -> None: - """Put the device to sleep, turning off fan and diode.""" - self.set_sleep_state(con.SleepState.SLEEP) - - def wake(self) -> None: - """Wake the device up to start reading.""" - self.set_sleep_state(con.SleepState.WORK) - - def safe_wake(self) -> None: - """Wake the device up, if you don't know what mode its in. - - This operates as a fire-and-forget, even in query mode. You shouldn't have to (and can't) query for a response - after this command. - """ - self.wake() - # If we were in query mode, this would flush out the response. If in active mode, this would be return read - # data, but we don't care. - self.ser.read(10) - - def set_device_id(self, device_id: bytes, target_device_id: bytes = con.ALL_SENSOR_ID) -> None: - """Set the device ID.""" - if len(device_id) != 2 or len(target_device_id) != 2: - raise AttributeError(f"Device ID must be 4 bytes, found {len(device_id)}, and {len(target_device_id)}") - cmd = con.Command.SET_DEVICE_ID.value + (b"\x00" * 10) + device_id + target_device_id - self._send_command(cmd) - - def query_device_id(self) -> DeviceIdResponse: - """Set the device ID.""" - return DeviceIdResponse(self._read_response()) - - def request_working_period(self) -> None: - """Retrieve the current working period for the device.""" - cmd = con.Command.SET_WORKING_PERIOD.value + con.OperationType.QUERY.value + (b"\x00" * 11) + con.ALL_SENSOR_ID - self._send_command(cmd) - - def query_working_period(self) -> WorkingPeriodReadResponse: - """Retrieve the current working period for the device.""" - return WorkingPeriodReadResponse(self._read_response()) - - def set_working_period(self, working_period: int) -> None: - """Set the working period for the device. - - Working period must be between 0 and 30. - - 0 means the device will read continuously. - Any value 1-30 means the device will wake and read for 30 seconds every n*60-30 seconds. - """ - if 0 >= working_period >= 30: - raise AttributeError("Working period must be between 0 and 30") - cmd = ( - con.Command.SET_WORKING_PERIOD.value - + con.OperationType.SET_MODE.value - + bytes([working_period]) - + (b"\x00" * 10) - + con.ALL_SENSOR_ID - ) - self._send_command(cmd) - - def request_firmware_version(self) -> None: - """Retrieve the firmware version from the device.""" - cmd = con.Command.CHECK_FIRMWARE_VERSION.value + (b"\x00" * 12) + con.ALL_SENSOR_ID - self._send_command(cmd) - - def query_firmware_version(self) -> CheckFirmwareReadResponse: - """Retrieve the firmware version from the device.""" - return CheckFirmwareReadResponse(self._read_response()) - - def _send_command(self, cmd: bytes): - """Send a command to the device as bytes.""" - head = con.HEAD + con.SUBMIT_TYPE - full_command = head + cmd + bytes([self._cmd_checksum(cmd)]) + con.TAIL - if len(full_command) != 19: - raise Exception(f"Command length must be 19, but was {len(full_command)}") - self.ser.write(full_command) - time.sleep(self.send_command_sleep) - - def _read_response(self) -> bytes: - """Read a response from the device.""" - result = self.ser.read(10) - if len(result) != 10: - raise IncompleteReadException(len(result)) - return result - - def _cmd_checksum(self, data: bytes) -> int: - """Generate a checksum for the data bytes of a command.""" - if len(data) != 15: - raise AttributeError("Invalid checksum length.") - return sum(d for d in data) % 256 - - -class QueryModeReader: - """Reader working in query mode.""" - - def __init__(self, ser_dev: serial.Serial, send_command_sleep: int = 1): - """Create the device.""" - self.base_reader = NovaPmReader(ser_dev=ser_dev, send_command_sleep=send_command_sleep) - self.base_reader.safe_wake() - self.base_reader.set_query_mode() - - def query(self) -> QueryReadResponse: - """Query the device for pollutant data.""" - self.base_reader.request_data() - return self.base_reader.query_data() - - def get_reporting_mode(self) -> ReportingModeReadResponse: - """Get the current reporting mode of the device.""" - self.base_reader.request_reporting_mode() - return self.base_reader.query_reporting_mode() - - def get_sleep_state(self) -> SleepWakeReadResponse: - """Get the current sleep state.""" - self.base_reader.request_sleep_state() - return self.base_reader.query_sleep_state() - - def sleep(self) -> SleepWakeReadResponse: - """Put the device to sleep, turning off fan and diode.""" - self.base_reader.sleep() - return self.base_reader.query_sleep_state() - - def wake(self) -> SleepWakeReadResponse: - """Wake the device up to start reading.""" - self.base_reader.wake() - return self.base_reader.query_sleep_state() - - def set_device_id(self, device_id: bytes, target_device_id: bytes = con.ALL_SENSOR_ID) -> DeviceIdResponse: - """Set the device ID.""" - self.base_reader.set_device_id(device_id, target_device_id) - return self.base_reader.query_device_id() - - def get_working_period(self) -> WorkingPeriodReadResponse: - """Retrieve the current working period for the device.""" - self.base_reader.request_working_period() - return self.base_reader.query_working_period() - - def set_working_period(self, working_period: int) -> WorkingPeriodReadResponse: - """Set the working period for the device. - - Working period must be between 0 and 30. - - 0 means the device will read continuously. - Any value 1-30 means the device will wake and read for 30 seconds every n*60-30 seconds. - """ - self.base_reader.set_working_period(working_period) - return self.base_reader.query_working_period() - - def get_firmware_version(self) -> CheckFirmwareReadResponse: - """Retrieve the firmware version from the device.""" - self.base_reader.request_firmware_version() - return self.base_reader.query_firmware_version() - - -class ActiveModeReader: - """Active Mode Reader. - - Use with caution! Active mode is unpredictable. Query mode is much preferred. - """ - - def __init__(self, ser_dev: serial.Serial, send_command_sleep: int = 2): - """Create the device.""" - self.base_reader = NovaPmReader(ser_dev=ser_dev, send_command_sleep=send_command_sleep) - self.ser_dev = ser_dev - self.base_reader.safe_wake() - self.base_reader.set_active_mode() - - def query(self) -> QueryReadResponse: - """Query the device for pollutant data.""" - return self.base_reader.query_data() - - def sleep(self) -> None: - """Put the device to sleep, turning off fan and diode.""" - self.base_reader.sleep() - - # Sleep seems to behave very strangely in active mode. It continually outputs data for old commands for quite - # a while before eventually having nothing to report. This forces it to "drain" whatever it was doing before - # returning, but also feels quite dangerous. - while len(self.ser_dev.read(10)) == 10: - pass - - def wake(self) -> None: - """Wake the device up to start reading.""" - self.base_reader.wake() - self.ser_dev.read(10) - - def set_device_id(self, device_id: bytes, target_device_id: bytes = con.ALL_SENSOR_ID) -> None: - """Set the device ID.""" - self.base_reader.set_device_id(device_id, target_device_id) - self.ser_dev.read(10) - - def set_working_period(self, working_period: int) -> None: - """Set the working period for the device. - - Working period must be between 0 and 30. - - 0 means the device will read continuously. - Any value 1-30 means the device will wake and read for 30 seconds every n*60-30 seconds. - """ - self.base_reader.set_working_period(working_period) - self.ser_dev.read(10) diff --git a/aqimon/read/sds011/constants.py b/aqimon/read/sds011/constants.py deleted file mode 100644 index 482f573..0000000 --- a/aqimon/read/sds011/constants.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Byte constants for the SDS011 device.""" -from enum import Enum - - -# Message head constant -HEAD = b"\xaa" -# Message tail constant -TAIL = b"\xab" - -# ID to send if the command should be issued to all sensor IDs. -ALL_SENSOR_ID = b"\xff\xff" - -# The submit type -SUBMIT_TYPE = b"\xb4" - - -class ResponseType(Enum): - """Response types for commands. - - GENERAL_RESPONSE is for all commands except query. - QUERY_RESPONSE only applies to the query command. - """ - - GENERAL_RESPONSE = b"\xc5" - # Query command has its own response type. - QUERY_RESPONSE = b"\xc0" - - -class Command(Enum): - """Possible commands for the device.""" - - SET_REPORTING_MODE = b"\x02" - QUERY = b"\x04" - SET_DEVICE_ID = b"\x05" - SET_SLEEP = b"\x06" - SET_WORKING_PERIOD = b"\x08" - CHECK_FIRMWARE_VERSION = b"\x07" - - -class OperationType(Enum): - """Operation type for many commands. - - Many commands have two modes, one for setting a value, and another for retrieving. - """ - - QUERY = b"\x00" - SET_MODE = b"\x01" - - -class ReportingMode(Enum): - """Reporting mode for the device. - - ACTIVE mode means that the device is constantly returning read data from the device, and won't respond correctly - to other query requests. - - QUERYING mode means that the device won't return read data unless explicitly asked for it. - """ - - ACTIVE = b"\x00" - QUERYING = b"\x01" - - -class SleepState(Enum): - """State of the device, either working or sleeping.""" - - SLEEP = b"\x00" - WORK = b"\x01" diff --git a/aqimon/read/sds011/exceptions.py b/aqimon/read/sds011/exceptions.py deleted file mode 100644 index 51ef046..0000000 --- a/aqimon/read/sds011/exceptions.py +++ /dev/null @@ -1,55 +0,0 @@ -"""All exception classes for the SDS011.""" - - -class Sds011Exception(Exception): - """Base exception for SDS011 device.""" - - pass - - -class ChecksumFailedException(Sds011Exception): - """Thrown if the checksum value in a response is incorrect.""" - - def __init__(self, expected: int, actual: int): - """Create exception.""" - super().__init__() - self.expected = expected - self.actual = actual - - -class IncorrectCommandException(Sds011Exception): - """Thrown if the command ID in a response is incorrect.""" - - def __init__(self, expected: int, actual: int): - """Create exception.""" - super().__init__(f"Expected command {expected}, found {actual}") - self.expected = expected - self.actual = actual - - -class IncorrectCommandCodeException(Sds011Exception): - """Thrown if the command code in a response is incorrect.""" - - def __init__(self, expected: int, actual: int): - """Create exception.""" - super().__init__(f"Expected code {expected}, found {actual}") - self.expected = expected - self.actual = actual - - -class IncorrectWrapperException(Sds011Exception): - """Thrown if the wrapper of a response (either HEAD or TAIL) is incorrect.""" - - pass - - -class IncompleteReadException(Sds011Exception): - """Thrown if the device didn't return complete data when asking for a response.""" - - pass - - -class InvalidDeviceIdException(Sds011Exception): - """Thrown if the trying to set the device ID on an invalid device.""" - - pass diff --git a/aqimon/read/sds011/responses.py b/aqimon/read/sds011/responses.py deleted file mode 100644 index 4ae32a5..0000000 --- a/aqimon/read/sds011/responses.py +++ /dev/null @@ -1,122 +0,0 @@ -"""Response objects for SDS011. - -Creates and validates typed classes from binary responses from the device. -""" -from .constants import ( - HEAD, - TAIL, - Command, - ResponseType, - SleepState, - OperationType, - ReportingMode, -) -from .exceptions import ( - ChecksumFailedException, - IncorrectCommandException, - IncorrectCommandCodeException, - IncorrectWrapperException, - IncompleteReadException, -) - - -class ReadResponse: - """Generic read response object for responses from SDS011.""" - - def __init__(self, data: bytes, command_code: Command, response_type: ResponseType = ResponseType.GENERAL_RESPONSE): - """Create a read response.""" - if len(data) != 10: - raise IncompleteReadException() - - self.head = data[0:1] - self.cmd_id = data[1:2] - self.data = data[2:8] - self.device_id = data[6:8] - self.checksum: int = data[8] - self.tail = data[9:10] - self.expected_command_code = command_code - self.expected_response_type = response_type - # Check it! - self.verify() - - def verify(self): - """Verify the read data.""" - if self.head != HEAD: - raise IncorrectWrapperException() - if self.tail != TAIL: - raise IncorrectWrapperException() - if self.checksum != self.calc_checksum(): - raise ChecksumFailedException(expected=self.checksum, actual=self.calc_checksum()) - if self.cmd_id != self.expected_response_type.value: - raise IncorrectCommandException(expected=self.expected_response_type.value, actual=self.cmd_id) - - # Query responses don't validate the command code - if ( - self.expected_response_type != ResponseType.QUERY_RESPONSE - and bytes([self.data[0]]) != self.expected_command_code.value - ): - raise IncorrectCommandCodeException(expected=self.expected_command_code.value, actual=self.data[0]) - - def calc_checksum(self) -> int: - """Calculate the checksum for the read data.""" - return sum(d for d in self.data) % 256 - - -class QueryReadResponse(ReadResponse): - """Query read response.""" - - def __init__(self, data: bytes): - """Create a query read response.""" - super().__init__(data, command_code=Command.QUERY, response_type=ResponseType.QUERY_RESPONSE) - - self.pm25: float = int.from_bytes(data[2:4], byteorder="little") / 10 - self.pm10: float = int.from_bytes(data[4:6], byteorder="little") / 10 - - -class ReportingModeReadResponse(ReadResponse): - """Reporting mode response.""" - - def __init__(self, data: bytes): - """Create a reporting mode response.""" - super().__init__(data, command_code=Command.SET_REPORTING_MODE) - self.operation_type = OperationType(self.data[1:2]) - self.state = ReportingMode(self.data[2:3]) - - -class DeviceIdResponse(ReadResponse): - """Device ID response.""" - - def __init__(self, data: bytes): - """Create a device ID response.""" - super().__init__(data, command_code=Command.SET_DEVICE_ID) - - -class SleepWakeReadResponse(ReadResponse): - """Sleep/Wake Response.""" - - def __init__(self, data: bytes): - """Create a sleep/wake response.""" - super().__init__(data, command_code=Command.SET_SLEEP) - self.operation_type = OperationType(self.data[1:2]) - self.state = SleepState(self.data[2:3]) - - -class WorkingPeriodReadResponse(ReadResponse): - """Working period response.""" - - def __init__(self, data: bytes): - """Create a working period response.""" - super().__init__(data, command_code=Command.SET_WORKING_PERIOD) - self.operation_type = OperationType(self.data[1:2]) - self.interval: int = self.data[2] - - -class CheckFirmwareReadResponse(ReadResponse): - """Firmware response.""" - - def __init__(self, data: bytes): - """Create a firmware response.""" - super().__init__(data, command_code=Command.CHECK_FIRMWARE_VERSION) - self.year = self.data[1] - self.month = self.data[2] - self.day = self.data[3] diff --git a/poetry.lock b/poetry.lock index 7586865..44f766a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -552,6 +552,21 @@ files = [ {file = "ruff-0.0.256.tar.gz", hash = "sha256:f9a96b34a4870ee8cf2f3779cd7854620d1788a83b52374771266cf800541bb7"}, ] +[[package]] +name = "sds011lib" +version = "0.1.0" +description = "SDS011 Library" +category = "main" +optional = false +python-versions = ">=3.8,<4.0" +files = [ + {file = "sds011lib-0.1.0-py3-none-any.whl", hash = "sha256:090ab7bfcfe501cf9550078466c5d7677addbaeb404208052766ff8aac2565c4"}, + {file = "sds011lib-0.1.0.tar.gz", hash = "sha256:f9e6fb1aa6d03e93888ce1ac43400efce0af3d3680dd170abba8c7c2156f114b"}, +] + +[package.dependencies] +pyserial = ">=3.5,<4.0" + [[package]] name = "sniffio" version = "1.3.0" @@ -704,4 +719,4 @@ standard = ["colorama (>=0.4)", "httptools (>=0.5.0)", "python-dotenv (>=0.13)", [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "2c9ce04b15790c81f55f91f1c72b030030bdc58ffb2120f2fbd8b387162db80a" +content-hash = "1bb883a1439d87211f1c0715a990915dae3dc16a1988da9a3c74b765eb9ab525" diff --git a/pyproject.toml b/pyproject.toml index d13a5f0..0cfdf09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ pyserial = "^3.5" uvicorn = "^0.21.0" databases = {extras = ["aiosqlite"], version = "^0.7.0"} fastapi-utils = "^0.2.1" +sds011lib = "^0.1.0" [tool.poetry.group.dev.dependencies] diff --git a/tests/read/sds011/__init__.py b/tests/read/sds011/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/read/sds011/mock_device_serial.py b/tests/read/sds011/mock_device_serial.py deleted file mode 100644 index 111e247..0000000 --- a/tests/read/sds011/mock_device_serial.py +++ /dev/null @@ -1,145 +0,0 @@ -from serial import Serial -from aqimon.read.sds011.constants import ( - HEAD, - TAIL, - Command, - ReportingMode, - OperationType, - ResponseType, - SleepState, -) -from dataclasses import dataclass - - -@dataclass(frozen=True) -class WriteData: - """Simple wrapper for parsed write data.""" - - raw_data: bytes - raw_body_data: bytes - command: Command - - -class Sds011SerialEmulator(Serial): - """Emulated SDS011 Serial Port. - - Behaves like the device itself, except is a bit more predictable in ACTIVE mode. - """ - - def __init__(self) -> None: - """Create the emulator. - - Initializes to factory defaults. - """ - super().__init__() - self.response_buffer = b"" - self.operation_type = b"" - self.query_mode = ReportingMode.ACTIVE - self.device_id = b"\x01\x01" - self.sleep_state = SleepState.WORK - self.working_period = bytes([0]) - self.firmware_year = bytes([1]) - self.firmware_month = bytes([2]) - self.firmware_day = bytes([3]) - - def open(self): - """No-op open.""" - pass - - def close(self): - """No-op close.""" - pass - - def read(self, size: int = 1) -> bytes: - """Read from the emulator.""" - if self.query_mode == ReportingMode.ACTIVE and self.sleep_state == SleepState.WORK: - # If in active mode and awake, always return query response. - return self._get_query_response() - else: - response = self.response_buffer - self.response_buffer = b"" - return response - - def _generate_read(self, response_type: ResponseType, cmd: bytes) -> bytes: - """Generate a read command, with wrapper and checksum.""" - cmd_and_id = cmd + self.device_id - return HEAD + response_type.value + cmd_and_id + read_checksum(cmd_and_id) + TAIL - - def write(self, data: bytes) -> int: - """Write to the emulator.""" - last_write = parse_write_data(data) - self.operation_type = last_write.raw_body_data[1:2] - - if self.sleep_state == SleepState.SLEEP and last_write.command != Command.SET_SLEEP: - # Device ignores commands in sleep mode, unless its a sleep command - return len(data) - - if last_write.command == Command.SET_REPORTING_MODE: - if OperationType(last_write.raw_body_data[1:2]) == OperationType.SET_MODE: - self.query_mode = ReportingMode(last_write.raw_body_data[2:3]) - self._set_response_buffer(self._set_reporting_mode_response()) - elif last_write.command == Command.QUERY: - self._set_response_buffer(self._get_query_response()) - elif last_write.command == Command.SET_DEVICE_ID: - self.device_id = last_write.raw_body_data[11:13] - self._set_response_buffer(self._set_device_id_response()) - elif last_write.command == Command.SET_SLEEP: - if OperationType(last_write.raw_body_data[1:2]) == OperationType.SET_MODE: - self.sleep_state = SleepState(last_write.raw_body_data[2:3]) - self._set_response_buffer(self._set_sleep_response()) - elif last_write.command == Command.SET_WORKING_PERIOD: - if OperationType(last_write.raw_body_data[1:2]) == OperationType.SET_MODE: - self.working_period = last_write.raw_body_data[2:3] - self._set_response_buffer(self._set_working_period_response()) - elif last_write.command == Command.CHECK_FIRMWARE_VERSION: - self._set_response_buffer(self._check_firmware_response()) - return len(data) - - def _get_query_response(self) -> bytes: - return self._generate_read(ResponseType.QUERY_RESPONSE, b"\x19\x00\x64\x00") - - def _set_response_buffer(self, data: bytes) -> None: - # Response buffer should only be written if there wasn't something already there. - if self.response_buffer == b"": - self.response_buffer = data - - def _set_reporting_mode_response(self) -> bytes: - return self._generate_read( - ResponseType.GENERAL_RESPONSE, - Command.SET_REPORTING_MODE.value + self.operation_type + self.query_mode.value + b"\x00", - ) - - def _set_device_id_response(self) -> bytes: - return self._generate_read(ResponseType.GENERAL_RESPONSE, Command.SET_DEVICE_ID.value + (b"\x00" * 3)) - - def _set_sleep_response(self) -> bytes: - return self._generate_read( - ResponseType.GENERAL_RESPONSE, - Command.SET_SLEEP.value + self.operation_type + self.sleep_state.value + b"\x00", - ) - - def _set_working_period_response(self) -> bytes: - return self._generate_read( - ResponseType.GENERAL_RESPONSE, - Command.SET_WORKING_PERIOD.value + self.operation_type + self.working_period + b"\x00", - ) - - def _check_firmware_response(self) -> bytes: - return self._generate_read( - ResponseType.GENERAL_RESPONSE, - Command.CHECK_FIRMWARE_VERSION.value + self.firmware_year + self.firmware_month + self.firmware_day, - ) - - -def read_checksum(data: bytes) -> bytes: - """Generate a checksum for the data bytes of a command.""" - if len(data) != 6: - raise AttributeError("Invalid checksum length.") - return bytes([sum(d for d in data) % 256]) - - -def parse_write_data(data: bytes) -> WriteData: - """Parse write data from the emulator into a neater wrapper.""" - if len(data) != 19: - raise AttributeError("Data is wrong size.") - return WriteData(raw_data=data, raw_body_data=data[2:15], command=Command(data[2:3])) diff --git a/tests/read/sds011/test_sds011.py b/tests/read/sds011/test_sds011.py deleted file mode 100644 index 41da71f..0000000 --- a/tests/read/sds011/test_sds011.py +++ /dev/null @@ -1,349 +0,0 @@ -import pytest - -from aqimon.read.sds011 import NovaPmReader, ActiveModeReader, QueryModeReader -from aqimon.read.sds011.constants import ReportingMode, SleepState -from aqimon.read.sds011.exceptions import IncorrectCommandException, IncompleteReadException -from .mock_device_serial import Sds011SerialEmulator - - -class TestBaseReader: - @pytest.fixture - def reader(self): - # If you want to run these tests an integration you can replace the emulator here with a real serial device. - # ser_dev = serial.Serial('/dev/ttyUSB0', timeout=2, baudrate=9600) - # reader = NovaPmReader(ser_dev=ser_dev) - - ser_dev = Sds011SerialEmulator() - reader = NovaPmReader(ser_dev=ser_dev, send_command_sleep=0) - - # flush out the reader in case theres leftovers in the buffer - ser_dev.read(10) - - reader.wake() - # We don't know if the device was in active or querying. We must flush out the buffer from the above `wake`, - # if it exists. - ser_dev.read(10) - - reader.set_active_mode() - reader.set_working_period(0) - - yield reader - # Sleep the reader at the end so its not left on. - reader.sleep() - ser_dev.close() - - def test_hammer_reporting_mode(self, reader: NovaPmReader): - # Switch the modes - reader.set_query_mode() - reader.set_active_mode() - - # Set again in active mode - reader.set_active_mode() - - # set it in query mode twice - reader.set_query_mode() - reader.set_query_mode() - - reader.request_reporting_mode() - assert reader.query_reporting_mode().state == ReportingMode.QUERYING - - def test_hammer_sleep_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.sleep() - result = reader.query_sleep_state() - assert result.state == SleepState.SLEEP - - reader.wake() - reader.request_sleep_state() - assert reader.query_sleep_state().state == SleepState.WORK - reader.set_query_mode() - reader.request_sleep_state() - result = reader.query_sleep_state() - assert result.state == SleepState.WORK - - def test_hammer_sleep_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - reader.sleep() - - reader.wake() - reader.set_active_mode() - result = reader.query_data() - assert result.pm25 > 0.0 - - def test_queries_in_sleep_mode_are_incomplete(self, reader: NovaPmReader): - # Device can't be asked anything in sleep mode. - reader.set_query_mode() - reader.sleep() - assert reader.query_sleep_state().state == SleepState.SLEEP - - reader.request_reporting_mode() - with pytest.raises(IncompleteReadException): - reader.query_reporting_mode() - - reader.request_working_period() - with pytest.raises(IncompleteReadException): - reader.query_working_period() - - def test_values_changed_sleep_mode_arent_persisted(self, reader: NovaPmReader): - # Device can't set values while its asleep. - reader.set_query_mode() - reader.sleep() - assert reader.query_sleep_state().state == SleepState.SLEEP - - reader.set_working_period(20) - - reader.wake() - assert reader.query_sleep_state().state == SleepState.WORK - - reader.request_working_period() - assert reader.query_working_period().interval == 0 - - def test_buffer_is_first_command(self, reader: NovaPmReader): - reader.set_query_mode() - - # In query mode, once a command has been issued, subsequent commands should be effectively ignored, until the - # buffer is read. - reader.request_sleep_state() - reader.request_firmware_version() - reader.request_working_period() - - # We should still only get sleep state since its first - result = reader.query_sleep_state() - assert result.state == SleepState.WORK - - def test_get_reporting_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.request_reporting_mode() - result = reader.query_reporting_mode() - assert result.state == ReportingMode.QUERYING - - def test_get_reporting_mode_while_active_fails(self, reader: NovaPmReader): - reader.set_active_mode() - with pytest.raises(IncorrectCommandException): - reader.query_reporting_mode() - - def test_query_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - result = reader.query_data() - assert 999 > result.pm25 > 0 - assert 999 > result.pm10 > 0 - - def test_query_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.request_data() - result = reader.query_data() - assert 999 > result.pm25 > 0 - assert 999 > result.pm10 > 0 - - def test_set_device_id_query_mode(self, reader: NovaPmReader): - new_device_id = b"\xbb\xaa" - reader.set_query_mode() - reader.set_device_id(new_device_id) - result = reader.query_device_id() - assert result.device_id == new_device_id - - # Verify other commands also report correct ID - reader.request_reporting_mode() - result2 = reader.query_reporting_mode() - assert result2.device_id == new_device_id - - def test_sleep_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.sleep() - result = reader.query_sleep_state() - assert result.state == SleepState.SLEEP - - def test_sleep_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - reader.sleep() - - def test_wake_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.wake() - result = reader.query_sleep_state() - assert result.state == SleepState.WORK - - def test_wake_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - reader.wake() - - def test_get_sleep_state_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.wake() - result = reader.query_sleep_state() - assert result.state == SleepState.WORK - - def test_get_sleep_state_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - with pytest.raises(IncorrectCommandException): - reader.query_sleep_state() - - def test_set_working_period_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.set_working_period(10) - result = reader.query_working_period() - assert result.interval == 10 - - def test_set_working_period_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - reader.set_working_period(10) - - def test_get_working_period_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.set_working_period(10) - result = reader.query_working_period() - assert result.interval == 10 - - def test_get_working_period_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - with pytest.raises(IncorrectCommandException): - reader.query_working_period() - - def test_get_firmware_version_query_mode(self, reader: NovaPmReader): - reader.set_query_mode() - reader.request_firmware_version() - result = reader.query_firmware_version() - assert 99 >= result.year >= 0 - assert 12 >= result.month >= 1 - assert 31 >= result.day >= 1 - - def test_get_firmware_version_active_mode(self, reader: NovaPmReader): - reader.set_active_mode() - with pytest.raises(IncorrectCommandException): - reader.query_firmware_version() - - -class TestActiveModeReader: - @pytest.fixture - def reader(self): - # If you want to run these tests an integration you can replace the emulator here with a real serial device. - # ser_dev = serial.Serial("/dev/ttyUSB0", timeout=2, baudrate=9600) - # reader = ActiveModeReader(ser_dev=ser_dev, send_command_sleep=5) - - ser_dev = Sds011SerialEmulator() - reader = ActiveModeReader(ser_dev=ser_dev, send_command_sleep=0) - reader.set_working_period(0) - ser_dev.read(10) - - yield reader - # Sleep the reader at the end so its not left on. - reader.base_reader.sleep() - ser_dev.close() - - def test_query(self, reader: ActiveModeReader): - result = reader.query() - assert 999 > result.pm25 > 0 - assert 999 > result.pm10 > 0 - - def test_query_sleep_mode(self, reader: ActiveModeReader): - reader.sleep() - - with pytest.raises(IncompleteReadException): - reader.query() - - def test_wake(self, reader: ActiveModeReader): - reader.sleep() - with pytest.raises(IncompleteReadException): - reader.query() - reader.wake() - - # Make sure we can read again. - result = reader.query() - assert 999 > result.pm25 > 0 - assert 999 > result.pm10 > 0 - - def test_set_working_period(self, reader: ActiveModeReader): - reader.set_working_period(20) - - # We can't really do much here to validate that this is working. Just ensure that we can still query after. - result = reader.query() - assert 999 > result.pm25 > 0 - assert 999 > result.pm10 > 0 - - def test_set_device_id(self, reader: ActiveModeReader): - reader.set_device_id(b"\x12\x23") - - # We can't really do much here to validate that this is working. Just ensure that we can still query after. - result = reader.query() - assert result.device_id == b"\x12\x23" - - -class TestQueryModeReader: - @pytest.fixture - def reader(self): - # If you want to run these tests an integration you can replace the emulator here with a real serial device. - # ser_dev = serial.Serial("/dev/ttyUSB0", timeout=2, baudrate=9600) - # reader = QueryModeReader(ser_dev=ser_dev) - - ser_dev = Sds011SerialEmulator() - reader = QueryModeReader(ser_dev=ser_dev, send_command_sleep=0) - reader.set_working_period(0) - - yield reader - # Sleep the reader at the end so its not left on. - reader.base_reader.sleep() - ser_dev.close() - - def test_query(self, reader: QueryModeReader): - result = reader.query() - assert 999 > result.pm25 > 0 - assert 999 > result.pm10 > 0 - - def test_query_sleep_mode(self, reader: QueryModeReader): - reader.sleep() - - with pytest.raises(IncompleteReadException): - reader.query() - - def test_wake(self, reader: QueryModeReader): - reader.sleep() - with pytest.raises(IncompleteReadException): - reader.query() - result = reader.wake() - assert result.state == SleepState.WORK - - result2 = reader.query() - assert 999 > result2.pm25 > 0 - assert 999 > result2.pm10 > 0 - - def test_get_sleep_state(self, reader: QueryModeReader): - result = reader.sleep() - assert result.state == SleepState.SLEEP - - result = reader.wake() - assert result.state == SleepState.WORK - - result = reader.get_sleep_state() - assert result.state == SleepState.WORK - - # Make sure we can read again. - result2 = reader.query() - assert 999 > result2.pm25 > 0 - assert 999 > result2.pm10 > 0 - - def test_get_reporting_mode(self, reader: QueryModeReader): - result = reader.get_reporting_mode() - assert result.state == ReportingMode.QUERYING - - def test_set_working_period(self, reader: QueryModeReader): - result = reader.set_working_period(20) - assert result.interval == 20 - - def test_get_working_period(self, reader: QueryModeReader): - reader.set_working_period(20) - result = reader.get_working_period() - assert result.interval == 20 - - def test_set_device_id(self, reader: QueryModeReader): - result = reader.set_device_id(b"\x12\x23") - assert result.device_id == b"\x12\x23" - - # We can't really do much here to validate that this is working. Just ensure that we can still query after. - result2 = reader.query() - assert result2.device_id == b"\x12\x23" - - def test_get_firmware_version(self, reader: QueryModeReader): - result = reader.get_firmware_version() - assert 99 >= result.year >= 0 - assert 12 >= result.month >= 1 - assert 31 >= result.day >= 1