Skip to content

Commit

Permalink
Amend the transfer reassembly state machine and prepare v3.1 release (#…
Browse files Browse the repository at this point in the history
…213)

Fixes #212
  • Loading branch information
pavel-kirienko authored Apr 22, 2023
1 parent 19c26e6 commit 69ed329
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 42 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ If you find the examples to be unclear or incorrect, please, open a ticket.
## Revisions
### v3.1
- Remove the Dockerfile; use [toolshed](https://github.com/OpenCyphal/docker_toolchains/pkgs/container/toolshed)
instead if necessary.
- Simplify the transfer reassembly state machine to address [#212](https://github.com/OpenCyphal/libcanard/issues/212).
See also <https://forum.opencyphal.org/t/amendment-to-the-transfer-reception-state-machine-implementations/1870>.
### v3.0
- Update branding as [UAVCAN v1 is renamed to Cyphal](https://forum.opencyphal.org/t/uavcan-v1-is-now-cyphal/1622).
Expand Down
61 changes: 34 additions & 27 deletions libcanard/canard.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,16 +824,28 @@ CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
CANARD_ASSERT(rxs->transfer_id <= CANARD_TRANSFER_ID_MAX);
CANARD_ASSERT(frame->transfer_id <= CANARD_TRANSFER_ID_MAX);

// The transfer ID timeout is measured relative to the timestamp of the last start-of-transfer frame.
// Triggering a TID timeout when the TID is the same is undesirable because it may cause the reassembler to
// switch to another interface if the start-of-transfer frame of the current transfer is duplicated
// on the other interface more than (transfer-ID timeout) units of time after the start of
// the transfer while the reassembly of this transfer is still in progress.
// While this behavior is not visible to the application because the transfer will still be reassembled,
// it may delay the delivery of the transfer.
const bool tid_timed_out = (frame->timestamp_usec > rxs->transfer_timestamp_usec) &&
(frame->transfer_id != rxs->transfer_id) &&
((frame->timestamp_usec - rxs->transfer_timestamp_usec) > transfer_id_timeout_usec);

// Examples: rxComputeTransferIDDifference(2, 3)==31
// rxComputeTransferIDDifference(2, 2)==0
// rxComputeTransferIDDifference(2, 1)==1
const bool not_previous_tid = rxComputeTransferIDDifference(rxs->transfer_id, frame->transfer_id) > 1;

const bool need_restart = tid_timed_out || ((rxs->redundant_transport_index == redundant_transport_index) &&
frame->start_of_transfer && not_previous_tid);

// Restarting the transfer reassembly only makes sense if the new frame is a start of transfer.
// Otherwise, the new transfer would be impossible to reassemble anyway since the first frame is lost.
const bool need_restart =
frame->start_of_transfer &&
(tid_timed_out || ((rxs->redundant_transport_index == redundant_transport_index) && not_previous_tid));
if (need_restart)
{
CANARD_ASSERT(frame->start_of_transfer);
rxs->total_payload_size = 0U;
rxs->payload_size = 0U;
rxs->calculated_crc = CRC_INITIAL;
Expand All @@ -843,29 +855,24 @@ CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
}

int8_t out = 0;
if (need_restart && (!frame->start_of_transfer))
{
rxSessionRestart(ins, rxs); // SOT-miss, no point going further.
}
else
// The purpose of the correct_start check is to reduce the possibility of accepting a malformed multi-frame
// transfer in the event of a CRC collision. The scenario where this failure mode would manifest is as follows:
// 1. A valid transfer (whether single- or multi-frame) is accepted with TID=X.
// 2. All frames of the subsequent multi-frame transfer with TID=X+1 are lost except for the last one.
// 3. The CRC of said multi-frame transfer happens to yield the correct residue when applied to the fragment
// of the payload contained in the last frame of the transfer (a CRC collision is in effect).
// 4. The last frame of the multi-frame transfer is erroneously accepted even though it is malformed.
// The correct_start check eliminates this failure mode by ensuring that the first frame is observed.
// See https://github.com/OpenCyphal/libcanard/issues/189.
const bool correct_transport = (rxs->redundant_transport_index == redundant_transport_index);
const bool correct_toggle = (frame->toggle == rxs->toggle);
const bool correct_tid = (frame->transfer_id == rxs->transfer_id);
const bool correct_start = frame->start_of_transfer //
? (0 == rxs->total_payload_size)
: (rxs->total_payload_size > 0);
if (correct_transport && correct_toggle && correct_tid && correct_start)
{
// The purpose of the correct_start check is to reduce the possibility of accepting a malformed multi-frame
// transfer in the event of a CRC collision. The scenario where this failure mode would manifest is as follows:
// 1. A valid transfer (whether single- or multi-frame) is accepted with TID=X.
// 2. All frames of the subsequent multi-frame transfer with TID=X+1 are lost except for the last one.
// 3. The CRC of said multi-frame transfer happens to yield the correct residue when applied to the fragment
// of the payload contained in the last frame of the transfer (a CRC collision is in effect).
// 4. The last frame of the multi-frame transfer is erroneously accepted even though it is malformed.
// The correct_start check eliminates this failure mode by ensuring that the first frame is observed.
// See https://github.com/OpenCyphal/libcanard/issues/189.
const bool correct_transport = (rxs->redundant_transport_index == redundant_transport_index);
const bool correct_toggle = (frame->toggle == rxs->toggle);
const bool correct_tid = (frame->transfer_id == rxs->transfer_id);
const bool correct_start = frame->start_of_transfer || (rxs->total_payload_size > 0);
if (correct_transport && correct_toggle && correct_tid && correct_start)
{
out = rxSessionAcceptFrame(ins, rxs, frame, extent, out_transfer);
}
out = rxSessionAcceptFrame(ins, rxs, frame, extent, out_transfer);
}
return out;
}
Expand Down
2 changes: 1 addition & 1 deletion libcanard/canard.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ extern "C" {
/// Semantic version of this library (not the Cyphal specification).
/// API will be backward compatible within the same major version.
#define CANARD_VERSION_MAJOR 3
#define CANARD_VERSION_MINOR 0
#define CANARD_VERSION_MINOR 1

/// The version number of the Cyphal specification implemented by this library.
#define CANARD_CYPHAL_SPECIFICATION_VERSION_MAJOR 1
Expand Down
44 changes: 30 additions & 14 deletions tests/test_private_rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);
ins.getAllocator().deallocate(transfer.payload);

// Restart by TID timeout, not the first frame.
// TID timeout does not occur until SOT; see https://github.com/OpenCyphal/libcanard/issues/212.
frame.timestamp_usec = 30'000'000;
frame.transfer_id = 12; // Goes back.
frame.start_of_transfer = false;
Expand All @@ -507,18 +507,34 @@ TEST_CASE("rxSessionUpdate")
frame.payload = reinterpret_cast<const uint8_t*>("\x0A\x0A\x0A\x0A\x0A\x0A\x0A");
REQUIRE(0 == update(2, 1'000'000, 16));
REQUIRE(rxs.transfer_timestamp_usec == 20'000'100); // No change.
REQUIRE(rxs.payload_size == 0);
REQUIRE(rxs.payload == nullptr);
REQUIRE(rxs.calculated_crc == 0xFFFF);
REQUIRE(rxs.transfer_id == 13U);
REQUIRE(rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(rxs.transfer_id == 14U); // No change.
REQUIRE(rxs.toggle); // No change.
REQUIRE(rxs.redundant_transport_index == 0); // No change.
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 0);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 0);

// Restart by TID timeout. This may only occur when SOT is set.
frame.timestamp_usec = 30'000'000;
frame.transfer_id = 12; // Goes back.
frame.start_of_transfer = true;
frame.end_of_transfer = false;
frame.toggle = true;
frame.payload_size = 7;
frame.payload = reinterpret_cast<const uint8_t*>("\x0A\x0A\x0A\x0A\x0A\x0A\x0A");
REQUIRE(0 == update(2, 1'000'000, 16));
REQUIRE(rxs.transfer_timestamp_usec == 30'000'000); // Updated from the frame.
REQUIRE(rxs.payload_size == 7); // From the frame.
REQUIRE(rxs.payload != nullptr);
REQUIRE(rxs.calculated_crc == 0x23C7);
REQUIRE(rxs.transfer_id == 12U); // Updated from the frame.
REQUIRE(!rxs.toggle); // In anticipation of the next frame.
REQUIRE(rxs.redundant_transport_index == 2); // Updated from the update.
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);

