Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16608] Fix total_unread_ consistent with reader's history upon get_first_untaken_info() (backport #3203) #3217

Merged
merged 7 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,22 +353,14 @@ bool DataReaderHistory::get_first_untaken_info(
{
std::lock_guard<RecursiveTimedMutex> lock(*getMutex());

CacheChange_t* change = nullptr;
WriterProxy* wp = nullptr;
if (mp_reader->nextUntakenCache(&change, &wp))
{
auto it = data_available_instances_.find(change->instanceHandle);
assert(it != data_available_instances_.end());
auto& instance_changes = it->second->cache_changes;
auto item =
std::find_if(instance_changes.cbegin(), instance_changes.cend(),
[change](const DataReaderCacheChange& v)
{
return v == change;
});
ReadTakeCommand::generate_info(info, *(it->second), *item);
mp_reader->change_read_by_user(change, wp, false);
return true;
for (auto& it : data_available_instances_)
{
auto& instance_changes = it.second->cache_changes;
if (!instance_changes.empty())
{
ReadTakeCommand::generate_info(info, *(it.second), instance_changes.front());
return true;
}
}

return false;
Expand Down
33 changes: 29 additions & 4 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ class PubSubReader
PubSubReader(
const std::string& topic_name,
bool take = true,
bool statistics = false)
bool statistics = false,
bool read = true)
: participant_listener_(*this)
, listener_(*this)
, participant_(nullptr)
Expand All @@ -293,10 +294,12 @@ class PubSubReader
, receiving_(false)
, current_processed_count_(0)
, number_samples_expected_(0)
, current_unread_count_(0)
, discovery_result_(false)
, onDiscovery_(nullptr)
, onEndpointDiscovery_(nullptr)
, take_(take)
, read_(read)
, statistics_(statistics)
#if HAVE_SECURITY
, authorized_(0)
Expand Down Expand Up @@ -548,6 +551,16 @@ class PubSubReader
return current_processed_count_;
}

size_t block_for_unread_count_of(
size_t n_unread)
{
block([this, n_unread]() -> bool
{
return current_unread_count_ >= n_unread;
});
return current_unread_count_;
}

void block(
std::function<bool()> checker)
{
Expand Down Expand Up @@ -1675,6 +1688,14 @@ class PubSubReader
type data;
eprosima::fastdds::dds::SampleInfo info;

if (!take_ && !read_)
{
current_unread_count_ = datareader->get_unread_count();
std::cout << "Total unread count " << current_unread_count_ << std::endl;
cv_.notify_one();
return;
}

ReturnCode_t success = take_ ?
datareader->take_next_sample((void*)&data, &info) :
datareader->read_next_sample((void*)&data, &info);
Expand Down Expand Up @@ -1841,8 +1862,9 @@ class PubSubReader
eprosima::fastdds::dds::TypeSupport type_;
using LastSeqInfo = std::pair<eprosima::fastrtps::rtps::InstanceHandle_t, eprosima::fastrtps::rtps::GUID_t>;
std::map<LastSeqInfo, eprosima::fastrtps::rtps::SequenceNumber_t> last_seq;
size_t current_processed_count_;
size_t number_samples_expected_;
std::atomic<size_t> current_processed_count_;
std::atomic<size_t> number_samples_expected_;
std::atomic<size_t> current_unread_count_;
bool discovery_result_;

std::string xml_file_ = "";
Expand All @@ -1852,9 +1874,12 @@ class PubSubReader
std::function<bool(const eprosima::fastrtps::rtps::ParticipantDiscoveryInfo& info)> onDiscovery_;
std::function<bool(const eprosima::fastrtps::rtps::WriterDiscoveryInfo& info)> onEndpointDiscovery_;

//! True to take data from history. False to read
//! True to take data from history. On False, read_ is checked.
bool take_;

//! True to read data from history. False, do nothing on data reception.
bool read_;

//! True if the class is called from the statistics blackbox (specific topic name and domain id).
bool statistics_;

Expand Down
1 change: 1 addition & 0 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ class PubSubWriter
bool send_sample(
type& msg)
{
default_send_print(msg);
return datawriter_->write((void*)&msg);
}

Expand Down
69 changes: 69 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,75 @@ TEST_P(DDSDataReader, LivelinessChangedStatusGet)

}

// Regression test of Refs #16608, Github #3203. Checks that total_unread_ variable is consistent with
// unread changes in reader's history after performing a get_first_untaken_info() on a change with no writer matched.
TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
{
if (enable_datasharing)
{
//! TODO: Datasharing changes the behavior of this test. Changes are
//! instantly removed on removePublisher() call and on the PUBListener callback
GTEST_SKIP() << "Data-sharing removes the changes instantly changing the behavior of this test. Skipping";
}

//! Spawn a couple of participants writer/reader
PubSubWriter<HelloWorldPubSubType> pubsub_writer(TEST_TOPIC_NAME);
//! Create a reader that does nothing when new data is available. Neither take nor read it.
PubSubReader<HelloWorldPubSubType> pubsub_reader(TEST_TOPIC_NAME, false, false, false);

// Initialization of all the participants
std::cout << "Initializing PubSubs for topic " << TEST_TOPIC_NAME << std::endl;

//! Participant Writer configuration and qos
pubsub_writer.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS)
.init();
ASSERT_EQ(pubsub_writer.isInitialized(), true);

//! Participant Reader configuration and qos
pubsub_reader.reliability(eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS)
.durability_kind(eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS)
.history_kind(eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS)
.init();
ASSERT_EQ(pubsub_reader.isInitialized(), true);

eprosima::fastdds::dds::DataReader& reader = pubsub_reader.get_native_reader();
eprosima::fastdds::dds::SampleInfo info;

EXPECT_EQ(ReturnCode_t::RETCODE_NO_DATA, reader.get_first_untaken_info(&info));

// Wait for discovery.
pubsub_reader.wait_discovery();
pubsub_writer.wait_discovery();

auto data = default_helloworld_data_generator();

pubsub_reader.startReception(data);

pubsub_writer.send(data);
EXPECT_TRUE(data.empty());

pubsub_reader.block_for_unread_count_of(3);
pubsub_writer.removePublisher();
pubsub_reader.wait_writer_undiscovery();

//! Try reading the first untaken info.
//! Checks whether total_unread_ is consistent with
//! the number of unread changes in history
//! This API call should NOT modify the history
EXPECT_EQ(ReturnCode_t::RETCODE_OK, reader.get_first_untaken_info(&info));

HelloWorld msg;
eprosima::fastdds::dds::SampleInfo sinfo;

//! Try getting a sample
auto result = reader.take_next_sample((void*)&msg, &sinfo);

//! Assert last operation
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down