Skip to content

Commit

Permalink
pw_transfer: Fix WindowPacketDropper proxy filter
Browse files Browse the repository at this point in the history
The WindowPacketDropper was tracking windows solely based on packet
count, ignoring packet offsets. This caused it to treat additional
in-flight packets from an earlier window as a new window, incorrectly
dropping them. This updates the filter to consider the offset the
receiver requests as the start of each new window, and only start
counting window packets from there.

Bug: 322497823
Change-Id: Ide06e4267ec319d932fd1d4eeb1bd3bcb1ab5d11
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/194030
Presubmit-Verified: CQ Bot Account <[email protected]>
Commit-Queue: Alexei Frolov <[email protected]>
Reviewed-by: Wyatt Hepler <[email protected]>
  • Loading branch information
frolv authored and CQ Bot Account committed Feb 26, 2024
1 parent 614d94a commit 44a57f1
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 33 deletions.
5 changes: 2 additions & 3 deletions pw_transfer/integration_test/expected_errors_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,11 @@ def test_server_read_timeout(self, client_type):
expected_status=status_pb2.StatusCode.DEADLINE_EXCEEDED,
)

# TODO(b/322497823): Re-enable java and python tests when they are fixed.
@parameterized.expand(
[
("cpp"),
# ("java"),
# ("python"),
("java"),
("python"),
]
)
def test_data_drop_client_lifetime_timeout(self, client_type):
Expand Down
63 changes: 44 additions & 19 deletions pw_transfer/integration_test/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import socket
import sys
import time
from typing import Any, Awaitable, Callable, Iterable, List, Optional
from typing import Awaitable, Callable, Iterable, List, NamedTuple, Optional

from google.protobuf import text_format

Expand Down Expand Up @@ -56,13 +56,18 @@
_RECEIVE_BUFFER_SIZE = 2048


class Event(Enum):
class EventType(Enum):
TRANSFER_START = 1
PARAMETERS_RETRANSMIT = 2
PARAMETERS_CONTINUE = 3
START_ACK_CONFIRMATION = 4


class Event(NamedTuple):
type: EventType
chunk: Chunk


