Skip to content

Commit

Permalink
refactor: move PeerManager out of CSigningManager ctor
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed Dec 5, 2024
1 parent 7ebc61e commit 7498a38
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 34 deletions.
5 changes: 2 additions & 3 deletions src/llmq/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainman.ActiveChainstate(), dmnman, *qdkgsman, evo_db,
*quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests,
wipe)},
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, peerman,
unit_tests, wipe)},
sigman{std::make_unique<llmq::CSigningManager>(mn_activeman, chainman.ActiveChainstate(), *qman, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const {
assert(llmq::chainLocksHandler == nullptr);
Expand Down Expand Up @@ -85,7 +84,7 @@ void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
qman->Start();
shareman->RegisterAsRecoveredSigsListener();
shareman->StartWorkerThread();
sigman->StartWorkerThread();
sigman->StartWorkerThread(peerman);

llmq::chainLocksHandler->Start();
llmq::quorumInstantSendManager->Start();
Expand Down
45 changes: 25 additions & 20 deletions src/llmq/signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge)
//////////////////

CSigningManager::CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe) :
db(fMemory, fWipe), m_mn_activeman(mn_activeman), m_chainstate(chainstate), qman(_qman), m_peerman(peerman)
const CQuorumManager& _qman, bool fMemory, bool fWipe) :
db(fMemory, fWipe),
m_mn_activeman(mn_activeman),
m_chainstate(chainstate),
qman(_qman)
{
}

Expand Down Expand Up @@ -381,13 +384,14 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS
return true;
}

PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, const std::string& msg_type,
CDataStream& vRecv)
{
if (msg_type == NetMsgType::QSIGREC) {
auto recoveredSig = std::make_shared<CRecoveredSig>();
vRecv >> *recoveredSig;

return ProcessMessageRecoveredSig(pfrom, recoveredSig);
return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig);
}
return {};
}
Expand Down Expand Up @@ -416,10 +420,11 @@ static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CR
return true;
}

PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig)
PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman,
const std::shared_ptr<const CRecoveredSig>& recoveredSig)
{
WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(),
CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash())));
WITH_LOCK(::cs_main,
peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash())));

bool ban = false;
if (!PreVerifyRecoveredSig(qman, *recoveredSig, ban)) {
Expand Down Expand Up @@ -517,22 +522,22 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
}
}

void CSigningManager::ProcessPendingReconstructedRecoveredSigs()
void CSigningManager::ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman)
{
decltype(pendingReconstructedRecoveredSigs) m;
WITH_LOCK(cs_pending, swap(m, pendingReconstructedRecoveredSigs));

for (const auto& p : m) {
ProcessRecoveredSig(p.second);
ProcessRecoveredSig(p.second, peerman);
}
}

bool CSigningManager::ProcessPendingRecoveredSigs()
bool CSigningManager::ProcessPendingRecoveredSigs(PeerManager& peerman)
{
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> recSigsByNode;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;

ProcessPendingReconstructedRecoveredSigs();
ProcessPendingReconstructedRecoveredSigs(peerman);

const size_t nMaxBatchSize{32};
CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums);
Expand Down Expand Up @@ -575,7 +580,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()

if (batchVerifier.badSources.count(nodeId)) {
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
Assert(m_peerman)->Misbehaving(nodeId, 100);
peerman.Misbehaving(nodeId, 100);
continue;
}

Expand All @@ -584,15 +589,15 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
continue;
}

ProcessRecoveredSig(recSig);
ProcessRecoveredSig(recSig, peerman);
}
}

return recSigsByNode.size() >= nMaxBatchSize;
}

// signature must be verified already
void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig)
void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig, PeerManager& peerman)
{
auto llmqType = recoveredSig->getLlmqType();

Expand Down Expand Up @@ -631,12 +636,12 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered
WITH_LOCK(cs_pending, pendingReconstructedRecoveredSigs.erase(recoveredSig->GetHash()));

if (m_mn_activeman != nullptr) {
Assert(m_peerman)->RelayRecoveredSig(recoveredSig->GetHash());
peerman.RelayRecoveredSig(recoveredSig->GetHash());
}

auto listeners = WITH_LOCK(cs_listeners, return recoveredSigsListeners);
for (auto& l : listeners) {
Assert(m_peerman)->PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig));
peerman.PostProcessMessage(l->HandleNewRecoveredSig(*recoveredSig));
}

