Skip to content

Commit

Permalink
Allow overlay to work automatically when in loopback mode
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Feb 27, 2023
1 parent 040a29c commit 72909ef
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class OverlayManagerImpl : public OverlayManager
int const MAX_RETRY_COUNT = 15;
std::chrono::milliseconds retryDelayDemand(int numAttemptsMade) const;
size_t getMaxDemandSize() const;
int availableOutboundPendingSlots() const;

public:
OverlayManagerImpl(Application& app);
Expand Down Expand Up @@ -216,7 +217,6 @@ class OverlayManagerImpl : public OverlayManager

bool moveToAuthenticated(Peer::pointer peer);

int availableOutboundPendingSlots() const;
int availableOutboundAuthenticatedSlots() const;
int nonPreferredAuthenticatedCount() const;

Expand Down
92 changes: 60 additions & 32 deletions src/overlay/test/LoopbackPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,49 @@ LoopbackPeer::getIP() const
return "127.0.0.1";
}

std::pair<std::shared_ptr<LoopbackPeer>, std::shared_ptr<LoopbackPeer>>
LoopbackPeer::initiate(Application& app, Application& otherApp)
{
auto peer = make_shared<LoopbackPeer>(app, Peer::WE_CALLED_REMOTE);
auto otherPeer =
make_shared<LoopbackPeer>(otherApp, Peer::REMOTE_CALLED_US);

peer->mRemote = otherPeer;
peer->mState = Peer::CONNECTED;

otherPeer->mRemote = peer;
otherPeer->mState = Peer::CONNECTED;

peer->mAddress = PeerBareAddress(otherPeer->getIP(),
otherPeer->getApp().getConfig().PEER_PORT);
otherPeer->mAddress =
PeerBareAddress{peer->getIP(), peer->getApp().getConfig().PEER_PORT};

app.getOverlayManager().addOutboundConnection(peer);
otherApp.getOverlayManager().addInboundConnection(otherPeer);
// if connection was dropped during addPendingPeer, we don't want do call
// connectHandler
if (peer->mState != Peer::CONNECTED || otherPeer->mState != Peer::CONNECTED)
{
return std::pair(peer, otherPeer);
}

peer->startRecurrentTimer();
otherPeer->startRecurrentTimer();

std::weak_ptr<LoopbackPeer> init = peer;
peer->getApp().postOnMainThread(
[init]() {
auto inC = init.lock();
if (inC)
{
inC->connectHandler(asio::error_code());
}
},
"LoopbackPeer: connect");
return std::pair(peer, otherPeer);
}

AuthCert
LoopbackPeer::getAuthCert()
{
Expand Down Expand Up @@ -108,6 +151,20 @@ LoopbackPeer::drop(std::string const& reason, DropDirection direction, DropMode)
return;
}

if (mState != GOT_AUTH)
{
CLOG_DEBUG(Overlay, "TCPPeer::drop {} in state {} we called:{}",
toString(), mState, mRole);
}
else if (direction == Peer::DropDirection::WE_DROPPED_REMOTE)
{
CLOG_INFO(Overlay, "Dropping peer {}, reason {}", toString(), reason);
}
else
{
CLOG_INFO(Overlay, "peer {} dropped us, reason {}", toString(), reason);
}

mDropReason = reason;
mState = CLOSING;
Peer::shutdown();
Expand Down Expand Up @@ -437,39 +494,10 @@ LoopbackPeer::setReorderProbability(double d)

LoopbackPeerConnection::LoopbackPeerConnection(Application& initiator,
Application& acceptor)
: mInitiator(make_shared<LoopbackPeer>(initiator, Peer::WE_CALLED_REMOTE))
, mAcceptor(make_shared<LoopbackPeer>(acceptor, Peer::REMOTE_CALLED_US))
{
mInitiator->mRemote = mAcceptor;
mInitiator->mState = Peer::CONNECTED;

mAcceptor->mRemote = mInitiator;
mAcceptor->mState = Peer::CONNECTED;

initiator.getOverlayManager().addOutboundConnection(mInitiator);
acceptor.getOverlayManager().addInboundConnection(mAcceptor);

// if connection was dropped during addPendingPeer, we don't want do call
// connectHandler
if (mInitiator->mState != Peer::CONNECTED ||
mAcceptor->mState != Peer::CONNECTED)
{
return;
}

mInitiator->startRecurrentTimer();
mAcceptor->startRecurrentTimer();

std::weak_ptr<LoopbackPeer> init = mInitiator;
mInitiator->getApp().postOnMainThread(
[init]() {
auto inC = init.lock();
if (inC)
{
inC->connectHandler(asio::error_code());
}
},
"LoopbackPeer: connect");
auto res = LoopbackPeer::initiate(initiator, acceptor);
mInitiator = res.first;
mAcceptor = res.second;
}

