Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22519] Decouple transport receivers creation using unique network flows (backport #5583) #5591

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 45 additions & 17 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <sstream>
Expand Down Expand Up @@ -1837,42 +1838,69 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
if (unique_flows)
{
attributes.multicastLocatorList.clear();
attributes.unicastLocatorList = m_att.defaultUnicastLocatorList;
attributes.unicastLocatorList.clear();
attributes.external_unicast_locators.clear();

uint16_t port = initial_unique_port;
while (port < final_unique_port)
// Register created resources to distinguish the case where a receiver was created in this same function call
// (and can be reused for other locators of the same kind in this reader), and that in which it was already
// created before for other reader in this same participant.
std::map<int32_t, int16_t> created_resources;

// Create unique flows for unicast locators
LocatorList_t input_locator_list = m_att.defaultUnicastLocatorList;
for (Locator_t& loc : input_locator_list)
{
// Set port on unicast locators
for (Locator_t& loc : attributes.unicastLocatorList)
uint16_t port = created_resources.count(loc.kind) ? created_resources[loc.kind] : initial_unique_port;
while (port < final_unique_port)
{
// Set logical port only TCP locators
if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind)
{
// Due to current implementation limitations only one physical port (actual socket receiver)
// is allowed when using TCP tranport. All we can do for now is to create a unique "logical" flow.
// TODO(juanlofer): create a unique dedicated TCP communication channel once this limitation is removed.
IPLocator::setLogicalPort(loc, port);
}
else
{
loc.port = port;
}

// Try creating receiver for this locator
LocatorList_t aux_locator_list;
aux_locator_list.push_back(loc);
if (createReceiverResources(aux_locator_list, false, true, false))
{
created_resources[loc.kind] = port;
}

// Locator will be present in the list if receiver was created, or was already created
// Continue if receiver not created for this reader (might exist but created for other reader in this same participant)
if (!aux_locator_list.empty() &&
created_resources.count(loc.kind) && (created_resources[loc.kind] == port))
{
break;
}

// Try with next port
++port;
}

// Try creating receiver resources
LocatorList_t aux_locator_list = attributes.unicastLocatorList;
if (createReceiverResources(aux_locator_list, false, true, false))
// Fail when unique ports are exhausted
if (port >= final_unique_port)
{
break;
EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
<< initial_unique_port << "-" << final_unique_port << ". Discarding locator: " << loc);
}
else
{
attributes.unicastLocatorList.push_back(loc);
}

// Try with next port
++port;
}

// Fail when unique ports are exhausted
if (port >= final_unique_port)
if (attributes.unicastLocatorList.empty())
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
<< initial_unique_port << "-" << final_unique_port);
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "No unicast locators to create unique flows");
return false;
}
}
Expand Down Expand Up @@ -1983,7 +2011,7 @@ bool RTPSParticipantImpl::createReceiverResources(
bool ret_val = input_list.empty();

#if HAVE_SECURITY
// An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
// An auxilary buffer is needed in the ReceiverResource to decrypt the message,
// that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size.
uint32_t max_receiver_buffer_size =
is_secure() ? std::numeric_limits<uint16_t>::max() : (std::numeric_limits<uint32_t>::max)();
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ class RTPSParticipantImpl
* @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable
* @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled.
* @param log_when_creation_fails - True if a log warning shall be issued for each locator when a receiver resource cannot be created.
* @return True if a receiver resource was created for at least a locator in the list, false otherwise.
*/
bool createReceiverResources(
LocatorList_t& Locator_list,
Expand Down
102 changes: 102 additions & 0 deletions test/blackbox/common/BlackboxTestsNetworkConf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "PubSubParticipant.hpp"

#include <fastrtps/rtps/common/Locator.h>
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
#include <fastrtps/utils/IPFinder.h>

using namespace eprosima::fastrtps;
Expand Down Expand Up @@ -165,6 +166,107 @@ TEST_P(NetworkConfig, sub_unique_network_flows)
}
}