// Restart by TID mismatch.
frame.timestamp_usec = 20'000'200; // Goes back.
frame.transfer_id = 11; // Goes back.
frame.transfer_id = 10; // Goes back.
frame.start_of_transfer = true;
frame.end_of_transfer = false;
frame.toggle = true;
Expand All @@ -529,15 +545,15 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(rxs.payload_size == 7);
REQUIRE(0 == std::memcmp(rxs.payload, "\x0B\x0B\x0B\x0B\x0B\x0B\x0B", 7));
REQUIRE(rxs.calculated_crc == crc("\x0B\x0B\x0B\x0B\x0B\x0B\x0B"));
REQUIRE(rxs.transfer_id == 11U);
REQUIRE(rxs.transfer_id == 10U);
REQUIRE(!rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);

// Duplicate start rejected (toggle mismatch).
frame.timestamp_usec = 20'000'300;
frame.transfer_id = 11;
frame.transfer_id = 10;
frame.start_of_transfer = true;
frame.end_of_transfer = true;
frame.toggle = true;
Expand All @@ -548,15 +564,15 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(rxs.payload_size == 7);
REQUIRE(0 == std::memcmp(rxs.payload, "\x0B\x0B\x0B\x0B\x0B\x0B\x0B", 7));
REQUIRE(rxs.calculated_crc == crc("\x0B\x0B\x0B\x0B\x0B\x0B\x0B"));
REQUIRE(rxs.transfer_id == 11U);
REQUIRE(rxs.transfer_id == 10U);
REQUIRE(!rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);

