Skip to content

Commit

Permalink
Merge #19911: net: guard vRecvGetData with cs_vRecv and orphan_work_s…
Browse files Browse the repository at this point in the history
…et with g_cs_orphans

da0988d scripted-diff: rename vRecvGetData (Neha Narula)
ba95181 Guard vRecvGetData (now in net processing) with its own mutex (Neha Narula)
2d9f2fc Move vRecvGetData to net processing (Neha Narula)
673247b Lock before checking if orphan_work_set is empty; indicate it is guarded (Neha Narula)
8803aee Move m_orphan_work_set to net_processing (Neha Narula)
9c47cb2 [Rename only] Rename orphan_work_set to m_orphan_work_set. (Neha Narula)

Pull request description:

  Add annotations to guard `vRecvGetData` and `orphan_work_set` and fix up places where they were accessed without a lock. There is no current data race because they happen to be accessed by only one thread, but this might not always be the case.

  Original discussion: bitcoin/bitcoin#18861 (comment)

ACKs for top commit:
  MarcoFalke:
    review ACK da0988d 🐬
  jnewbery:
    Code review ACK da0988d
  hebasto:
    ACK da0988d, I have reviewed the code and it looks correct, I agree it can be merged.

Tree-SHA512: 31cadd319ddc9273a87e77afc4db7339fd636e816b5e742eba5cb32927ac5cc07a672b2268d2d38a75a0f1b17d93836adab9acf7e52f26ea9a43f54efa57257e
  • Loading branch information
fanquake committed Oct 19, 2020
2 parents 80c8a02 + da0988d commit c92aa83
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 49 deletions.
3 changes: 0 additions & 3 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,6 @@ class CNode

RecursiveMutex cs_sendProcessing;

std::deque<CInv> vRecvGetData;
uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0};

std::atomic<int64_t> nLastSend{0};
Expand Down Expand Up @@ -1051,8 +1050,6 @@ class CNode
// Whether a ping is requested.
std::atomic<bool> fPingQueued{false};

std::set<uint256> orphan_work_set;

CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn, ConnectionType conn_type_in, bool inbound_onion = false);
~CNode();
CNode(const CNode&) = delete;
Expand Down
125 changes: 79 additions & 46 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,14 @@ struct Peer {
/** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */
bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false};

/** Set of txids to reconsider once their parent transactions have been accepted **/
std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans);

/** Protects m_getdata_requests **/
Mutex m_getdata_requests_mutex;
/** Work queue of items requested by this peer **/
std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex);

Peer(NodeId id) : m_id(id) {}
};

Expand Down Expand Up @@ -1654,11 +1662,11 @@ static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode&
return {};
}

void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!cs_main, peer.m_getdata_requests_mutex)
{
AssertLockNotHeld(cs_main);

std::deque<CInv>::iterator it = pfrom.vRecvGetData.begin();
std::deque<CInv>::iterator it = peer.m_getdata_requests.begin();
std::vector<CInv> vNotFound;
const CNetMsgMaker msgMaker(pfrom.GetCommonVersion());

Expand All @@ -1670,7 +1678,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// Process as many TX items from the front of the getdata queue as
// possible, since they're common and it's efficient to batch process
// them.
while (it != pfrom.vRecvGetData.end() && it->IsGenTxMsg()) {
while (it != peer.m_getdata_requests.end() && it->IsGenTxMsg()) {
if (interruptMsgProc) return;
// The send buffer provides backpressure. If there's no space in
// the buffer, pause processing until the next call.
Expand Down Expand Up @@ -1718,7 +1726,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm

// Only process one BLOCK item per call, since they're uncommon and can be
// expensive to process.
if (it != pfrom.vRecvGetData.end() && !pfrom.fPauseSend) {
if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) {
const CInv &inv = *it++;
if (inv.IsGenBlkMsg()) {
ProcessGetBlockData(pfrom, chainparams, inv, connman);
Expand All @@ -1727,7 +1735,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm
// and continue processing the queue on the next call.
}

pfrom.vRecvGetData.erase(pfrom.vRecvGetData.begin(), it);
peer.m_getdata_requests.erase(peer.m_getdata_requests.begin(), it);

if (!vNotFound.empty()) {
// Let the peer know that we didn't find what it asked for, so it doesn't
Expand Down Expand Up @@ -2270,6 +2278,8 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
return;
}

PeerRef peer = GetPeerRef(pfrom.GetId());
if (peer == nullptr) return;

if (msg_type == NetMsgType::VERSION) {
// Each connection can only send one version message
Expand Down Expand Up @@ -2708,8 +2718,12 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId());
}

pfrom.vRecvGetData.insert(pfrom.vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
{
LOCK(peer->m_getdata_requests_mutex);
peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
}

return;
}

Expand Down Expand Up @@ -2797,36 +2811,38 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
return;
}

LOCK(cs_main);
{
LOCK(cs_main);

const CBlockIndex* pindex = LookupBlockIndex(req.blockhash);
if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId());
return;
}
const CBlockIndex* pindex = LookupBlockIndex(req.blockhash);
if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) {
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId());
return;
}

