diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index fd547ebce..46f98e2b3 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -20,6 +20,7 @@ package fetcher import ( "errors" "math/rand" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -179,9 +180,11 @@ type BlockFetcher struct { completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing // Block cache - queue *prque.Prque // Queue containing the import operations (block number sorted) - queues map[string]int // Per peer block counts to prevent memory exhaustion - queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + queueLock sync.Mutex // The lock to protect queue and queued from concurrent use + queue *prque.Prque // Queue containing the import operations (block number sorted) (must hold queueLock when accessing) + queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) (must hold queueLock when accessing) + + queues map[string]int // Per peer block counts to prevent memory exhaustion // Callbacks getHeader HeaderRetrievalFn // Retrieves a header from the local chain @@ -361,6 +364,8 @@ func (f *BlockFetcher) loop() { } // Import any queued blocks that could potentially fit height := f.chainHeight() + var insertedOps []*blockOrHeaderInject + f.queueLock.Lock() for !f.queue.Empty() { op := f.queue.PopItem().(*blockOrHeaderInject) hash := op.hash() @@ -381,6 +386,11 @@ func (f *BlockFetcher) loop() { f.forgetBlock(hash) continue } + insertedOps = append(insertedOps, op) + } + f.queueLock.Unlock() + + for _, op := range insertedOps { if f.light { f.importHeaders(op.origin, op.header) } else { @@ -442,7 +452,9 @@ func (f *BlockFetcher) loop() { case hash := <-f.done: // A pending import finished, remove all traces of the notification f.forgetHash(hash) + f.queueLock.Lock() f.forgetBlock(hash) + f.queueLock.Unlock() case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval @@ -533,8 +545,12 @@ func (f *BlockFetcher) loop() { for _, header := range task.headers { hash := header.Hash() + f.queueLock.Lock() + isNotQueued := f.queued[hash] == nil + f.queueLock.Unlock() + // Filter fetcher-requested headers from other synchronisation algorithms - if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { + if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && isNotQueued { // If the delivered header does not match the promised number, drop the announcer if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) @@ -628,7 +644,11 @@ func (f *BlockFetcher) loop() { txnHash common.Hash // calculated lazily and reused ) for hash, announce := range f.completing { - if f.queued[hash] != nil || announce.origin != task.peer { + f.queueLock.Lock() + isQueued := f.queued[hash] != nil + f.queueLock.Unlock() + + if isQueued || announce.origin != task.peer { continue } if uncleHash == (common.Hash{}) { @@ -754,6 +774,7 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B return } // Schedule the block for future importing + f.queueLock.Lock() if _, ok := f.queued[hash]; !ok { op := &blockOrHeaderInject{origin: peer} if header != nil { @@ -770,6 +791,7 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B } log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size()) } + f.queueLock.Unlock() } // importHeaders spawns a new goroutine to run a header insertion into the chain. @@ -780,13 +802,30 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) go func() { - defer func() { f.done <- hash }() + f.queueLock.Lock() + parentHash := header.ParentHash + _, isParentQueued := f.queued[parentHash] + // If the parent's unknown, abort insertion parent := f.getHeader(header.ParentHash) if parent == nil { log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + // This can be still a valid header but maybe the parent header parallel insert does not finish yet, + // put this header back to the queue for future processing + if isParentQueued { + op := &blockOrHeaderInject{ + origin: peer, + header: header, + } + f.queue.Push(op, -int64(header.Number.Uint64())) + f.queueLock.Unlock() + } return } + f.queueLock.Unlock() + + defer func() { f.done <- hash }() + // Validate the header and if something went wrong, drop the peer if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock { log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) @@ -814,14 +853,31 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { - defer func() { f.done <- hash }() + f.queueLock.Lock() + parentHash := block.ParentHash() + _, isParentQueued := f.queued[parentHash] // If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + // This can be still a valid block but maybe the parent block parallel insert does not finish yet, + // put this block back to the queue for future processing + if isParentQueued { + op := &blockOrHeaderInject{ + origin: peer, + block: block, + sidecars: sidecars, + } + f.queue.Push(op, -int64(block.NumberU64())) + } + f.queueLock.Unlock() return } + f.queueLock.Unlock() + + defer func() { f.done <- hash }() + // Quickly validate the header and propagate the block if it passes err := f.verifyHeader(block.Header()) if err == nil { @@ -906,6 +962,7 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) { // forgetBlock removes all traces of a queued block from the fetcher's internal // state. +// The caller must hold the queueLock. func (f *BlockFetcher) forgetBlock(hash common.Hash) { if insert := f.queued[hash]; insert != nil { f.queues[insert.origin]--