Skip to content

Commit

Permalink
Move unpackers.
Browse files Browse the repository at this point in the history
Move struct unpackers from utils to the data_types module.
  • Loading branch information
denpamusic committed Oct 20, 2023
1 parent 50828f5 commit 476160a
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 68 deletions.
37 changes: 27 additions & 10 deletions pyplumio/helpers/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,26 @@

from abc import ABC, abstractmethod
import socket
import struct
from typing import Any, Type

from pyplumio import util
# Data type unpackers.
unpack_float = struct.Struct("<f").unpack
unpack_char = struct.Struct("<b").unpack
unpack_short = struct.Struct("<h").unpack
unpack_ushort = struct.Struct("<H").unpack
unpack_int = struct.Struct("<i").unpack
unpack_uint = struct.Struct("<I").unpack
unpack_double = struct.Struct("<d").unpack
unpack_int64 = struct.Struct("<q").unpack
unpack_uint64 = struct.Struct("<Q").unpack


def unpack_string(data: bytearray, offset: int = 0) -> str:
"""Unpack a string."""
strlen = data[offset]
offset += 1
return data[offset : offset + strlen + 1].decode()


class DataType(ABC):
Expand Down Expand Up @@ -65,7 +82,7 @@ class SignedChar(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_char(self._cut_data(data))[0]
self._value = unpack_char(self._cut_data(data))[0]


class Short(DataType):
Expand All @@ -75,7 +92,7 @@ class Short(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_short(self._cut_data(data))[0]
self._value = unpack_short(self._cut_data(data))[0]


class Int(DataType):
Expand All @@ -85,7 +102,7 @@ class Int(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_int(self._cut_data(data))[0]
self._value = unpack_int(self._cut_data(data))[0]


class Byte(DataType):
Expand All @@ -105,7 +122,7 @@ class UnsignedShort(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_ushort(self._cut_data(data))[0]
self._value = unpack_ushort(self._cut_data(data))[0]


class UnsignedInt(DataType):
Expand All @@ -115,7 +132,7 @@ class UnsignedInt(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_uint(self._cut_data(data))[0]
self._value = unpack_uint(self._cut_data(data))[0]


class Float(DataType):
Expand All @@ -125,7 +142,7 @@ class Float(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_float(self._cut_data(data))[0]
self._value = unpack_float(self._cut_data(data))[0]


class Undefined8(DataType):
Expand All @@ -145,7 +162,7 @@ class Double(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_double(self._cut_data(data))[0]
self._value = unpack_double(self._cut_data(data))[0]


class Boolean(DataType):
Expand Down Expand Up @@ -187,7 +204,7 @@ class Int64(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_int64(self._cut_data(data))[0]
self._value = unpack_int64(self._cut_data(data))[0]


class UInt64(DataType):
Expand All @@ -197,7 +214,7 @@ class UInt64(DataType):

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = util.unpack_uint64(self._cut_data(data))[0]
self._value = unpack_uint64(self._cut_data(data))[0]


class IPv4(DataType):
Expand Down
10 changes: 5 additions & 5 deletions pyplumio/structures/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from datetime import datetime
from typing import Any, Final, Generator

from pyplumio import util
from pyplumio.const import AlertType
from pyplumio.helpers.data_types import unpack_uint
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand Down Expand Up @@ -65,10 +65,10 @@ def _unpack_alert(self, message: bytearray) -> Alert:
except ValueError:
pass

from_seconds = util.unpack_uint(message[self._offset + 1 : self._offset + 5])[0]
to_seconds = util.unpack_uint(
message[self._offset + 5 : self._offset + ALERT_SIZE]
)[0]
from_seconds = unpack_uint(message[self._offset + 1 : self._offset + 5])[0]
to_seconds = unpack_uint(message[self._offset + 5 : self._offset + ALERT_SIZE])[
0
]

from_dt = _convert_to_datetime(from_seconds)
to_dt = _convert_to_datetime(to_seconds) if to_seconds > 0 else None
Expand Down
11 changes: 5 additions & 6 deletions pyplumio/structures/data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from typing import Final

from pyplumio import util
from pyplumio.helpers.data_types import DATA_TYPES, DataType
from pyplumio.helpers.data_types import DATA_TYPES, DataType, unpack_ushort
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -21,9 +20,9 @@ class DataSchemaStructure(StructureDecoder):
def _unpack_block(self, message: bytearray) -> tuple[int, DataType]:
"""Unpack a block."""
param_type = message[self._offset]
param_id = util.unpack_ushort(
message[self._offset + 1 : self._offset + BLOCK_SIZE]
)[0]
param_id = unpack_ushort(message[self._offset + 1 : self._offset + BLOCK_SIZE])[
0
]

try:
return param_id, DATA_TYPES[param_type]()
Expand All @@ -34,7 +33,7 @@ def decode(
self, message: bytearray, offset: int = 0, data: EventDataType | None = None
) -> tuple[EventDataType, int]:
"""Decode bytes and return message data and offset."""
blocks = util.unpack_ushort(message[offset : offset + 2])[0]
blocks = unpack_ushort(message[offset : offset + 2])[0]
self._offset = offset + 2
if blocks == 0:
return ensure_device_data(data), self._offset
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/fan_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import math
from typing import Final

from pyplumio import util
from pyplumio.helpers.data_types import unpack_float
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -20,7 +20,7 @@ def decode(
self, message: bytearray, offset: int = 0, data: EventDataType | None = None
) -> tuple[EventDataType, int]:
"""Decode bytes and return message data and offset."""
fan_power = util.unpack_float(message[offset : offset + FAN_POWER_SIZE])[0]
fan_power = unpack_float(message[offset : offset + FAN_POWER_SIZE])[0]
if not math.isnan(fan_power):
data = ensure_device_data(data, {ATTR_FAN_POWER: fan_power})

Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/frame_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from typing import Final

from pyplumio import util
from pyplumio.const import FrameType
from pyplumio.helpers.data_types import unpack_ushort
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -26,7 +26,7 @@ def _unpack_frame_versions(self, message: bytearray) -> tuple[FrameType | int, i
except ValueError:
pass

version = util.unpack_ushort(
version = unpack_ushort(
message[self._offset + 1 : self._offset + FRAME_VERSION_SIZE]
)[0]

Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/fuel_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import math
from typing import Final

from pyplumio import util
from pyplumio.helpers.data_types import unpack_float
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -20,7 +20,7 @@ def decode(
self, message: bytearray, offset: int = 0, data: EventDataType | None = None
) -> tuple[EventDataType, int]:
"""Decode bytes and return message data and offset."""
fuel_consumption = util.unpack_float(message[offset : offset + 4])[0]
fuel_consumption = unpack_float(message[offset : offset + 4])[0]
if not math.isnan(fuel_consumption):
return (
ensure_device_data(data, {ATTR_FUEL_CONSUMPTION: fuel_consumption}),
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/lambda_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import math
from typing import Final

from pyplumio import util
from pyplumio.const import BYTE_UNDEFINED
from pyplumio.helpers.data_types import unpack_ushort
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -26,7 +26,7 @@ def decode(
if message[offset] == BYTE_UNDEFINED:
return ensure_device_data(data), offset + 1

level = util.unpack_ushort(message[offset + 2 : offset + LAMBDA_LEVEL_SIZE])[0]
level = unpack_ushort(message[offset + 2 : offset + LAMBDA_LEVEL_SIZE])[0]
return (
ensure_device_data(
data,
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/mixer_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import math
from typing import Final, Generator

from pyplumio import util
from pyplumio.const import ATTR_CURRENT_TEMP, ATTR_TARGET_TEMP
from pyplumio.helpers.data_types import unpack_float
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -23,7 +23,7 @@ class MixerSensorsStructure(StructureDecoder):

def _unpack_mixer_sensors(self, message: bytearray) -> EventDataType | None:
"""Unpack sensors for a mixer."""
current_temp = util.unpack_float(message[self._offset : self._offset + 4])[0]
current_temp = unpack_float(message[self._offset : self._offset + 4])[0]
try:
if not math.isnan(current_temp):
return {
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/network_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import socket
from typing import Final

from pyplumio import util
from pyplumio.const import EncryptionType
from pyplumio.helpers.data_types import unpack_string
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import Structure, ensure_device_data

Expand Down Expand Up @@ -95,7 +95,7 @@ def decode(
encryption=EncryptionType(int(message[offset + 26])),
signal_quality=int(message[offset + 27]),
status=bool(message[offset + 28]),
ssid=util.unpack_string(message, offset + 33),
ssid=unpack_string(message, offset + 33),
),
server_status=bool(message[offset + 25]),
)
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/output_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing import Final

from pyplumio import util
from pyplumio.helpers.data_types import unpack_uint
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -22,7 +22,7 @@ def decode(
self, message: bytearray, offset: int = 0, data: EventDataType | None = None
) -> tuple[EventDataType, int]:
"""Decode bytes and return message data and offset."""
output_flags = util.unpack_uint(message[offset : offset + OUTPUT_FLAGS_SIZE])[0]
output_flags = unpack_uint(message[offset : offset + OUTPUT_FLAGS_SIZE])[0]
return (
ensure_device_data(
data,
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import math
from typing import Final

from pyplumio import util
from pyplumio.helpers.data_types import unpack_uint
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand Down Expand Up @@ -54,7 +54,7 @@ def decode(
self, message: bytearray, offset: int = 0, data: EventDataType | None = None
) -> tuple[EventDataType, int]:
"""Decode bytes and return message data and offset."""
outputs = util.unpack_uint(message[offset : offset + OUTPUTS_SIZE])[0]
outputs = unpack_uint(message[offset : offset + OUTPUTS_SIZE])[0]
return (
ensure_device_data(
data,
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/power.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import math
from typing import Final

from pyplumio import util
from pyplumio.helpers.data_types import unpack_float
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -20,7 +20,7 @@ def decode(
self, message: bytearray, offset: int = 0, data: EventDataType | None = None
) -> tuple[EventDataType, int]:
"""Decode bytes and return message data and offset."""
power = util.unpack_float(message[offset : offset + 4])[0]
power = unpack_float(message[offset : offset + 4])[0]
if not math.isnan(power):
return ensure_device_data(data, {ATTR_POWER: power}), offset + POWER_SIZE

Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/product_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import struct
from typing import Final

from pyplumio import util
from pyplumio.const import ProductType
from pyplumio.helpers.data_types import unpack_string
from pyplumio.helpers.typing import EventDataType
from pyplumio.helpers.uid import unpack_uid
from pyplumio.structures import StructureDecoder, ensure_device_data
Expand Down Expand Up @@ -45,7 +45,7 @@ def decode(
uid=unpack_uid(message, offset),
logo=logo,
image=image,
model=util.unpack_string(message, offset + 16),
model=unpack_string(message, offset + 16),
)
},
),
Expand Down
4 changes: 2 additions & 2 deletions pyplumio/structures/temperatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import math
from typing import Final

from pyplumio import util
from pyplumio.helpers.data_types import unpack_float
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand Down Expand Up @@ -61,7 +61,7 @@ def decode(
offset += 1
for _ in range(temperatures):
index = message[offset]
temp = util.unpack_float(message[offset + 1 : offset + 5])[0]
temp = unpack_float(message[offset + 1 : offset + 5])[0]

if (not math.isnan(temp)) and 0 <= index < len(TEMPERATURES):
# Temperature exists and index is in the correct range.
Expand Down
8 changes: 3 additions & 5 deletions pyplumio/structures/thermostat_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import math
from typing import Final, Generator

from pyplumio import util
from pyplumio.const import (
ATTR_CURRENT_TEMP,
ATTR_SCHEDULE,
ATTR_STATE,
ATTR_TARGET_TEMP,
BYTE_UNDEFINED,
)
from pyplumio.helpers.data_types import unpack_float
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder, ensure_device_data

Expand All @@ -33,10 +33,8 @@ def _unpack_thermostat_sensors(
self, message: bytearray, contacts: int
) -> EventDataType | None:
"""Unpack sensors for a thermostat."""
current_temp = util.unpack_float(message[self._offset + 1 : self._offset + 5])[
0
]
target_temp = util.unpack_float(
current_temp = unpack_float(message[self._offset + 1 : self._offset + 5])[0]
target_temp = unpack_float(
message[self._offset + 5 : self._offset + THERMOSTAT_SENSORS_SIZE]
)[0]

Expand Down
Loading

0 comments on commit 476160a

Please sign in to comment.