Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct reporting of MatchedStatus last_*_handle [13786] (backport #2544) #2546

Merged
merged 1 commit into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1108,8 +1108,8 @@ void DataWriterImpl::update_publication_matched_status(
{
publication_matched_status_.total_count += count_change;
publication_matched_status_.total_count_change += count_change;
publication_matched_status_.last_subscription_handle = status.last_subscription_handle;
}
publication_matched_status_.last_subscription_handle = status.last_subscription_handle;

StatusMask notify_status = StatusMask::publication_matched();
DataWriterListener* listener = get_listener_for(notify_status);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,8 @@ void DataReaderImpl::update_subscription_matched_status(
{
subscription_matched_status_.total_count += count_change;
subscription_matched_status_.total_count_change += count_change;
subscription_matched_status_.last_publication_handle = status.last_publication_handle;
}
subscription_matched_status_.last_publication_handle = status.last_publication_handle;

if (count_change < 0)
{
Expand Down
17 changes: 15 additions & 2 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,15 +673,16 @@ class PubSubReader
return ret_value;
}

void wait_writer_undiscovery()
void wait_writer_undiscovery(
unsigned int matched = 0)
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Reader is waiting removal..." << std::endl;

cvDiscovery_.wait(lock, [&]()
{
return matched_ == 0;
return matched_ <= matched;
});

std::cout << "Reader removal finished..." << std::endl;
Expand Down Expand Up @@ -1563,11 +1564,23 @@ class PubSubReader
return datareader_guid_;
}

eprosima::fastrtps::rtps::InstanceHandle_t datareader_ihandle()
{
return eprosima::fastrtps::rtps::InstanceHandle_t(datareader_guid());
}

const eprosima::fastrtps::rtps::GUID_t& participant_guid() const
{
return participant_guid_;
}

eprosima::fastdds::dds::SubscriptionMatchedStatus get_subscription_matched_status() const
{
eprosima::fastdds::dds::SubscriptionMatchedStatus status;
datareader_->get_subscription_matched_status(status);
return status;
}

private:

void receive_one(
Expand Down
17 changes: 15 additions & 2 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,15 +599,16 @@ class PubSubWriter
return ret_value;
}

void wait_reader_undiscovery()
void wait_reader_undiscovery(
unsigned int matched = 0)
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Writer is waiting removal..." << std::endl;

cv_.wait(lock, [&]()
{
return matched_ == 0;
return matched_ <= matched;
});

std::cout << "Writer removal finished..." << std::endl;
Expand Down Expand Up @@ -1339,6 +1340,11 @@ class PubSubWriter
return datawriter_guid_;
}

eprosima::fastrtps::rtps::InstanceHandle_t datawriter_ihandle()
{
return eprosima::fastrtps::rtps::InstanceHandle_t(datawriter_guid());
}

bool update_partition(
const std::string& partition)
{
Expand Down Expand Up @@ -1390,6 +1396,13 @@ class PubSubWriter
return status;
}

eprosima::fastdds::dds::PublicationMatchedStatus get_publication_matched_status() const
{
eprosima::fastdds::dds::PublicationMatchedStatus status;
datawriter_->get_publication_matched_status(status);
return status;
}

void set_xml_filename(
const std::string& name)
{
Expand Down
125 changes: 125 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,128 @@ TEST(DDSDiscovery, DDSNetworkInterfaceChangesAtRunTime)
datareader.destroy();
datawriter.destroy();
}

/*
* This tests checks that DataReader::get_subscription_matched_status() and
* DataWriter::get_publication_matched_status() return the correct last_publication_handle
* and last_subscription_handle respectively; that is, the handle to the last DataWriter/DataReader which
* discovery/un-discovery triggered a change in the DataReader/DataWriter MatchedStatus.
*
* It does so by creating two pairs of DataReader-DataWriter at different times, waiting for matching/unmatching
* and check the last_publication_handle and last_subscription_handle respectively:
*
* 1. Create a DR and DW in the same partition and wait for discovery
* 2. Check that the last_*_handle is the correct one
* 3. Create another DR and DW in the same partition and wait for discovery
* 4. Check that old DR and DW report the new DW and DR as last_*_handle
* 5. Change old DW to a different partition and wait for undiscovery
* 6. Check that both DR report the old DW as last_publication_handle
* 7. Change old DR to a partition different than the other two and wait for undiscovery
* 8. Check that old DR and new DW report each other as last_*_handle
*/
TEST(DDSDiscovery, UpdateMatchedStatus)
{
/* Create DataReaders and DataWriters */
PubSubWriter<HelloWorldPubSubType> datawriter_1(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> datareader_1(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldPubSubType> datawriter_2(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> datareader_2(TEST_TOPIC_NAME);

/* Init first pair of DataReader-DataWriter */
datareader_1
.partition("A")
.init();
ASSERT_TRUE(datareader_1.isInitialized());

datawriter_1
.partition("A")
.init();
ASSERT_TRUE(datawriter_1.isInitialized());

/* Wait for discovery */
datareader_1.wait_discovery(std::chrono::seconds(3), 1);
datawriter_1.wait_discovery(1, std::chrono::seconds(3));
ASSERT_EQ(datareader_1.get_matched(), 1u);
ASSERT_EQ(datawriter_1.get_matched(), 1u);

/* Check that the reported last_*_handle are correct */
ASSERT_EQ(datareader_1.get_subscription_matched_status().last_publication_handle,
datawriter_1.datawriter_ihandle());
ASSERT_EQ(datawriter_1.get_publication_matched_status().last_subscription_handle,
datareader_1.datareader_ihandle());

/* Init second pair of DataReader-DataWriter */
datareader_2
.partition("A")
.init();
ASSERT_TRUE(datareader_2.isInitialized());

datawriter_2
.partition("A")
.init();
ASSERT_TRUE(datawriter_2.isInitialized());

/*
* Wait for discovery:
* - DR_1: DW_1, DW_2
* - DR_2: DW_1, DW_2
* - DW_1: DR_1, DR_2
* - DW_2: DR_1, DR_2
*/
datareader_1.wait_discovery(std::chrono::seconds(3), 2);
datawriter_1.wait_discovery(2, std::chrono::seconds(3));
datareader_2.wait_discovery(std::chrono::seconds(3), 2);
datawriter_2.wait_discovery(2, std::chrono::seconds(3));

ASSERT_EQ(datareader_1.get_matched(), 2u);
ASSERT_EQ(datawriter_1.get_matched(), 2u);
ASSERT_EQ(datareader_2.get_matched(), 2u);
ASSERT_EQ(datawriter_2.get_matched(), 2u);

/* Check that the reported last_*_handle are correct */
ASSERT_EQ(datareader_1.get_subscription_matched_status().last_publication_handle,
datawriter_2.datawriter_ihandle());
ASSERT_EQ(datawriter_1.get_publication_matched_status().last_subscription_handle,
datareader_2.datareader_ihandle());

/*
* Change DW_1's partition and wait for undiscovery:
* - DR_1: DW_2
* - DR_2: DW_2
* - DW_1:
* - DW_2: DR_1, DR_2
*/
datawriter_1.update_partition("B");
datawriter_1.wait_reader_undiscovery(0);
datareader_1.wait_writer_undiscovery(1);
datareader_2.wait_writer_undiscovery(1);

/* Check that the reported last_*_handle are correct */
ASSERT_EQ(datareader_1.get_subscription_matched_status().last_publication_handle,
datawriter_1.datawriter_ihandle());
ASSERT_EQ(datareader_2.get_subscription_matched_status().last_publication_handle,
datawriter_1.datawriter_ihandle());

/*
* Change DR_1 partition and wait for undiscovery:
* - DR_1:
* - DR_2: DW_2
* - DW_1:
* - DW_2: DR_2
*/
datareader_1.update_partition("C");
datareader_1.wait_writer_undiscovery(0);
datawriter_2.wait_reader_undiscovery(1);

/* Check that the reported last_*_handle are correct */
ASSERT_EQ(datareader_1.get_subscription_matched_status().last_publication_handle,
datawriter_2.datawriter_ihandle());
ASSERT_EQ(datawriter_2.get_publication_matched_status().last_subscription_handle,
datareader_1.datareader_ihandle());

/* Clean up entities */
datareader_1.destroy();
datareader_2.destroy();
datawriter_1.destroy();
datawriter_2.destroy();
}