Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
TimOrme committed Apr 19, 2023
1 parent 47e1c93 commit 5c30744
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 0 deletions.
Empty file added tests/read/__init__.py
Empty file.
Empty file added tests/read/sds011/__init__.py
Empty file.
155 changes: 155 additions & 0 deletions tests/read/sds011/mock_device_serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from serial import Serial
from typing import Optional
from aqimon.read.sds011.constants import (
HEAD,
TAIL,
Commands,
ReportingState,
ReportingMode,
ResponseTypes,
SleepMode,
SleepState,
WorkingPeriodMode,
)
from dataclasses import dataclass


@dataclass(frozen=True)
class WriteData:
"""Simple wrapper for parsed write data."""

raw_data: bytes
raw_body_data: bytes
command: Commands


class Sds011SerialEmulator(Serial):
"""Emulated SDS011 Serial Port.
Behaves like the device itself, except is a bit more predictable in ACTIVE mode.
"""

def __init__(self) -> None:
"""Create the emulator.
Initializes to factory defaults.
"""
super().__init__()
self.last_write: Optional[WriteData] = None
self.query_mode = ReportingState.ACTIVE
self.device_id = b"\x01\x01"
self.sleep_state = SleepState.WORK
self.working_period = bytes([0])
self.firmware_year = bytes([1])
self.firmware_month = bytes([2])
self.firmware_day = bytes([3])

def open(self):
"""No-op open."""
pass

def close(self):
"""No-op close."""
pass

def read(self, size: int = 1) -> bytes:
"""Read from the emulator."""
if self.last_write is not None:
if self.query_mode == ReportingState.ACTIVE:
# Admittedly, this command sometimes fails on the device as well. It's the only one that seems to fairly
# consistently return in ACTIVE mode though, so the emulator does the same.
if (
self.last_write.command == Commands.SET_REPORTING_MODE
and ReportingMode(self.last_write.raw_body_data[1:2]) == ReportingMode.SET_MODE
):
return self._generate_read(
ResponseTypes.GENERAL_RESPONSE,
Commands.SET_REPORTING_MODE.value
+ self.last_write.raw_body_data[1:2]
+ self.query_mode.value
+ b"\x00",
)

# If in active mode, almost always return query response.
return self._generate_read(ResponseTypes.QUERY_RESPONSE, b"\x19\x00\x64\x00")
else:
if self.last_write.command == Commands.QUERY:
return self._generate_read(ResponseTypes.QUERY_RESPONSE, b"\x19\x00\x64\x00")
elif self.last_write.command == Commands.SET_REPORTING_MODE:
return self._generate_read(
ResponseTypes.GENERAL_RESPONSE,
Commands.SET_REPORTING_MODE.value
+ self.last_write.raw_body_data[1:2]
+ self.query_mode.value
+ b"\x00",
)
elif self.last_write.command == Commands.SET_DEVICE_ID:
return self._generate_read(
ResponseTypes.GENERAL_RESPONSE, Commands.SET_DEVICE_ID.value + (b"\x00" * 3)
)
elif self.last_write.command == Commands.SET_SLEEP:
return self._generate_read(
ResponseTypes.GENERAL_RESPONSE,
Commands.SET_SLEEP.value
+ self.last_write.raw_body_data[1:2]
+ self.sleep_state.value
+ b"\x00",
)
elif self.last_write.command == Commands.SET_WORKING_PERIOD:
return self._generate_read(
ResponseTypes.GENERAL_RESPONSE,
Commands.SET_WORKING_PERIOD.value
+ self.last_write.raw_body_data[1:2]
+ self.working_period
+ b"\x00",
)
elif self.last_write.command == Commands.CHECK_FIRMWARE_VERSION:
return self._generate_read(
ResponseTypes.GENERAL_RESPONSE,
Commands.CHECK_FIRMWARE_VERSION.value
+ self.firmware_year
+ self.firmware_month
+ self.firmware_day,
)
return b""

def _generate_read(self, response_type: ResponseTypes, cmd: bytes):
"""Generate a read command, with wrapper and checksum."""
cmd_and_id = cmd + self.device_id
return HEAD + response_type.value + cmd_and_id + read_checksum(cmd_and_id) + TAIL