class Filter(abc.ABC):
"""An abstract interface for manipulating a stream of data.
Expand Down Expand Up @@ -322,15 +327,15 @@ async def process(self, data: bytes) -> None:
await self.send_data(data)

def handle_event(self, event: Event) -> None:
if event is Event.TRANSFER_START:
if event.type is EventType.TRANSFER_START:
self.advance_packets_before_failure()


class WindowPacketDropper(Filter):
"""A filter to allow the same packet in each window to be dropped
"""A filter to allow the same packet in each window to be dropped.
WindowPacketDropper with drop the nth packet in each window as
specified by window_packet_to_drop. This process will happend
specified by window_packet_to_drop. This process will happen
indefinitely for each window.
This filter should be instantiated in the same filter stack as an
Expand All @@ -347,19 +352,29 @@ def __init__(
self._name = name
self._relay_packets = True
self._window_packet_to_drop = window_packet_to_drop
self._next_window_start_offset: Optional[int] = 0
self._window_packet = 0

async def process(self, data: bytes) -> None:
data_chunk = None
try:
is_data_chunk = (
_extract_transfer_chunk(data).type is Chunk.Type.DATA
)
chunk = _extract_transfer_chunk(data)
if chunk.type is Chunk.Type.DATA:
data_chunk = chunk
except Exception:
# Invalid / non-chunk data (e.g. text logs); ignore.
is_data_chunk = False
pass

# Only count transfer data chunks as part of a window.
if is_data_chunk:
if data_chunk is not None:
if data_chunk.offset == self._next_window_start_offset:
# If a new window has been requested, wait until the first
# chunk matching its requested offset to begin counting window
# chunks. Any in-flight chunks from the previous window are
# allowed through.
self._window_packet = 0
self._next_window_start_offset = None

if self._window_packet != self._window_packet_to_drop:
await self.send_data(data)

Expand All @@ -368,12 +383,16 @@ async def process(self, data: bytes) -> None:
await self.send_data(data)

def handle_event(self, event: Event) -> None:
if event in (
Event.PARAMETERS_RETRANSMIT,
Event.PARAMETERS_CONTINUE,
Event.START_ACK_CONFIRMATION,
if event.type in (
EventType.PARAMETERS_RETRANSMIT,
EventType.PARAMETERS_CONTINUE,
EventType.START_ACK_CONFIRMATION,
):
self._window_packet = 0
# A new transmission window has been requested, starting at the
# offset specified in the chunk. The receiver may already have data
# from the previous window in-flight, so don't immediately reset
# the window packet counter.
self._next_window_start_offset = event.chunk.offset


class EventFilter(Filter):
Expand All @@ -397,13 +416,19 @@ async def process(self, data: bytes) -> None:
try:
chunk = _extract_transfer_chunk(data)
if chunk.type is Chunk.Type.START:
await self._queue.put(Event.TRANSFER_START)
await self._queue.put(Event(EventType.TRANSFER_START, chunk))
if chunk.type is Chunk.Type.START_ACK_CONFIRMATION:
await self._queue.put(Event.START_ACK_CONFIRMATION)
await self._queue.put(
Event(EventType.START_ACK_CONFIRMATION, chunk)
)
elif chunk.type is Chunk.Type.PARAMETERS_RETRANSMIT:
await self._queue.put(Event.PARAMETERS_RETRANSMIT)
await self._queue.put(
Event(EventType.PARAMETERS_RETRANSMIT, chunk)
)
elif chunk.type is Chunk.Type.PARAMETERS_CONTINUE:
await self._queue.put(Event.PARAMETERS_CONTINUE)
await self._queue.put(
Event(EventType.PARAMETERS_CONTINUE, chunk)
)
except:
# Silently ignore invalid packets
pass
Expand Down
141 changes: 130 additions & 11 deletions pw_transfer/integration_test/proxy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ async def append(list: List[bytes], data: bytes):
for packet in packets:
await server_failure.process(packet)
self.assertEqual(len(sent_packets), num_packets)
server_failure.handle_event(proxy.Event.TRANSFER_START)
server_failure.handle_event(
proxy.Event(
proxy.EventType.TRANSFER_START,
Chunk(ProtocolVersion.VERSION_TWO, Chunk.Type.START),
)
)

async def test_keep_drop_queue_loop(self):
sent_packets: List[bytes] = []
Expand Down Expand Up @@ -246,10 +251,32 @@ async def append(list: List[bytes], data: bytes):
# Test each even twice to assure the filter does not have issues
# on new window bondaries.
events = [
proxy.Event.PARAMETERS_RETRANSMIT,
proxy.Event.PARAMETERS_CONTINUE,
proxy.Event.PARAMETERS_RETRANSMIT,
proxy.Event.PARAMETERS_CONTINUE,
proxy.Event(
proxy.EventType.PARAMETERS_RETRANSMIT,
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.PARAMETERS_RETRANSMIT,
),
),
proxy.Event(
proxy.EventType.PARAMETERS_CONTINUE,
Chunk(
ProtocolVersion.VERSION_TWO, Chunk.Type.PARAMETERS_CONTINUE
),
),
proxy.Event(
proxy.EventType.PARAMETERS_RETRANSMIT,
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.PARAMETERS_RETRANSMIT,
),
),
proxy.Event(
proxy.EventType.PARAMETERS_CONTINUE,
Chunk(
ProtocolVersion.VERSION_TWO, Chunk.Type.PARAMETERS_CONTINUE
),
),
]

for event in events:
Expand All @@ -259,6 +286,98 @@ async def append(list: List[bytes], data: bytes):
self.assertEqual(sent_packets, expected_packets)
window_packet_dropper.handle_event(event)

async def test_window_packet_dropper_extra_in_flight_packets(self):
sent_packets: List[bytes] = []

# Async helper so DataTransposer can await on it.
async def append(list: List[bytes], data: bytes):
list.append(data)

window_packet_dropper = proxy.WindowPacketDropper(
lambda data: append(sent_packets, data),
name="test",
window_packet_to_drop=1,
)

packets = [
_encode_rpc_frame(
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.DATA,
data=b'1',
offset=0,
)
),
_encode_rpc_frame(
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.DATA,
data=b'2',
offset=1,
)
),
_encode_rpc_frame(
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.DATA,
data=b'3',
offset=2,
)
),
_encode_rpc_frame(
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.DATA,
data=b'2',
offset=1,
)
),
_encode_rpc_frame(
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.DATA,
data=b'3',
offset=2,
)
),
_encode_rpc_frame(
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.DATA,
data=b'4',
offset=3,
)
),
]

expected_packets = packets[1:]

# Test each even twice to assure the filter does not have issues
# on new window bondaries.
events = [
None,
proxy.Event(
proxy.EventType.PARAMETERS_RETRANSMIT,
Chunk(
ProtocolVersion.VERSION_TWO,
Chunk.Type.PARAMETERS_RETRANSMIT,
offset=1,
),
),
None,
None,
None,
None,
]

for packet, event in zip(packets, events):
await window_packet_dropper.process(packet)
if event is not None:
window_packet_dropper.handle_event(event)

expected_packets = [packets[0], packets[2], packets[3], packets[5]]
self.assertEqual(sent_packets, expected_packets)

async def test_event_filter(self):
sent_packets: List[bytes] = []

Expand Down Expand Up @@ -330,22 +449,22 @@ async def append(list: List[bytes], data: bytes):

expected_events = [
None, # request
proxy.Event.TRANSFER_START, # start chunk
proxy.EventType.TRANSFER_START,
None, # data chunk
None, # data chunk
None, # request
proxy.Event.TRANSFER_START, # start chunk
proxy.EventType.TRANSFER_START,
None, # data chunk
None, # data chunk
]

for packet, expected_event in zip(packets, expected_events):
for packet, expected_event_type in zip(packets, expected_events):
await event_filter.process(packet)
try:
event = queue.get_nowait()
event_type = queue.get_nowait().type
except asyncio.QueueEmpty:
event = None
self.assertEqual(event, expected_event)
event_type = None
self.assertEqual(event_type, expected_event_type)


def _encode_rpc_frame(chunk: Chunk) -> bytes:
Expand Down

0 comments on commit 44a57f1

Please sign in to comment.