From 8c14648956a4f61185b761f7f2460e5fcd1b3e19 Mon Sep 17 00:00:00 2001 From: Tim Orme <TimothyOrme@gmail.com> Date: Thu, 20 Apr 2023 23:13:56 -0700 Subject: [PATCH] Rename some fields, simplify constants to be less repetitive --- aqimon/read/sds011/__init__.py | 39 ++++++++--------- aqimon/read/sds011/constants.py | 32 +++----------- aqimon/read/sds011/responses.py | 34 +++++++-------- tests/read/sds011/mock_device_serial.py | 58 ++++++++++++------------- tests/read/sds011/test_sds011.py | 6 +-- 5 files changed, 69 insertions(+), 100 deletions(-) diff --git a/aqimon/read/sds011/__init__.py b/aqimon/read/sds011/__init__.py index cebf36b..d0e9be8 100644 --- a/aqimon/read/sds011/__init__.py +++ b/aqimon/read/sds011/__init__.py @@ -30,7 +30,7 @@ def __init__(self, ser_dev: serial.Serial, send_command_sleep: int = 1): def request_data(self) -> None: """Request device to return pollutant data.""" - cmd = con.Commands.QUERY.value + (b"\x00" * 12) + con.ALL_SENSOR_ID + cmd = con.Command.QUERY.value + (b"\x00" * 12) + con.ALL_SENSOR_ID self._send_command(cmd) def query_data(self) -> QueryReadResponse: @@ -40,9 +40,9 @@ def query_data(self) -> QueryReadResponse: def request_reporting_mode(self) -> None: """Request device to return the current reporting mode.""" cmd = ( - con.Commands.SET_REPORTING_MODE.value - + con.ReportingMode.QUERY.value - + con.ReportingState.ACTIVE.value + con.Command.SET_REPORTING_MODE.value + + con.OperationType.QUERY.value + + con.ReportingMode.ACTIVE.value + (b"\x00" * 10) + con.ALL_SENSOR_ID ) @@ -54,7 +54,7 @@ def query_reporting_mode(self) -> ReportingModeReadResponse: def set_active_mode(self) -> None: """Set the reporting mode to active.""" - self._set_reporting_mode(con.ReportingState.ACTIVE) + self._set_reporting_mode(con.ReportingMode.ACTIVE) try: self.query_reporting_mode() except IncorrectCommandException: @@ -64,7 +64,7 @@ def set_active_mode(self) -> None: def set_query_mode(self) -> None: """Set the reporting mode to querying.""" - self._set_reporting_mode(con.ReportingState.QUERYING) + self._set_reporting_mode(con.ReportingMode.QUERYING) try: self.query_reporting_mode() except IncorrectCommandException: @@ -72,7 +72,7 @@ def set_query_mode(self) -> None: except IncompleteReadException: pass - def _set_reporting_mode(self, reporting_mode: con.ReportingState) -> None: + def _set_reporting_mode(self, reporting_mode: con.ReportingMode) -> None: """Set the reporting mode, either ACTIVE or QUERYING. ACTIVE mode means the device will always return a Query command response when data is asked for, regardless of @@ -83,8 +83,8 @@ def _set_reporting_mode(self, reporting_mode: con.ReportingState) -> None: ACTIVE mode is the factory default, but generally, QUERYING mode is preferrable for the longevity of the device. """ cmd = ( - con.Commands.SET_REPORTING_MODE.value - + con.ReportingMode.SET_MODE.value + con.Command.SET_REPORTING_MODE.value + + con.OperationType.SET_MODE.value + reporting_mode.value + (b"\x00" * 10) + con.ALL_SENSOR_ID @@ -96,7 +96,7 @@ def _set_reporting_mode(self, reporting_mode: con.ReportingState) -> None: def request_sleep_state(self) -> None: """Get the current sleep state.""" - cmd = con.Commands.SET_SLEEP.value + con.SleepMode.QUERY.value + b"\x00" + (b"\x00" * 10) + con.ALL_SENSOR_ID + cmd = con.Command.SET_SLEEP.value + con.OperationType.QUERY.value + b"\x00" + (b"\x00" * 10) + con.ALL_SENSOR_ID self._send_command(cmd) def query_sleep_state(self) -> SleepWakeReadResponse: @@ -106,8 +106,8 @@ def query_sleep_state(self) -> SleepWakeReadResponse: def set_sleep_state(self, sleep_state: con.SleepState) -> None: """Set the sleep state, either wake or sleep.""" cmd = ( - con.Commands.SET_SLEEP.value - + con.SleepMode.SET_MODE.value + con.Command.SET_SLEEP.value + + con.OperationType.SET_MODE.value + sleep_state.value + (b"\x00" * 10) + con.ALL_SENSOR_ID @@ -126,7 +126,7 @@ def set_device_id(self, device_id: bytes, target_device_id: bytes = con.ALL_SENS """Set the device ID.""" if len(device_id) != 2 or len(target_device_id) != 2: raise AttributeError(f"Device ID must be 4 bytes, found {len(device_id)}, and {len(target_device_id)}") - cmd = con.Commands.SET_DEVICE_ID.value + (b"\x00" * 10) + device_id + target_device_id + cmd = con.Command.SET_DEVICE_ID.value + (b"\x00" * 10) + device_id + target_device_id self._send_command(cmd) def query_device_id(self) -> DeviceIdResponse: @@ -135,12 +135,7 @@ def query_device_id(self) -> DeviceIdResponse: def request_working_period(self) -> None: """Retrieve the current working period for the device.""" - cmd = ( - con.Commands.SET_WORKING_PERIOD.value - + con.WorkingPeriodMode.QUERY.value - + (b"\x00" * 11) - + con.ALL_SENSOR_ID - ) + cmd = con.Command.SET_WORKING_PERIOD.value + con.OperationType.QUERY.value + (b"\x00" * 11) + con.ALL_SENSOR_ID self._send_command(cmd) def query_working_period(self) -> WorkingPeriodReadResponse: @@ -158,8 +153,8 @@ def set_working_period(self, working_period: int) -> None: if 0 >= working_period >= 30: raise AttributeError("Working period must be between 0 and 30") cmd = ( - con.Commands.SET_WORKING_PERIOD.value - + con.WorkingPeriodMode.SET_MODE.value + con.Command.SET_WORKING_PERIOD.value + + con.OperationType.SET_MODE.value + bytes([working_period]) + (b"\x00" * 10) + con.ALL_SENSOR_ID @@ -168,7 +163,7 @@ def set_working_period(self, working_period: int) -> None: def request_firmware_version(self) -> None: """Retrieve the firmware version from the device.""" - cmd = con.Commands.CHECK_FIRMWARE_VERSION.value + (b"\x00" * 12) + con.ALL_SENSOR_ID + cmd = con.Command.CHECK_FIRMWARE_VERSION.value + (b"\x00" * 12) + con.ALL_SENSOR_ID self._send_command(cmd) def query_firmware_version(self) -> CheckFirmwareReadResponse: diff --git a/aqimon/read/sds011/constants.py b/aqimon/read/sds011/constants.py index eaa6e4a..482f573 100644 --- a/aqimon/read/sds011/constants.py +++ b/aqimon/read/sds011/constants.py @@ -14,7 +14,7 @@ SUBMIT_TYPE = b"\xb4" -class ResponseTypes(Enum): +class ResponseType(Enum): """Response types for commands. GENERAL_RESPONSE is for all commands except query. @@ -26,7 +26,7 @@ class ResponseTypes(Enum): QUERY_RESPONSE = b"\xc0" -class Commands(Enum): +class Command(Enum): """Possible commands for the device.""" SET_REPORTING_MODE = b"\x02" @@ -37,17 +37,17 @@ class Commands(Enum): CHECK_FIRMWARE_VERSION = b"\x07" -class ReportingMode(Enum): - """Sub command for reporting mode state. +class OperationType(Enum): + """Operation type for many commands. - Can either query or set the value for the state. + Many commands have two modes, one for setting a value, and another for retrieving. """ QUERY = b"\x00" SET_MODE = b"\x01" -class ReportingState(Enum): +class ReportingMode(Enum): """Reporting mode for the device. ACTIVE mode means that the device is constantly returning read data from the device, and won't respond correctly @@ -60,26 +60,6 @@ class ReportingState(Enum): QUERYING = b"\x01" -class WorkingPeriodMode(Enum): - """Sub command for working period state. - - Can either query or set the value for the state. - """ - - QUERY = b"\x00" - SET_MODE = b"\x01" - - -class SleepMode(Enum): - """Sub command for sleep state. - - Can either query or set the value for the state. - """ - - QUERY = b"\x00" - SET_MODE = b"\x01" - - class SleepState(Enum): """State of the device, either working or sleeping.""" diff --git a/aqimon/read/sds011/responses.py b/aqimon/read/sds011/responses.py index b0de8ea..4ae32a5 100644 --- a/aqimon/read/sds011/responses.py +++ b/aqimon/read/sds011/responses.py @@ -5,13 +5,11 @@ from .constants import ( HEAD, TAIL, - Commands, - ResponseTypes, - SleepMode, + Command, + ResponseType, SleepState, + OperationType, ReportingMode, - ReportingState, - WorkingPeriodMode, ) from .exceptions import ( ChecksumFailedException, @@ -25,9 +23,7 @@ class ReadResponse: """Generic read response object for responses from SDS011.""" - def __init__( - self, data: bytes, command_code: Commands, response_type: ResponseTypes = ResponseTypes.GENERAL_RESPONSE - ): + def __init__(self, data: bytes, command_code: Command, response_type: ResponseType = ResponseType.GENERAL_RESPONSE): """Create a read response.""" if len(data) != 10: raise IncompleteReadException() @@ -56,7 +52,7 @@ def verify(self): # Query responses don't validate the command code if ( - self.expected_response_type != ResponseTypes.QUERY_RESPONSE + self.expected_response_type != ResponseType.QUERY_RESPONSE and bytes([self.data[0]]) != self.expected_command_code.value ): raise IncorrectCommandCodeException(expected=self.expected_command_code.value, actual=self.data[0]) @@ -71,7 +67,7 @@ class QueryReadResponse(ReadResponse): def __init__(self, data: bytes): """Create a query read response.""" - super().__init__(data, command_code=Commands.QUERY, response_type=ResponseTypes.QUERY_RESPONSE) + super().__init__(data, command_code=Command.QUERY, response_type=ResponseType.QUERY_RESPONSE) self.pm25: float = int.from_bytes(data[2:4], byteorder="little") / 10 self.pm10: float = int.from_bytes(data[4:6], byteorder="little") / 10 @@ -82,9 +78,9 @@ class ReportingModeReadResponse(ReadResponse): def __init__(self, data: bytes): """Create a reporting mode response.""" - super().__init__(data, command_code=Commands.SET_REPORTING_MODE) - self.mode_type = ReportingMode(self.data[1:2]) - self.state = ReportingState(self.data[2:3]) + super().__init__(data, command_code=Command.SET_REPORTING_MODE) + self.operation_type = OperationType(self.data[1:2]) + self.state = ReportingMode(self.data[2:3]) class DeviceIdResponse(ReadResponse): @@ -92,7 +88,7 @@ class DeviceIdResponse(ReadResponse): def __init__(self, data: bytes): """Create a device ID response.""" - super().__init__(data, command_code=Commands.SET_DEVICE_ID) + super().__init__(data, command_code=Command.SET_DEVICE_ID) class SleepWakeReadResponse(ReadResponse): @@ -100,8 +96,8 @@ class SleepWakeReadResponse(ReadResponse): def __init__(self, data: bytes): """Create a sleep/wake response.""" - super().__init__(data, command_code=Commands.SET_SLEEP) - self.mode_type = SleepMode(self.data[1:2]) + super().__init__(data, command_code=Command.SET_SLEEP) + self.operation_type = OperationType(self.data[1:2]) self.state = SleepState(self.data[2:3]) @@ -110,8 +106,8 @@ class WorkingPeriodReadResponse(ReadResponse): def __init__(self, data: bytes): """Create a working period response.""" - super().__init__(data, command_code=Commands.SET_WORKING_PERIOD) - self.mode_type = WorkingPeriodMode(self.data[1:2]) + super().__init__(data, command_code=Command.SET_WORKING_PERIOD) + self.operation_type = OperationType(self.data[1:2]) self.interval: int = self.data[2] @@ -120,7 +116,7 @@ class CheckFirmwareReadResponse(ReadResponse): def __init__(self, data: bytes): """Create a firmware response.""" - super().__init__(data, command_code=Commands.CHECK_FIRMWARE_VERSION) + super().__init__(data, command_code=Command.CHECK_FIRMWARE_VERSION) self.year = self.data[1] self.month = self.data[2] self.day = self.data[3] diff --git a/tests/read/sds011/mock_device_serial.py b/tests/read/sds011/mock_device_serial.py index 155a829..ddfd5a6 100644 --- a/tests/read/sds011/mock_device_serial.py +++ b/tests/read/sds011/mock_device_serial.py @@ -2,13 +2,11 @@ from aqimon.read.sds011.constants import ( HEAD, TAIL, - Commands, - ReportingState, + Command, ReportingMode, - ResponseTypes, - SleepMode, + OperationType, + ResponseType, SleepState, - WorkingPeriodMode, ) from dataclasses import dataclass @@ -19,7 +17,7 @@ class WriteData: raw_data: bytes raw_body_data: bytes - command: Commands + command: Command class Sds011SerialEmulator(Serial): @@ -36,7 +34,7 @@ def __init__(self) -> None: super().__init__() self.response_buffer = b"" self.response_type = b"" - self.query_mode = ReportingState.ACTIVE + self.query_mode = ReportingMode.ACTIVE self.device_id = b"\x01\x01" self.sleep_state = SleepState.WORK self.working_period = bytes([0]) @@ -54,7 +52,7 @@ def close(self): def read(self, size: int = 1) -> bytes: """Read from the emulator.""" - if self.query_mode == ReportingState.ACTIVE: + if self.query_mode == ReportingMode.ACTIVE: # If in active mode, always return query response. return self._get_query_response() else: @@ -62,7 +60,7 @@ def read(self, size: int = 1) -> bytes: self.response_buffer = b"" return response - def _generate_read(self, response_type: ResponseTypes, cmd: bytes) -> bytes: + def _generate_read(self, response_type: ResponseType, cmd: bytes) -> bytes: """Generate a read command, with wrapper and checksum.""" cmd_and_id = cmd + self.device_id return HEAD + response_type.value + cmd_and_id + read_checksum(cmd_and_id) + TAIL @@ -71,55 +69,55 @@ def write(self, data: bytes) -> int: """Write to the emulator.""" last_write = parse_write_data(data) self.response_type = last_write.raw_body_data[1:2] - if last_write.command == Commands.SET_REPORTING_MODE: - if ReportingMode(last_write.raw_body_data[1:2]) == ReportingMode.SET_MODE: - self.query_mode = ReportingState(last_write.raw_body_data[2:3]) + if last_write.command == Command.SET_REPORTING_MODE: + if OperationType(last_write.raw_body_data[1:2]) == OperationType.SET_MODE: + self.query_mode = ReportingMode(last_write.raw_body_data[2:3]) self.response_buffer = self._set_reporting_mode_response() - elif last_write.command == Commands.QUERY: + elif last_write.command == Command.QUERY: self.response_buffer = self._get_query_response() - elif last_write.command == Commands.SET_DEVICE_ID: + elif last_write.command == Command.SET_DEVICE_ID: self.device_id = last_write.raw_body_data[11:13] self.response_buffer = self._set_device_id_response() - elif last_write.command == Commands.SET_SLEEP: - if SleepMode(last_write.raw_body_data[1:2]) == SleepMode.SET_MODE: + elif last_write.command == Command.SET_SLEEP: + if OperationType(last_write.raw_body_data[1:2]) == OperationType.SET_MODE: self.sleep_state = SleepState(last_write.raw_body_data[2:3]) self.response_buffer = self._set_sleep_response() - elif last_write.command == Commands.SET_WORKING_PERIOD: - if WorkingPeriodMode(last_write.raw_body_data[1:2]) == WorkingPeriodMode.SET_MODE: + elif last_write.command == Command.SET_WORKING_PERIOD: + if OperationType(last_write.raw_body_data[1:2]) == OperationType.SET_MODE: self.working_period = last_write.raw_body_data[2:3] self.response_buffer = self._set_working_period_response() - elif last_write.command == Commands.CHECK_FIRMWARE_VERSION: + elif last_write.command == Command.CHECK_FIRMWARE_VERSION: self.response_buffer = self._check_firmware_response() return len(data) def _get_query_response(self) -> bytes: - return self._generate_read(ResponseTypes.QUERY_RESPONSE, b"\x19\x00\x64\x00") + return self._generate_read(ResponseType.QUERY_RESPONSE, b"\x19\x00\x64\x00") def _set_reporting_mode_response(self) -> bytes: return self._generate_read( - ResponseTypes.GENERAL_RESPONSE, - Commands.SET_REPORTING_MODE.value + self.response_type + self.query_mode.value + b"\x00", + ResponseType.GENERAL_RESPONSE, + Command.SET_REPORTING_MODE.value + self.response_type + self.query_mode.value + b"\x00", ) def _set_device_id_response(self) -> bytes: - return self._generate_read(ResponseTypes.GENERAL_RESPONSE, Commands.SET_DEVICE_ID.value + (b"\x00" * 3)) + return self._generate_read(ResponseType.GENERAL_RESPONSE, Command.SET_DEVICE_ID.value + (b"\x00" * 3)) def _set_sleep_response(self) -> bytes: return self._generate_read( - ResponseTypes.GENERAL_RESPONSE, - Commands.SET_SLEEP.value + self.response_type + self.sleep_state.value + b"\x00", + ResponseType.GENERAL_RESPONSE, + Command.SET_SLEEP.value + self.response_type + self.sleep_state.value + b"\x00", ) def _set_working_period_response(self) -> bytes: return self._generate_read( - ResponseTypes.GENERAL_RESPONSE, - Commands.SET_WORKING_PERIOD.value + self.response_type + self.working_period + b"\x00", + ResponseType.GENERAL_RESPONSE, + Command.SET_WORKING_PERIOD.value + self.response_type + self.working_period + b"\x00", ) def _check_firmware_response(self) -> bytes: return self._generate_read( - ResponseTypes.GENERAL_RESPONSE, - Commands.CHECK_FIRMWARE_VERSION.value + self.firmware_year + self.firmware_month + self.firmware_day, + ResponseType.GENERAL_RESPONSE, + Command.CHECK_FIRMWARE_VERSION.value + self.firmware_year + self.firmware_month + self.firmware_day, ) @@ -134,4 +132,4 @@ def parse_write_data(data: bytes) -> WriteData: """Parse write data from the emulator into a neater wrapper.""" if len(data) != 19: raise AttributeError("Data is wrong size.") - return WriteData(raw_data=data, raw_body_data=data[2:15], command=Commands(data[2:3])) + return WriteData(raw_data=data, raw_body_data=data[2:15], command=Command(data[2:3])) diff --git a/tests/read/sds011/test_sds011.py b/tests/read/sds011/test_sds011.py index 3b92dd9..47acb3a 100644 --- a/tests/read/sds011/test_sds011.py +++ b/tests/read/sds011/test_sds011.py @@ -1,7 +1,7 @@ import pytest from aqimon.read.sds011 import NovaPmReader -from aqimon.read.sds011.constants import ReportingState, SleepState +from aqimon.read.sds011.constants import ReportingMode, SleepState from aqimon.read.sds011.exceptions import ( IncorrectCommandException, ) @@ -46,7 +46,7 @@ def test_hammer_reporting_mode(reader: NovaPmReader): reader.set_query_mode() reader.request_reporting_mode() - assert reader.query_reporting_mode().state == ReportingState.QUERYING + assert reader.query_reporting_mode().state == ReportingMode.QUERYING def test_hammer_sleep_query_mode(reader: NovaPmReader): @@ -78,7 +78,7 @@ def test_get_reporting_mode(reader: NovaPmReader): reader.set_query_mode() reader.request_reporting_mode() result = reader.query_reporting_mode() - assert result.state == ReportingState.QUERYING + assert result.state == ReportingMode.QUERYING def test_get_reporting_mode_while_active_fails(reader: NovaPmReader):