Skip to content

Commit

Permalink
Refs #20815: Refactor test
Browse files Browse the repository at this point in the history
Signed-off-by: eduponz <[email protected]>
  • Loading branch information
EduPonz committed May 29, 2024
1 parent aa5246f commit 96841b0
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 164 deletions.
123 changes: 99 additions & 24 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#ifndef _TEST_BLACKBOX_PUBSUBREADER_HPP_
#define _TEST_BLACKBOX_PUBSUBREADER_HPP_

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <list>
#include <mutex>
#include <string>
#include <vector>

#include <asio.hpp>
#include <gtest/gtest.h>
Expand All @@ -36,6 +39,7 @@
#include <fastdds/dds/core/condition/WaitSet.hpp>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/dds/core/ReturnCode.hpp>
#include <fastdds/dds/core/status/BaseStatus.hpp>
#include <fastdds/dds/core/UserAllocatedSequence.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
Expand All @@ -46,7 +50,9 @@
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/topic/ContentFilteredTopic.hpp>
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/dds/topic/TopicDescription.hpp>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
#include <fastdds/rtps/transport/TCPv6TransportDescriptor.h>
#include <fastdds/rtps/transport/UDPTransportDescriptor.h>
Expand Down Expand Up @@ -292,6 +298,7 @@ class PubSubReader
, listener_(*this)
, participant_(nullptr)
, topic_(nullptr)
, cf_topic_(nullptr)
, subscriber_(nullptr)
, datareader_(nullptr)
, status_mask_(eprosima::fastdds::dds::StatusMask::all())
Expand Down Expand Up @@ -320,6 +327,8 @@ class PubSubReader
, times_incompatible_qos_(0)
, last_incompatible_qos_(eprosima::fastdds::dds::INVALID_QOS_POLICY_ID)
, message_receive_count_(0)
, filter_expression_("")
, expression_parameters_({})
{
// Load default QoS to permit testing with external XML profile files.
DomainParticipantFactory::get_instance()->load_profiles();
Expand Down Expand Up @@ -354,6 +363,19 @@ class PubSubReader
loan_sample_validation(false);
}

PubSubReader(
const std::string& topic_name,
const std::string& filter_expression,
const std::vector<std::string>& expression_parameters,
bool take = true,
bool statistics = false,
bool read = true)
: PubSubReader(topic_name, take, statistics, read)
{
filter_expression_ = filter_expression;
expression_parameters_ = expression_parameters;
}

virtual ~PubSubReader()
{
destroy();
Expand Down Expand Up @@ -413,6 +435,17 @@ class PubSubReader
ASSERT_NE(topic_, nullptr);
ASSERT_TRUE(topic_->is_enabled());

// Create CFT if needed
if (!filter_expression_.empty())
{
cf_topic_ = participant_->create_contentfilteredtopic(
topic_name_ + "_cft",
topic_,
filter_expression_,
expression_parameters_);
ASSERT_NE(cf_topic_, nullptr);
}

// Create publisher
createSubscriber();
}
Expand All @@ -426,19 +459,26 @@ class PubSubReader
ASSERT_NE(subscriber_, nullptr);
ASSERT_TRUE(subscriber_->is_enabled());

using TopicDescriptionPtr = eprosima::fastdds::dds::TopicDescription*;
TopicDescriptionPtr topic_desc {(nullptr !=
cf_topic_) ? static_cast<TopicDescriptionPtr>(cf_topic_) : static_cast<
TopicDescriptionPtr>(
topic_)};

if (!xml_file_.empty())
{
if (!datareader_profile_.empty())
{
datareader_ = subscriber_->create_datareader_with_profile(topic_, datareader_profile_, &listener_,
datareader_ = subscriber_->create_datareader_with_profile(topic_desc, datareader_profile_,
&listener_,
status_mask_);
ASSERT_NE(datareader_, nullptr);
ASSERT_TRUE(datareader_->is_enabled());
}
}
if (datareader_ == nullptr)
{
datareader_ = subscriber_->create_datareader(topic_, datareader_qos_, &listener_, status_mask_);
datareader_ = subscriber_->create_datareader(topic_desc, datareader_qos_, &listener_, status_mask_);
}

if (datareader_ != nullptr)
Expand Down Expand Up @@ -474,22 +514,14 @@ class PubSubReader
{
if (participant_ != nullptr)
{
if (datareader_)
{
subscriber_->delete_datareader(datareader_);
datareader_ = nullptr;
}
if (subscriber_)
{
participant_->delete_subscriber(subscriber_);
subscriber_ = nullptr;
}
if (topic_)
{
participant_->delete_topic(topic_);
topic_ = nullptr;
}
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_);
ASSERT_EQ(eprosima::fastdds::dds::RETCODE_OK, participant_->delete_contained_entities());
datareader_ = nullptr;
subscriber_ = nullptr;
cf_topic_ = nullptr;
topic_ = nullptr;

ASSERT_EQ(eprosima::fastdds::dds::RETCODE_OK,
eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_));
participant_ = nullptr;
}

