Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick smallest available participant ID for new paricipants #3437

Merged
merged 7 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 57 additions & 26 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,26 +153,9 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(
}

uint32_t ID;
if (!instance->prepare_participant_id(PParam.participantID, ID))
{
std::lock_guard<std::mutex> guard(instance->m_mutex);

if (PParam.participantID < 0)
{
ID = instance->getNewId();
while (instance->m_RTPSParticipantIDs.insert(ID).second == false)
{
ID = instance->getNewId();
}
}
else
{
ID = PParam.participantID;
if (instance->m_RTPSParticipantIDs.insert(ID).second == false)
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return nullptr;
}
}
return nullptr;
}

if (!PParam.defaultUnicastLocatorList.isValid())
Expand All @@ -190,7 +173,7 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(

// Generate a new GuidPrefix_t
GuidPrefix_t guidP;
guid_prefix_create(ID, guidP);
guid_prefix_create(instance->get_id_for_prefix(ID), guidP);
if (!PParam.builtin.metatraffic_external_unicast_locators.empty())
{
fastdds::rtps::LocatorList locators;
Expand Down Expand Up @@ -259,6 +242,8 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(
{
std::lock_guard<std::mutex> guard(instance->m_mutex);
instance->m_RTPSParticipants.push_back(t_p_RTPSParticipant(p, pimpl));
instance->m_RTPSParticipantIDs[ID].used = true;
instance->m_RTPSParticipantIDs[ID].reserved = true;
}

// Check the environment file in case it was modified during participant creation leading to a missed callback.
Expand Down Expand Up @@ -291,7 +276,9 @@ bool RTPSDomainImpl::removeRTPSParticipant(
{
RTPSDomainImpl::t_p_RTPSParticipant participant = *it;
instance->m_RTPSParticipants.erase(it);
instance->m_RTPSParticipantIDs.erase(participant.second->getRTPSParticipantID());
uint32_t participant_id = participant.second->getRTPSParticipantID();
instance->m_RTPSParticipantIDs[participant_id].used = false;
instance->m_RTPSParticipantIDs[participant_id].reserved = false;
lock.unlock();
instance->removeRTPSParticipant_nts(participant);
return true;
Expand Down Expand Up @@ -599,7 +586,54 @@ RTPSParticipant* RTPSDomainImpl::clientServerEnvironmentCreationOverride(

uint32_t RTPSDomainImpl::getNewId()
{
return m_maxRTPSParticipantID++;
// Get the smallest available participant ID.
// Settings like maxInitialPeersRange control how many participants a peer
// will look for on this host.
// Choosing the smallest value ensures peers using unicast discovery will
// find this participant as long as the total number of participants has
// not exceeded the number of peers they will look for.
uint32_t i = 0;
while (m_RTPSParticipantIDs[i].reserved || m_RTPSParticipantIDs[i].used)
{
++i;
}
m_RTPSParticipantIDs[i].reserved = true;
return i;
}

bool RTPSDomainImpl::prepare_participant_id(
int32_t input_id,
uint32_t& participant_id)
{
std::lock_guard<std::mutex> guard(m_mutex);
if (input_id < 0)
{
participant_id = getNewId();
}
else
{
participant_id = input_id;
if (m_RTPSParticipantIDs[participant_id].used == true)
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return false;
}
}
return true;
}

uint32_t RTPSDomainImpl::get_id_for_prefix(
uint32_t participant_id)
{
uint32_t ret = participant_id;
if (ret < 0x10000)
{
std::lock_guard<std::mutex> guard(m_mutex);
ret |= m_RTPSParticipantIDs[participant_id].counter;
m_RTPSParticipantIDs[participant_id].counter += 0x10000;
}

return ret;
}

void RTPSDomainImpl::create_participant_guid(
Expand All @@ -610,10 +644,7 @@ void RTPSDomainImpl::create_participant_guid(
{
auto instance = get_instance();
std::lock_guard<std::mutex> guard(instance->m_mutex);
do
{
participant_id = instance->getNewId();
} while (instance->m_RTPSParticipantIDs.find(participant_id) != instance->m_RTPSParticipantIDs.end());
participant_id = instance->getNewId();
}

guid_prefix_create(participant_id, guid.guidPrefix);
Expand Down
32 changes: 19 additions & 13 deletions src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <memory>
#include <thread>
#include <unordered_map>

#if defined(_WIN32) || defined(__unix__)
#include <FileWatch.hpp>
Expand Down Expand Up @@ -104,16 +105,6 @@ class RTPSDomainImpl
static bool removeRTPSParticipant(
RTPSParticipant* p);

/**
* Set the maximum RTPSParticipantID.
* @param maxRTPSParticipantId ID.
*/
static inline void setMaxRTPSParticipantId(
uint32_t maxRTPSParticipantId)
{
get_instance()->m_maxRTPSParticipantID = maxRTPSParticipantId;
}

/**
* Creates a RTPSParticipant as default server or client if ROS_MASTER_URI environment variable is set.
* @param domain_id DDS domain associated
Expand Down Expand Up @@ -214,10 +205,20 @@ class RTPSDomainImpl

/**
* @brief Get Id to create a RTPSParticipant.
*
* This function assumes m_mutex is already locked by the caller.
*
* @return Different ID for each call.
*/
uint32_t getNewId();

bool prepare_participant_id(
int32_t input_id,
uint32_t& participant_id);

uint32_t get_id_for_prefix(
uint32_t participant_id);

void removeRTPSParticipant_nts(
t_p_RTPSParticipant&);

Expand All @@ -227,11 +228,16 @@ class RTPSDomainImpl

std::mutex m_mutex;

std::atomic<uint32_t> m_maxRTPSParticipantID;

std::vector<t_p_RTPSParticipant> m_RTPSParticipants;

std::set<uint32_t> m_RTPSParticipantIDs;
struct ParticipantIDState
{
uint32_t counter = 0;
bool reserved = false;
bool used = false;
};

std::unordered_map<uint32_t, ParticipantIDState> m_RTPSParticipantIDs;

FileWatchHandle file_watch_handle_;
};
Expand Down