if (pindex->nHeight < ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
// If an older block is requested (should never happen in practice,
// but can happen in tests) send a block response instead of a
// blocktxn response. Sending a full block response instead of a
// small blocktxn response is preferable in the case where a peer
// might maliciously send lots of getblocktxn requests to trigger
// expensive disk reads, because it will require the peer to
// actually receive all the data read from disk over the network.
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
CInv inv;
inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK;
inv.hash = req.blockhash;
pfrom.vRecvGetData.push_back(inv);
// The message processing loop will go around again (without pausing) and we'll respond then (without cs_main)
return;
}
if (pindex->nHeight >= ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) {
CBlock block;
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
assert(ret);

CBlock block;
bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus());
assert(ret);
SendBlockTransactions(pfrom, block, req);
return;
}
}

SendBlockTransactions(pfrom, block, req);
// If an older block is requested (should never happen in practice,
// but can happen in tests) send a block response instead of a
// blocktxn response. Sending a full block response instead of a
// small blocktxn response is preferable in the case where a peer
// might maliciously send lots of getblocktxn requests to trigger
// expensive disk reads, because it will require the peer to
// actually receive all the data read from disk over the network.
LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH);
CInv inv;
WITH_LOCK(cs_main, inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK);
inv.hash = req.blockhash;
WITH_LOCK(peer->m_getdata_requests_mutex, peer->m_getdata_requests.push_back(inv));
// The message processing loop will go around again (without pausing) and we'll respond then
return;
}

Expand Down Expand Up @@ -2961,7 +2977,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
pfrom.orphan_work_set.insert(elem->first);
peer->m_orphan_work_set.insert(elem->first);
}
}
}
Expand All @@ -2978,7 +2994,7 @@ void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDat
}

// Recursively process any orphan transactions that depended on this one
ProcessOrphanTx(pfrom.orphan_work_set);
ProcessOrphanTx(peer->m_orphan_work_set);
}
else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS)
{
Expand Down Expand Up @@ -3773,21 +3789,37 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP
{
bool fMoreWork = false;

if (!pfrom->vRecvGetData.empty())
ProcessGetData(*pfrom, m_chainparams, m_connman, m_mempool, interruptMsgProc);
PeerRef peer = GetPeerRef(pfrom->GetId());
if (peer == nullptr) return false;

if (!pfrom->orphan_work_set.empty()) {
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) {
ProcessGetData(*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc);
}
}

{
LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(pfrom->orphan_work_set);
if (!peer->m_orphan_work_set.empty()) {
ProcessOrphanTx(peer->m_orphan_work_set);
}
}

if (pfrom->fDisconnect)
return false;

// this maintains the order of responses
// and prevents vRecvGetData to grow unbounded
if (!pfrom->vRecvGetData.empty()) return true;
if (!pfrom->orphan_work_set.empty()) return true;
// and prevents m_getdata_requests to grow unbounded
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) return true;
}

{
LOCK(g_cs_orphans);
if (!peer->m_orphan_work_set.empty()) return true;
}

// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)
Expand All @@ -3814,10 +3846,11 @@ bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgP

try {
ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc);
if (interruptMsgProc)
return false;
if (!pfrom->vRecvGetData.empty())
fMoreWork = true;
if (interruptMsgProc) return false;
{
LOCK(peer->m_getdata_requests_mutex);
if (!peer->m_getdata_requests.empty()) fMoreWork = true;
}
} catch (const std::exception& e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name());
} catch (...) {
Expand Down

0 comments on commit c92aa83

Please sign in to comment.