LoopbackPeerConnection::~LoopbackPeerConnection()
Expand Down
5 changes: 5 additions & 0 deletions src/overlay/test/LoopbackPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class LoopbackPeer : public Peer
{
}
LoopbackPeer(Application& app, PeerRole role);

static std::pair<std::shared_ptr<LoopbackPeer>,
std::shared_ptr<LoopbackPeer>>
initiate(Application& app, Application& otherApp);

void drop(std::string const& reason, DropDirection dropDirection,
DropMode dropMode) override;

Expand Down
16 changes: 8 additions & 8 deletions src/overlay/test/OverlayTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1887,14 +1887,14 @@ TEST_CASE("disconnected topology recovery")
auto nodeIDs = simulation->getNodeIDs();

// Disconnected graph 0-1-2-3 and 4-5-6
simulation->addConnection(nodeIDs[0], nodeIDs[1]);
simulation->addConnection(nodeIDs[1], nodeIDs[2]);
simulation->addConnection(nodeIDs[2], nodeIDs[3]);
simulation->addConnection(nodeIDs[3], nodeIDs[0]);

simulation->addConnection(nodeIDs[6], nodeIDs[4]);
simulation->addConnection(nodeIDs[4], nodeIDs[5]);
simulation->addConnection(nodeIDs[5], nodeIDs[6]);
simulation->addPendingConnection(nodeIDs[0], nodeIDs[1]);
simulation->addPendingConnection(nodeIDs[1], nodeIDs[2]);
simulation->addPendingConnection(nodeIDs[2], nodeIDs[3]);
simulation->addPendingConnection(nodeIDs[3], nodeIDs[0]);

simulation->addPendingConnection(nodeIDs[6], nodeIDs[4]);
simulation->addPendingConnection(nodeIDs[4], nodeIDs[5]);
simulation->addPendingConnection(nodeIDs[5], nodeIDs[6]);

simulation->startAllNodes();

Expand Down
79 changes: 73 additions & 6 deletions src/simulation/Simulation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Simulation::Simulation(Mode mode, Hash const& networkID, ConfigGen confGen,
, mQuorumSetAdjuster(qSetAdjust)
{
mIdleApp = Application::create(mClock, newConfig());
mPeerMap.emplace(mIdleApp->getConfig().PEER_PORT, mIdleApp);
}

Simulation::~Simulation()
Expand Down Expand Up @@ -116,14 +117,26 @@ Simulation::addNode(SecretKey nodeKey, SCPQuorumSet qSet, Config const* cfg2,
Application::pointer app;
if (newDB)
{
app = Application::create(*clock, *cfg, newDB);
if (mMode == OVER_LOOPBACK)
{
app =
createTestApplication<ApplicationLoopbackOverlay, Simulation&>(
*clock, *cfg, *this, newDB, false);
}
else
{
app = createTestApplication(*clock, *cfg, newDB, false);
}
}
else
{
app = setupApp(*cfg, *clock, startAtLedger, startAtHash);
}
mNodes.emplace(nodeKey.getPublicKey(), Node{clock, app});

mPeerMap.emplace(app->getConfig().PEER_PORT,
std::weak_ptr<Application>(app));

return app;
}

Expand Down Expand Up @@ -156,6 +169,7 @@ Simulation::removeNode(NodeID const& id)
if (it != mNodes.end())
{
auto node = it->second;
mPeerMap.erase(node.mApp->getConfig().PEER_PORT);
mNodes.erase(it);
node.mApp->gracefulStop();
while (node.mClock->crank(false) > 0)
Expand All @@ -167,6 +181,25 @@ Simulation::removeNode(NodeID const& id)
}
}

Application::pointer
Simulation::getAppFromPeerMap(unsigned short peerPort)
{
releaseAssert(mMode == OVER_LOOPBACK);
auto it = mPeerMap.find(peerPort);
if (it == mPeerMap.end())
{
return nullptr;
}

auto app = it->second.lock();
if (app)
{
return app;
}

return nullptr;
}

