Skip to content

Commit

Permalink
add some protection in case the trie prefetcher is already stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
setunapo committed Jul 4, 2022
1 parent 3a9b7ac commit faf01f0
Showing 1 changed file with 46 additions and 11 deletions.
57 changes: 46 additions & 11 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,22 @@ func (p *triePrefetcher) copy() *triePrefetcher {
return fetcherCopied
}

p.copyChan <- struct{}{}
return <-p.copyDoneChan
select {
case <-p.closeMainChan:
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetches)),
}
// for closed trie prefetcher, retrieve the current states
for root, fetcher := range p.fetchers {
fetcherCopied.fetches[root] = fetcher.peek()
}
return fetcherCopied
default:
p.copyChan <- struct{}{}
return <-p.copyDoneChan
}
}

// prefetch schedules a batch of trie items to prefetch.
Expand All @@ -258,7 +272,12 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
if p.fetches != nil {
return
}
p.prefetchChan <- &prefetchMsg{root, accountHash, keys}
select {
case <-p.closeMainChan: // skip closed trie prefetcher
return
default:
p.prefetchChan <- &prefetchMsg{root, accountHash, keys}
}
}

// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
Expand All @@ -273,17 +292,29 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
return p.db.CopyTrie(trie)
}

trieChan := make(chan *subfetcher)
defer func() { close(trieChan) }()
p.trieChan <- &trieMsg{root, trieChan}
fetcher := <-trieChan
var fetcher *subfetcher
select {
case <-p.closeMainChan:
fetcher = p.fetchers[root]
default:
// currentTrieChan is to make sure we receive root's fetcher in concurrency mode.
currentTrieChan := make(chan *subfetcher)
defer func() { close(currentTrieChan) }()
p.trieChan <- &trieMsg{root, currentTrieChan}
fetcher = <-currentTrieChan
}
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
p.abortChan <- fetcher // safe to do multiple times

select {
case <-p.closeAbortChan:
default:
// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
p.abortChan <- fetcher // safe to do multiple times
}

trie := fetcher.peek()
if trie == nil {
Expand All @@ -303,7 +334,11 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
if p.fetches != nil {
return
}
p.usedChan <- &usedMsg{root, used}
select {
case <-p.closeAbortChan:
default:
p.usedChan <- &usedMsg{root, used}
}
}

// subfetcher is a trie fetcher goroutine responsible for pulling entries for a
Expand Down

0 comments on commit faf01f0

Please sign in to comment.