Skip to content

Commit

Permalink
Add numeric data type.
Browse files Browse the repository at this point in the history
This data type implements NaN (not a number) check.
  • Loading branch information
denpamusic committed Oct 23, 2024
1 parent e975c7e commit 3167333
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 26 deletions.
51 changes: 40 additions & 11 deletions pyplumio/helpers/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from __future__ import annotations

from abc import ABC, abstractmethod
import math
import socket
import struct
from typing import ClassVar, Final, Generic, TypeVar
from typing import ClassVar, Final, Generic, SupportsFloat, TypeVar

from pyplumio.const import BYTE_UNDEFINED

T = TypeVar("T")
DataTypeT = TypeVar("DataTypeT", bound="DataType")
Expand Down Expand Up @@ -278,79 +281,105 @@ def size(self) -> int:
return self._size


class SignedChar(BuiltInDataType[int]):
FloatT = TypeVar("FloatT", bound=SupportsFloat)


class NumericDataType(BuiltInDataType[FloatT], ABC):
"""Represents a checkable data type."""

__slots__ = ("_nan",)

_nan: bool

def __init__(self, value: FloatT | None = None) -> None:
"""Initialize a new checkable data type."""
self._nan = False
super().__init__(value)

def isnan(self) -> bool:
"""Return True if value is not a number, False otherwise."""
return self._nan

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
undefined = bytearray([BYTE_UNDEFINED] * self.size)
self._value = self._struct.unpack_from(data)[0]
self._nan = data[0 : self.size] == undefined or math.isnan(self._value)


class SignedChar(NumericDataType[int]):
"""Represents a signed char."""

__slots__ = ()

_struct = struct.Struct("<b")


class UnsignedChar(BuiltInDataType[int]):
class UnsignedChar(NumericDataType[int]):
"""Represents an unsigned char."""

__slots__ = ()

_struct = struct.Struct("<B")


class Short(BuiltInDataType[int]):
class Short(NumericDataType[int]):
"""Represents a 16-bit integer."""

__slots__ = ()

_struct = struct.Struct("<h")


class UnsignedShort(BuiltInDataType[int]):
class UnsignedShort(NumericDataType[int]):
"""Represents an unsigned 16-bit integer."""

__slots__ = ()

_struct = struct.Struct("<H")


class Int(BuiltInDataType[int]):
class Int(NumericDataType[int]):
"""Represents a 32-bit integer."""

__slots__ = ()

_struct = struct.Struct("<i")


class UnsignedInt(BuiltInDataType[int]):
class UnsignedInt(NumericDataType[int]):
"""Represents a unsigned 32-bit integer."""

__slots__ = ()

_struct = struct.Struct("<I")


class Float(BuiltInDataType[float]):
class Float(NumericDataType[float]):
"""Represents a float."""

__slots__ = ()

_struct = struct.Struct("<f")


class Double(BuiltInDataType[float]):
class Double(NumericDataType[float]):
"""Represents a double."""

__slots__ = ()

_struct = struct.Struct("<d")


class Int64(BuiltInDataType[int]):
class Int64(NumericDataType[int]):
"""Represents a 64-bit signed integer."""

__slots__ = ()

_struct = struct.Struct("<q")


class UInt64(BuiltInDataType[int]):
class UInt64(NumericDataType[int]):
"""Represents a 64-bit unsigned integer."""

__slots__ = ()
Expand Down
3 changes: 1 addition & 2 deletions pyplumio/structures/boiler_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import math
from typing import Any, Final

