From 1cd350fca2d625bea87c28d3af7774c9f3f2171d Mon Sep 17 00:00:00 2001 From: setunapo Date: Tue, 5 Jul 2022 08:52:16 +0800 Subject: [PATCH] replace channel by RWMutex for a few triePrefetcher APIs For APIs like: trie(), copy(), used(), it is simpler and more efficient to use a RWMutex instead of channel communicaton. Since the mainLoop would be busy handling trie request, while these trie request can be processed in parallism. We would only keep prefetch and close within the mainLoop, since they could update the fetchers --- core/state/trie_prefetcher.go | 95 ++++++++++------------------------- 1 file changed, 27 insertions(+), 68 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 8da2c01639..91b944d589 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -41,16 +41,6 @@ type prefetchMsg struct { keys [][]byte } -type usedMsg struct { - root common.Hash - used [][]byte -} - -type trieMsg struct { - root common.Hash - resultChan chan *subfetcher -} - // triePrefetcher is an active prefetcher, which receives accounts or storage // items and does trie-loading of them. The goal is to get as much useful content // into the caches as possible. @@ -65,11 +55,8 @@ type triePrefetcher struct { closed int32 closeMainChan chan struct{} // it is to inform the mainLoop closeMainDoneChan chan struct{} - copyChan chan struct{} - copyDoneChan chan *triePrefetcher + fetchersMutex sync.RWMutex prefetchChan chan *prefetchMsg // no need to wait for return - trieChan chan *trieMsg - usedChan chan *usedMsg // no need to wait for return abortChan chan *subfetcher closeAbortChan chan struct{} // it is used to inform abortLoop @@ -97,11 +84,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), - copyChan: make(chan struct{}, concurrentChanSize), - copyDoneChan: make(chan *triePrefetcher, concurrentChanSize), prefetchChan: make(chan *prefetchMsg, concurrentChanSize), - trieChan: make(chan *trieMsg, concurrentChanSize), - usedChan: make(chan *usedMsg, concurrentChanSize), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), @@ -121,34 +104,16 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre func (p *triePrefetcher) mainLoop() { for { select { - case <-p.copyChan: - fetcherCopied := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetchers)), - } - // we're copying an active fetcher, retrieve the current states - for root, fetcher := range p.fetchers { - fetcherCopied.fetches[root] = fetcher.peek() - } - p.copyDoneChan <- fetcherCopied - case pMsg := <-p.prefetchChan: fetcher := p.fetchers[pMsg.root] if fetcher == nil { fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash) + p.fetchersMutex.Lock() p.fetchers[pMsg.root] = fetcher + p.fetchersMutex.Unlock() } fetcher.schedule(pMsg.keys) - case tireMsg := <-p.trieChan: - tireMsg.resultChan <- p.fetchers[tireMsg.root] - - case uMsg := <-p.usedChan: - if fetcher := p.fetchers[uMsg.root]; fetcher != nil { - fetcher.used = uMsg.used - } - case <-p.closeMainChan: for _, fetcher := range p.fetchers { p.abortChan <- fetcher // safe to do multiple times @@ -177,14 +142,14 @@ func (p *triePrefetcher) mainLoop() { } close(p.closeAbortChan) close(p.closeMainDoneChan) + p.fetchersMutex.Lock() p.fetchers = nil + p.fetchersMutex.Unlock() + // drain all the channels before quit the loop for { select { - case <-p.copyChan: case <-p.prefetchChan: - case <-p.trieChan: - case <-p.usedChan: default: return } @@ -238,24 +203,28 @@ func (p *triePrefetcher) copy() *triePrefetcher { } return fetcherCopied } - p.copyChan <- struct{}{} + select { case <-p.closeMainChan: - select { - case <-p.copyChan: // to discard the message sent - default: + // for closed trie prefetcher, the fetches should not be nil + fetcherCopied := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie), } + return fetcherCopied + default: + p.fetchersMutex.RLock() fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetches)), + fetches: make(map[common.Hash]Trie, len(p.fetchers)), } - // for closed trie prefetcher, retrieve the current states + // we're copying an active fetcher, retrieve the current states for root, fetcher := range p.fetchers { fetcherCopied.fetches[root] = fetcher.peek() } - return fetcherCopied - case fetcherCopied := <-p.copyDoneChan: + p.fetchersMutex.RUnlock() return fetcherCopied } } @@ -268,7 +237,6 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c } select { case <-p.closeMainChan: // skip closed trie prefetcher - return case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}: } } @@ -285,19 +253,9 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } - var fetcher *subfetcher - // currentTrieChan is to make sure we receive root's fetcher in concurrency mode. - currentTrieChan := make(chan *subfetcher) - p.trieChan <- &trieMsg{root, currentTrieChan} - select { - case <-p.closeMainChan: - select { - case <-p.trieChan: - default: - } - fetcher = p.fetchers[root] - case fetcher = <-currentTrieChan: - } + p.fetchersMutex.RLock() + fetcher := p.fetchers[root] + p.fetchersMutex.Unlock() if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil @@ -329,12 +287,13 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) { return } select { - case <-p.closeAbortChan: - select { - case <-p.usedChan: - default: + case <-p.closeMainChan: + default: + p.fetchersMutex.RLock() + if fetcher := p.fetchers[root]; fetcher != nil { + fetcher.used = used } - case p.usedChan <- &usedMsg{root, used}: + p.fetchersMutex.RUnlock() } }