From 377b9bf106bcd3fa2672ca8eaf6a9e1b3ee4764b Mon Sep 17 00:00:00 2001 From: Joakim Plate Date: Thu, 18 May 2023 02:06:34 +0200 Subject: [PATCH] Add asyncio protocol --- RFXtrx/__init__.py | 177 +++++++++++++++++++++++++++++++++--------- RFXtrx/lowlevel.py | 30 ++++++- requirements_test.txt | 3 +- setup.cfg | 1 + tests/test_async.py | 55 +++++++++++++ 5 files changed, 227 insertions(+), 39 deletions(-) create mode 100644 tests/test_async.py diff --git a/RFXtrx/__init__.py b/RFXtrx/__init__.py index 95e4afa..c8bfb33 100644 --- a/RFXtrx/__init__.py +++ b/RFXtrx/__init__.py @@ -28,6 +28,9 @@ import threading import time import logging +import asyncio +from typing import Callable, Optional + from time import sleep @@ -38,6 +41,45 @@ _LOGGER = logging.getLogger(__name__) +def parse(data): + """ Parse the given data and return an RFXtrxEvent """ + if data is None: + return None + pkt = lowlevel.parse(data) + if pkt is not None: + if isinstance(pkt, lowlevel.SensorPacket): + obj = SensorEvent(pkt) + elif isinstance(pkt, lowlevel.Status): + obj = StatusEvent(pkt) + else: + obj = ControlEvent(pkt) + return obj + return None + + +class ParseError(Exception): + """ Error occurred during parsing of packet """ + + +def parse_protected(data: bytes): + """ Parse the given data and always return an event object """ + try: + try: + obj = parse(data) + except Exception as exception: # pylint: disable=broad-except + raise ParseError( + "Parse error for data: {0}".format(data.hex(" ")) + ) from exception + + if obj is None: + raise ParseError("No packet for data: {0}".format(data.hex(" "))) + + return obj + except Exception as exception: # pylint: disable=broad-except + _LOGGER.debug("Unexpected parse error", exc_info=True) + return ExceptionEvent(exception) + + ############################################################################### # RFXtrxDevice class ############################################################################### @@ -674,6 +716,18 @@ def __str__(self): return "{0} device=[{1}]".format( type(self), self.device) + +class ExceptionEvent(RFXtrxEvent): + """ Error from parsing occured """ + def __init__(self, exception): + super().__init__(None) + self.exception = exception + + def __str__(self): + return "{0} {1}".format( + type(self), repr(self.exception)) + + ############################################################################### # DummySerial class ############################################################################### @@ -741,21 +795,7 @@ class RFXtrxTransport: @staticmethod def parse(data): """ Parse the given data and return an RFXtrxEvent """ - if data is None: - return None - pkt = lowlevel.parse(data) - if pkt is not None: - if isinstance(pkt, lowlevel.SensorPacket): - obj = SensorEvent(pkt) - elif isinstance(pkt, lowlevel.Status): - obj = StatusEvent(pkt) - else: - obj = ControlEvent(pkt) - - # Store the latest RF signal data - obj.data = data - return obj - return None + return parse(data) def reset(self): """ reset the rfxtrx device """ @@ -812,6 +852,7 @@ def receive_blocking(self): " ".join("0x{0:02x}".format(x) for x in pkt) ) return self.parse(pkt) + return None def send(self, data): """ Send the given packet """ @@ -829,7 +870,7 @@ def send(self, data): def reset(self): """ Reset the RFXtrx """ - self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + self.send(lowlevel.COMMAND_RESET) sleep(0.3) # Should work with 0.05, but not for me self.serial.flushInput() @@ -887,6 +928,7 @@ def receive_blocking(self): " ".join("0x{0:02x}".format(x) for x in pkt) ) return self.parse(pkt) + return None def send(self, data): """ Send the given packet """ @@ -904,7 +946,7 @@ def send(self, data): def reset(self): """ Reset the RFXtrx """ - self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + self.send(lowlevel.COMMAND_RESET) sleep(0.3) self.sock.sendall(b'') @@ -1021,37 +1063,100 @@ def close_connection(self): def set_recmodes(self, modenames): """ Sets the device modes (which protocols to decode) """ - data = bytearray([0x0D, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) - - # Keep the values read during init. - data[5] = self._status.device.tranceiver_type - data[6] = self._status.device.output_power - - # Build the mode data bytes from the mode names - for mode in modenames: - byteno, bitno = lowlevel.get_recmode_tuple(mode) - if byteno is None: - raise ValueError('Unknown mode name '+mode) - - data[7 + byteno] |= 1 << bitno - + data = lowlevel.set_mode_packet(modenames, self._status) self.transport.send(data) self._modes = modenames return self.transport.receive_blocking() def send_start(self): """ Sends the Start RFXtrx transceiver command """ - self.transport.send(b'\x0D\x00\x00\x03\x07\x00\x00' - b'\x00\x00\x00\x00\x00\x00\x00') + self.transport.send(lowlevel.COMMAND_START) return self.transport.receive_blocking() def send_get_status(self): """ Sends the Get Status command """ - self.transport.send(b'\x0D\x00\x00\x01\x02\x00\x00' - b'\x00\x00\x00\x00\x00\x00\x00') + self.transport.send(lowlevel.COMMAND_GET_STATUS) return self.transport.receive_blocking() class Core(Connect): """ The main class for rfxcom-py. Has changed name to Connect """ + + +class AsyncClient(asyncio.protocols.BufferedProtocol, + asyncio.protocols.Protocol): + """ Async protocol for parsing data from rfxtrx device """ + + def __init__(self, callback: Callable[[RFXtrxEvent], None]) -> None: + super().__init__() + self._buffer = bytearray(256) + self._view = memoryview(self._buffer) + self._pos = 0 + self._len = 1 + self._callback = callback + self._transport: Optional[asyncio.BaseTransport] = None + + def data_received(self, data: bytes) -> None: + """Wrapper for non buffer protocol transports.""" + idx = 0 + while idx < len(data): + cnt = len(data)-idx + buffer = self.get_buffer(cnt) + cnt = min(len(buffer), cnt) + assert cnt + buffer[0:cnt] = bytes(data[idx:idx+cnt]) + self.buffer_updated(cnt) + idx += cnt + + def get_buffer(self, _: int) -> memoryview: + """Return view of buffer for next needed data.""" + return self._view[self._pos:self._len] + + def _packet_received(self, pkt: bytes): + """ Packet received. """ + _LOGGER.debug( + "Recv: %s", + " ".join("0x{0:02x}".format(x) for x in pkt) + ) + + obj = parse_protected(pkt) + try: + self._callback(obj) + except Exception: # pylint: disable=broad-except + _LOGGER.exception("Unexpected callback error: %s", pkt) + + def buffer_updated(self, nbytes): + """ New data received. """ + self._pos += nbytes + assert self._pos <= self._len + + if self._len == 1: + self._len = self._buffer[0] + 1 + + if self._len == self._pos: + try: + self._packet_received(self._buffer[:self._pos]) + finally: + self._pos = 0 + self._len = 1 + + def connection_made(self, transport: asyncio.BaseTransport) -> None: + """ Connection to server was made. """ + self._transport = transport + self._pos = 0 + self._len = 1 + + def connection_lost(self, exc: Optional[Exception]) -> None: + """ Connection to server was lost. """ + if exc: + _LOGGER.error("Unexpected disconnection: %s", exc) + self._transport = None + + def send(self, data: bytes): + """ Send a set of data to server. """ + assert isinstance(self._transport, asyncio.WriteTransport) + self._transport.write(data) + + def close(self): + """ Close down protocol. """ + self._transport.close() diff --git a/RFXtrx/lowlevel.py b/RFXtrx/lowlevel.py index 0e91d45..795222a 100644 --- a/RFXtrx/lowlevel.py +++ b/RFXtrx/lowlevel.py @@ -24,6 +24,13 @@ # pylint: disable=C0302,R0902,R0903,R0911,R0913 # pylint: disable= too-many-lines, too-many-statements +COMMAND_RESET = b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' +COMMAND_START = b'\x0D\x00\x00\x03\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00' +COMMAND_GET_STATUS = ( + b'\x0D\x00\x00\x01\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00' +) + + ############################################################################### # Packet class ############################################################################### @@ -2278,7 +2285,7 @@ def load_receive(self, data): (data[10] << 8) + data[11]) self.prodwatthours = ((data[12] * pow(2, 24)) + (data[13] << 16) + (data[14] << 8) + data[15]) - self.tarif_num = (data[16] & 0x0f) + self.tarif_num = data[16] & 0x0f self.voltage = data[17] + 200 self.currentwatt = (data[18] << 8) + data[19] self.state_byte = data[20] @@ -2378,7 +2385,7 @@ def set_transmit(self, subtype, seqnbr, id1, id2, sound): self.id2 = id2 self.sound = sound self.rssi = 0 - self.rssi_byte = (self.rssi << 4) + self.rssi_byte = self.rssi << 4 self.data = bytearray([self.packetlength, self.packettype, self.subtype, self.seqnbr, self.id1, self.id2, self.sound, @@ -3086,3 +3093,22 @@ def parse(data): return None return pkt + + +def set_mode_packet(modenames, status): + """Construct a mode packet.""" + data = bytearray([0x0D, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + + # Keep the values read during init. + data[5] = status.device.tranceiver_type + data[6] = status.device.output_power + + # Build the mode data bytes from the mode names + for mode in modenames: + byteno, bitno = get_recmode_tuple(mode) + if byteno is None: + raise ValueError('Unknown mode name '+mode) + + data[7 + byteno] |= 1 << bitno + return data diff --git a/requirements_test.txt b/requirements_test.txt index 2db348c..91c61cf 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -3,4 +3,5 @@ pylint>=1.5.3 coveralls>=1.1 pytest>=2.8.0 pytest-cov>=2.2.0 -pytest-timeout>=1.0.0 \ No newline at end of file +pytest-timeout>=1.0.0 +pytest-asyncio>=0.20.3 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 25c6497..e059776 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,3 @@ [tool:pytest] testpaths = tests +asyncio_mode=auto diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..822e5b8 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,55 @@ +"""Test for async connections.""" +import asyncio +import logging +from typing import List + +import pytest + +import RFXtrx + +logging.basicConfig(level=logging.DEBUG) + + +EVENT_1_DATA = bytes([0x07, 0x10, 0x00, 0x2A, 0x45, 0x05, 0x01, 0x70]) +EVENT_1_STR = " device=[ type='X10 lighting' id='E5'] values=[('Command', 'On'), ('Rssi numeric', 7)]" + +EVENT_2_DATA = bytes( + [0x0B, 0x55, 0x02, 0x03, 0x12, 0x34, 0x02, 0x50, 0x01, 0x23, 0x45, 0x57] +) +EVENT_2_STR = " device=[ type='PCR800' id='12:34'] values=[('Battery numeric', 7), ('Rain rate', 5.92), ('Rain total', 7456.5), ('Rssi numeric', 5)]" + +# Cut an event short +EVENT_SHORT_DATA = bytes([EVENT_2_DATA[0] - 1, *EVENT_2_DATA[1:-1]]) +EVENT_SHORT_STR = " ParseError('No packet for data: 0a 55 02 03 12 34 02 50 01 23 45')" + + +@pytest.mark.parametrize( + ["data", "events"], + [ + pytest.param([EVENT_1_DATA], [EVENT_1_STR], id="one_packet"), + pytest.param( + [EVENT_1_DATA[:1], EVENT_1_DATA[1:]], [EVENT_1_STR], id="split_packet" + ), + pytest.param( + [EVENT_1_DATA[:1], EVENT_1_DATA[1:4], EVENT_1_DATA[4:], EVENT_2_DATA], + [EVENT_1_STR, EVENT_2_STR], + id="combined_packet", + ), + pytest.param([EVENT_SHORT_DATA], [EVENT_SHORT_STR], id="invalid_len"), + ], +) +async def test_parse_segmented(data: List[bytes], events: List[str]): + """Verify that split packets end up parsing correctly""" + received_events = [] + + def callback(event: RFXtrx.RFXtrxEvent): + received_events.append(str(event)) + + transport = asyncio.BaseTransport() + protocol: asyncio.Protocol = RFXtrx.AsyncClient(callback) + protocol.connection_made(transport) + for x in data: + protocol.data_received(x) + protocol.connection_lost(None) + + assert received_events == events