def write(self, data: bytes) -> int:
"""Write to the emulator."""
self.last_write = parse_write_data(data)
if (
self.last_write.command == Commands.SET_REPORTING_MODE
and ReportingMode(self.last_write.raw_body_data[1:2]) == ReportingMode.SET_MODE
):
self.query_mode = ReportingState(self.last_write.raw_body_data[2:3])
elif self.last_write.command == Commands.SET_DEVICE_ID:
self.device_id = self.last_write.raw_body_data[11:13]
elif (
self.last_write.command == Commands.SET_SLEEP
and SleepMode(self.last_write.raw_body_data[1:2]) == SleepMode.SET_MODE
):
self.sleep_state = SleepState(self.last_write.raw_body_data[2:3])
elif (
self.last_write.command == Commands.SET_WORKING_PERIOD
and WorkingPeriodMode(self.last_write.raw_body_data[1:2]) == WorkingPeriodMode.SET_MODE
):
self.working_period = self.last_write.raw_body_data[2:3]
return len(data)


def read_checksum(data: bytes) -> bytes:
"""Generate a checksum for the data bytes of a command."""
if len(data) != 6:
raise AttributeError("Invalid checksum length.")
return bytes([sum(d for d in data) % 256])


def parse_write_data(data: bytes) -> WriteData:
"""Parse write data from the emulator into a neater wrapper."""
if len(data) != 19:
raise AttributeError("Data is wrong size.")
return WriteData(raw_data=data, raw_body_data=data[2:15], command=Commands(data[2:3]))
141 changes: 141 additions & 0 deletions tests/read/sds011/test_sds011.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import pytest

from aqimon.read.sds011 import NovaPmReader
from aqimon.read.sds011.constants import ReportingState, SleepState
from aqimon.read.sds011.exceptions import QueryInActiveModeException
from .mock_device_serial import Sds011SerialEmulator


@pytest.fixture
def reader():
# In a theoretical world, you could replace the emulator here with a true serial device, and run these as
# integration tests. However, in practice, the behavior of ACTIVE mode is too inconsistent for many of these tests
# to behave reliably.
ser_dev = Sds011SerialEmulator()
reader = NovaPmReader(ser_dev=ser_dev)
try:
reader.wake()
except QueryInActiveModeException:
pass
reader.set_working_period(0)
reader.set_reporting_mode(ReportingState.ACTIVE)
yield reader
ser_dev.close()


def test_set_reporting_mode(reader: NovaPmReader):
result = reader.set_reporting_mode(ReportingState.QUERYING)
assert result.state == ReportingState.QUERYING


def test_get_reporting_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
result = reader.get_reporting_mode()
assert result.state == ReportingState.QUERYING


def test_get_reporting_mode_while_active_fails(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
with pytest.raises(QueryInActiveModeException):
reader.get_reporting_mode()


def test_query_active_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
result = reader.query()
assert 999 > result.pm25 > 0
assert 999 > result.pm10 > 0


def test_query_query_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
result = reader.query()
assert 999 > result.pm25 > 0
assert 999 > result.pm10 > 0


def test_set_device_id(reader: NovaPmReader):
new_device_id = b"\xbb\xaa"
reader.set_reporting_mode(ReportingState.QUERYING)
result = reader.set_device_id(new_device_id)
assert result.device_id == new_device_id

# Verify other commands also report correct ID
result2 = reader.get_reporting_mode()
assert result2.device_id == new_device_id


def test_sleep_query_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
result = reader.sleep()
assert result.state == SleepState.SLEEP


def test_sleep_active_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
with pytest.raises(QueryInActiveModeException):
reader.sleep()


def test_wake_query_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
result = reader.wake()
assert result.state == SleepState.WORK


def test_wake_active_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
with pytest.raises(QueryInActiveModeException):
reader.wake()


def test_get_sleep_state_query_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
reader.wake()
result = reader.get_sleep_state()
assert result.state == SleepState.WORK


def test_get_sleep_state_active_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
with pytest.raises(QueryInActiveModeException):
reader.get_sleep_state()


def test_set_working_period_query_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
result = reader.set_working_period(10)
assert result.interval == 10


def test_set_working_period_active_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
with pytest.raises(QueryInActiveModeException):
reader.set_working_period(10)


def test_get_working_period_query_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
reader.set_working_period(10)
result = reader.get_working_period()
assert result.interval == 10


def test_get_working_period_active_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
with pytest.raises(QueryInActiveModeException):
reader.get_working_period()


def test_get_firmware_version_query_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.QUERYING)
result = reader.get_firmware_version()
assert 99 >= result.year >= 0
assert 12 >= result.month >= 1
assert 31 >= result.day >= 1


def test_get_firmware_version_active_mode(reader: NovaPmReader):
reader.set_reporting_mode(ReportingState.ACTIVE)
with pytest.raises(QueryInActiveModeException):
reader.get_firmware_version()

0 comments on commit 5c30744

Please sign in to comment.