From 97fadad4022cd311e19c134c08a621fd71c4585a Mon Sep 17 00:00:00 2001 From: juanlofer-eprosima <88179026+juanlofer-eprosima@users.noreply.github.com> Date: Wed, 22 Jan 2025 09:24:14 +0100 Subject: [PATCH] Decouple transport receivers creation using unique network flows (#5583) * Refs #22519. Add regression test Signed-off-by: Juan Lopez Fernandez * Refs #22519. Decouple transport receivers creation using unique network flows Signed-off-by: Juan Lopez Fernandez * Refs #22519. Add comment for future developers Signed-off-by: Juan Lopez Fernandez * Refs #22519. Apply suggestion Signed-off-by: Juan Lopez Fernandez * Refs #22519. Reuse unique ports for locators of same kind in a reader Signed-off-by: Juan Lopez Fernandez --------- Signed-off-by: Juan Lopez Fernandez (cherry picked from commit e6e918fe6c041bf328e55370650524cbe54f5d02) --- .../rtps/participant/RTPSParticipantImpl.cpp | 62 ++++++++--- .../rtps/participant/RTPSParticipantImpl.hpp | 1 + .../common/BlackboxTestsNetworkConf.cpp | 101 ++++++++++++++++++ 3 files changed, 147 insertions(+), 17 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index c29f99a13d1..c18fe2ed421 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -1728,42 +1729,69 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( if (unique_flows) { attributes.multicastLocatorList.clear(); - attributes.unicastLocatorList = m_att.defaultUnicastLocatorList; + attributes.unicastLocatorList.clear(); attributes.external_unicast_locators.clear(); - uint16_t port = initial_unique_port; - while (port < final_unique_port) + // Register created resources to distinguish the case where a receiver was created in this same function call + // (and can be reused for other locators of the same kind in this reader), and that in which it was already + // created before for other reader in this same participant. + std::map created_resources; + + // Create unique flows for unicast locators + LocatorList_t input_locator_list = m_att.defaultUnicastLocatorList; + for (Locator_t& loc : input_locator_list) { - // Set port on unicast locators - for (Locator_t& loc : attributes.unicastLocatorList) + uint16_t port = created_resources.count(loc.kind) ? created_resources[loc.kind] : initial_unique_port; + while (port < final_unique_port) { // Set logical port only TCP locators if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind) { + // Due to current implementation limitations only one physical port (actual socket receiver) + // is allowed when using TCP tranport. All we can do for now is to create a unique "logical" flow. + // TODO(juanlofer): create a unique dedicated TCP communication channel once this limitation is removed. IPLocator::setLogicalPort(loc, port); } else { loc.port = port; } + + // Try creating receiver for this locator + LocatorList_t aux_locator_list; + aux_locator_list.push_back(loc); + if (createReceiverResources(aux_locator_list, false, true, false)) + { + created_resources[loc.kind] = port; + } + + // Locator will be present in the list if receiver was created, or was already created + // Continue if receiver not created for this reader (might exist but created for other reader in this same participant) + if (!aux_locator_list.empty() && + created_resources.count(loc.kind) && (created_resources[loc.kind] == port)) + { + break; + } + + // Try with next port + ++port; } - // Try creating receiver resources - LocatorList_t aux_locator_list = attributes.unicastLocatorList; - if (createReceiverResources(aux_locator_list, false, true, false)) + // Fail when unique ports are exhausted + if (port >= final_unique_port) { - break; + EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: " + << initial_unique_port << "-" << final_unique_port << ". Discarding locator: " << loc); + } + else + { + attributes.unicastLocatorList.push_back(loc); } - - // Try with next port - ++port; } - // Fail when unique ports are exhausted - if (port >= final_unique_port) + if (attributes.unicastLocatorList.empty()) { - EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: " - << initial_unique_port << "-" << final_unique_port); + EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "No unicast locators to create unique flows"); return false; } } @@ -1874,7 +1902,7 @@ bool RTPSParticipantImpl::createReceiverResources( bool ret_val = input_list.empty(); #if HAVE_SECURITY - // An auxilary buffer is needed in the ReceiverResource to to decrypt the message, + // An auxilary buffer is needed in the ReceiverResource to decrypt the message, // that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size. uint32_t max_receiver_buffer_size = is_secure() ? std::numeric_limits::max() : (std::numeric_limits::max)(); diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp index 5cc7293c655..0b46731d903 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.hpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.hpp @@ -1031,6 +1031,7 @@ class RTPSParticipantImpl * @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable * @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled. * @param log_when_creation_fails - True if a log warning shall be issued for each locator when a receiver resource cannot be created. + * @return True if a receiver resource was created for at least a locator in the list, false otherwise. */ bool createReceiverResources( LocatorList_t& Locator_list, diff --git a/test/blackbox/common/BlackboxTestsNetworkConf.cpp b/test/blackbox/common/BlackboxTestsNetworkConf.cpp index 16d2ae17b56..19fd45c9efd 100644 --- a/test/blackbox/common/BlackboxTestsNetworkConf.cpp +++ b/test/blackbox/common/BlackboxTestsNetworkConf.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include "BlackboxTests.hpp" @@ -171,6 +172,106 @@ TEST_P(NetworkConfig, sub_unique_network_flows) } } +// Regression test for redmine issue #22519 to check that readers using unique network flows cannot share locators +// with other readers. The mentioned issue referred to the case where TCP + builtin transports are present. +// In that concrete scenario, the problem was that while the TCP (and UDP) transports rightly were able +// to create a receiver in the dedicated "unique flow" port, shared memory failed for that same port as the other +// process (or participant) is already listening on it. However this was not being handled properly, so once matched, +// the publisher attempts to send data to the wrongfully announced shared memory locator. +// Note that the underlying problem is that, when creating unique network flows, all transports are requested to +// create a receiver for a specific port all together. This is, the creation of unique flow receivers is only +// considered to fail when it fails for all transports, instead of decoupling them and keep trying for alternative +// ports when the creation of a specific transport receiver fails. +// In this test a similar scenario is presented, but using instead UDP and shared memory transports. In the first +// participant, only shared memory is used (which should create a SHM receiver in the first "unique" port attempted). +// In the second participant both UDP and shared memory are used (which should create a UDP receiver in the first +// "unique" port attempted, and a shared memory receiver in the second "unique" port attempted, as the first one is +// already being used by the first participant). As a result, the listening shared memory locators of each data +// reader should be different. Finally, a third data reader is created in the second participant, and it is verified +// that its listening locators are different from those of the other reader created in the same participant, as well as +// from the (SHM) one of the reader created in the first participant. +TEST_P(NetworkConfig, sub_unique_network_flows_multiple_locators) +{ + // Enable unique network flows feature + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + + // First participant + PubSubParticipant participant(0, 1, 0, 0); + + participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties); + + std::shared_ptr shm_descriptor = std::make_shared(); + // Use only SHM transport in the first participant + participant.disable_builtin_transport().add_user_transport_to_pparams(shm_descriptor); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + + LocatorList_t locators; + + participant.get_native_reader(0).get_listening_locators(locators); + ASSERT_EQ(locators.size(), 1u); + ASSERT_EQ((*locators.begin()).kind, LOCATOR_KIND_SHM); + + // Second participant + PubSubParticipant participant2(0, 2, 0, 0); + + participant2.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties); + + // Use both UDP and SHM in the second participant + if (!use_udpv4) + { + participant2.disable_builtin_transport().add_user_transport_to_pparams(descriptor_). + add_user_transport_to_pparams(shm_descriptor); + } + + ASSERT_TRUE(participant2.init_participant()); + ASSERT_TRUE(participant2.init_subscriber(0)); + + LocatorList_t locators2_1; + + participant2.get_native_reader(0).get_listening_locators(locators2_1); + ASSERT_TRUE(locators2_1.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP + + // Check SHM locator is different from the one in the first participant + for (const Locator_t& loc : locators2_1) + { + if (LOCATOR_KIND_SHM == loc.kind) + { + // Ports should be different (expected second and first values of the unique network flows port range) + ASSERT_FALSE(loc == *locators.begin()); + } + } + + // Now create a second reader in the second participant + ASSERT_TRUE(participant2.init_subscriber(1)); + + LocatorList_t locators2_2; + + participant2.get_native_reader(1).get_listening_locators(locators2_2); + ASSERT_TRUE(locators2_2.size() >= 2u); // There should be at least two locators, one for SHM and N(#interfaces) for UDP + + // Check SHM locator is different from the one in the first participant + for (const Locator_t& loc : locators2_2) + { + if (LOCATOR_KIND_SHM == loc.kind) + { + // Ports should be different (expected third and first values of the unique network flows port range) + ASSERT_FALSE(loc == *locators.begin()); + } + } + + // Now check no locators are shared between the two readers in the second participant + for (const Locator_t& loc_1 : locators2_1) + { + for (const Locator_t& loc_2 : locators2_2) + { + ASSERT_FALSE(loc_1 == loc_2); + } + } +} + //Verify that outLocatorList is used to select the desired output channel TEST_P(NetworkConfig, PubSubOutLocatorSelection) {