Skip to content

Commit

Permalink
instantsend|sigs: Sleep when there is no more work (#3988)
Browse files Browse the repository at this point in the history
* instantsend|sigs: Sleep when there is no more work

Instead of sleeping only when no work has been done.
Avoids useless cycles, improves batching.

* llmq: Add and use nMaxBatchSize

* llmq: Compare to what we got in return, not what we verified at the end

It might happen that we get 32 pending but do only verify less than 32 and in this case we would assume there is no more work but it could still be more in the pipeline from my understanding.

* llmq: Rename more_work -> fMoreWork

* llmq: Be consistent with the other fMoreWork initialization

Co-authored-by: xdustinface <[email protected]>
  • Loading branch information
UdjinM6 and xdustinface authored Feb 11, 2021
1 parent 317353d commit 899c124
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 31 deletions.
24 changes: 10 additions & 14 deletions src/llmq/quorums_instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ bool CInstantSendManager::PreVerifyInstantSendLock(const llmq::CInstantSendLock&
bool CInstantSendManager::ProcessPendingInstantSendLocks()
{
decltype(pendingInstantSendLocks) pend;
bool fMoreWork{false};

{
LOCK(cs);
Expand All @@ -745,6 +746,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
pend.emplace(it->first, std::move(it->second));
pendingInstantSendLocks.erase(it);
}
fMoreWork = true;
}
}

Expand Down Expand Up @@ -776,7 +778,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
ProcessPendingInstantSendLocks(dkgInterval, pend, true);
}

return true;
return fMoreWork;
}

std::unordered_set<uint256> CInstantSendManager::ProcessPendingInstantSendLocks(int signOffset, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
Expand Down Expand Up @@ -1365,7 +1367,7 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid)
}
}

bool CInstantSendManager::ProcessPendingRetryLockTxs()
void CInstantSendManager::ProcessPendingRetryLockTxs()
{
decltype(pendingRetryTxs) retryTxs;
{
Expand All @@ -1374,11 +1376,11 @@ bool CInstantSendManager::ProcessPendingRetryLockTxs()
}

if (retryTxs.empty()) {
return false;
return;
}

if (!IsInstantSendEnabled()) {
return false;
return;
}

int retryCount = 0;
Expand Down Expand Up @@ -1428,8 +1430,6 @@ bool CInstantSendManager::ProcessPendingRetryLockTxs()
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- retried %d TXs. nonLockedTxs.size=%d\n", __func__,
retryCount, nonLockedTxs.size());
}

return retryCount != 0;
}

bool CInstantSendManager::AlreadyHave(const CInv& inv)
Expand Down Expand Up @@ -1521,15 +1521,11 @@ size_t CInstantSendManager::GetInstantSendLockCount()
void CInstantSendManager::WorkThreadMain()
{
while (!workInterrupt) {
bool didWork = false;

didWork |= ProcessPendingInstantSendLocks();
didWork |= ProcessPendingRetryLockTxs();
bool fMoreWork = ProcessPendingInstantSendLocks();
ProcessPendingRetryLockTxs();

if (!didWork) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/quorums_instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class CInstantSendManager : public CRecoveredSigsListener
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock);
void RemoveChainLockConflictingLock(const uint256& islockHash, const CInstantSendLock& islock);
static void AskNodesForLockedTx(const uint256& txid);
bool ProcessPendingRetryLockTxs();
void ProcessPendingRetryLockTxs();

bool AlreadyHave(const CInv& inv);
bool GetInstantSendLockByHash(const uint256& hash, CInstantSendLock& ret);
Expand Down
5 changes: 3 additions & 2 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,8 @@ bool CSigningManager::ProcessPendingRecoveredSigs()

ProcessPendingReconstructedRecoveredSigs();

CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
const size_t nMaxBatchSize{32};
CollectPendingRecoveredSigsToVerify(nMaxBatchSize, recSigsByNode, quorums);
if (recSigsByNode.empty()) {
return false;
}
Expand Down Expand Up @@ -675,7 +676,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()
}
}

return true;
return recSigsByNode.size() >= nMaxBatchSize;
}

// signature must be verified already
Expand Down
23 changes: 10 additions & 13 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,8 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;

CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums);
const size_t nMaxBatchSize{32};
CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums);
if (sigSharesByNodes.empty()) {
return false;
}
Expand Down Expand Up @@ -704,7 +705,7 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
ProcessPendingSigShares(v, quorums, connman);
}

return true;
return sigSharesByNodes.size() >= nMaxBatchSize;
}

// It's ensured that no duplicates are passed to this method
Expand Down Expand Up @@ -1501,12 +1502,12 @@ void CSigSharesManager::WorkThreadMain()
continue;
}

bool didWork = false;
bool fMoreWork{false};

RemoveBannedNodeStates();
didWork |= quorumSigningManager->ProcessPendingRecoveredSigs();
didWork |= ProcessPendingSigShares(*g_connman);
didWork |= SignPendingSigShares();
fMoreWork |= quorumSigningManager->ProcessPendingRecoveredSigs();
fMoreWork |= ProcessPendingSigShares(*g_connman);
SignPendingSigShares();

if (GetTimeMillis() - lastSendTime > 100) {
SendMessages();
Expand All @@ -1517,10 +1518,8 @@ void CSigSharesManager::WorkThreadMain()
quorumSigningManager->Cleanup();

// TODO Wakeup when pending signing is needed?
if (!didWork) {
if (!workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
return;
}
}
}
Expand All @@ -1531,7 +1530,7 @@ void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id,
pendingSigns.emplace_back(quorum, id, msgHash);
}

bool CSigSharesManager::SignPendingSigShares()
void CSigSharesManager::SignPendingSigShares()
{
std::vector<std::tuple<const CQuorumCPtr, uint256, uint256>> v;
{
Expand All @@ -1557,8 +1556,6 @@ bool CSigSharesManager::SignPendingSigShares()
}
}
}

return !v.empty();
}

CSigShare CSigSharesManager::CreateSigShare(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash)
Expand Down
2 changes: 1 addition & 1 deletion src/llmq/quorums_signing_shares.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class CSigSharesManager : public CRecoveredSigsListener
void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend);
void CollectSigSharesToSendConcentrated(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes);
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce);
bool SignPendingSigShares();
void SignPendingSigShares();
void WorkThreadMain();
};

Expand Down

0 comments on commit 899c124

Please sign in to comment.