Skip to content

Commit

Permalink
Make outbound queue metrics more accurate
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Jul 15, 2024
1 parent d78f48e commit b8da941
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 27 deletions.
54 changes: 40 additions & 14 deletions src/overlay/FlowControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ FlowControl::maybeReleaseCapacity(StellarMessage const& msg)
}
}

std::vector<std::shared_ptr<StellarMessage const>>
std::vector<FlowControl::QueuedOutboundMessage>
FlowControl::getNextBatchToSend()
{
ZoneScoped;
releaseAssert(!threadIsMain() || !mUseBackgroundThread);

std::lock_guard<std::mutex> guard(mFlowControlMutex);
std::vector<std::shared_ptr<StellarMessage const>> batchToSend;
std::vector<QueuedOutboundMessage> batchToSend;

int sent = 0;
for (int i = 0; i < mOutboundQueues.size(); i++)
Expand All @@ -135,7 +135,7 @@ FlowControl::getNextBatchToSend()
break;
}

batchToSend.push_back(front.mMessage);
batchToSend.push_back(front);
++sent;
auto& om = mOverlayMetrics;

Expand All @@ -150,8 +150,6 @@ FlowControl::getNextBatchToSend()
{
case TRANSACTION:
{
om.mOutboundQueueDelayTxs.Update(diff);
mMetrics.mOutboundQueueDelayTxs.Update(diff);
if (mFlowControlBytesCapacity)
{
size_t s =
Expand All @@ -162,24 +160,16 @@ FlowControl::getNextBatchToSend()
}
break;
case SCP_MESSAGE:
{
om.mOutboundQueueDelaySCP.Update(diff);
mMetrics.mOutboundQueueDelaySCP.Update(diff);
}
break;
break;
case FLOOD_DEMAND:
{
om.mOutboundQueueDelayDemand.Update(diff);
mMetrics.mOutboundQueueDelayDemand.Update(diff);
size_t s = front.mMessage->floodDemand().txHashes.size();
releaseAssert(mDemandQueueTxHashCount >= s);
mDemandQueueTxHashCount -= s;
}
break;
case FLOOD_ADVERT:
{
om.mOutboundQueueDelayAdvert.Update(diff);
mMetrics.mOutboundQueueDelayAdvert.Update(diff);
size_t s = front.mMessage->floodAdvert().txHashes.size();
releaseAssert(mAdvertQueueTxHashCount >= s);
mAdvertQueueTxHashCount -= s;
Expand All @@ -199,6 +189,42 @@ FlowControl::getNextBatchToSend()
return batchToSend;
}

void
FlowControl::updateMsgMetrics(std::shared_ptr<StellarMessage const> msg,
VirtualClock::time_point const& timePlaced)
{
std::lock_guard<std::mutex> guard(mFlowControlMutex);
auto diff = mAppConnector.now() - timePlaced;

auto updateQueueDelay = [&](auto& queue, auto& metrics) {
queue.Update(diff);
metrics.Update(diff);
};

auto& om = mAppConnector.getOverlayMetrics();
switch (msg->type())
{
case TRANSACTION:
updateQueueDelay(om.mOutboundQueueDelayTxs,
mMetrics.mOutboundQueueDelayTxs);
break;
case SCP_MESSAGE:
updateQueueDelay(om.mOutboundQueueDelaySCP,
mMetrics.mOutboundQueueDelaySCP);
break;
case FLOOD_DEMAND:
updateQueueDelay(om.mOutboundQueueDelayDemand,
mMetrics.mOutboundQueueDelayDemand);
break;
case FLOOD_ADVERT:
updateQueueDelay(om.mOutboundQueueDelayAdvert,
mMetrics.mOutboundQueueDelayAdvert);
break;
default:
abort();
}
}

void
FlowControl::handleTxSizeIncrease(uint32_t increase)
{
Expand Down
4 changes: 3 additions & 1 deletion src/overlay/FlowControl.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ class FlowControl
void addMsgAndMaybeTrimQueue(std::shared_ptr<StellarMessage const> msg);
// Return next batch of messages to send
// NOTE: this methods _releases_ capacity and cleans up flow control queues
std::vector<std::shared_ptr<StellarMessage const>> getNextBatchToSend();
std::vector<QueuedOutboundMessage> getNextBatchToSend();
void updateMsgMetrics(std::shared_ptr<StellarMessage const> msg,
VirtualClock::time_point const& timePlaced);

#ifdef BUILD_TESTS
std::shared_ptr<FlowControlCapacity>
Expand Down
9 changes: 9 additions & 0 deletions src/overlay/OverlayAppConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "main/Application.h"
#include "overlay/BanManager.h"
#include "overlay/OverlayManager.h"
#include "overlay/OverlayMetrics.h"
#include "util/Timer.h"

namespace stellar
Expand Down Expand Up @@ -81,4 +82,12 @@ OverlayAppConnector::shouldYield() const
releaseAssert(threadIsMain());
return mApp.getClock().shouldYield();
}

OverlayMetrics&
OverlayAppConnector::getOverlayMetrics()
{
// OverlayMetrics class is thread-safe
return mApp.getOverlayManager().getOverlayMetrics();
}

}
2 changes: 2 additions & 0 deletions src/overlay/OverlayAppConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class OverlayManager;
class LedgerManager;
class Herder;
class BanManager;
struct OverlayMetrics;

// Helper class to isolate access to Application; all function helpers must
// either be called from main or be thread-sade
Expand Down Expand Up @@ -38,5 +39,6 @@ class OverlayAppConnector
VirtualClock::time_point now() const;
Config const& getConfig() const;
bool overlayShuttingDown() const;
OverlayMetrics& getOverlayMetrics();
};
}
1 change: 1 addition & 0 deletions src/overlay/OverlayMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace stellar

