diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 8da2c01639..91b944d589 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -41,16 +41,6 @@ type prefetchMsg struct { keys [][]byte } -type usedMsg struct { - root common.Hash - used [][]byte -} - -type trieMsg struct { - root common.Hash - resultChan chan *subfetcher -} - // triePrefetcher is an active prefetcher, which receives accounts or storage // items and does trie-loading of them. The goal is to get as much useful content // into the caches as possible. @@ -65,11 +55,8 @@ type triePrefetcher struct { closed int32 closeMainChan chan struct{} // it is to inform the mainLoop closeMainDoneChan chan struct{} - copyChan chan struct{} - copyDoneChan chan *triePrefetcher + fetchersMutex sync.RWMutex prefetchChan chan *prefetchMsg // no need to wait for return - trieChan chan *trieMsg - usedChan chan *usedMsg // no need to wait for return abortChan chan *subfetcher closeAbortChan chan struct{} // it is used to inform abortLoop @@ -97,11 +84,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), - copyChan: make(chan struct{}, concurrentChanSize), - copyDoneChan: make(chan *triePrefetcher, concurrentChanSize), prefetchChan: make(chan *prefetchMsg, concurrentChanSize), - trieChan: make(chan *trieMsg, concurrentChanSize), - usedChan: make(chan *usedMsg, concurrentChanSize), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), @@ -121,34 +104,16 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre func (p *triePrefetcher) mainLoop() { for { select { - case <-p.copyChan: - fetcherCopied := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetchers)), - } - // we're copying an active fetcher, retrieve the current states - for root, fetcher := range p.fetchers { - fetcherCopied.fetches[root] = fetcher.peek() - } - p.copyDoneChan <- fetcherCopied - case pMsg := <-p.prefetchChan: fetcher := p.fetchers[pMsg.root] if fetcher == nil { fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash) + p.fetchersMutex.Lock() p.fetchers[pMsg.root] = fetcher + p.fetchersMutex.Unlock() } fetcher.schedule(pMsg.keys) - case tireMsg := <-p.trieChan: - tireMsg.resultChan <- p.fetchers[tireMsg.root] - - case uMsg := <-p.usedChan: - if fetcher := p.fetchers[uMsg.root]; fetcher != nil { - fetcher.used = uMsg.used - } - case <-p.closeMainChan: for _, fetcher := range p.fetchers { p.abortChan <- fetcher // safe to do multiple times @@ -177,14 +142,14 @@ func (p *triePrefetcher) mainLoop() { } close(p.closeAbortChan) close(p.closeMainDoneChan) + p.fetchersMutex.Lock() p.fetchers = nil + p.fetchersMutex.Unlock() + // drain all the channels before quit the loop for { select { - case <-p.copyChan: case <-p.prefetchChan: - case <-p.trieChan: - case <-p.usedChan: default: return } @@ -238,24 +203,28 @@ func (p *triePrefetcher) copy() *triePrefetcher { } return fetcherCopied } - p.copyChan <- struct{}{} + select { case <-p.closeMainChan: - select { - case <-p.copyChan: // to discard the message sent - default: + // for closed trie prefetcher, the fetches should not be nil + fetcherCopied := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie), } + return fetcherCopied + default: + p.fetchersMutex.RLock() fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetches)), + fetches: make(map[common.Hash]Trie, len(p.fetchers)), } - // for closed trie prefetcher, retrieve the current states + // we're copying an active fetcher, retrieve the current states for root, fetcher := range p.fetchers { fetcherCopied.fetches[root] = fetcher.peek() } - return fetcherCopied - case fetcherCopied := <-p.copyDoneChan: + p.fetchersMutex.RUnlock() return fetcherCopied } } @@ -268,7 +237,6 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c } select { case <-p.closeMainChan: // skip closed trie prefetcher - return case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}: } } @@ -285,19 +253,9 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } - var fetcher *subfetcher - // currentTrieChan is to make sure we receive root's fetcher in concurrency mode. - currentTrieChan := make(chan *subfetcher) - p.trieChan <- &trieMsg{root, currentTrieChan} - select { - case <-p.closeMainChan: - select { - case <-p.trieChan: - default: - } - fetcher = p.fetchers[root] - case fetcher = <-currentTrieChan: - } + p.fetchersMutex.RLock() + fetcher := p.fetchers[root] + p.fetchersMutex.Unlock() if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil @@ -329,12 +287,13 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) { return } select { - case <-p.closeAbortChan: - select { - case <-p.usedChan: - default: + case <-p.closeMainChan: + default: + p.fetchersMutex.RLock() + if fetcher := p.fetchers[root]; fetcher != nil { + fetcher.used = used } - case p.usedChan <- &usedMsg{root, used}: + p.fetchersMutex.RUnlock() } }