from pyplumio.helpers.data_types import Float
Expand All @@ -24,7 +23,7 @@ def decode(
boiler_power = Float.from_bytes(message, offset)
offset += boiler_power.size

if math.isnan(boiler_power.value):
if boiler_power.isnan():
return ensure_dict(data), offset

return ensure_dict(data, {ATTR_BOILER_POWER: boiler_power.value}), offset
3 changes: 1 addition & 2 deletions pyplumio/structures/fan_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import math
from typing import Any, Final

from pyplumio.helpers.data_types import Float
Expand All @@ -24,7 +23,7 @@ def decode(
fan_power = Float.from_bytes(message, offset)
offset += fan_power.size

if math.isnan(fan_power.value):
if fan_power.isnan():
return ensure_dict(data), offset

return ensure_dict(data, {ATTR_FAN_POWER: fan_power.value}), offset
3 changes: 1 addition & 2 deletions pyplumio/structures/fuel_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import math
from typing import Any, Final

from pyplumio.helpers.data_types import Float
Expand All @@ -24,7 +23,7 @@ def decode(
fuel_consumption = Float.from_bytes(message, offset)
offset += fuel_consumption.size

if math.isnan(fuel_consumption.value):
if fuel_consumption.isnan():
return ensure_dict(data), offset

return (
Expand Down
3 changes: 1 addition & 2 deletions pyplumio/structures/mixer_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

from collections.abc import Generator
import math
from typing import Any, Final

from pyplumio.const import ATTR_CURRENT_TEMP, ATTR_TARGET_TEMP
Expand Down Expand Up @@ -37,7 +36,7 @@ def _unpack_mixer_sensors(self, message: bytearray) -> dict[str, Any] | None:
ATTR_TARGET_TEMP: message[offset + 4],
ATTR_PUMP: bool(message[offset + 6] & 0x01),
}
if not math.isnan(current_temp.value)
if not current_temp.isnan()
else None
)
finally:
Expand Down
13 changes: 10 additions & 3 deletions pyplumio/structures/regulator_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from typing import Any, Final

from pyplumio.helpers.data_types import BitArray, DataType
from pyplumio.helpers.data_types import BitArray, DataType, NumericDataType
from pyplumio.structures import StructureDecoder
from pyplumio.structures.frame_versions import FrameVersionsStructure
from pyplumio.structures.regulator_data_schema import ATTR_REGDATA_SCHEMA
Expand Down Expand Up @@ -37,6 +37,9 @@ def _unpack_regulator_data(self, message: bytearray, data_type: DataType) -> Any
self._bitarray_index = data_type.next(self._bitarray_index)

self._offset += data_type.size
if isinstance(data_type, NumericDataType) and data_type.isnan():
return None

return data_type.value

def decode(
Expand All @@ -58,8 +61,12 @@ def decode(
):
self._bitarray_index = 0
data[ATTR_REGDATA] = {
param_id: self._unpack_regulator_data(message, data_type)
for param_id, data_type in schema
param_id: value
for param_id, value in {
param_id: self._unpack_regulator_data(message, data_type)
for param_id, data_type in schema
}.items()
if value is not None
}

return data, self._offset
3 changes: 1 addition & 2 deletions pyplumio/structures/temperatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import math
from typing import Any, Final

from pyplumio.helpers.data_types import Float
Expand Down Expand Up @@ -65,7 +64,7 @@ def decode(
offset += 1
temp = Float.from_bytes(message, offset)
offset += temp.size
if (not math.isnan(temp.value)) and 0 <= index < len(TEMPERATURES):
if not temp.isnan() and 0 <= index < len(TEMPERATURES):
# Temperature exists and index is in the correct range.
data[TEMPERATURES[index]] = temp.value

Expand Down
3 changes: 1 addition & 2 deletions pyplumio/structures/thermostat_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

from collections.abc import Generator
import math
from typing import Any, Final

from pyplumio.const import (
Expand Down Expand Up @@ -53,7 +52,7 @@ def _unpack_thermostat_sensors(
ATTR_CONTACTS: bool(contacts & self._contact_mask),
ATTR_SCHEDULE: bool(contacts & self._schedule_mask),
}
if not math.isnan(current_temp.value) and target_temp.value > 0
if not current_temp.isnan() and target_temp.value > 0
else None
)
finally:
Expand Down
Loading

0 comments on commit 3167333

Please sign in to comment.