From 3167333fd04224d913180962822c428db223c050 Mon Sep 17 00:00:00 2001 From: denpamusic Date: Wed, 23 Oct 2024 04:05:44 +0300 Subject: [PATCH] Add numeric data type. This data type implements NaN (not a number) check. --- pyplumio/helpers/data_types.py | 51 +++++++++++---- pyplumio/structures/boiler_power.py | 3 +- pyplumio/structures/fan_power.py | 3 +- pyplumio/structures/fuel_consumption.py | 3 +- pyplumio/structures/mixer_sensors.py | 3 +- pyplumio/structures/regulator_data.py | 13 +++- pyplumio/structures/temperatures.py | 3 +- pyplumio/structures/thermostat_sensors.py | 3 +- tests/helpers/test_data_types.py | 80 +++++++++++++++++++++++ 9 files changed, 136 insertions(+), 26 deletions(-) diff --git a/pyplumio/helpers/data_types.py b/pyplumio/helpers/data_types.py index e54973c8..c2f225ec 100644 --- a/pyplumio/helpers/data_types.py +++ b/pyplumio/helpers/data_types.py @@ -3,9 +3,12 @@ from __future__ import annotations from abc import ABC, abstractmethod +import math import socket import struct -from typing import ClassVar, Final, Generic, TypeVar +from typing import ClassVar, Final, Generic, SupportsFloat, TypeVar + +from pyplumio.const import BYTE_UNDEFINED T = TypeVar("T") DataTypeT = TypeVar("DataTypeT", bound="DataType") @@ -278,7 +281,33 @@ def size(self) -> int: return self._size -class SignedChar(BuiltInDataType[int]): +FloatT = TypeVar("FloatT", bound=SupportsFloat) + + +class NumericDataType(BuiltInDataType[FloatT], ABC): + """Represents a checkable data type.""" + + __slots__ = ("_nan",) + + _nan: bool + + def __init__(self, value: FloatT | None = None) -> None: + """Initialize a new checkable data type.""" + self._nan = False + super().__init__(value) + + def isnan(self) -> bool: + """Return True if value is not a number, False otherwise.""" + return self._nan + + def unpack(self, data: bytes) -> None: + """Unpack the data.""" + undefined = bytearray([BYTE_UNDEFINED] * self.size) + self._value = self._struct.unpack_from(data)[0] + self._nan = data[0 : self.size] == undefined or math.isnan(self._value) + + +class SignedChar(NumericDataType[int]): """Represents a signed char.""" __slots__ = () @@ -286,7 +315,7 @@ class SignedChar(BuiltInDataType[int]): _struct = struct.Struct(" dict[str, Any] | None: ATTR_TARGET_TEMP: message[offset + 4], ATTR_PUMP: bool(message[offset + 6] & 0x01), } - if not math.isnan(current_temp.value) + if not current_temp.isnan() else None ) finally: diff --git a/pyplumio/structures/regulator_data.py b/pyplumio/structures/regulator_data.py index 194468ce..82bf5eb5 100644 --- a/pyplumio/structures/regulator_data.py +++ b/pyplumio/structures/regulator_data.py @@ -4,7 +4,7 @@ from typing import Any, Final -from pyplumio.helpers.data_types import BitArray, DataType +from pyplumio.helpers.data_types import BitArray, DataType, NumericDataType from pyplumio.structures import StructureDecoder from pyplumio.structures.frame_versions import FrameVersionsStructure from pyplumio.structures.regulator_data_schema import ATTR_REGDATA_SCHEMA @@ -37,6 +37,9 @@ def _unpack_regulator_data(self, message: bytearray, data_type: DataType) -> Any self._bitarray_index = data_type.next(self._bitarray_index) self._offset += data_type.size + if isinstance(data_type, NumericDataType) and data_type.isnan(): + return None + return data_type.value def decode( @@ -58,8 +61,12 @@ def decode( ): self._bitarray_index = 0 data[ATTR_REGDATA] = { - param_id: self._unpack_regulator_data(message, data_type) - for param_id, data_type in schema + param_id: value + for param_id, value in { + param_id: self._unpack_regulator_data(message, data_type) + for param_id, data_type in schema + }.items() + if value is not None } return data, self._offset diff --git a/pyplumio/structures/temperatures.py b/pyplumio/structures/temperatures.py index 2bf88095..d4838c02 100644 --- a/pyplumio/structures/temperatures.py +++ b/pyplumio/structures/temperatures.py @@ -2,7 +2,6 @@ from __future__ import annotations -import math from typing import Any, Final from pyplumio.helpers.data_types import Float @@ -65,7 +64,7 @@ def decode( offset += 1 temp = Float.from_bytes(message, offset) offset += temp.size - if (not math.isnan(temp.value)) and 0 <= index < len(TEMPERATURES): + if not temp.isnan() and 0 <= index < len(TEMPERATURES): # Temperature exists and index is in the correct range. data[TEMPERATURES[index]] = temp.value diff --git a/pyplumio/structures/thermostat_sensors.py b/pyplumio/structures/thermostat_sensors.py index 0e462712..c617dfdf 100644 --- a/pyplumio/structures/thermostat_sensors.py +++ b/pyplumio/structures/thermostat_sensors.py @@ -3,7 +3,6 @@ from __future__ import annotations from collections.abc import Generator -import math from typing import Any, Final from pyplumio.const import ( @@ -53,7 +52,7 @@ def _unpack_thermostat_sensors( ATTR_CONTACTS: bool(contacts & self._contact_mask), ATTR_SCHEDULE: bool(contacts & self._schedule_mask), } - if not math.isnan(current_temp.value) and target_temp.value > 0 + if not current_temp.isnan() and target_temp.value > 0 else None ) finally: diff --git a/tests/helpers/test_data_types.py b/tests/helpers/test_data_types.py index 95cc7df8..b5eac40c 100644 --- a/tests/helpers/test_data_types.py +++ b/tests/helpers/test_data_types.py @@ -2,6 +2,7 @@ import pytest +from pyplumio.const import BYTE_UNDEFINED from pyplumio.helpers import data_types @@ -57,6 +58,10 @@ def test_signed_char() -> None: assert repr(data_types.SignedChar()) == "SignedChar()" assert data_type == data_types.SignedChar.from_bytes(buffer) assert data_type == 22 + assert not data_type.isnan() + undefined = bytearray([BYTE_UNDEFINED]) + data_type = data_types.SignedChar.from_bytes(undefined) + assert data_type.isnan() def test_short() -> None: @@ -70,6 +75,10 @@ def test_short() -> None: assert repr(data_types.Short()) == "Short()" assert data_type == data_types.Short.from_bytes(buffer) assert data_type == -20 + assert not data_type.isnan() + undefined = bytearray([BYTE_UNDEFINED, BYTE_UNDEFINED]) + data_type = data_types.Short.from_bytes(undefined) + assert data_type.isnan() def test_int() -> None: @@ -83,6 +92,12 @@ def test_int() -> None: assert repr(data_types.Int()) == "Int()" assert data_type == data_types.Int.from_bytes(buffer) assert data_type == -26111 + assert not data_type.isnan() + undefined = bytearray( + [BYTE_UNDEFINED, BYTE_UNDEFINED, BYTE_UNDEFINED, BYTE_UNDEFINED] + ) + data_type = data_types.Int.from_bytes(undefined) + assert data_type.isnan() def test_unsigned_char() -> None: @@ -96,6 +111,10 @@ def test_unsigned_char() -> None: assert repr(data_types.UnsignedChar()) == "UnsignedChar()" assert data_type == data_types.UnsignedChar.from_bytes(buffer) assert data_type == 3 + assert not data_type.isnan() + undefined = bytearray([BYTE_UNDEFINED]) + data_type = data_types.UnsignedChar.from_bytes(undefined) + assert data_type.isnan() def test_ushort() -> None: @@ -109,6 +128,10 @@ def test_ushort() -> None: assert repr(data_types.UnsignedShort()) == "UnsignedShort()" assert data_type == data_types.UnsignedShort.from_bytes(buffer) assert data_type == 298 + assert not data_type.isnan() + undefined = bytearray([BYTE_UNDEFINED, BYTE_UNDEFINED]) + data_type = data_types.UnsignedShort.from_bytes(undefined) + assert data_type.isnan() def test_uint() -> None: @@ -122,6 +145,12 @@ def test_uint() -> None: assert repr(data_types.UnsignedInt()) == "UnsignedInt()" assert data_type == data_types.UnsignedInt.from_bytes(buffer) assert data_type == 16282 + assert not data_type.isnan() + undefined = bytearray( + [BYTE_UNDEFINED, BYTE_UNDEFINED, BYTE_UNDEFINED, BYTE_UNDEFINED] + ) + data_type = data_types.UnsignedInt.from_bytes(undefined) + assert data_type.isnan() def test_float() -> None: @@ -135,6 +164,12 @@ def test_float() -> None: assert repr(data_types.Float()) == "Float()" assert data_type == data_types.Float.from_bytes(buffer) assert data_type == 12.0 + assert not data_type.isnan() + undefined = bytearray( + [BYTE_UNDEFINED, BYTE_UNDEFINED, BYTE_UNDEFINED, BYTE_UNDEFINED] + ) + data_type = data_types.Float.from_bytes(undefined) + assert data_type.isnan() def test_double() -> None: @@ -148,6 +183,21 @@ def test_double() -> None: assert repr(data_types.Double()) == "Double()" assert data_type == data_types.Double.from_bytes(buffer) assert data_type == 12.12 + assert not data_type.isnan() + undefined = bytearray( + [ + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + ] + ) + data_type = data_types.Double.from_bytes(undefined) + assert data_type.isnan() def test_bitarray() -> None: @@ -190,6 +240,21 @@ def test_int64() -> None: assert repr(data_types.Int64()) == "Int64()" assert data_type == data_types.Int64.from_bytes(buffer) assert data_type == -1498954336607141889 + assert not data_type.isnan() + undefined = bytearray( + [ + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + ] + ) + data_type = data_types.Int64.from_bytes(undefined) + assert data_type.isnan() def test_uint64() -> None: @@ -203,6 +268,21 @@ def test_uint64() -> None: assert repr(data_types.UInt64()) == "UInt64()" assert data_type == data_types.UInt64.from_bytes(buffer) assert data_type == 6148631004284209477 + assert not data_type.isnan() + undefined = bytearray( + [ + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + BYTE_UNDEFINED, + ] + ) + data_type = data_types.UInt64.from_bytes(undefined) + assert data_type.isnan() def test_ipv4() -> None: