Skip to content

Commit

Permalink
Rename some fields, simplify constants to be less repetitive
Browse files Browse the repository at this point in the history
  • Loading branch information
TimOrme committed Apr 21, 2023
1 parent d2b22d9 commit 8c14648
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 100 deletions.
39 changes: 17 additions & 22 deletions aqimon/read/sds011/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, ser_dev: serial.Serial, send_command_sleep: int = 1):

def request_data(self) -> None:
"""Request device to return pollutant data."""
cmd = con.Commands.QUERY.value + (b"\x00" * 12) + con.ALL_SENSOR_ID
cmd = con.Command.QUERY.value + (b"\x00" * 12) + con.ALL_SENSOR_ID
self._send_command(cmd)

def query_data(self) -> QueryReadResponse:
Expand All @@ -40,9 +40,9 @@ def query_data(self) -> QueryReadResponse:
def request_reporting_mode(self) -> None:
"""Request device to return the current reporting mode."""
cmd = (
con.Commands.SET_REPORTING_MODE.value
+ con.ReportingMode.QUERY.value
+ con.ReportingState.ACTIVE.value
con.Command.SET_REPORTING_MODE.value
+ con.OperationType.QUERY.value
+ con.ReportingMode.ACTIVE.value
+ (b"\x00" * 10)
+ con.ALL_SENSOR_ID
)
Expand All @@ -54,7 +54,7 @@ def query_reporting_mode(self) -> ReportingModeReadResponse:

def set_active_mode(self) -> None:
"""Set the reporting mode to active."""
self._set_reporting_mode(con.ReportingState.ACTIVE)
self._set_reporting_mode(con.ReportingMode.ACTIVE)
try:
self.query_reporting_mode()
except IncorrectCommandException:
Expand All @@ -64,15 +64,15 @@ def set_active_mode(self) -> None:

def set_query_mode(self) -> None:
"""Set the reporting mode to querying."""
self._set_reporting_mode(con.ReportingState.QUERYING)
self._set_reporting_mode(con.ReportingMode.QUERYING)
try:
self.query_reporting_mode()
except IncorrectCommandException:
pass
except IncompleteReadException:
pass

def _set_reporting_mode(self, reporting_mode: con.ReportingState) -> None:
def _set_reporting_mode(self, reporting_mode: con.ReportingMode) -> None:
"""Set the reporting mode, either ACTIVE or QUERYING.
ACTIVE mode means the device will always return a Query command response when data is asked for, regardless of
Expand All @@ -83,8 +83,8 @@ def _set_reporting_mode(self, reporting_mode: con.ReportingState) -> None:
ACTIVE mode is the factory default, but generally, QUERYING mode is preferrable for the longevity of the device.
"""
cmd = (
con.Commands.SET_REPORTING_MODE.value
+ con.ReportingMode.SET_MODE.value
con.Command.SET_REPORTING_MODE.value
+ con.OperationType.SET_MODE.value
+ reporting_mode.value
+ (b"\x00" * 10)
+ con.ALL_SENSOR_ID
Expand All @@ -96,7 +96,7 @@ def _set_reporting_mode(self, reporting_mode: con.ReportingState) -> None:

def request_sleep_state(self) -> None:
"""Get the current sleep state."""
cmd = con.Commands.SET_SLEEP.value + con.SleepMode.QUERY.value + b"\x00" + (b"\x00" * 10) + con.ALL_SENSOR_ID
cmd = con.Command.SET_SLEEP.value + con.OperationType.QUERY.value + b"\x00" + (b"\x00" * 10) + con.ALL_SENSOR_ID
self._send_command(cmd)

def query_sleep_state(self) -> SleepWakeReadResponse:
Expand All @@ -106,8 +106,8 @@ def query_sleep_state(self) -> SleepWakeReadResponse:
def set_sleep_state(self, sleep_state: con.SleepState) -> None:
"""Set the sleep state, either wake or sleep."""
cmd = (
con.Commands.SET_SLEEP.value
+ con.SleepMode.SET_MODE.value
con.Command.SET_SLEEP.value
+ con.OperationType.SET_MODE.value
+ sleep_state.value
+ (b"\x00" * 10)
+ con.ALL_SENSOR_ID
Expand All @@ -126,7 +126,7 @@ def set_device_id(self, device_id: bytes, target_device_id: bytes = con.ALL_SENS
"""Set the device ID."""
if len(device_id) != 2 or len(target_device_id) != 2:
raise AttributeError(f"Device ID must be 4 bytes, found {len(device_id)}, and {len(target_device_id)}")
cmd = con.Commands.SET_DEVICE_ID.value + (b"\x00" * 10) + device_id + target_device_id
cmd = con.Command.SET_DEVICE_ID.value + (b"\x00" * 10) + device_id + target_device_id
self._send_command(cmd)

def query_device_id(self) -> DeviceIdResponse:
Expand All @@ -135,12 +135,7 @@ def query_device_id(self) -> DeviceIdResponse:

def request_working_period(self) -> None:
"""Retrieve the current working period for the device."""
cmd = (
con.Commands.SET_WORKING_PERIOD.value
+ con.WorkingPeriodMode.QUERY.value
+ (b"\x00" * 11)
+ con.ALL_SENSOR_ID
)
cmd = con.Command.SET_WORKING_PERIOD.value + con.OperationType.QUERY.value + (b"\x00" * 11) + con.ALL_SENSOR_ID
self._send_command(cmd)

def query_working_period(self) -> WorkingPeriodReadResponse:
Expand All @@ -158,8 +153,8 @@ def set_working_period(self, working_period: int) -> None:
if 0 >= working_period >= 30:
raise AttributeError("Working period must be between 0 and 30")
cmd = (
con.Commands.SET_WORKING_PERIOD.value
+ con.WorkingPeriodMode.SET_MODE.value
con.Command.SET_WORKING_PERIOD.value
+ con.OperationType.SET_MODE.value
+ bytes([working_period])
+ (b"\x00" * 10)
+ con.ALL_SENSOR_ID
Expand All @@ -168,7 +163,7 @@ def set_working_period(self, working_period: int) -> None:

def request_firmware_version(self) -> None:
"""Retrieve the firmware version from the device."""
cmd = con.Commands.CHECK_FIRMWARE_VERSION.value + (b"\x00" * 12) + con.ALL_SENSOR_ID
cmd = con.Command.CHECK_FIRMWARE_VERSION.value + (b"\x00" * 12) + con.ALL_SENSOR_ID
self._send_command(cmd)

def query_firmware_version(self) -> CheckFirmwareReadResponse:
Expand Down
32 changes: 6 additions & 26 deletions aqimon/read/sds011/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
SUBMIT_TYPE = b"\xb4"


class ResponseTypes(Enum):
class ResponseType(Enum):
"""Response types for commands.
GENERAL_RESPONSE is for all commands except query.
Expand All @@ -26,7 +26,7 @@ class ResponseTypes(Enum):
QUERY_RESPONSE = b"\xc0"


class Commands(Enum):
class Command(Enum):
"""Possible commands for the device."""

SET_REPORTING_MODE = b"\x02"
Expand All @@ -37,17 +37,17 @@ class Commands(Enum):
CHECK_FIRMWARE_VERSION = b"\x07"


class ReportingMode(Enum):
"""Sub command for reporting mode state.
class OperationType(Enum):
"""Operation type for many commands.
Can either query or set the value for the state.
Many commands have two modes, one for setting a value, and another for retrieving.
"""

QUERY = b"\x00"
SET_MODE = b"\x01"


class ReportingState(Enum):
class ReportingMode(Enum):
"""Reporting mode for the device.
ACTIVE mode means that the device is constantly returning read data from the device, and won't respond correctly
Expand All @@ -60,26 +60,6 @@ class ReportingState(Enum):
QUERYING = b"\x01"


class WorkingPeriodMode(Enum):
"""Sub command for working period state.
Can either query or set the value for the state.
"""

QUERY = b"\x00"
SET_MODE = b"\x01"


class SleepMode(Enum):
"""Sub command for sleep state.
Can either query or set the value for the state.
"""

QUERY = b"\x00"
SET_MODE = b"\x01"


class SleepState(Enum):
"""State of the device, either working or sleeping."""

Expand Down
34 changes: 15 additions & 19 deletions aqimon/read/sds011/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from .constants import (
HEAD,
TAIL,
Commands,
ResponseTypes,
SleepMode,
Command,
ResponseType,
SleepState,
OperationType,
ReportingMode,
ReportingState,
WorkingPeriodMode,
)
from .exceptions import (
ChecksumFailedException,
Expand All @@ -25,9 +23,7 @@
class ReadResponse:
"""Generic read response object for responses from SDS011."""

def __init__(
self, data: bytes, command_code: Commands, response_type: ResponseTypes = ResponseTypes.GENERAL_RESPONSE
):
def __init__(self, data: bytes, command_code: Command, response_type: ResponseType = ResponseType.GENERAL_RESPONSE):
"""Create a read response."""
if len(data) != 10:
raise IncompleteReadException()
Expand Down Expand Up @@ -56,7 +52,7 @@ def verify(self):

# Query responses don't validate the command code
if (
self.expected_response_type != ResponseTypes.QUERY_RESPONSE
self.expected_response_type != ResponseType.QUERY_RESPONSE
and bytes([self.data[0]]) != self.expected_command_code.value
):
raise IncorrectCommandCodeException(expected=self.expected_command_code.value, actual=self.data[0])
Expand All @@ -71,7 +67,7 @@ class QueryReadResponse(ReadResponse):

def __init__(self, data: bytes):
"""Create a query read response."""
super().__init__(data, command_code=Commands.QUERY, response_type=ResponseTypes.QUERY_RESPONSE)
super().__init__(data, command_code=Command.QUERY, response_type=ResponseType.QUERY_RESPONSE)

self.pm25: float = int.from_bytes(data[2:4], byteorder="little") / 10
self.pm10: float = int.from_bytes(data[4:6], byteorder="little") / 10
Expand All @@ -82,26 +78,26 @@ class ReportingModeReadResponse(ReadResponse):

def __init__(self, data: bytes):
"""Create a reporting mode response."""
super().__init__(data, command_code=Commands.SET_REPORTING_MODE)
self.mode_type = ReportingMode(self.data[1:2])
self.state = ReportingState(self.data[2:3])
super().__init__(data, command_code=Command.SET_REPORTING_MODE)
self.operation_type = OperationType(self.data[1:2])
self.state = ReportingMode(self.data[2:3])


class DeviceIdResponse(ReadResponse):
"""Device ID response."""

def __init__(self, data: bytes):
"""Create a device ID response."""
super().__init__(data, command_code=Commands.SET_DEVICE_ID)
super().__init__(data, command_code=Command.SET_DEVICE_ID)


class SleepWakeReadResponse(ReadResponse):
"""Sleep/Wake Response."""

def __init__(self, data: bytes):
"""Create a sleep/wake response."""
super().__init__(data, command_code=Commands.SET_SLEEP)
self.mode_type = SleepMode(self.data[1:2])
super().__init__(data, command_code=Command.SET_SLEEP)
self.operation_type = OperationType(self.data[1:2])
self.state = SleepState(self.data[2:3])


Expand All @@ -110,8 +106,8 @@ class WorkingPeriodReadResponse(ReadResponse):

def __init__(self, data: bytes):
"""Create a working period response."""
super().__init__(data, command_code=Commands.SET_WORKING_PERIOD)
self.mode_type = WorkingPeriodMode(self.data[1:2])
super().__init__(data, command_code=Command.SET_WORKING_PERIOD)
self.operation_type = OperationType(self.data[1:2])
self.interval: int = self.data[2]


Expand All @@ -120,7 +116,7 @@ class CheckFirmwareReadResponse(ReadResponse):

def __init__(self, data: bytes):
"""Create a firmware response."""
super().__init__(data, command_code=Commands.CHECK_FIRMWARE_VERSION)
super().__init__(data, command_code=Command.CHECK_FIRMWARE_VERSION)
self.year = self.data[1]
self.month = self.data[2]
self.day = self.data[3]
Loading

0 comments on commit 8c14648

Please sign in to comment.