Skip to content

Commit

Permalink
eth/fetcher: don't skip block/header when parent is not found
Browse files Browse the repository at this point in the history
Currently, we simply skip importing block/header when parent block/header is not
found. However, since multiple blocks can be imported in parallel, the not
found parent might be due to the fact that the parent import does not finish
yet. This leads to a suitation that the correct block in canonical chain is
skipped and the node gets stuck until the peer timeout. We observe this behavior
when there are reorgs and block import is time consuming.

This commit fixes it by creating a new queue for those missing parent blocks and
re-import them after the parent is imported.
  • Loading branch information
minh-bq committed Nov 21, 2024
1 parent 584db0f commit 7ee0cf9
Showing 1 changed file with 116 additions and 37 deletions.
153 changes: 116 additions & 37 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package fetcher
import (
"errors"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -32,10 +33,11 @@ import (
)

const (
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
cleanMissingParentInterval = 30 * time.Second // Interval to clean missing parent mapping
)

const (
Expand Down Expand Up @@ -183,6 +185,10 @@ type BlockFetcher struct {
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

missingParentLock sync.Mutex // Protect missingParent mapping from concurrent use
missingParent map[common.Hash][]common.Hash // Mapping from parent hash to slice of block hashes of missing parent blocks
importMissingParent chan common.Hash

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
Expand All @@ -209,30 +215,32 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
) *BlockFetcher {

return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
verifyBlobHeader: verifyBlobHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
missingParent: make(map[common.Hash][]common.Hash),
importMissingParent: make(chan common.Hash, blockLimit),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
verifyBlobHeader: verifyBlobHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
}

Expand Down Expand Up @@ -344,13 +352,15 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac
func (f *BlockFetcher) loop() {
// Iterate the block fetching until a quit is requested
var (
fetchTimer = time.NewTimer(0)
completeTimer = time.NewTimer(0)
fetchTimer = time.NewTimer(0)
completeTimer = time.NewTimer(0)
cleanMissingParentTicker = time.NewTicker(cleanMissingParentInterval)
)
<-fetchTimer.C // clear out the channel
<-completeTimer.C
defer fetchTimer.Stop()
defer completeTimer.Stop()
defer cleanMissingParentTicker.Stop()

for {
// Clean up any expired block fetches
Expand Down Expand Up @@ -378,7 +388,9 @@ func (f *BlockFetcher) loop() {
}
// Otherwise if fresh and still unknown, try and import
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
f.missingParentLock.Lock()
f.forgetBlock(hash)
f.missingParentLock.Unlock()
continue
}
if f.light {
Expand Down Expand Up @@ -442,7 +454,9 @@ func (f *BlockFetcher) loop() {
case hash := <-f.done:
// A pending import finished, remove all traces of the notification
f.forgetHash(hash)
f.missingParentLock.Lock()
f.forgetBlock(hash)
f.missingParentLock.Unlock()

case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
Expand Down Expand Up @@ -684,6 +698,28 @@ func (f *BlockFetcher) loop() {
f.enqueue(announce.origin, nil, block, sidecars)
}
}

case hash := <-f.importMissingParent:
if op := f.queued[hash]; op != nil {
if f.light {
f.importHeaders(op.origin, op.header)
} else {
f.importBlocks(op.origin, op.block, op.sidecars)
}
}
case <-cleanMissingParentTicker.C:
height := f.chainHeight()
f.missingParentLock.Lock()
for _, blocks := range f.missingParent {
for _, block := range blocks {
if op := f.queued[block]; op != nil {
if op.number()+maxUncleDist < height {
f.forgetBlock(block)
}
}
}
}
f.missingParentLock.Unlock()
}
}
}
Expand Down Expand Up @@ -780,13 +816,16 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
// If the parent's unknown, queue for later processing when parent block is imported
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
f.missingParentLock.Lock()
f.missingParent[header.ParentHash] = append(f.missingParent[header.ParentHash], hash)
f.missingParentLock.Unlock()
return
}
defer func() { f.done <- hash }()
// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
Expand All @@ -798,6 +837,14 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
return
}
f.missingParentLock.Lock()
nextBlockHashes, ok := f.missingParent[hash]
f.missingParentLock.Unlock()
if ok {
for _, nextBlockHash := range nextBlockHashes {
f.importMissingParent <- nextBlockHash
}
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(header, nil)
Expand All @@ -814,14 +861,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars []
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done <- hash }()

// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
// If the parent's unknown, queue for later processing when parent block is imported
parentHash := block.ParentHash()
parent := f.getBlock(parentHash)
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", parentHash)
f.missingParentLock.Lock()
f.missingParent[parentHash] = append(f.missingParent[parentHash], hash)
f.missingParentLock.Unlock()
return
}
defer func() { f.done <- hash }()
// Quickly validate the header and propagate the block if it passes
err := f.verifyHeader(block.Header())
if err == nil {
Expand Down Expand Up @@ -853,6 +903,14 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars []
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, nil, false)

f.missingParentLock.Lock()
nextBlockHashes, ok := f.missingParent[hash]
f.missingParentLock.Unlock()
if ok {
for _, nextBlockHash := range nextBlockHashes {
f.importMissingParent <- nextBlockHash
}
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(nil, block)
Expand Down Expand Up @@ -906,12 +964,33 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) {

// forgetBlock removes all traces of a queued block from the fetcher's internal
// state.
// The caller must hold the missingParentLock.
func (f *BlockFetcher) forgetBlock(hash common.Hash) {
if insert := f.queued[hash]; insert != nil {
f.queues[insert.origin]--
if f.queues[insert.origin] == 0 {
delete(f.queues, insert.origin)
}
delete(f.queued, hash)
var parentHash common.Hash
if f.light {
parentHash = insert.header.ParentHash
} else {
parentHash = insert.block.ParentHash()
}
blocks := f.missingParent[parentHash]
for i, block := range blocks {
if block == hash {
// Swap with the last element then decrease the length
blocks[i] = blocks[len(blocks)-1]
blocks = blocks[:len(blocks)-1]
break
}
}
if len(blocks) == 0 {
delete(f.missingParent, parentHash)
} else {
f.missingParent[parentHash] = blocks
}
}
}

0 comments on commit 7ee0cf9

Please sign in to comment.