GetMainSignals().NotifyRecoveredSig(recoveredSig);
Expand Down Expand Up @@ -799,14 +804,14 @@ bool CSigningManager::GetVoteForId(Consensus::LLMQType llmqType, const uint256&
return db.GetVoteForId(llmqType, id, msgHashRet);
}

void CSigningManager::StartWorkerThread()
void CSigningManager::StartWorkerThread(PeerManager& peerman)
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}

workThread = std::thread(&util::TraceThread, "sigshares", [this] { WorkThreadMain(); });
workThread = std::thread(&util::TraceThread, "sigshares", [this, &peerman] { WorkThreadMain(peerman); });
}

void CSigningManager::StopWorkerThread()
Expand All @@ -826,10 +831,10 @@ void CSigningManager::InterruptWorkerThread()
workInterrupt();
}

void CSigningManager::WorkThreadMain()
void CSigningManager::WorkThreadMain(PeerManager& peerman)
{
while (!workInterrupt) {
bool fMoreWork = ProcessPendingRecoveredSigs();
bool fMoreWork = ProcessPendingRecoveredSigs(peerman);

Cleanup();

Expand Down
19 changes: 10 additions & 9 deletions src/llmq/signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ class CSigningManager
const CActiveMasternodeManager* const m_mn_activeman;
const CChainState& m_chainstate;
const CQuorumManager& qman;
const std::unique_ptr<PeerManager>& m_peerman;

mutable Mutex cs_pending;
// Incoming and not verified yet
Expand All @@ -178,12 +177,12 @@ class CSigningManager

public:
CSigningManager(const CActiveMasternodeManager* const mn_activeman, const CChainState& chainstate,
const CQuorumManager& _qman, const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe);
const CQuorumManager& _qman, bool fMemory, bool fWipe);

bool AlreadyHave(const CInv& inv) const;
bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const;

PeerMsgRet ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(const CNode& pnode, PeerManager& peerman, const std::string& msg_type, CDataStream& vRecv);

// This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid
// This is the case for example when a signature appears as part of InstantSend or ChainLocks
Expand All @@ -196,16 +195,18 @@ class CSigningManager
void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id);

private:
PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig);
PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, PeerManager& peerman,
const std::shared_ptr<const CRecoveredSig>& recoveredSig);

void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions,
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
void ProcessPendingReconstructedRecoveredSigs();
bool ProcessPendingRecoveredSigs(); // called from the worker thread of CSigSharesManager
void ProcessPendingReconstructedRecoveredSigs(PeerManager& peerman);
bool ProcessPendingRecoveredSigs(PeerManager& peerman); // called from the worker thread of CSigSharesManager
public:
// TODO - should not be public!
void ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig);
void ProcessRecoveredSig(const std::shared_ptr<const CRecoveredSig>& recoveredSig, PeerManager& peerman);

private:
void Cleanup(); // called from the worker thread of CSigSharesManager

Expand All @@ -228,10 +229,10 @@ class CSigningManager
private:
std::thread workThread;
CThreadInterrupt workInterrupt;
void WorkThreadMain();
void WorkThreadMain(PeerManager& peerman);

public:
void StartWorkerThread();
void StartWorkerThread(PeerManager& peerman);
void StopWorkerThread();
void InterruptWorkerThread();
};
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
}
}

sigman.ProcessRecoveredSig(rs);
sigman.ProcessRecoveredSig(rs, *m_peerman);
}

CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt)
Expand Down
2 changes: 1 addition & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5265,7 +5265,7 @@ void PeerManagerImpl::ProcessMessage(
ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, *this, is_masternode, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom);
m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv);
ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom);

if (msg_type == NetMsgType::CLSIG) {
if (llmq::AreChainLocksEnabled(m_sporkman)) {
Expand Down

0 comments on commit 7498a38

Please sign in to comment.