From faf01f0b1725bc1e7cb12cc6ffe5ed95bd1f8688 Mon Sep 17 00:00:00 2001 From: setunapo Date: Mon, 4 Jul 2022 10:08:30 +0800 Subject: [PATCH] add some protection in case the trie prefetcher is already stopped --- core/state/trie_prefetcher.go | 57 ++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index f70efc5b38..466eb9fdcb 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -248,8 +248,22 @@ func (p *triePrefetcher) copy() *triePrefetcher { return fetcherCopied } - p.copyChan <- struct{}{} - return <-p.copyDoneChan + select { + case <-p.closeMainChan: + fetcherCopied := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie, len(p.fetches)), + } + // for closed trie prefetcher, retrieve the current states + for root, fetcher := range p.fetchers { + fetcherCopied.fetches[root] = fetcher.peek() + } + return fetcherCopied + default: + p.copyChan <- struct{}{} + return <-p.copyDoneChan + } } // prefetch schedules a batch of trie items to prefetch. @@ -258,7 +272,12 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c if p.fetches != nil { return } - p.prefetchChan <- &prefetchMsg{root, accountHash, keys} + select { + case <-p.closeMainChan: // skip closed trie prefetcher + return + default: + p.prefetchChan <- &prefetchMsg{root, accountHash, keys} + } } // trie returns the trie matching the root hash, or nil if the prefetcher doesn't @@ -273,17 +292,29 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } - trieChan := make(chan *subfetcher) - defer func() { close(trieChan) }() - p.trieChan <- &trieMsg{root, trieChan} - fetcher := <-trieChan + var fetcher *subfetcher + select { + case <-p.closeMainChan: + fetcher = p.fetchers[root] + default: + // currentTrieChan is to make sure we receive root's fetcher in concurrency mode. + currentTrieChan := make(chan *subfetcher) + defer func() { close(currentTrieChan) }() + p.trieChan <- &trieMsg{root, currentTrieChan} + fetcher = <-currentTrieChan + } if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil } - // Interrupt the prefetcher if it's by any chance still running and return - // a copy of any pre-loaded trie. - p.abortChan <- fetcher // safe to do multiple times + + select { + case <-p.closeAbortChan: + default: + // Interrupt the prefetcher if it's by any chance still running and return + // a copy of any pre-loaded trie. + p.abortChan <- fetcher // safe to do multiple times + } trie := fetcher.peek() if trie == nil { @@ -303,7 +334,11 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) { if p.fetches != nil { return } - p.usedChan <- &usedMsg{root, used} + select { + case <-p.closeAbortChan: + default: + p.usedChan <- &usedMsg{root, used} + } } // subfetcher is a trie fetcher goroutine responsible for pulling entries for a