Skip to content

Commit

Permalink
eth/fetcher: don't skip block/header when parent is not found
Browse files Browse the repository at this point in the history
Currently, we simply skip importing block/header when parent block/header is not
found. However, since multiple blocks can be imported in parallel, the not
found parent might be due to the fact that the parent import does not finish
yet. This leads to a suitation that the correct block in canonical chain is
skipped and the node gets stuck until the peer timeout. We observe this behavior
when there are reorgs and block import is time consuming.

This commit fixes it by not skipping these blocks (by not marking them as done)
and re-queueing these for future processing when finding that the parent is in
the queue already.
  • Loading branch information
minh-bq committed Nov 20, 2024
1 parent 584db0f commit 18066a3
Showing 1 changed file with 64 additions and 7 deletions.
71 changes: 64 additions & 7 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package fetcher
import (
"errors"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -179,9 +180,11 @@ type BlockFetcher struct {
completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing

// Block cache
queue *prque.Prque // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
queueLock sync.Mutex // The lock to protect queue and queued from concurrent use
queue *prque.Prque // Queue containing the import operations (block number sorted) (must hold queueLock when accessing)
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) (must hold queueLock when accessing)

queues map[string]int // Per peer block counts to prevent memory exhaustion

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
Expand Down Expand Up @@ -361,6 +364,8 @@ func (f *BlockFetcher) loop() {
}
// Import any queued blocks that could potentially fit
height := f.chainHeight()
var insertedOps []*blockOrHeaderInject
f.queueLock.Lock()
for !f.queue.Empty() {
op := f.queue.PopItem().(*blockOrHeaderInject)
hash := op.hash()
Expand All @@ -381,6 +386,11 @@ func (f *BlockFetcher) loop() {
f.forgetBlock(hash)
continue
}
insertedOps = append(insertedOps, op)
}
f.queueLock.Unlock()

for _, op := range insertedOps {
if f.light {
f.importHeaders(op.origin, op.header)
} else {
Expand Down Expand Up @@ -442,7 +452,9 @@ func (f *BlockFetcher) loop() {
case hash := <-f.done:
// A pending import finished, remove all traces of the notification
f.forgetHash(hash)
f.queueLock.Lock()
f.forgetBlock(hash)
f.queueLock.Unlock()

case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
Expand Down Expand Up @@ -533,8 +545,12 @@ func (f *BlockFetcher) loop() {
for _, header := range task.headers {
hash := header.Hash()

f.queueLock.Lock()
isNotQueued := f.queued[hash] == nil
f.queueLock.Unlock()

// Filter fetcher-requested headers from other synchronisation algorithms
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && isNotQueued {
// If the delivered header does not match the promised number, drop the announcer
if header.Number.Uint64() != announce.number {
log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
Expand Down Expand Up @@ -628,7 +644,11 @@ func (f *BlockFetcher) loop() {
txnHash common.Hash // calculated lazily and reused
)
for hash, announce := range f.completing {
if f.queued[hash] != nil || announce.origin != task.peer {
f.queueLock.Lock()
isQueued := f.queued[hash] != nil
f.queueLock.Unlock()

if isQueued || announce.origin != task.peer {
continue
}
if uncleHash == (common.Hash{}) {
Expand Down Expand Up @@ -754,6 +774,7 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
return
}
// Schedule the block for future importing
f.queueLock.Lock()
if _, ok := f.queued[hash]; !ok {
op := &blockOrHeaderInject{origin: peer}
if header != nil {
Expand All @@ -770,6 +791,7 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
}
log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size())
}
f.queueLock.Unlock()
}

// importHeaders spawns a new goroutine to run a header insertion into the chain.
Expand All @@ -780,13 +802,30 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

go func() {
defer func() { f.done <- hash }()
f.queueLock.Lock()
parentHash := header.ParentHash
_, isParentQueued := f.queued[parentHash]

// If the parent's unknown, abort insertion
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
// This can be still a valid header but maybe the parent header parallel insert does not finish yet,
// put this header back to the queue for future processing
if isParentQueued {
op := &blockOrHeaderInject{
origin: peer,
header: header,
}
f.queue.Push(op, -int64(header.Number.Uint64()))
f.queueLock.Unlock()
}
return
}
f.queueLock.Unlock()

defer func() { f.done <- hash }()

// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
Expand Down Expand Up @@ -814,14 +853,31 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars []
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done <- hash }()
f.queueLock.Lock()
parentHash := block.ParentHash()
_, isParentQueued := f.queued[parentHash]

// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
// This can be still a valid block but maybe the parent block parallel insert does not finish yet,
// put this block back to the queue for future processing
if isParentQueued {
op := &blockOrHeaderInject{
origin: peer,
block: block,
sidecars: sidecars,
}
f.queue.Push(op, -int64(block.NumberU64()))
}
f.queueLock.Unlock()
return
}
f.queueLock.Unlock()

defer func() { f.done <- hash }()

// Quickly validate the header and propagate the block if it passes
err := f.verifyHeader(block.Header())
if err == nil {
Expand Down Expand Up @@ -906,6 +962,7 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) {

// forgetBlock removes all traces of a queued block from the fetcher's internal
// state.
// The caller must hold the queueLock.
func (f *BlockFetcher) forgetBlock(hash common.Hash) {
if insert := f.queued[hash]; insert != nil {
f.queues[insert.origin]--
Expand Down

0 comments on commit 18066a3

Please sign in to comment.