Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netsync: Re-request data sooner after peer disconnect #3067

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,19 +650,64 @@ func (m *SyncManager) handleDonePeerMsg(p *peerpkg.Peer) {
// Remove the peer from the list of candidate peers.
delete(m.peers, p)

// Remove requested transactions from the global map so that they will
// be fetched from elsewhere.
// Re-request in-flight blocks and transactions that were not received
// by the disconnected peer if the data was announced by another peer.
// Remove the data from the manager's requested data maps if no other
// peers have announced the data.
requestQueues := make(map[*peerpkg.Peer][]*wire.InvVect)
var inv wire.InvVect
inv.Type = wire.InvTypeTx
TxHashes:
for txHash := range peer.requestedTxns {
inv.Hash = txHash
for pp, mgrPeer := range m.peers {
if !pp.IsKnownInventory(&inv) {
continue
}
inv := inv
invs := append(requestQueues[pp], &inv)
requestQueues[pp] = invs
mgrPeer.requestedTxns[txHash] = struct{}{}
continue TxHashes
}
// No peers found that have announced this data.
delete(m.requestedTxns, txHash)
}

// Remove requested blocks from the global map so that they will be
// fetched from elsewhere.
// TODO(oga) we could possibly here check which peers have these blocks
// and request them now to speed things up a little.
inv.Type = wire.InvTypeBlock
BlockHashes:
for blockHash := range peer.requestedBlocks {
inv.Hash = blockHash
for pp, mgrPeer := range m.peers {
if !pp.IsKnownInventory(&inv) {
continue
}
inv := inv
invs := append(requestQueues[pp], &inv)
requestQueues[pp] = invs
mgrPeer.requestedBlocks[blockHash] = struct{}{}
continue BlockHashes
}
// No peers found that have announced this data.
delete(m.requestedBlocks, blockHash)
}
for pp, requestQueue := range requestQueues {
var numRequested int32
gdmsg := wire.NewMsgGetData()
for _, iv := range requestQueue {
gdmsg.AddInvVect(iv)
numRequested++
if numRequested == wire.MaxInvPerMsg {
// Send full getdata message and reset.
pp.QueueMessage(gdmsg, nil)
gdmsg = wire.NewMsgGetData()
numRequested = 0
}
}

if len(gdmsg.InvList) > 0 {
pp.QueueMessage(gdmsg, nil)
}
}

// Attempt to find a new peer to sync from and reset the final requested
// block when the quitting peer is the sync peer.
Expand Down