Skip to content

Commit

Permalink
Fix/code improvements (#104)
Browse files Browse the repository at this point in the history
* clean/unify log messages
* rename advertisement data
* static methods and class variables
* stricter typing
  • Loading branch information
patman15 authored Nov 30, 2024
1 parent a44fa31 commit 7da0dfe
Show file tree
Hide file tree
Showing 14 changed files with 338 additions and 379 deletions.
13 changes: 5 additions & 8 deletions custom_components/bms_ble/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

from bleak.backends.device import BLEDevice
from bleak.exc import BleakError

from homeassistant.components.bluetooth import (
DOMAIN as BLUETOOTH_DOMAIN,
async_last_service_info,
)
from homeassistant.components.bluetooth import async_last_service_info
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
Expand Down Expand Up @@ -49,7 +46,7 @@ def __init__(
if service_info := async_last_service_info(
self.hass, address=self._mac, connectable=True
):
LOGGER.debug("device data: %s", service_info.as_dict())
LOGGER.debug("%s: advertisement: %s", self.name, service_info.as_dict())

# retrieve BMS class and initialize it
self._device: BaseBMS = bms_device
Expand Down Expand Up @@ -84,7 +81,7 @@ def link_quality(self) -> int:

async def async_shutdown(self) -> None:
"""Shutdown coordinator and any connection."""
LOGGER.debug("%s: shuting down BMS device", self.name)
LOGGER.debug("Shutting down BMS device (%s)", self.name)
await super().async_shutdown()
await self._device.disconnect()

Expand Down Expand Up @@ -118,5 +115,5 @@ async def _async_update_data(self) -> BMSsample:
)

self._link_q[-1] = True # set success
LOGGER.debug("BMS data sample %s", battery_info)
LOGGER.debug("%s: BMS data sample %s", self.name, battery_info)
return battery_info
25 changes: 14 additions & 11 deletions custom_components/bms_ble/plugins/basebms.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Base class defintion for battery management systems (BMS)."""

from abc import ABCMeta, abstractmethod
import asyncio.events
from collections.abc import Awaitable, Callable
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import Awaitable, Callable
from statistics import fmean
from typing import Any, Final

Expand All @@ -12,6 +12,10 @@
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from bleak_retry_connector import establish_connection
from homeassistant.components.bluetooth import BluetoothServiceInfoBleak
from homeassistant.components.bluetooth.match import ble_device_matches
from homeassistant.loader import BluetoothMatcherOptional
from homeassistant.util.unit_conversion import _HRS_TO_SECS

from custom_components.bms_ble.const import (
ATTR_BATTERY_CHARGING,
Expand All @@ -26,12 +30,6 @@
KEY_CELL_VOLTAGE,
KEY_TEMP_VALUE,
)
from homeassistant.components.bluetooth import BluetoothServiceInfoBleak
from homeassistant.components.bluetooth.match import (
BluetoothMatcherOptional,
ble_device_matches,
)
from homeassistant.util.unit_conversion import _HRS_TO_SECS

type BMSsample = dict[str, int | float | bool]

Expand Down Expand Up @@ -118,8 +116,8 @@ def _calc_values() -> set[str]:
"""
return set()

@classmethod
def _add_missing_values(cls, data: BMSsample, values: set[str]):
@staticmethod
def _add_missing_values(data: BMSsample, values: set[str]):
"""Calculate missing BMS values from existing ones.
data: data dictionary from BMS
Expand Down Expand Up @@ -214,7 +212,7 @@ async def _wait_event(self) -> None:

@abstractmethod
async def _async_update(self) -> BMSsample:
"""Return a dictionary of BMS values, where the keys need to match the keys in the SENSOR_TYPES list."""
"""Return a dictionary of BMS values (keys need to come from the SENSOR_TYPES list)."""

async def async_update(self) -> BMSsample:
"""Retrieve updated values from the BMS using method of the subclass."""
Expand All @@ -239,3 +237,8 @@ def crc_xmodem(data: bytearray) -> int:
for _ in range(8):
crc = (crc >> 1) ^ 0xA001 if crc % 2 else (crc >> 1)
return ((0xFF00 & crc) >> 8) | ((crc & 0xFF) << 8)


def crc_sum(frame: bytes) -> int:
"""Calculate frame CRC."""
return sum(frame) & 0xFF
92 changes: 43 additions & 49 deletions custom_components/bms_ble/plugins/cbtpwr_bms.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Module to support CBT Power Smart BMS."""

import asyncio
from collections.abc import Callable
import logging
from collections.abc import Callable
from typing import Any, Final

from bleak.backends.device import BLEDevice
from bleak.uuids import normalize_uuid_str
from homeassistant.util.unit_conversion import _HRS_TO_SECS

from custom_components.bms_ble.const import (
ATTR_BATTERY_CHARGING,
Expand All @@ -23,9 +24,8 @@
KEY_CELL_VOLTAGE,
KEY_DESIGN_CAP,
)
from homeassistant.util.unit_conversion import _HRS_TO_SECS

from .basebms import BaseBMS, BMSsample
from .basebms import BaseBMS, BMSsample, crc_sum

BAT_TIMEOUT: Final = 1
LOGGER: Final = logging.getLogger(__name__)
Expand All @@ -42,23 +42,23 @@ class BMS(BaseBMS):
LEN_POS: Final[int] = 3
CMD_POS: Final[int] = 2
CELL_VOLTAGE_CMDS: Final[list[int]] = [0x5, 0x6, 0x7, 0x8]
_FIELDS: Final[
list[tuple[str, int, int, int, bool, Callable[[int], int | float]]]
] = [
(ATTR_VOLTAGE, 0x0B, 4, 4, False, lambda x: float(x / 1000)),
(ATTR_CURRENT, 0x0B, 8, 4, True, lambda x: float(x / 1000)),
(ATTR_TEMPERATURE, 0x09, 4, 2, False, lambda x: x),
(ATTR_BATTERY_LEVEL, 0x0A, 4, 1, False, lambda x: x),
(KEY_DESIGN_CAP, 0x15, 4, 2, False, lambda x: x),
(ATTR_CYCLES, 0x15, 6, 2, False, lambda x: x),
(ATTR_RUNTIME, 0x0C, 14, 2, False, lambda x: float(x * _HRS_TO_SECS / 100)),
]
_CMDS: Final[list[int]] = list({field[1] for field in _FIELDS})

def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None:
"""Intialize private BMS members."""
super().__init__(LOGGER, self._notification_handler, ble_device, reconnect)
self._data: bytearray = bytearray()
self._FIELDS: Final[
list[tuple[str, int, int, int, bool, Callable[[int], int | float]]]
] = [
(ATTR_VOLTAGE, 0x0B, 4, 4, False, lambda x: float(x / 1000)),
(ATTR_CURRENT, 0x0B, 8, 4, True, lambda x: float(x / 1000)),
(ATTR_TEMPERATURE, 0x09, 4, 2, False, lambda x: x),
(ATTR_BATTERY_LEVEL, 0x0A, 4, 1, False, lambda x: x),
(KEY_DESIGN_CAP, 0x15, 4, 2, False, lambda x: x),
(ATTR_CYCLES, 0x15, 6, 2, False, lambda x: x),
(ATTR_RUNTIME, 0x0C, 14, 2, False, lambda x: float(x * _HRS_TO_SECS / 100)),
]
self._CMDS: Final[list[int]] = list({field[1] for field in self._FIELDS})

@staticmethod
def matcher_dict_list() -> list[dict[str, Any]]:
Expand Down Expand Up @@ -102,54 +102,48 @@ def _calc_values() -> set[str]:

def _notification_handler(self, _sender, data: bytearray) -> None:
"""Retrieve BMS data update."""

LOGGER.debug("(%s) Rx BLE data: %s", self._ble_device.name, data)
LOGGER.debug("%s: Received BLE data: %s", self.name, data)

# verify that data long enough
if (
len(data) < self.MIN_FRAME
or len(data) != self.MIN_FRAME + data[self.LEN_POS]
):
if len(data) < BMS.MIN_FRAME or len(data) != BMS.MIN_FRAME + data[BMS.LEN_POS]:
LOGGER.debug(
"(%s) incorrect frame length (%i): %s", self.name, len(data), data
"%s: incorrect frame length (%i): %s", self.name, len(data), data
)
return

if not data.startswith(self.HEAD) or not data.endswith(self.TAIL_RX):
LOGGER.debug("(%s) Incorrect frame start/end: %s", self.name, data)
if not data.startswith(BMS.HEAD) or not data.endswith(BMS.TAIL_RX):
LOGGER.debug("%s: incorrect frame start/end: %s", self.name, data)
return

crc = self._crc(data[len(self.HEAD) : len(data) + self.CRC_POS])
if data[self.CRC_POS] != crc:
crc = crc_sum(data[len(BMS.HEAD) : len(data) + BMS.CRC_POS])
if data[BMS.CRC_POS] != crc:
LOGGER.debug(
"(%s) Rx data CRC is invalid: 0x%x != 0x%x",
"%s: RX data CRC is invalid: 0x%X != 0x%X",
self.name,
data[len(data) + self.CRC_POS],
data[len(data) + BMS.CRC_POS],
crc,
)
return

self._data = data
self._data_event.set()

def _crc(self, frame: bytes) -> int:
"""Calculate CBT Power frame CRC."""
return sum(frame) & 0xFF

def _gen_frame(self, cmd: bytes, value: list[int] | None = None) -> bytes:
@staticmethod
def _gen_frame(cmd: bytes, value: list[int] | None = None) -> bytes:
"""Assemble a CBT Power BMS command."""
value = [] if value is None else value
assert len(value) <= 255
frame = bytes([*self.HEAD, cmd[0]])
frame = bytes([*BMS.HEAD, cmd[0]])
frame += bytes([len(value), *value])
frame += bytes([self._crc(frame[len(self.HEAD) :])])
frame += bytes([*self.TAIL_TX])
frame += bytes([crc_sum(frame[len(BMS.HEAD) :])])
frame += bytes([*BMS.TAIL_TX])
return frame

def _cell_voltages(self, data: bytearray) -> dict[str, float]:
@staticmethod
def _cell_voltages(data: bytearray) -> dict[str, float]:
"""Return cell voltages from status message."""
return {
f"{KEY_CELL_VOLTAGE}{idx+(data[self.CMD_POS]-5)*5}": int.from_bytes(
f"{KEY_CELL_VOLTAGE}{idx+(data[BMS.CMD_POS]-5)*5}": int.from_bytes(
data[4 + 2 * idx : 6 + 2 * idx],
byteorder="little",
signed=True,
Expand All @@ -162,25 +156,25 @@ async def _async_update(self) -> BMSsample:
"""Update battery status information."""
data = {}
resp_cache = {} # variable to avoid multiple queries with same command
for cmd in self._CMDS:
LOGGER.debug("(%s) request command 0x%X.", self.name, cmd)
for cmd in BMS._CMDS:
LOGGER.debug("%s: request command 0x%X.", self.name, cmd)
await self._client.write_gatt_char(
BMS.uuid_tx(), data=self._gen_frame(cmd.to_bytes(1))
BMS.uuid_tx(), data=BMS._gen_frame(cmd.to_bytes(1))
)
try:
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)
except TimeoutError:
continue
if cmd != self._data[self.CMD_POS]:
if cmd != self._data[BMS.CMD_POS]:
LOGGER.debug(
"(%s): incorrect response 0x%x to command 0x%x",
"%s:: incorrect response 0x%X to command 0x%X",
self.name,
self._data[self.CMD_POS],
self._data[BMS.CMD_POS],
cmd,
)
resp_cache[self._data[self.CMD_POS]] = self._data.copy()
resp_cache[self._data[BMS.CMD_POS]] = self._data.copy()

for field, cmd, pos, size, sign, fct in self._FIELDS:
for field, cmd, pos, size, sign, fct in BMS._FIELDS:
if resp_cache.get(cmd):
data |= {
field: fct(
Expand All @@ -191,15 +185,15 @@ async def _async_update(self) -> BMSsample:
}

voltages = {}
for cmd in self.CELL_VOLTAGE_CMDS:
for cmd in BMS.CELL_VOLTAGE_CMDS:
await self._client.write_gatt_char(
BMS.uuid_tx(), data=self._gen_frame(cmd.to_bytes(1))
BMS.uuid_tx(), data=BMS._gen_frame(cmd.to_bytes(1))
)
try:
await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT)
except TimeoutError:
break
voltages |= self._cell_voltages(self._data)
voltages |= BMS._cell_voltages(self._data)
if invalid := [k for k, v in voltages.items() if v == 0]:
for k in invalid:
voltages.pop(k)
Expand Down
Loading

0 comments on commit 7da0dfe

Please sign in to comment.