// Regression test for redmine issue #22519 to check that readers using unique network flows cannot share locators
// with other readers. The mentioned issue referred to the case where TCP + builtin transports are present.
// In that concrete scenario, the problem was that while the TCP (and UDP) transports rightly were able
// to create a receiver in the dedicated "unique flow" port, shared memory failed for that same port as the other
// process (or participant) is already listening on it. However this was not being handled properly, so once matched,
// the publisher attempts to send data to the wrongfully announced shared memory locator.
// Note that the underlying problem is that, when creating unique network flows, all transports are requested to
// create a receiver for a specific port all together. This is, the creation of unique flow receivers is only
// considered to fail when it fails for all transports, instead of decoupling them and keep trying for alternative
// ports when the creation of a specific transport receiver fails.
// In this test a similar scenario is presented, but using instead UDP and shared memory transports. In the first
// participant, only shared memory is used (which should create a SHM receiver in the first "unique" port attempted).
// In the second participant both UDP and shared memory are used (which should create a UDP receiver in the first
// "unique" port attempted, and a shared memory receiver in the second "unique" port attempted, as the first one is
// already being used by the first participant). As a result, the listening shared memory locators of each data
// reader should be different. Finally, a third data reader is created in the second participant, and it is verified
// that its listening locators are different from those of the other reader created in the same participant, as well as
// from the (SHM) one of the reader created in the first participant.
TEST_P(NetworkConfig, sub_unique_network_flows_multiple_locators)
{
// Enable unique network flows feature
PropertyPolicy properties;
properties.properties().emplace_back("fastdds.unique_network_flows", "");

// First participant
PubSubParticipant<HelloWorldPubSubType> participant(0, 1, 0, 0);

participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);

std::shared_ptr<eprosima::fastdds::rtps::SharedMemTransportDescriptor> shm_descriptor =
std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
// Use only SHM transport in the first participant
participant.disable_builtin_transport().add_user_transport_to_pparams(shm_descriptor);

ASSERT_TRUE(participant.init_participant());
ASSERT_TRUE(participant.init_subscriber(0));

LocatorList_t locators;

participant.get_native_reader(0).get_listening_locators(locators);
ASSERT_EQ(locators.size(), 1u);
ASSERT_EQ((*locators.begin()).kind, LOCATOR_KIND_SHM);

// Second participant
PubSubParticipant<HelloWorldPubSubType> participant2(0, 2, 0, 0);

participant2.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);

// Use both UDP and SHM in the second participant
if (!use_udpv4)
{
participant2.disable_builtin_transport().add_user_transport_to_pparams(descriptor_).
add_user_transport_to_pparams(shm_descriptor);
}

ASSERT_TRUE(participant2.init_participant());
ASSERT_TRUE(participant2.init_subscriber(0));

LocatorList_t locators2_1;

participant2.get_native_reader(0).get_listening_locators(locators2_1);
ASSERT_TRUE(locators2_1.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP

// Check SHM locator is different from the one in the first participant
for (const Locator_t& loc : locators2_1)
{
if (LOCATOR_KIND_SHM == loc.kind)
{
// Ports should be different (expected second and first values of the unique network flows port range)
ASSERT_FALSE(loc == *locators.begin());
}
}

// Now create a second reader in the second participant
ASSERT_TRUE(participant2.init_subscriber(1));

LocatorList_t locators2_2;

participant2.get_native_reader(1).get_listening_locators(locators2_2);
ASSERT_TRUE(locators2_2.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP

// Check SHM locator is different from the one in the first participant
for (const Locator_t& loc : locators2_2)
{
if (LOCATOR_KIND_SHM == loc.kind)
{
// Ports should be different (expected third and first values of the unique network flows port range)
ASSERT_FALSE(loc == *locators.begin());
}
}

// Now check no locators are shared between the two readers in the second participant
for (const Locator_t& loc_1 : locators2_1)
{
for (const Locator_t& loc_2 : locators2_2)
{
ASSERT_FALSE(loc_1 == loc_2);
}
}
}

//Verify that outLocatorList is used to select the desired output channel
TEST_P(NetworkConfig, PubSubOutLocatorSelection)
{
Expand Down
Loading