// Continue & finalize.
frame.timestamp_usec = 20'000'400;
frame.transfer_id = 11;
frame.transfer_id = 10;
frame.start_of_transfer = false;
frame.end_of_transfer = true;
frame.toggle = false;
Expand All @@ -567,15 +583,15 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(rxs.payload_size == 0);
REQUIRE(rxs.payload == nullptr);
REQUIRE(rxs.calculated_crc == 0xFFFF);
REQUIRE(rxs.transfer_id == 12U);
REQUIRE(rxs.transfer_id == 11U);
REQUIRE(rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(transfer.timestamp_usec == 20'000'200);
REQUIRE(transfer.metadata.priority == CanardPrioritySlow);
REQUIRE(transfer.metadata.transfer_kind == CanardTransferKindMessage);
REQUIRE(transfer.metadata.port_id == 2'222);
REQUIRE(transfer.metadata.remote_node_id == 55);
REQUIRE(transfer.metadata.transfer_id == 11);
REQUIRE(transfer.metadata.transfer_id == 10);
REQUIRE(transfer.payload_size == 10);
REQUIRE(0 == std::memcmp(transfer.payload, "\x0B\x0B\x0B\x0B\x0B\x0B\x0B\x0D\x0D\x0D", 10));
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
Expand Down
105 changes: 105 additions & 0 deletions tests/test_public_rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,108 @@ TEST_CASE("Issue189") // https://github.com/OpenCyphal/libcanard/issues/189
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1); // The payload buffer is gone.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == sizeof(RxSession));
}

TEST_CASE("Issue212")
{
using helpers::Instance;
using exposed::RxSession;

Instance ins;
CanardRxTransfer transfer{};
CanardRxSubscription* subscription = nullptr;

const auto accept = [&](const CanardMicrosecond timestamp_usec,
const std::uint8_t redundant_transport_index,
const std::uint32_t extended_can_id,
const std::vector<std::uint8_t>& payload) {
static std::vector<std::uint8_t> payload_storage;
payload_storage = payload;
CanardFrame frame{};
frame.extended_can_id = extended_can_id;
frame.payload_size = std::size(payload);
frame.payload = payload_storage.data();
return ins.rxAccept(timestamp_usec, frame, redundant_transport_index, transfer, &subscription);
};

ins.getAllocator().setAllocationCeiling(sizeof(RxSession) + 50); // A session and the payload buffer.

// Create a message subscription with the transfer-ID timeout of just one microsecond.
CanardRxSubscription sub_msg{};
REQUIRE(1 == ins.rxSubscribe(CanardTransferKindMessage, 0b0110011001100, 50, 1, sub_msg));
REQUIRE(ins.getMessageSubs().at(0) == &sub_msg);
REQUIRE(ins.getMessageSubs().at(0)->port_id == 0b0110011001100);
REQUIRE(ins.getMessageSubs().at(0)->extent == 50);
REQUIRE(ins.getMessageSubs().at(0)->transfer_id_timeout_usec == 1);
REQUIRE(ensureAllNullptr(ins.getMessageSubs().at(0)->sessions));
REQUIRE(ins.getResponseSubs().empty());
REQUIRE(ins.getRequestSubs().empty());

// Feed a multi-frame transfer with a time delta between its frames larger than the transfer-ID timeout.
// Here's how we compute the reference value of the transfer CRC:
// >>> from pycyphal.transport.commons.crc import CRC16CCITT
// >>> CRC16CCITT.new(bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])).value_as_bytes
// b'2\xf8'
subscription = nullptr;
REQUIRE(0 == accept(100'000'001, // first frame
1,
0b001'00'0'11'0110011001100'0'0100111,
{1, 2, 3, 4, 5, 6, 7, 0b101'00010}));
REQUIRE(0 == accept(101'000'001, // second frame
1,
0b001'00'0'11'0110011001100'0'0100111,
{8, 9, 10, 11, 12, 13, 14, 0b000'00010}));
REQUIRE(1 == accept(102'000'002, // third and last frame
1,
0b001'00'0'11'0110011001100'0'0100111,
{0x32, 0xF8, 0b011'00010}));
REQUIRE(subscription != nullptr); // Subscription exists.
REQUIRE(subscription->port_id == 0b0110011001100);
REQUIRE(transfer.timestamp_usec == 100'000'001);
REQUIRE(transfer.metadata.priority == CanardPriorityImmediate);
REQUIRE(transfer.metadata.transfer_kind == CanardTransferKindMessage);
REQUIRE(transfer.metadata.port_id == 0b0110011001100);
REQUIRE(transfer.metadata.remote_node_id == 0b0100111);
REQUIRE(transfer.metadata.transfer_id == 2);
REQUIRE(transfer.payload_size == 14);
REQUIRE(0 == std::memcmp(transfer.payload, "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E", 14));
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 2); // The SESSION and the PAYLOAD BUFFER.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == (sizeof(RxSession) + 50));
REQUIRE(ins.getMessageSubs().at(0)->sessions[0b0100111] != nullptr);
ins.getAllocator().deallocate(transfer.payload);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1); // The payload buffer is gone.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == sizeof(RxSession));

// Similar but the reassembler should switch to the other transport.
REQUIRE(0 == accept(110'000'001, // first frame, transport #1
1,
0b001'00'0'11'0110011001100'0'0100111,
{1, 2, 3, 4, 5, 6, 7, 0b101'00011}));
REQUIRE(0 == accept(111'000'001, // first frame, transport #0
0,
0b001'00'0'11'0110011001100'0'0100111,
{1, 2, 3, 4, 5, 6, 7, 0b101'00011}));
REQUIRE(0 == accept(112'000'001, // second frame, transport #1
1,
0b001'00'0'11'0110011001100'0'0100111,
{8, 9, 10, 11, 12, 13, 14, 0b000'00011}));
REQUIRE(1 == accept(113'000'002, // third and last frame, transport #1
1,
0b001'00'0'11'0110011001100'0'0100111,
{0x32, 0xF8, 0b011'00011}));
REQUIRE(subscription != nullptr); // Subscription exists.
REQUIRE(subscription->port_id == 0b0110011001100);
REQUIRE(transfer.timestamp_usec == 110'000'001);
REQUIRE(transfer.metadata.priority == CanardPriorityImmediate);
REQUIRE(transfer.metadata.transfer_kind == CanardTransferKindMessage);
REQUIRE(transfer.metadata.port_id == 0b0110011001100);
REQUIRE(transfer.metadata.remote_node_id == 0b0100111);
REQUIRE(transfer.metadata.transfer_id == 3);
REQUIRE(transfer.payload_size == 14);
REQUIRE(0 == std::memcmp(transfer.payload, "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E", 14));
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 2); // The SESSION and the PAYLOAD BUFFER.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == (sizeof(RxSession) + 50));
REQUIRE(ins.getMessageSubs().at(0)->sessions[0b0100111] != nullptr);
ins.getAllocator().deallocate(transfer.payload);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1); // The payload buffer is gone.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == sizeof(RxSession));
}

0 comments on commit 69ed329

Please sign in to comment.