Skip to content

Commit

Permalink
pw_transfer: Shrink window size on retried data
Browse files Browse the repository at this point in the history
http://pwrev.dev/235100 updated transfers to send CONTINUE parameters
instead of RETRANSMIT when a previously-received chunk was retried.
However, as a retry likely indicates an issue with the underlying
network, it is prudent to shrink the window in these cases in an
attempt to reduce congestion.

Change-Id: Ic85fbf83c48bbaa29a729c00abbf956a4363b951
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/249532
Lint: Lint 🤖 <[email protected]>
Reviewed-by: Wyatt Hepler <[email protected]>
Commit-Queue: Auto-Submit <[email protected]>
Docs-Not-Needed: Alexei Frolov <[email protected]>
Presubmit-Verified: CQ Bot Account <[email protected]>
Pigweed-Auto-Submit: Alexei Frolov <[email protected]>
  • Loading branch information
frolv authored and CQ Bot Account committed Nov 18, 2024
1 parent aeecb42 commit 48712ad
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 91 deletions.
5 changes: 5 additions & 0 deletions pw_transfer/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,11 +889,16 @@ void Context::HandleReceivedData(const Chunk& chunk) {
// recovery cycle to avoid shrinking the window size and potentially
// thrashing. The expected data may already be in-flight, so just allow
// the transmitter to keep going with a CONTINUE parameters chunk.
//
// However, as a retried chunk indicates a potential issue with the
// underlying connection, shrink the transfer window.
//
// Start ack confs do not come with an offset set, so it can get stuck
// here if we are doing an offset transfer.
PW_LOG_DEBUG("Transfer %u received duplicate chunk with offset %u",
id_for_log(),
static_cast<unsigned>(chunk.offset()));
UpdateTransferParameters(TransmitAction::kRetransmit);
SendTransferParameters(TransmitAction::kExtend);
} else {
// Bad offset; reset window size to send another parameters chunk.
Expand Down
196 changes: 105 additions & 91 deletions pw_transfer/transfer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2334,97 +2334,6 @@ TEST_F(WriteTransfer, Version2_ContinueParameters) {
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
}

TEST_F(WriteTransfer, Version2_ResendPreviousData_ReceivesContinueParameters) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
.set_desired_session_id(kArbitrarySessionId)
.set_resource_id(7)));

transfer_thread_.WaitUntilEventIsProcessed();

EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);

// First, the server responds with a START_ACK, accepting the session ID and
// confirming the protocol version.
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
EXPECT_EQ(chunk.resource_id(), 7u);

// Complete the handshake by confirming the server's ACK.
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
.set_session_id(kArbitrarySessionId)));
transfer_thread_.WaitUntilEventIsProcessed();

// Server should respond by sending its initial transfer parameters.
ASSERT_EQ(ctx_.total_responses(), 2u);

chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), 32u);
ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
EXPECT_EQ(chunk.max_chunk_size_bytes().value(), 37u);

ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(kArbitrarySessionId)
.set_offset(0)
.set_payload(span(kData).first(16))));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(ctx_.total_responses(), 3u);

// Resend data that has already been received.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(kArbitrarySessionId)
.set_offset(8)
.set_payload(span(kData).subspan(8, 8))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();

// The server should respond with CONTINUE parameters rather than requesting
// retransmission and starting a recovery cycle.
ASSERT_EQ(ctx_.total_responses(), 4u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
EXPECT_EQ(chunk.offset(), 16u);
EXPECT_EQ(chunk.window_end_offset(), 32u);

// Send the expected chunk.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(kArbitrarySessionId)
.set_offset(16)
.set_payload(span(kData).subspan(16))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(ctx_.total_responses(), 5u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());

EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);

ctx_.SendClientStream<64>(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
.set_session_id(kArbitrarySessionId)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 5u);
}

TEST_F(WriteTransfer, Version2_ClientTerminatesDuringHandshake) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
Expand Down Expand Up @@ -3067,6 +2976,111 @@ TEST_F(WriteTransferLargeData, Version2_AdaptiveWindow_CongestionAvoidance) {
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
}

