From 44a57f1aae6e91189afef42d4c77a418cedb0b34 Mon Sep 17 00:00:00 2001 From: Alexei Frolov Date: Mon, 26 Feb 2024 21:46:18 +0000 Subject: [PATCH] pw_transfer: Fix WindowPacketDropper proxy filter The WindowPacketDropper was tracking windows solely based on packet count, ignoring packet offsets. This caused it to treat additional in-flight packets from an earlier window as a new window, incorrectly dropping them. This updates the filter to consider the offset the receiver requests as the start of each new window, and only start counting window packets from there. Bug: 322497823 Change-Id: Ide06e4267ec319d932fd1d4eeb1bd3bcb1ab5d11 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/194030 Presubmit-Verified: CQ Bot Account Commit-Queue: Alexei Frolov Reviewed-by: Wyatt Hepler --- .../integration_test/expected_errors_test.py | 5 +- pw_transfer/integration_test/proxy.py | 63 +++++--- pw_transfer/integration_test/proxy_test.py | 141 ++++++++++++++++-- 3 files changed, 176 insertions(+), 33 deletions(-) diff --git a/pw_transfer/integration_test/expected_errors_test.py b/pw_transfer/integration_test/expected_errors_test.py index fdd3896f3e..1d8d642890 100644 --- a/pw_transfer/integration_test/expected_errors_test.py +++ b/pw_transfer/integration_test/expected_errors_test.py @@ -294,12 +294,11 @@ def test_server_read_timeout(self, client_type): expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED, ) - # TODO(b/322497823): Re-enable java and python tests when they are fixed. @parameterized.expand( [ ("cpp"), - # ("java"), - # ("python"), + ("java"), + ("python"), ] ) def test_data_drop_client_lifetime_timeout(self, client_type): diff --git a/pw_transfer/integration_test/proxy.py b/pw_transfer/integration_test/proxy.py index d6bf0118d6..2bda7d7e5e 100644 --- a/pw_transfer/integration_test/proxy.py +++ b/pw_transfer/integration_test/proxy.py @@ -28,7 +28,7 @@ import socket import sys import time -from typing import Any, Awaitable, Callable, Iterable, List, Optional +from typing import Awaitable, Callable, Iterable, List, NamedTuple, Optional from google.protobuf import text_format @@ -56,13 +56,18 @@ _RECEIVE_BUFFER_SIZE = 2048 -class Event(Enum): +class EventType(Enum): TRANSFER_START = 1 PARAMETERS_RETRANSMIT = 2 PARAMETERS_CONTINUE = 3 START_ACK_CONFIRMATION = 4 +class Event(NamedTuple): + type: EventType + chunk: Chunk + + class Filter(abc.ABC): """An abstract interface for manipulating a stream of data. @@ -322,15 +327,15 @@ async def process(self, data: bytes) -> None: await self.send_data(data) def handle_event(self, event: Event) -> None: - if event is Event.TRANSFER_START: + if event.type is EventType.TRANSFER_START: self.advance_packets_before_failure() class WindowPacketDropper(Filter): - """A filter to allow the same packet in each window to be dropped + """A filter to allow the same packet in each window to be dropped. WindowPacketDropper with drop the nth packet in each window as - specified by window_packet_to_drop. This process will happend + specified by window_packet_to_drop. This process will happen indefinitely for each window. This filter should be instantiated in the same filter stack as an @@ -347,19 +352,29 @@ def __init__( self._name = name self._relay_packets = True self._window_packet_to_drop = window_packet_to_drop + self._next_window_start_offset: Optional[int] = 0 self._window_packet = 0 async def process(self, data: bytes) -> None: + data_chunk = None try: - is_data_chunk = ( - _extract_transfer_chunk(data).type is Chunk.Type.DATA - ) + chunk = _extract_transfer_chunk(data) + if chunk.type is Chunk.Type.DATA: + data_chunk = chunk except Exception: # Invalid / non-chunk data (e.g. text logs); ignore. - is_data_chunk = False + pass # Only count transfer data chunks as part of a window. - if is_data_chunk: + if data_chunk is not None: + if data_chunk.offset == self._next_window_start_offset: + # If a new window has been requested, wait until the first + # chunk matching its requested offset to begin counting window + # chunks. Any in-flight chunks from the previous window are + # allowed through. + self._window_packet = 0 + self._next_window_start_offset = None + if self._window_packet != self._window_packet_to_drop: await self.send_data(data) @@ -368,12 +383,16 @@ async def process(self, data: bytes) -> None: await self.send_data(data) def handle_event(self, event: Event) -> None: - if event in ( - Event.PARAMETERS_RETRANSMIT, - Event.PARAMETERS_CONTINUE, - Event.START_ACK_CONFIRMATION, + if event.type in ( + EventType.PARAMETERS_RETRANSMIT, + EventType.PARAMETERS_CONTINUE, + EventType.START_ACK_CONFIRMATION, ): - self._window_packet = 0 + # A new transmission window has been requested, starting at the + # offset specified in the chunk. The receiver may already have data + # from the previous window in-flight, so don't immediately reset + # the window packet counter. + self._next_window_start_offset = event.chunk.offset class EventFilter(Filter): @@ -397,13 +416,19 @@ async def process(self, data: bytes) -> None: try: chunk = _extract_transfer_chunk(data) if chunk.type is Chunk.Type.START: - await self._queue.put(Event.TRANSFER_START) + await self._queue.put(Event(EventType.TRANSFER_START, chunk)) if chunk.type is Chunk.Type.START_ACK_CONFIRMATION: - await self._queue.put(Event.START_ACK_CONFIRMATION) + await self._queue.put( + Event(EventType.START_ACK_CONFIRMATION, chunk) + ) elif chunk.type is Chunk.Type.PARAMETERS_RETRANSMIT: - await self._queue.put(Event.PARAMETERS_RETRANSMIT) + await self._queue.put( + Event(EventType.PARAMETERS_RETRANSMIT, chunk) + ) elif chunk.type is Chunk.Type.PARAMETERS_CONTINUE: - await self._queue.put(Event.PARAMETERS_CONTINUE) + await self._queue.put( + Event(EventType.PARAMETERS_CONTINUE, chunk) + ) except: # Silently ignore invalid packets pass diff --git a/pw_transfer/integration_test/proxy_test.py b/pw_transfer/integration_test/proxy_test.py index 982a7e0357..f995bbc5de 100644 --- a/pw_transfer/integration_test/proxy_test.py +++ b/pw_transfer/integration_test/proxy_test.py @@ -137,7 +137,12 @@ async def append(list: List[bytes], data: bytes): for packet in packets: await server_failure.process(packet) self.assertEqual(len(sent_packets), num_packets) - server_failure.handle_event(proxy.Event.TRANSFER_START) + server_failure.handle_event( + proxy.Event( + proxy.EventType.TRANSFER_START, + Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.START), + ) + ) async def test_keep_drop_queue_loop(self): sent_packets: List[bytes] = [] @@ -246,10 +251,32 @@ async def append(list: List[bytes], data: bytes): # Test each even twice to assure the filter does not have issues # on new window bondaries. events = [ - proxy.Event.PARAMETERS_RETRANSMIT, - proxy.Event.PARAMETERS_CONTINUE, - proxy.Event.PARAMETERS_RETRANSMIT, - proxy.Event.PARAMETERS_CONTINUE, + proxy.Event( + proxy.EventType.PARAMETERS_RETRANSMIT, + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.PARAMETERS_RETRANSMIT, + ), + ), + proxy.Event( + proxy.EventType.PARAMETERS_CONTINUE, + Chunk( + ProtocolVersion.VERSION_TWO, Chunk.Type.PARAMETERS_CONTINUE + ), + ), + proxy.Event( + proxy.EventType.PARAMETERS_RETRANSMIT, + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.PARAMETERS_RETRANSMIT, + ), + ), + proxy.Event( + proxy.EventType.PARAMETERS_CONTINUE, + Chunk( + ProtocolVersion.VERSION_TWO, Chunk.Type.PARAMETERS_CONTINUE + ), + ), ] for event in events: @@ -259,6 +286,98 @@ async def append(list: List[bytes], data: bytes): self.assertEqual(sent_packets, expected_packets) window_packet_dropper.handle_event(event) + async def test_window_packet_dropper_extra_in_flight_packets(self): + sent_packets: List[bytes] = [] + + # Async helper so DataTransposer can await on it. + async def append(list: List[bytes], data: bytes): + list.append(data) + + window_packet_dropper = proxy.WindowPacketDropper( + lambda data: append(sent_packets, data), + name="test", + window_packet_to_drop=1, + ) + + packets = [ + _encode_rpc_frame( + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.DATA, + data=b'1', + offset=0, + ) + ), + _encode_rpc_frame( + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.DATA, + data=b'2', + offset=1, + ) + ), + _encode_rpc_frame( + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.DATA, + data=b'3', + offset=2, + ) + ), + _encode_rpc_frame( + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.DATA, + data=b'2', + offset=1, + ) + ), + _encode_rpc_frame( + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.DATA, + data=b'3', + offset=2, + ) + ), + _encode_rpc_frame( + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.DATA, + data=b'4', + offset=3, + ) + ), + ] + + expected_packets = packets[1:] + + # Test each even twice to assure the filter does not have issues + # on new window bondaries. + events = [ + None, + proxy.Event( + proxy.EventType.PARAMETERS_RETRANSMIT, + Chunk( + ProtocolVersion.VERSION_TWO, + Chunk.Type.PARAMETERS_RETRANSMIT, + offset=1, + ), + ), + None, + None, + None, + None, + ] + + for packet, event in zip(packets, events): + await window_packet_dropper.process(packet) + if event is not None: + window_packet_dropper.handle_event(event) + + expected_packets = [packets[0], packets[2], packets[3], packets[5]] + self.assertEqual(sent_packets, expected_packets) + async def test_event_filter(self): sent_packets: List[bytes] = [] @@ -330,22 +449,22 @@ async def append(list: List[bytes], data: bytes): expected_events = [ None, # request - proxy.Event.TRANSFER_START, # start chunk + proxy.EventType.TRANSFER_START, None, # data chunk None, # data chunk None, # request - proxy.Event.TRANSFER_START, # start chunk + proxy.EventType.TRANSFER_START, None, # data chunk None, # data chunk ] - for packet, expected_event in zip(packets, expected_events): + for packet, expected_event_type in zip(packets, expected_events): await event_filter.process(packet) try: - event = queue.get_nowait() + event_type = queue.get_nowait().type except asyncio.QueueEmpty: - event = None - self.assertEqual(event, expected_event) + event_type = None + self.assertEqual(event_type, expected_event_type) def _encode_rpc_frame(chunk: Chunk) -> bytes: