Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[17464] Improve auto gaps in data sharing #3343

Merged
merged 6 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,16 @@ void DataSharingListener::process_new_data ()
pool->get_next_unread_payload(ch, last_sequence, last_payload);
has_new_payload = ch.sequenceNumber != c_SequenceNumber_Unknown;

if (has_new_payload)
if (has_new_payload && ch.sequenceNumber > SequenceNumber_t(0, 0))
{
if (last_sequence != c_SequenceNumber_Unknown && ch.sequenceNumber != last_sequence + 1)
if (last_sequence != c_SequenceNumber_Unknown && ch.sequenceNumber > last_sequence + 1)
{
EPROSIMA_LOG_WARNING(RTPS_READER, "GAP (" << last_sequence + 1 << " - " << ch.sequenceNumber - 1 << ")"
<< " detected on datasharing writer " << pool->writer());
reader_->processGapMsg(pool->writer(), last_sequence + 1, SequenceNumberSet_t(ch.sequenceNumber));
}

if (last_sequence == c_SequenceNumber_Unknown && ch.sequenceNumber != SequenceNumber_t(0, 1))
if (last_sequence == c_SequenceNumber_Unknown && ch.sequenceNumber > SequenceNumber_t(0, 1))
{
EPROSIMA_LOG_INFO(RTPS_READER, "First change with SN " << ch.sequenceNumber
<< " detected on datasharing writer " <<
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/DataSharing/ReaderPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ class ReaderPool : public DataSharingPayloadPool
continue;
}

if (last_sn_ != c_SequenceNumber_Unknown && last_sn_ >= cache_change.sequenceNumber)
{
// Sequence number went backwards, it was most probably overriden.
continue;
}

if (!ensure_reading_reference_is_in_bounds())
{
// We may have been taken over and read a payload that is too far forward. Discard and continue
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ bool StatefulReader::processGapMsg(
WriterProxy* pWP = nullptr;

std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
if (!is_alive_)
if (!is_alive_ || gapStart < SequenceNumber_t(0, 1) || gapList.base() <= gapStart)
{
return false;
}
Expand All @@ -860,9 +860,9 @@ bool StatefulReader::processGapMsg(
{
// TODO (Miguel C): Refactor this inside WriterProxy
SequenceNumber_t auxSN;
SequenceNumber_t finalSN = gapList.base() - 1;
SequenceNumber_t finalSN = gapList.base();
History::const_iterator history_iterator = mp_history->changesBegin();
for (auxSN = gapStart; auxSN <= finalSN; auxSN++)
for (auxSN = gapStart; auxSN < finalSN; auxSN++)
{
if (pWP->irrelevant_change_set(auxSN))
{
Expand Down
29 changes: 29 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,35 @@ TEST(RTPS, MultithreadedWriterCreation)
RTPSDomain::stopAll();
}

/* Regression Test for improving gaps processing
* https://github.com/eProsima/Fast-DDS/pull/3343
*/
TEST(RTPS, RTPSCorrectGAPProcessing)
{
RTPSWithRegistrationReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
RTPSWithRegistrationWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved

reader.durability(eprosima::fastrtps::rtps::DurabilityKind_t::TRANSIENT_LOCAL).
reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE).init();

ASSERT_TRUE(reader.isInitialized());

writer.durability(eprosima::fastrtps::rtps::DurabilityKind_t::TRANSIENT_LOCAL).
reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE).init();

ASSERT_TRUE(writer.isInitialized());

reader.wait_discovery();
writer.wait_discovery();

SequenceNumberSet_t seq_set(SequenceNumber_t(0, 0));

//! GAP Message check
RTPSReader& native_reader = reader.get_native_reader();
ASSERT_NO_FATAL_FAILURE(native_reader.processGapMsg(writer.guid(), {0, 0}, seq_set));
}


class CustomReaderDataFilter : public eprosima::fastdds::rtps::IReaderDataFilter
{
public:
Expand Down
5 changes: 5 additions & 0 deletions test/blackbox/common/RTPSWithRegistrationReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ class RTPSWithRegistrationReader
return reader_->getGuid();
}

eprosima::fastrtps::rtps::RTPSReader& get_native_reader() const
{
return *reader_;
}

private:

void receive_one(
Expand Down