void
Simulation::dropAllConnections(NodeID const& id)
{
Expand Down Expand Up @@ -655,18 +688,20 @@ Simulation::crankUntil(VirtualClock::system_time_point timePoint,
Config
Simulation::newConfig()
{
Config cfg;
if (mConfigGen)
{
return mConfigGen(mConfigCount++);
cfg = mConfigGen(mConfigCount++);
}
else
{
Config res = getTestConfig(mConfigCount++);
res.ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING = true;
res.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
cfg = getTestConfig(mConfigCount++);
cfg.ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING = true;
cfg.TESTING_UPGRADE_LEDGER_PROTOCOL_VERSION =
Config::CURRENT_LEDGER_PROTOCOL_VERSION;
return res;
}

return cfg;
}

class ConsoleReporterWithSum : public medida::reporting::ConsoleReporter
Expand Down Expand Up @@ -715,4 +750,36 @@ Simulation::metricsSummary(string domain)
}
return out.str();
}

bool
LoopbackOverlayManager::connectToImpl(PeerBareAddress const& address,
bool forceoutbound)
{
CLOG_INFO(Overlay, "Connect to {}", address.toString());
auto currentConnection = getConnectedPeer(address);
if (!currentConnection || (forceoutbound && currentConnection->getRole() ==
Peer::REMOTE_CALLED_US))
{
if (availableOutboundPendingSlots() <= 0)
{
CLOG_DEBUG(Overlay,
"Peer rejected - all outbound pending connections "
"taken: {}",
address.toString());
return false;
}
getPeerManager().update(address, PeerManager::BackOffUpdate::INCREASE);
auto& app = static_cast<ApplicationLoopbackOverlay&>(mApp);
auto otherApp = app.getSim().getAppFromPeerMap(address.getPort());
auto res = LoopbackPeer::initiate(mApp, *otherApp);
return res.first->getState() == Peer::CONNECTED;
}
else
{
CLOG_ERROR(Overlay,
"trying to connect to a node we're already connected to {}",
address.toString());
return false;
}
}
}
49 changes: 49 additions & 0 deletions src/simulation/Simulation.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
#include "main/Application.h"
#include "main/Config.h"
#include "medida/medida.h"
#include "overlay/OverlayManagerImpl.h"
#include "overlay/StellarXDR.h"
#include "overlay/test/LoopbackPeer.h"
#include "simulation/LoadGenerator.h"
#include "test/TestUtils.h"
#include "test/TxTests.h"
#include "util/Timer.h"
#include "util/XDROperators.h"
Expand Down Expand Up @@ -61,6 +63,8 @@ class Simulation
void stopAllNodes();
void removeNode(NodeID const& id);

Application::pointer getAppFromPeerMap(unsigned short peerPort);

// returns true if all nodes have externalized
// triggers and exception if a node externalized higher than num+maxSpread
bool haveAllExternalized(uint32 num, uint32 maxSpread);
Expand Down Expand Up @@ -111,5 +115,50 @@ class Simulation
QuorumSetAdjuster mQuorumSetAdjuster;

std::chrono::milliseconds const quantum = std::chrono::milliseconds(100);

// Map PEER_PORT to Application
std::unordered_map<unsigned short, std::weak_ptr<Application>> mPeerMap;
};

class LoopbackOverlayManager : public OverlayManagerImpl
{
public:
LoopbackOverlayManager(Application& app) : OverlayManagerImpl(app)
{
}
virtual bool connectToImpl(PeerBareAddress const& address,
bool forceoutbound) override;
};

class ApplicationLoopbackOverlay : public TestApplication
{
Simulation& mSim;

public:
ApplicationLoopbackOverlay(VirtualClock& clock, Config const& cfg,
Simulation& sim)
: TestApplication(clock, cfg), mSim(sim)
{
}

virtual LoopbackOverlayManager&
getOverlayManager() override
{
auto& overlay = ApplicationImpl::getOverlayManager();
return static_cast<LoopbackOverlayManager&>(overlay);
}

Simulation&
getSim()
{
return mSim;
}

private:
virtual std::unique_ptr<OverlayManager>
createOverlayManager() override
{
return std::make_unique<LoopbackOverlayManager>(*this);
}
};
}

0 comments on commit 72909ef

Please sign in to comment.