diff --git a/custom_components/bms_ble/plugins/basebms.py b/custom_components/bms_ble/plugins/basebms.py index 177c86c..fa7a282 100644 --- a/custom_components/bms_ble/plugins/basebms.py +++ b/custom_components/bms_ble/plugins/basebms.py @@ -74,8 +74,7 @@ def calc_values(cls, data: BMSsample, values: set[str]): """ def can_calc(value: str, using: frozenset[str]) -> bool: - """Check that value to add does not exists, is requested and - the necessary parameters are available.""" + """Check that value to add does not exists, is requested and the necessary parameters are available.""" return (value in values) and (value not in data) and using.issubset(data) # calculate cycle capacity from voltage and cycle charge diff --git a/custom_components/bms_ble/plugins/daly_bms.py b/custom_components/bms_ble/plugins/daly_bms.py index d8b4d5d..7907c1b 100644 --- a/custom_components/bms_ble/plugins/daly_bms.py +++ b/custom_components/bms_ble/plugins/daly_bms.py @@ -55,7 +55,6 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None: self._client: BleakClient | None = None self._data: bytearray | None = None self._data_event = asyncio.Event() - self._connected = False # flag to indicate active BLE connection self._FIELDS: list[tuple[str, int, Callable[[int], int | float]]] = [ (ATTR_VOLTAGE, 80 + self.HEAD_LEN, lambda x: float(x / 10)), (ATTR_CURRENT, 82 + self.HEAD_LEN, lambda x: float((x - 30000) / 10)), @@ -95,13 +94,12 @@ async def _wait_event(self) -> None: await self._data_event.wait() self._data_event.clear() - def _on_disconnect(self, client: BleakClient) -> None: + def _on_disconnect(self, _client: BleakClient) -> None: """Disconnect callback function.""" - LOGGER.debug("Disconnected from BMS (%s)", client.address) - self._connected = False + LOGGER.debug("Disconnected from BMS (%s)", self._ble_device.name) - def _notification_handler(self, sender, data: bytearray) -> None: + def _notification_handler(self, _sender, data: bytearray) -> None: LOGGER.debug("Received BLE data: %s", data) if ( @@ -124,7 +122,7 @@ def _notification_handler(self, sender, data: bytearray) -> None: async def _connect(self) -> None: """Connect to the BMS and setup notification if not connected.""" - if not self._connected: + if self._client is None or not self._client.is_connected: LOGGER.debug("Connecting BMS (%s)", self._ble_device.name) self._client = BleakClient( self._ble_device, @@ -133,14 +131,13 @@ async def _connect(self) -> None: ) await self._client.connect() await self._client.start_notify(UUID_RX, self._notification_handler) - self._connected = True else: LOGGER.debug("BMS %s already connected", self._ble_device.name) async def disconnect(self) -> None: """Disconnect the BMS and includes stoping notifications.""" - if self._client and self._connected: + if self._client and self._client.is_connected: LOGGER.debug("Disconnecting BMS (%s)", self._ble_device.name) try: self._data_event.clear() @@ -148,8 +145,6 @@ async def disconnect(self) -> None: except BleakError: LOGGER.warning("Disconnect failed!") - self._client = None - async def async_update(self) -> BMSsample: """Update battery status information.""" await self._connect() diff --git a/custom_components/bms_ble/plugins/jbd_bms.py b/custom_components/bms_ble/plugins/jbd_bms.py index 4b3c692..985aa37 100644 --- a/custom_components/bms_ble/plugins/jbd_bms.py +++ b/custom_components/bms_ble/plugins/jbd_bms.py @@ -54,7 +54,6 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None: self._data: bytearray | None = None self._data_final: bytearray | None = None self._data_event = asyncio.Event() - self._connected = False # flag to indicate active BLE connection self._FIELDS: list[tuple[str, int, int, bool, Callable[[int], int | float]]] = [ (KEY_TEMP_SENS, 26, 1, False, lambda x: x), (ATTR_VOLTAGE, 4, 2, False, lambda x: float(x / 100)), @@ -83,13 +82,12 @@ async def _wait_event(self) -> None: await self._data_event.wait() self._data_event.clear() - def _on_disconnect(self, client: BleakClient) -> None: + def _on_disconnect(self, _client: BleakClient) -> None: """Disconnect callback function.""" LOGGER.debug("Disconnected from BMS (%s)", self._ble_device.name) - self._connected = False - def _notification_handler(self, sender, data: bytearray) -> None: + def _notification_handler(self, _sender, data: bytearray) -> None: if self._data_event.is_set(): return @@ -137,7 +135,7 @@ def _notification_handler(self, sender, data: bytearray) -> None: async def _connect(self) -> None: """Connect to the BMS and setup notification if not connected.""" - if not self._connected: + if self._client is None or not self._client.is_connected: LOGGER.debug("Connecting BMS (%s)", self._ble_device.name) self._client = BleakClient( self._ble_device, @@ -146,14 +144,13 @@ async def _connect(self) -> None: ) await self._client.connect() await self._client.start_notify(UUID_RX, self._notification_handler) - self._connected = True else: LOGGER.debug("BMS %s already connected", self._ble_device.name) async def disconnect(self) -> None: """Disconnect the BMS and includes stoping notifications.""" - if self._client and self._connected: + if self._client and self._client.is_connected: LOGGER.debug("Disconnecting BMS (%s)", self._ble_device.name) try: self._data_event.clear() @@ -161,8 +158,6 @@ async def disconnect(self) -> None: except BleakError: LOGGER.warning("Disconnect failed!") - self._client = None - def _crc(self, frame: bytes) -> int: """Calculate JBD frame CRC.""" return 0x10000 - sum(frame) diff --git a/custom_components/bms_ble/plugins/jikong_bms.py b/custom_components/bms_ble/plugins/jikong_bms.py index 073fe96..aabf2a9 100644 --- a/custom_components/bms_ble/plugins/jikong_bms.py +++ b/custom_components/bms_ble/plugins/jikong_bms.py @@ -53,7 +53,6 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None: self._data: bytearray | None = None self._data_final: bytearray | None = None self._data_event = asyncio.Event() - self._connected = False # flag to indicate active BLE connection self._char_write_handle: int | None = None self._FIELDS: list[tuple[str, int, int, bool, Callable[[int], int | float]]] = [ (KEY_CELL_COUNT, 70, 4, False, lambda x: x.bit_count()), @@ -87,13 +86,12 @@ async def _wait_event(self) -> None: await self._data_event.wait() self._data_event.clear() - def _on_disconnect(self, client: BleakClient) -> None: + def _on_disconnect(self, _client: BleakClient) -> None: """Disconnect callback function.""" LOGGER.debug("Disconnected from BMS (%s)", self._ble_device.name) - self._connected = False - def _notification_handler(self, sender, data: bytearray) -> None: + def _notification_handler(self, _sender, data: bytearray) -> None: """Retrieve BMS data update.""" if data[0 : len(self.BT_MODULE_MSG)] == self.BT_MODULE_MSG: @@ -139,7 +137,7 @@ def _notification_handler(self, sender, data: bytearray) -> None: async def _connect(self) -> None: """Connect to the BMS and setup notification if not connected.""" - if not self._connected: + if self._client is None or not self._client.is_connected: LOGGER.debug("Connecting BMS (%s)", self._ble_device.name) self._client = BleakClient( self._ble_device, @@ -171,7 +169,7 @@ async def _connect(self) -> None: "(%s) Failed to detect characteristics", self._ble_device.name ) await self._client.disconnect() - return + raise ConnectionError(f"Unable to connect to {self._ble_device.name}.") LOGGER.debug( "(%s) Using characteristics handle #%i (notify), #%i (write)", self._ble_device.name, @@ -186,15 +184,13 @@ async def _connect(self) -> None: await self._client.write_gatt_char( self._char_write_handle or 0, data=self._cmd(b"\x97") ) - - self._connected = True else: LOGGER.debug("BMS %s already connected", self._ble_device.name) async def disconnect(self) -> None: """Disconnect the BMS and includes stoping notifications.""" - if self._client and self._connected: + if self._client and self._client.is_connected: LOGGER.debug("Disconnecting BMS (%s)", self._ble_device.name) try: self._data_event.clear() @@ -202,8 +198,6 @@ async def disconnect(self) -> None: except BleakError: LOGGER.warning("Disconnect failed!") - self._client = None - def _crc(self, frame: bytes) -> int: """Calculate Jikong frame CRC.""" return sum(frame) & 0xFF @@ -244,13 +238,9 @@ def _decode_data(self, data: bytearray) -> BMSsample: async def async_update(self) -> BMSsample: """Update battery status information.""" + await self._connect() assert self._client is not None - if not self._connected: - LOGGER.debug( - "Update request, but device (%s) not connected", self._ble_device.name - ) - return {} if not self._data_event.is_set(): # request cell info (only if data is not constantly published) diff --git a/custom_components/bms_ble/plugins/ogt_bms.py b/custom_components/bms_ble/plugins/ogt_bms.py index 9d7b1c6..ca0119b 100644 --- a/custom_components/bms_ble/plugins/ogt_bms.py +++ b/custom_components/bms_ble/plugins/ogt_bms.py @@ -2,7 +2,6 @@ import asyncio from collections.abc import Callable -from enum import IntEnum import logging from typing import Any @@ -38,25 +37,21 @@ UUID_SERVICE = normalize_uuid_str("fff0") -class REG(IntEnum): - """Field list for _REGISTER constant.""" - - NAME = 0 - LEN = 1 - FCT = 2 - - class BMS(BaseBMS): """Offgridtec LiFePO4 Smart Pro type A and type B battery class implementation.""" + IDX_NAME = 0 + IDX_LEN = 1 + IDX_FCT = 2 + def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None: """Intialize private BMS members.""" self._reconnect = reconnect self._ble_device = ble_device - assert self._ble_device.name is not None self._client: BleakClient | None = None self._data_event = asyncio.Event() - self._connected = False # flag to indicate active BLE connection + + assert self._ble_device.name is not None self._type = self._ble_device.name[9] self._key = sum( CRYPT_SEQ[int(c, 16)] for c in (f"{int(self._ble_device.name[10:]):0>4X}") @@ -97,7 +92,7 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None: 18: (ATTR_RUNTIME, 2, lambda x: int(x * 60)), 23: (ATTR_CYCLES, 2, None), } - # add cell voltage registers, note need to be last! + # add cell voltage registers, note: need to be last! self._REGISTERS.update( { 63 @@ -139,6 +134,7 @@ async def async_update(self) -> BMSsample: """Update battery status information.""" await self._connect() + assert self._client is not None self._values = {} for key in list(self._REGISTERS): @@ -146,7 +142,9 @@ async def async_update(self) -> BMSsample: try: await asyncio.wait_for(self._wait_event(), timeout=BAT_TIMEOUT) except TimeoutError: - LOGGER.debug("Reading %s timed out", self._REGISTERS[key][REG.NAME]) + LOGGER.debug( + "Reading %s timed out", self._REGISTERS[key][self.IDX_NAME] + ) if key > 48 and f"{KEY_CELL_VOLTAGE}{64-key}" not in self._values: break @@ -160,11 +158,10 @@ async def async_update(self) -> BMSsample: await self.disconnect() return self._values - def _on_disconnect(self, client: BleakClient) -> None: - """Disconnect callback for Bleak.""" + def _on_disconnect(self, _client: BleakClient) -> None: + """Disconnect callback function.""" - LOGGER.debug("Disconnected from BMS (%s)", client.address) - self._connected = False + LOGGER.debug("Disconnected from BMS (%s)", self._ble_device.name) def _notification_handler(self, sender, data: bytearray) -> None: LOGGER.debug("Received BLE data: %s", data) @@ -173,7 +170,7 @@ def _notification_handler(self, sender, data: bytearray) -> None: # check that descrambled message is valid and from the right characteristic if valid and sender.uuid == UUID_RX: - name, length, func = self._REGISTERS[reg] + name, _length, func = self._REGISTERS[reg] value = func(nat_value) if func else nat_value LOGGER.debug( "Decoded data: reg: %s (#%i), raw: %i, value: %f", @@ -190,7 +187,7 @@ def _notification_handler(self, sender, data: bytearray) -> None: async def _connect(self) -> None: """Connect to the BMS and setup notification if not connected.""" - if not self._connected: + if self._client is None or not self._client.is_connected: LOGGER.debug("Connecting BMS (%s)", self._ble_device.name) self._client = BleakClient( self._ble_device, @@ -199,14 +196,13 @@ async def _connect(self) -> None: ) await self._client.connect() await self._client.start_notify(UUID_RX, self._notification_handler) - self._connected = True else: LOGGER.debug("BMS %s already connected", self._ble_device.name) async def disconnect(self) -> None: """Disconnect the BMS, includes stoping notifications.""" - if self._client and self._connected: + if self._client and self._client.is_connected: LOGGER.debug("Disconnecting BMS (%s)", self._ble_device.name) try: self._data_event.clear() @@ -214,8 +210,6 @@ async def disconnect(self) -> None: except BleakError: LOGGER.warning("Disconnect failed!") - self._client = None - def _ogt_response(self, resp: bytearray) -> tuple[bool, int, int]: """Descramble a response from the BMS.""" @@ -236,7 +230,9 @@ def _ogt_response(self, resp: bytearray) -> tuple[bool, int, int]: def _ogt_command(self, command: int) -> bytes: """Put together an scambled query to the BMS.""" - cmd = f"{self._HEADER}{command:0>2X}{self._REGISTERS[command][REG.LEN]:0>2X}" + cmd = ( + f"{self._HEADER}{command:0>2X}{self._REGISTERS[command][self.IDX_LEN]:0>2X}" + ) LOGGER.debug("command: %s", cmd) return bytearray(ord(cmd[i]) ^ self._key for i in range(len(cmd))) diff --git a/custom_components/bms_ble/plugins/seplos_bms.py b/custom_components/bms_ble/plugins/seplos_bms.py index 30a5a27..69cf041 100644 --- a/custom_components/bms_ble/plugins/seplos_bms.py +++ b/custom_components/bms_ble/plugins/seplos_bms.py @@ -65,7 +65,6 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None: self._data_final: dict[int, bytearray] = {} self._data_event = asyncio.Event() self._pack_count = 0 - self._connected = False # flag to indicate active BLE connection self._char_write_handle: int | None = None self._FIELDS: list[ tuple[str, int, int, int, bool, Callable[[int], int | float]] @@ -122,13 +121,12 @@ async def _wait_event(self) -> None: self._data_event.clear() - def _on_disconnect(self, client: BleakClient) -> None: + def _on_disconnect(self, _client: BleakClient) -> None: """Disconnect callback function.""" LOGGER.debug("Disconnected from BMS (%s)", self._ble_device.name) - self._connected = False - def _notification_handler(self, sender, data: bytearray) -> None: + def _notification_handler(self, _sender, data: bytearray) -> None: """Retrieve BMS data update.""" if ( @@ -205,7 +203,7 @@ def _notification_handler(self, sender, data: bytearray) -> None: async def _connect(self) -> None: """Connect to the BMS and setup notification if not connected.""" - if not self._connected: + if self._client is None or not self._client.is_connected: LOGGER.debug("Connecting BMS (%s)", self._ble_device.name) self._client = BleakClient( self._ble_device, @@ -214,15 +212,13 @@ async def _connect(self) -> None: ) await self._client.connect() await self._client.start_notify(UUID_RX, self._notification_handler) - - self._connected = True else: LOGGER.debug("BMS %s already connected", self._ble_device.name) async def disconnect(self) -> None: """Disconnect the BMS and includes stoping notifications.""" - if self._client and self._connected: + if self._client and self._client.is_connected: LOGGER.debug("Disconnecting BMS (%s)", self._ble_device.name) try: self._data_event.clear() @@ -230,8 +226,6 @@ async def disconnect(self) -> None: except BleakError: LOGGER.warning("Disconnect failed!") - self._client = None - def _swap32(self, value: int, signed: bool = False) -> int: """Swap high and low 16bit in 32bit integer.""" diff --git a/tests/conftest.py b/tests/conftest.py index d16ef7c..c01e914 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ from bleak.exc import BleakError from bleak.uuids import normalize_uuid_str, uuidstr_to_str from home_assistant_bluetooth import BluetoothServiceInfoBleak + import pytest from pytest_homeassistant_custom_component.common import MockConfigEntry from custom_components.bms_ble.const import ( @@ -25,6 +26,7 @@ ) from custom_components.bms_ble.plugins.basebms import BaseBMS, BMSsample + from .bluetooth import generate_advertisement_data, generate_ble_device LOGGER = logging.getLogger(__name__) diff --git a/tests/test_config_flow.py b/tests/test_config_flow.py index 8f3eda6..f7a25d0 100644 --- a/tests/test_config_flow.py +++ b/tests/test_config_flow.py @@ -1,14 +1,14 @@ """Test the BLE Battery Management System integration config flow.""" +from custom_components.bms_ble.const import DOMAIN +from custom_components.bms_ble.plugins.basebms import BaseBMS + from homeassistant.config_entries import SOURCE_BLUETOOTH, SOURCE_USER, ConfigEntryState from homeassistant.const import CONF_ADDRESS from homeassistant.core import HomeAssistant from homeassistant.data_entry_flow import FlowResultType from homeassistant.helpers import entity_registry as er -from custom_components.bms_ble.const import DOMAIN -from custom_components.bms_ble.plugins.basebms import BaseBMS - from .bluetooth import inject_bluetooth_service_info_bleak from .conftest import mock_config diff --git a/tests/test_daly_bms.py b/tests/test_daly_bms.py index cbd8755..414b1cf 100644 --- a/tests/test_daly_bms.py +++ b/tests/test_daly_bms.py @@ -96,7 +96,7 @@ async def test_update(monkeypatch, reconnect_fixture) -> None: # query again to check already connected state result = await bms.async_update() - assert bms._connected is not reconnect_fixture # noqa: SLF001 + assert bms._client and bms._client.is_connected is not reconnect_fixture # noqa: SLF001 await bms.disconnect() diff --git a/tests/test_jbd_bms.py b/tests/test_jbd_bms.py index 88415f4..da68922 100644 --- a/tests/test_jbd_bms.py +++ b/tests/test_jbd_bms.py @@ -1,7 +1,6 @@ """Test the JBD BMS implementation.""" from collections.abc import Buffer -import logging from uuid import UUID from bleak.backends.characteristic import BleakGATTCharacteristic @@ -12,7 +11,6 @@ from .bluetooth import generate_ble_device from .conftest import MockBleakClient -LOGGER = logging.getLogger(__name__) BT_FRAME_SIZE = 20 @@ -31,15 +29,12 @@ def _response( char_specifier == normalize_uuid_str("ff02") and bytearray(data)[0] == self.HEAD_CMD ): - LOGGER.debug("response command") if bytearray(data)[1:3] == self.CMD_INFO: - LOGGER.debug("info") return bytearray( b"\xdd\x03\x00\x1D\x06\x18\xFE\xE1\x01\xF2\x01\xF4\x00\x2A\x2C\x7C\x00\x00\x00" b"\x00\x00\x00\x80\x64\x03\x04\x03\x0B\x8B\x0B\x8A\x0B\x84\xf8\x84\x77" ) # {'voltage': 15.6, 'current': -2.87, 'battery_level': 100, 'cycle_charge': 4.98, 'cycles': 42, 'temperature': 22.133333333333347} if bytearray(data)[1:3] == self.CMD_CELL: - LOGGER.debug("cell") return bytearray( b"\xdd\x04\x00\x08\x0d\x66\x0d\x61\x0d\x68\x0d\x59\xfe\x3c\x77" ) # {'cell#0': 3.43, 'cell#1': 3.425, 'cell#2': 3.432, 'cell#3': 3.417} @@ -98,7 +93,6 @@ def _response( b"\00\00\00\00\00\00" # oversized response ) # {'voltage': 15.6, 'current': -2.87, 'battery_level': 100, 'cycle_charge': 4.98, 'cycles': 42, 'temperature': 22.133333333333347} if bytearray(data)[1:3] == self.CMD_CELL: - LOGGER.debug("cell") return bytearray( b"\xdd\x04\x00\x08\x0d\x66\x0d\x61\x0d\x68\x0d\x59\xfe\x3c\x77" b"\00\00\00\00\00\00\00\00\00\00\00\00" # oversized response @@ -147,7 +141,7 @@ async def test_update(monkeypatch, reconnect_fixture) -> None: # query again to check already connected state result = await bms.async_update() - assert bms._connected is not reconnect_fixture # noqa: SLF001 + assert bms._client and bms._client.is_connected is not reconnect_fixture # noqa: SLF001 await bms.disconnect() diff --git a/tests/test_jikong_bms.py b/tests/test_jikong_bms.py index a9a9f81..11389e5 100644 --- a/tests/test_jikong_bms.py +++ b/tests/test_jikong_bms.py @@ -9,6 +9,7 @@ from bleak.exc import BleakError from bleak.uuids import normalize_uuid_str, uuidstr_to_str from custom_components.bms_ble.plugins.jikong_bms import BMS +import pytest from .bluetooth import generate_ble_device from .conftest import MockBleakClient @@ -282,7 +283,9 @@ async def test_update(monkeypatch, reconnect_fixture) -> None: # query again to check already connected state result = await bms.async_update() - assert bms._connected is not reconnect_fixture # noqa: SLF001 + assert ( + bms._client and bms._client.is_connected is not reconnect_fixture + ) # noqa: SLF001 await bms.disconnect() @@ -360,7 +363,10 @@ async def test_invalid_device(monkeypatch) -> None: bms = BMS(generate_ble_device("cc:cc:cc:cc:cc:cc", "MockBLEdevice", None, -73)) - result = await bms.async_update() + result = {} + + with pytest.raises(ConnectionError, match=r"^Unable to connect to.*"): + result = await bms.async_update() assert result == {} diff --git a/tests/test_ogt_bms.py b/tests/test_ogt_bms.py index ad3dd83..9ef650e 100644 --- a/tests/test_ogt_bms.py +++ b/tests/test_ogt_bms.py @@ -144,7 +144,7 @@ async def test_update(monkeypatch, ogt_bms_fixture, reconnect_fixture) -> None: # query again to check already connected state result = await bms.async_update() - assert bms._connected is not reconnect_fixture # noqa: SLF001 + assert bms._client and bms._client.is_connected is not reconnect_fixture await bms.disconnect() @@ -161,7 +161,7 @@ async def test_invalid_response(monkeypatch) -> None: result = await bms.async_update() assert result == {} - assert bms._connected # noqa: SLF001 + assert bms._client and bms._client.is_connected # noqa: SLF001 await bms.disconnect() diff --git a/tests/test_seplos_bms.py b/tests/test_seplos_bms.py index 8d86cad..ad14663 100644 --- a/tests/test_seplos_bms.py +++ b/tests/test_seplos_bms.py @@ -7,16 +7,18 @@ from bleak.exc import BleakError from bleak.uuids import normalize_uuid_str from custom_components.bms_ble.plugins.seplos_bms import BMS -from pytest import raises, fixture +import pytest + from .bluetooth import generate_ble_device from .conftest import MockBleakClient BT_FRAME_SIZE = 50 CHAR_UUID = "fff1" -@fixture + +@pytest.fixture def ref_value() -> dict: - """Return reference value for mock Seplos BMS""" + """Return reference value for mock Seplos BMS.""" return { "voltage": 52.34, "current": -6.7, @@ -64,6 +66,7 @@ def ref_value() -> dict: "delta_voltage": 0.003, } + class MockSeplosBleakClient(MockBleakClient): """Emulate a Seplos BMS BleakClient.""" @@ -135,7 +138,7 @@ def _response(self, data: Buffer) -> bytearray: async def send_frag_response( self, data: Buffer, - response: bool = None, # type: ignore[implicit-optional] # same as upstream + _response: bool = None, # type: ignore[implicit-optional] # same as upstream ) -> None: """Send fragmented response.""" @@ -174,7 +177,7 @@ class MockWrongCRCBleakClient(MockSeplosBleakClient): async def send_frag_response( self, data: Buffer, - response: bool = None, # type: ignore[implicit-optional] # same as upstream + _response: bool = None, # type: ignore[implicit-optional] # same as upstream ) -> None: """Send fragmented response.""" @@ -252,7 +255,9 @@ async def test_update(monkeypatch, ref_value, reconnect_fixture) -> None: # query again to check already connected state result = await bms.async_update() - assert bms._connected is not reconnect_fixture # noqa: SLF001 + assert ( + bms._client and bms._client.is_connected is not reconnect_fixture + ) # noqa: SLF001 await bms.disconnect() @@ -317,7 +322,7 @@ async def test_invalid_message(monkeypatch) -> None: bms = BMS(generate_ble_device("cc:cc:cc:cc:cc:cc", "MockBLEdevice", None, -73)) result = {} - with raises(TimeoutError): + with pytest.raises(TimeoutError): result = await bms.async_update() assert result == {}