TEST_F(WriteTransferLargeData,
Version2_ResendPreviousData_ReceivesContinueParameters) {
ctx_.SendClientStream(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStart)
.set_desired_session_id(kArbitrarySessionId)
.set_resource_id(7)));

transfer_thread_.WaitUntilEventIsProcessed();

EXPECT_TRUE(handler_.prepare_write_called);
EXPECT_FALSE(handler_.finalize_write_called);

// First, the server responds with a START_ACK, accepting the session ID and
// confirming the protocol version.
ASSERT_EQ(ctx_.total_responses(), 1u);
Chunk chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.type(), Chunk::Type::kStartAck);
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
EXPECT_EQ(chunk.resource_id(), 7u);

// Complete the handshake by confirming the server's ACK.
ctx_.SendClientStream(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kStartAckConfirmation)
.set_session_id(kArbitrarySessionId)));
transfer_thread_.WaitUntilEventIsProcessed();

// Server should respond by sending its initial transfer parameters.
ASSERT_EQ(ctx_.total_responses(), 2u);

constexpr size_t kExpectedMaxChunkSizeBytes = 21u;

chunk = DecodeChunk(ctx_.responses()[1]);
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.type(), Chunk::Type::kParametersRetransmit);
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.window_end_offset(), kExpectedMaxChunkSizeBytes);
ASSERT_TRUE(chunk.max_chunk_size_bytes().has_value());
EXPECT_EQ(chunk.max_chunk_size_bytes().value(), kExpectedMaxChunkSizeBytes);

ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(kArbitrarySessionId)
.set_offset(0)
.set_payload(span(kData).first(16))));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(ctx_.total_responses(), 3u);

// Window size grows to 2 chunks on successful receipt.
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.protocol_version(), ProtocolVersion::kVersionTwo);
EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
EXPECT_EQ(chunk.offset(), 16u);
EXPECT_EQ(chunk.window_end_offset(),
chunk.offset() + 2 * kExpectedMaxChunkSizeBytes);

// Resend data that has already been received.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(kArbitrarySessionId)
.set_offset(8)
.set_payload(span(kData).subspan(8, 8))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();

// The server should respond with CONTINUE parameters rather than requesting
// retransmission and starting a recovery cycle. However, the window size
// should shrink in response to the retried data.
ASSERT_EQ(ctx_.total_responses(), 4u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.type(), Chunk::Type::kParametersContinue);
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
EXPECT_EQ(chunk.offset(), 16u);
EXPECT_EQ(chunk.window_end_offset(),
chunk.offset() + kExpectedMaxChunkSizeBytes);

// Send the expected chunk.
ctx_.SendClientStream<64>(
EncodeChunk(Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kData)
.set_session_id(kArbitrarySessionId)
.set_offset(16)
.set_payload(span(kData).subspan(16))
.set_remaining_bytes(0)));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(ctx_.total_responses(), 5u);
chunk = DecodeChunk(ctx_.responses().back());
EXPECT_EQ(chunk.session_id(), kArbitrarySessionId);
ASSERT_TRUE(chunk.status().has_value());
EXPECT_EQ(chunk.status().value(), OkStatus());

EXPECT_TRUE(handler_.finalize_write_called);
EXPECT_EQ(handler_.finalize_write_status, OkStatus());
EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);

ctx_.SendClientStream<64>(EncodeChunk(
Chunk(ProtocolVersion::kVersionTwo, Chunk::Type::kCompletionAck)
.set_session_id(kArbitrarySessionId)));
transfer_thread_.WaitUntilEventIsProcessed();
ASSERT_EQ(ctx_.total_responses(), 5u);
}

class WriteTransferMultibyteVarintChunkSize : public ::testing::Test {
protected:
WriteTransferMultibyteVarintChunkSize()
Expand Down

0 comments on commit 48712ad

Please sign in to comment.