Expand Down Expand Up @@ -524,6 +556,18 @@ class PubSubReader
return get_last_sequence_received();
}

void startReception(
size_t expected_samples)
{
{
std::unique_lock<std::mutex> lock(mutex_);
current_processed_count_ = 0;
number_samples_expected_ = expected_samples;
last_seq.clear();
}
receiving_.store(true);
}

void stopReception()
{
receiving_.store(false);
Expand Down Expand Up @@ -1746,6 +1790,13 @@ class PubSubReader
return status;
}

eprosima::fastdds::dds::SampleLostStatus get_sample_lost_status() const
{
eprosima::fastdds::dds::SampleLostStatus status;
datareader_->get_sample_lost_status(status);
return status;
}

bool is_matched() const
{
return matched_ > 0;
Expand Down Expand Up @@ -1878,13 +1929,18 @@ class PubSubReader
if (info.valid_data
&& info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE)
{
auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data);
ASSERT_NE(it, total_msgs_.end());
total_msgs_.erase(it);
if (!total_msgs_.empty())
{
auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data);
ASSERT_NE(it, total_msgs_.end());
total_msgs_.erase(it);
}
++current_processed_count_;
default_receive_print<type>(data);
cv_.notify_one();
}

postprocess_sample(data, info);
}
}

Expand Down Expand Up @@ -1929,14 +1985,19 @@ class PubSubReader

if (valid_sample)
{
auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data);
ASSERT_NE(it, total_msgs_.end());
total_msgs_.erase(it);
if (!total_msgs_.empty())
{
auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data);
ASSERT_NE(it, total_msgs_.end());
total_msgs_.erase(it);
}
++current_processed_count_;
default_receive_print<type>(data);
cv_.notify_one();
}
}

postprocess_sample(data, info);
}

datareader->return_loan(datas, infos);
Expand All @@ -1952,6 +2013,14 @@ class PubSubReader
receive_(datareader, std::ref(returnedValue));
}

virtual void postprocess_sample(
const type& data,
const eprosima::fastdds::dds::SampleInfo& info)
{
static_cast<void>(data);
static_cast<void>(info);
}

void participant_matched()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);
Expand Down Expand Up @@ -2005,6 +2074,7 @@ class PubSubReader
eprosima::fastdds::dds::DomainParticipant* participant_;
eprosima::fastdds::dds::DomainParticipantQos participant_qos_;
eprosima::fastdds::dds::Topic* topic_;
eprosima::fastdds::dds::ContentFilteredTopic* cf_topic_;
eprosima::fastdds::dds::Subscriber* subscriber_;
eprosima::fastdds::dds::SubscriberQos subscriber_qos_;
eprosima::fastdds::dds::DataReader* datareader_;
Expand Down Expand Up @@ -2084,6 +2154,11 @@ class PubSubReader
SampleLostStatusFunctor sample_lost_status_functor_;
//! Functor called when called SampleRejectedStatus listener.
SampleRejectedStatusFunctor sample_rejected_status_functor_;

//! Expression for CFT
std::string filter_expression_;
//! Parameters for CFT expression
std::vector<std::string> expression_parameters_;
};

template<class TypeSupport>
Expand Down
14 changes: 12 additions & 2 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
#if _MSC_VER
#include <Windows.h>
#endif // _MSC_VER
#include <fastdds/dds/common/InstanceHandle.hpp>
#include <fastdds/dds/core/condition/GuardCondition.hpp>
#include <fastdds/dds/core/condition/StatusCondition.hpp>
#include <fastdds/dds/core/condition/WaitSet.hpp>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/dds/core/ReturnCode.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
Expand All @@ -46,11 +48,11 @@
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/topic/Topic.hpp>
#include <fastdds/rtps/flowcontrol/FlowControllerSchedulerPolicy.hpp>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
#include <fastdds/rtps/transport/TCPv6TransportDescriptor.h>
#include <fastdds/rtps/transport/UDPTransportDescriptor.h>
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>
#include <fastdds/rtps/transport/UDPv6TransportDescriptor.h>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
#include <fastdds/rtps/transport/TCPv6TransportDescriptor.h>
#include <fastdds/utils/IPLocator.h>

using eprosima::fastdds::dds::DomainParticipantFactory;
Expand Down Expand Up @@ -542,6 +544,14 @@ class PubSubWriter
return datawriter_->write((void*)&msg);
}

eprosima::fastdds::dds::ReturnCode_t send_sample(
type& msg,
const eprosima::fastdds::dds::InstanceHandle_t& instance_handle)
{
default_send_print(msg);
return datawriter_->write((void*)&msg, instance_handle);
}

void assert_liveliness()
{
datawriter_->assert_liveliness();
Expand Down
Loading

0 comments on commit 96841b0

Please sign in to comment.