class Application;

// OverlayMetrics is a thread-safe struct
struct OverlayMetrics
{
OverlayMetrics(Application& app);
Expand Down
28 changes: 17 additions & 11 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ Peer::sendMessage(std::shared_ptr<StellarMessage const> msg, bool log)
[](std::shared_ptr<Peer> self) {
for (auto const& m : self->mFlowControl->getNextBatchToSend())
{
self->sendAuthenticatedMessage(m);
self->sendAuthenticatedMessage(m.mMessage, m.mTimeEmplaced);
}
});
}
Expand All @@ -799,7 +799,9 @@ Peer::sendMessage(std::shared_ptr<StellarMessage const> msg, bool log)
}

void
Peer::sendAuthenticatedMessage(std::shared_ptr<StellarMessage const> msg)
Peer::sendAuthenticatedMessage(
std::shared_ptr<StellarMessage const> msg,
std::optional<VirtualClock::time_point> timePlaced)
{
{
// No need to hold the lock for the duration of this function:
Expand All @@ -814,7 +816,7 @@ Peer::sendAuthenticatedMessage(std::shared_ptr<StellarMessage const> msg)
}
}

auto cb = [msg](std::shared_ptr<Peer> self) {
auto cb = [msg, timePlaced](std::shared_ptr<Peer> self) {
// Construct an authenticated message and place it in the queue
// _synchronously_ This is important because we assign auth sequence to
// each message, which must be ordered
Expand All @@ -826,6 +828,10 @@ Peer::sendAuthenticatedMessage(std::shared_ptr<StellarMessage const> msg)
xdrBytes = xdr::xdr_to_msg(amsg);
}
self->sendMessage(std::move(xdrBytes));
if (timePlaced)
{
self->mFlowControl->updateMsgMetrics(msg, *timePlaced);
}
};

// If we're already on the background thread (i.e. via flow control), move
Expand Down Expand Up @@ -1050,14 +1056,14 @@ Peer::recvSendMore(StellarMessage const& msg)
releaseAssert(threadIsMain());
releaseAssert(mFlowControl);
mFlowControl->maybeReleaseCapacity(msg);
maybeExecuteInBackground("Peer::recvSendMore maybeSendNextBatch",
[](std::shared_ptr<Peer> self) {
for (auto const& m :
self->mFlowControl->getNextBatchToSend())
{
self->sendAuthenticatedMessage(m);
}
});
maybeExecuteInBackground(
"Peer::recvSendMore maybeSendNextBatch",
[](std::shared_ptr<Peer> self) {
for (auto const& m : self->mFlowControl->getNextBatchToSend())
{
self->sendAuthenticatedMessage(m.mMessage, m.mTimeEmplaced);
}
});
}

void
Expand Down
4 changes: 3 additions & 1 deletion src/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ class Peer : public std::enable_shared_from_this<Peer>,
void recurrentTimerExpired(asio::error_code const& error);
std::chrono::seconds getIOTimeout() const;

void sendAuthenticatedMessage(std::shared_ptr<StellarMessage const> msg);
void sendAuthenticatedMessage(
std::shared_ptr<StellarMessage const> msg,
std::optional<VirtualClock::time_point> timePlaced = std::nullopt);
void beginMessageProcessing(StellarMessage const& msg);
void endMessageProcessing(StellarMessage const& msg);

Expand Down

0 comments on commit b8da941

Please sign in to comment.