From 5c30744ea98937887b8edaec5d80d85f2499439b Mon Sep 17 00:00:00 2001 From: Tim Orme Date: Tue, 18 Apr 2023 23:20:11 -0700 Subject: [PATCH] Add tests --- tests/read/__init__.py | 0 tests/read/sds011/__init__.py | 0 tests/read/sds011/mock_device_serial.py | 155 ++++++++++++++++++++++++ tests/read/sds011/test_sds011.py | 141 +++++++++++++++++++++ 4 files changed, 296 insertions(+) create mode 100644 tests/read/__init__.py create mode 100644 tests/read/sds011/__init__.py create mode 100644 tests/read/sds011/mock_device_serial.py create mode 100644 tests/read/sds011/test_sds011.py diff --git a/tests/read/__init__.py b/tests/read/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/read/sds011/__init__.py b/tests/read/sds011/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/read/sds011/mock_device_serial.py b/tests/read/sds011/mock_device_serial.py new file mode 100644 index 0000000..382c9a3 --- /dev/null +++ b/tests/read/sds011/mock_device_serial.py @@ -0,0 +1,155 @@ +from serial import Serial +from typing import Optional +from aqimon.read.sds011.constants import ( + HEAD, + TAIL, + Commands, + ReportingState, + ReportingMode, + ResponseTypes, + SleepMode, + SleepState, + WorkingPeriodMode, +) +from dataclasses import dataclass + + +@dataclass(frozen=True) +class WriteData: + """Simple wrapper for parsed write data.""" + + raw_data: bytes + raw_body_data: bytes + command: Commands + + +class Sds011SerialEmulator(Serial): + """Emulated SDS011 Serial Port. + + Behaves like the device itself, except is a bit more predictable in ACTIVE mode. + """ + + def __init__(self) -> None: + """Create the emulator. + + Initializes to factory defaults. + """ + super().__init__() + self.last_write: Optional[WriteData] = None + self.query_mode = ReportingState.ACTIVE + self.device_id = b"\x01\x01" + self.sleep_state = SleepState.WORK + self.working_period = bytes([0]) + self.firmware_year = bytes([1]) + self.firmware_month = bytes([2]) + self.firmware_day = bytes([3]) + + def open(self): + """No-op open.""" + pass + + def close(self): + """No-op close.""" + pass + + def read(self, size: int = 1) -> bytes: + """Read from the emulator.""" + if self.last_write is not None: + if self.query_mode == ReportingState.ACTIVE: + # Admittedly, this command sometimes fails on the device as well. It's the only one that seems to fairly + # consistently return in ACTIVE mode though, so the emulator does the same. + if ( + self.last_write.command == Commands.SET_REPORTING_MODE + and ReportingMode(self.last_write.raw_body_data[1:2]) == ReportingMode.SET_MODE + ): + return self._generate_read( + ResponseTypes.GENERAL_RESPONSE, + Commands.SET_REPORTING_MODE.value + + self.last_write.raw_body_data[1:2] + + self.query_mode.value + + b"\x00", + ) + + # If in active mode, almost always return query response. + return self._generate_read(ResponseTypes.QUERY_RESPONSE, b"\x19\x00\x64\x00") + else: + if self.last_write.command == Commands.QUERY: + return self._generate_read(ResponseTypes.QUERY_RESPONSE, b"\x19\x00\x64\x00") + elif self.last_write.command == Commands.SET_REPORTING_MODE: + return self._generate_read( + ResponseTypes.GENERAL_RESPONSE, + Commands.SET_REPORTING_MODE.value + + self.last_write.raw_body_data[1:2] + + self.query_mode.value + + b"\x00", + ) + elif self.last_write.command == Commands.SET_DEVICE_ID: + return self._generate_read( + ResponseTypes.GENERAL_RESPONSE, Commands.SET_DEVICE_ID.value + (b"\x00" * 3) + ) + elif self.last_write.command == Commands.SET_SLEEP: + return self._generate_read( + ResponseTypes.GENERAL_RESPONSE, + Commands.SET_SLEEP.value + + self.last_write.raw_body_data[1:2] + + self.sleep_state.value + + b"\x00", + ) + elif self.last_write.command == Commands.SET_WORKING_PERIOD: + return self._generate_read( + ResponseTypes.GENERAL_RESPONSE, + Commands.SET_WORKING_PERIOD.value + + self.last_write.raw_body_data[1:2] + + self.working_period + + b"\x00", + ) + elif self.last_write.command == Commands.CHECK_FIRMWARE_VERSION: + return self._generate_read( + ResponseTypes.GENERAL_RESPONSE, + Commands.CHECK_FIRMWARE_VERSION.value + + self.firmware_year + + self.firmware_month + + self.firmware_day, + ) + return b"" + + def _generate_read(self, response_type: ResponseTypes, cmd: bytes): + """Generate a read command, with wrapper and checksum.""" + cmd_and_id = cmd + self.device_id + return HEAD + response_type.value + cmd_and_id + read_checksum(cmd_and_id) + TAIL + + def write(self, data: bytes) -> int: + """Write to the emulator.""" + self.last_write = parse_write_data(data) + if ( + self.last_write.command == Commands.SET_REPORTING_MODE + and ReportingMode(self.last_write.raw_body_data[1:2]) == ReportingMode.SET_MODE + ): + self.query_mode = ReportingState(self.last_write.raw_body_data[2:3]) + elif self.last_write.command == Commands.SET_DEVICE_ID: + self.device_id = self.last_write.raw_body_data[11:13] + elif ( + self.last_write.command == Commands.SET_SLEEP + and SleepMode(self.last_write.raw_body_data[1:2]) == SleepMode.SET_MODE + ): + self.sleep_state = SleepState(self.last_write.raw_body_data[2:3]) + elif ( + self.last_write.command == Commands.SET_WORKING_PERIOD + and WorkingPeriodMode(self.last_write.raw_body_data[1:2]) == WorkingPeriodMode.SET_MODE + ): + self.working_period = self.last_write.raw_body_data[2:3] + return len(data) + + +def read_checksum(data: bytes) -> bytes: + """Generate a checksum for the data bytes of a command.""" + if len(data) != 6: + raise AttributeError("Invalid checksum length.") + return bytes([sum(d for d in data) % 256]) + + +def parse_write_data(data: bytes) -> WriteData: + """Parse write data from the emulator into a neater wrapper.""" + if len(data) != 19: + raise AttributeError("Data is wrong size.") + return WriteData(raw_data=data, raw_body_data=data[2:15], command=Commands(data[2:3])) diff --git a/tests/read/sds011/test_sds011.py b/tests/read/sds011/test_sds011.py new file mode 100644 index 0000000..11e5456 --- /dev/null +++ b/tests/read/sds011/test_sds011.py @@ -0,0 +1,141 @@ +import pytest + +from aqimon.read.sds011 import NovaPmReader +from aqimon.read.sds011.constants import ReportingState, SleepState +from aqimon.read.sds011.exceptions import QueryInActiveModeException +from .mock_device_serial import Sds011SerialEmulator + + +@pytest.fixture +def reader(): + # In a theoretical world, you could replace the emulator here with a true serial device, and run these as + # integration tests. However, in practice, the behavior of ACTIVE mode is too inconsistent for many of these tests + # to behave reliably. + ser_dev = Sds011SerialEmulator() + reader = NovaPmReader(ser_dev=ser_dev) + try: + reader.wake() + except QueryInActiveModeException: + pass + reader.set_working_period(0) + reader.set_reporting_mode(ReportingState.ACTIVE) + yield reader + ser_dev.close() + + +def test_set_reporting_mode(reader: NovaPmReader): + result = reader.set_reporting_mode(ReportingState.QUERYING) + assert result.state == ReportingState.QUERYING + + +def test_get_reporting_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + result = reader.get_reporting_mode() + assert result.state == ReportingState.QUERYING + + +def test_get_reporting_mode_while_active_fails(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + with pytest.raises(QueryInActiveModeException): + reader.get_reporting_mode() + + +def test_query_active_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + result = reader.query() + assert 999 > result.pm25 > 0 + assert 999 > result.pm10 > 0 + + +def test_query_query_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + result = reader.query() + assert 999 > result.pm25 > 0 + assert 999 > result.pm10 > 0 + + +def test_set_device_id(reader: NovaPmReader): + new_device_id = b"\xbb\xaa" + reader.set_reporting_mode(ReportingState.QUERYING) + result = reader.set_device_id(new_device_id) + assert result.device_id == new_device_id + + # Verify other commands also report correct ID + result2 = reader.get_reporting_mode() + assert result2.device_id == new_device_id + + +def test_sleep_query_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + result = reader.sleep() + assert result.state == SleepState.SLEEP + + +def test_sleep_active_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + with pytest.raises(QueryInActiveModeException): + reader.sleep() + + +def test_wake_query_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + result = reader.wake() + assert result.state == SleepState.WORK + + +def test_wake_active_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + with pytest.raises(QueryInActiveModeException): + reader.wake() + + +def test_get_sleep_state_query_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + reader.wake() + result = reader.get_sleep_state() + assert result.state == SleepState.WORK + + +def test_get_sleep_state_active_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + with pytest.raises(QueryInActiveModeException): + reader.get_sleep_state() + + +def test_set_working_period_query_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + result = reader.set_working_period(10) + assert result.interval == 10 + + +def test_set_working_period_active_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + with pytest.raises(QueryInActiveModeException): + reader.set_working_period(10) + + +def test_get_working_period_query_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + reader.set_working_period(10) + result = reader.get_working_period() + assert result.interval == 10 + + +def test_get_working_period_active_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + with pytest.raises(QueryInActiveModeException): + reader.get_working_period() + + +def test_get_firmware_version_query_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.QUERYING) + result = reader.get_firmware_version() + assert 99 >= result.year >= 0 + assert 12 >= result.month >= 1 + assert 31 >= result.day >= 1 + + +def test_get_firmware_version_active_mode(reader: NovaPmReader): + reader.set_reporting_mode(ReportingState.ACTIVE) + with pytest.raises(QueryInActiveModeException): + reader.get_firmware_version()