Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20849] New max_message_size property to limit output datagrams size (backport #4777) (backport #4807) #4899

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <chrono>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <vector>
Expand Down Expand Up @@ -472,6 +473,8 @@ class RTPSWriter

//! Flow controller.
fastdds::rtps::FlowController* flow_controller_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

//!WriterHistory
WriterHistory* mp_history = nullptr;
Expand Down
23 changes: 21 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,22 @@ RTPSParticipantImpl::RTPSParticipantImpl(
" It may is opened by another application. Discovery may fail.");
}

{
const std::string* max_size_property =
PropertyPolicyHelper::find_property(m_att.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
logError(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}
}

bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic;
size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number;
if (num_send_buffers == 0)
Expand Down Expand Up @@ -2109,8 +2125,11 @@ uint32_t RTPSParticipantImpl::getMaxMessageSize() const
#endif // if HAVE_SECURITY

return (std::min)(
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size);
{
max_output_message_size_,
m_network_Factory.get_max_message_size_between_transports(),
max_receiver_buffer_size
});
}

uint32_t RTPSParticipantImpl::getMaxDataSize()
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <limits>
#include <list>
#include <mutex>
#include <sys/types.h>
Expand Down Expand Up @@ -574,6 +575,8 @@ class RTPSParticipantImpl
std::function<bool(const std::string&)> type_check_fn_;
//!Pool of send buffers
std::unique_ptr<SendBuffersManager> send_buffers_;
//! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();

/**
* Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed
Expand Down
40 changes: 29 additions & 11 deletions src/cpp/rtps/writer/RTPSWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@
*
*/

#include <mutex>

#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <fastdds/rtps/writer/RTPSWriter.h>

#include <rtps/participant/RTPSParticipantImpl.h>
#include <mutex>

#include <fastdds/dds/log/Log.hpp>

#include <fastdds/rtps/writer/RTPSWriter.h>

#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/history/WriterHistory.h>

#include <fastdds/rtps/messages/RTPSMessageCreator.h>

#include <rtps/DataSharing/DataSharingNotifier.hpp>
#include <rtps/DataSharing/WriterPool.hpp>
#include <rtps/history/BasicPayloadPool.hpp>
#include <rtps/history/CacheChangePool.h>
#include <rtps/participant/RTPSParticipantImpl.h>

#include <statistics/rtps/StatisticsBase.hpp>
#include <statistics/rtps/messages/RTPSStatisticsMessages.hpp>

Expand Down Expand Up @@ -109,6 +107,22 @@ void RTPSWriter::init(
const std::shared_ptr<IChangePool>& change_pool,
const WriterAttributes& att)
{
{
const std::string* max_size_property =
PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.max_message_size");
if (max_size_property != nullptr)
{
try
{
max_output_message_size_ = std::stoul(*max_size_property);
}
catch (const std::exception& e)
{
logError(RTPS_WRITER, "Error parsing max_message_size property: " << e.what());
}
}
}

payload_pool_ = payload_pool;
change_pool_ = change_pool;
fixed_payload_size_ = 0;
Expand Down Expand Up @@ -306,6 +320,10 @@ uint32_t RTPSWriter::getMaxDataSize()
uint32_t flow_max = flow_controller_->get_max_payload();
uint32_t part_max = mp_RTPSParticipant->getMaxMessageSize();
uint32_t max_size = flow_max > part_max ? part_max : flow_max;
if (max_output_message_size_ < max_size)
{
max_size = max_output_message_size_;
}

max_size = calculateMaxDataSize(max_size);
return max_size &= ~3;
Expand Down
95 changes: 95 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastrtps/types/TypesBase.h>

#include <rtps/transport/test_UDPv4Transport.h>

#include "BlackboxTests.hpp"
#include "../types/HelloWorldPubSubTypes.h"
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -271,6 +275,97 @@ TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock)
ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant));
}

/**
* @test Set the maximum number of bytes allowed for a datagram generated by a DomainParticipant.
*/
TEST(DDSBasic, max_output_message_size_participant)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};

// Create the DomainParticipants with the appropriate value for the property
eprosima::fastrtps::rtps::PropertyPolicy property_policy;
property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
writer.property_policy(property_policy).disable_builtin_transport()
.add_user_transport_to_pparams(testTransport).init();
EXPECT_TRUE(writer.isInitialized());

// Wait for discovery
writer.wait_discovery(std::chrono::seconds(2));
reader.wait_discovery(std::chrono::seconds(2));
EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.startReception(samples);
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);
}

/**
* @test Set the maximum number of bytes allowed for a datagram generated by a DataWriter.
*/
TEST(DDSBasic, max_output_message_size_writer)
{
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);

auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};

// Create the DataWriter with the appropriate value for the property
eprosima::fastrtps::rtps::PropertyPolicy property_policy;
property_policy.properties().emplace_back("fastdds.max_message_size", segment_size_str);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
writer.entity_property_policy(property_policy).disable_builtin_transport()
.add_user_transport_to_pparams(testTransport).init();
ASSERT_TRUE(writer.isInitialized());

PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery(std::chrono::seconds(2));
reader.wait_discovery(std::chrono::seconds(2));

EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.startReception(samples);
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);

}

} // namespace dds
} // namespace fastdds
} // namespace eprosima
86 changes: 86 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,92 @@ TEST(RTPS, RTPSCorrectGAPProcessing)
}


/* Maximum number of bytes allowed for an RTPS datagram generated by this participant. */
TEST(RTPS, max_output_message_size_participant)
{
/* Set up */
// Create the RTPSReader
RTPSWithRegistrationReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

// Create the RTPSWriter
RTPSWithRegistrationWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
// Create the RTPSParticipants with the appropriate value for the property
auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};
writer.add_user_transport_to_pparams(testTransport).
disable_builtin_transport().
add_property("fastdds.max_message_size", segment_size_str).
init();
EXPECT_TRUE(writer.isInitialized());

// Wait for discovery
writer.wait_discovery(std::chrono::seconds(2));
reader.wait_discovery(std::chrono::seconds(2));
EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.expected_data(samples);
reader.startReception();
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);
}

/* Maximum number of bytes allowed for an RTPS datagram generated by this writer. */
TEST(RTPS, max_output_message_size_writer)
{
const uint32_t segment_size = 1470;
std::string segment_size_str = std::to_string(segment_size);

auto testTransport = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();
testTransport->messages_filter_ = [segment_size](eprosima::fastrtps::rtps::CDRMessage_t& datagram)
{
EXPECT_LE(datagram.length, segment_size);
// Never drop samples
return false;
};
RTPSWithRegistrationWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
writer.add_property("fastdds.max_message_size", segment_size_str).
disable_builtin_transport().add_user_transport_to_pparams(testTransport).init();
ASSERT_TRUE(writer.isInitialized());

RTPSWithRegistrationReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
reader.init();
EXPECT_TRUE(reader.isInitialized());

writer.wait_discovery();
reader.wait_discovery();

EXPECT_EQ(writer.get_matched(), 1u);
EXPECT_EQ(reader.get_matched(), 1u);

// Send samples
auto samples = default_data16kb_data_generator(1);
reader.expected_data(samples);
reader.startReception();
writer.send(samples);
EXPECT_TRUE(samples.empty());

// Wait for reception
reader.block_for_all(std::chrono::seconds(1));
EXPECT_EQ(reader.getReceivedCount(), 1u);

}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
Loading