From 96fd8b060431bff9e07dfe2a0ee6a5a70095d0bc Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 30 Jun 2022 16:59:15 +0800 Subject: [PATCH 01/10] Redesign triePrefetcher to make it thread safe There are 2 types of triePrefetcher instances: 1.New created triePrefetcher: it is key to do trie prefetch to speed up validation phase. 2.Copied triePrefetcher: it only copy the prefetched trie information, actually it won't do prefetch at all, the copied tries are all kept in p.fetches. Here we try to improve the new created one, to make it concurrent safe, while the copied one's behavior stay unchanged(its logic is very simple). As commented in triePrefetcher struct, its APIs are not thread safe. So callers should make sure the created triePrefetcher should be used within a single routine. As we are trying to improve triePrefetcher, we would use it concurrently, so it is necessary to redesign it for concurrent access. The design is simple: ** start a mainLoop to do all the work, APIs just send channel message. Others: ** remove the metrics copy, since it is useless for copied triePrefetcher ** for trie(), only get subfetcher through channel to reduce the workload of mainloop --- core/state/trie_prefetcher.go | 247 +++++++++++++++++++++++----------- 1 file changed, 168 insertions(+), 79 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index d92648b4dc..b3bd5e4bc4 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -24,13 +24,27 @@ import ( "github.com/ethereum/go-ethereum/metrics" ) -const abortChanSize = 64 +const ( + abortChanSize = 64 + concurrentChanSize = 10 +) var ( // triePrefetchMetricsPrefix is the prefix under which to publis the metrics. triePrefetchMetricsPrefix = "trie/prefetch/" ) +type prefetchMsg struct { + root common.Hash + accountHash common.Hash + keys [][]byte +} + +type usedMsg struct { + root common.Hash + used [][]byte +} + // triePrefetcher is an active prefetcher, which receives accounts or storage // items and does trie-loading of them. The goal is to get as much useful content // into the caches as possible. @@ -42,8 +56,17 @@ type triePrefetcher struct { fetches map[common.Hash]Trie // Partially or fully fetcher tries fetchers map[common.Hash]*subfetcher // Subfetchers for each trie - abortChan chan *subfetcher - closeChan chan struct{} + closeMainChan chan struct{} // it is to inform the mainLoop + closeMainDoneChan chan struct{} + copyChan chan struct{} + copyDoneChan chan *triePrefetcher + prefetchChan chan *prefetchMsg // no need to wait for return + trieChan chan *common.Hash + trieDoneChan chan *subfetcher + usedChan chan *usedMsg // no need to wait for return + + abortChan chan *subfetcher + closeAbortChan chan struct{} // it is used to inform abortLoop deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter @@ -60,11 +83,21 @@ type triePrefetcher struct { func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map - abortChan: make(chan *subfetcher, abortChanSize), - closeChan: make(chan struct{}), + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + + abortChan: make(chan *subfetcher, abortChanSize), + closeAbortChan: make(chan struct{}), + + closeMainChan: make(chan struct{}), + closeMainDoneChan: make(chan struct{}), + prefetchChan: make(chan *prefetchMsg, concurrentChanSize), + trieChan: make(chan *common.Hash, concurrentChanSize), + trieDoneChan: make(chan *subfetcher, concurrentChanSize), + usedChan: make(chan *usedMsg, concurrentChanSize), + copyChan: make(chan struct{}, concurrentChanSize), + copyDoneChan: make(chan *triePrefetcher, concurrentChanSize), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), @@ -77,15 +110,109 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), } go p.abortLoop() + go p.mainLoop() return p } +func (p *triePrefetcher) mainLoop() { + // a series of anonymous functions which are concurrent unsafe, + // to avoid being accessed outside of the mainloop + copyFunc := func() *triePrefetcher { + copy := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie, len(p.fetches)), + abortChan: make(chan *subfetcher), + closeAbortChan: make(chan struct{}), + } + // we're copying an active fetcher, retrieve the current states + for root, fetcher := range p.fetchers { + copy.fetches[root] = fetcher.peek() + } + return copy + } + + closeFunc := func() { + for _, fetcher := range p.fetchers { + p.abortChan <- fetcher // safe to do multiple times + <-fetcher.term + if metrics.EnabledExpensive { + if fetcher.root == p.root { + p.accountLoadMeter.Mark(int64(len(fetcher.seen))) + p.accountDupMeter.Mark(int64(fetcher.dups)) + p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.accountWasteMeter.Mark(int64(len(fetcher.seen))) + } else { + p.storageLoadMeter.Mark(int64(len(fetcher.seen))) + p.storageDupMeter.Mark(int64(fetcher.dups)) + p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.storageWasteMeter.Mark(int64(len(fetcher.seen))) + } + } + } + close(p.closeAbortChan) + // Clear out all fetchers (will crash on a second call, deliberate) + p.fetchers = nil + } + + prefetchFunc := func(root common.Hash, keys [][]byte, accountHash common.Hash) { + fetcher := p.fetchers[root] + if fetcher == nil { + fetcher = newSubfetcher(p.db, root, accountHash) + p.fetchers[root] = fetcher + } + fetcher.schedule(keys) + } + + trieFunc := func(root common.Hash) *subfetcher { + // Otherwise the prefetcher is active, bail if no trie was prefetched for this root + fetcher := p.fetchers[root] + if fetcher == nil { + p.deliveryMissMeter.Mark(1) + return nil + } + return fetcher + } + + usedFunc := func(root common.Hash, used [][]byte) { + if fetcher := p.fetchers[root]; fetcher != nil { + fetcher.used = used + } + } + + // the main loop is executed from here + for { + select { + case <-p.copyChan: + p.copyDoneChan <- copyFunc() + case pMsg := <-p.prefetchChan: + prefetchFunc(pMsg.root, pMsg.keys, pMsg.accountHash) + case tireHash := <-p.trieChan: + p.trieDoneChan <- trieFunc(*tireHash) + case used := <-p.usedChan: + usedFunc(used.root, used.used) + case <-p.closeMainChan: + closeFunc() + close(p.closeMainDoneChan) + return + } + } +} + func (p *triePrefetcher) abortLoop() { for { select { case fetcher := <-p.abortChan: fetcher.abort() - case <-p.closeChan: + case <-p.closeAbortChan: // drain fetcher channel for { select { @@ -99,73 +226,45 @@ func (p *triePrefetcher) abortLoop() { } } -// close iterates over all the subfetchers, aborts any that were left spinning -// and reports the stats to the metrics subsystem. -func (p *triePrefetcher) close() { - for _, fetcher := range p.fetchers { - p.abortChan <- fetcher // safe to do multiple times - <-fetcher.term - if metrics.EnabledExpensive { - if fetcher.root == p.root { - p.accountLoadMeter.Mark(int64(len(fetcher.seen))) - p.accountDupMeter.Mark(int64(fetcher.dups)) - p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.accountWasteMeter.Mark(int64(len(fetcher.seen))) - } else { - p.storageLoadMeter.Mark(int64(len(fetcher.seen))) - p.storageDupMeter.Mark(int64(fetcher.dups)) - p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.storageWasteMeter.Mark(int64(len(fetcher.seen))) - } - } - } - close(p.closeChan) - // Clear out all fetchers (will crash on a second call, deliberate) - p.fetchers = nil -} - // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data // already loaded will be copied over, but no goroutines will be started. This // is mostly used in the miner which creates a copy of it's actively mutated // state to be sealed while it may further mutate the state. func (p *triePrefetcher) copy() *triePrefetcher { - copy := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map - - abortChan: make(chan *subfetcher), - closeChan: make(chan struct{}), - deliveryMissMeter: p.deliveryMissMeter, - accountLoadMeter: p.accountLoadMeter, - accountDupMeter: p.accountDupMeter, - accountSkipMeter: p.accountSkipMeter, - accountWasteMeter: p.accountWasteMeter, - storageLoadMeter: p.storageLoadMeter, - storageDupMeter: p.storageDupMeter, - storageSkipMeter: p.storageSkipMeter, - storageWasteMeter: p.storageWasteMeter, - } // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { + copy := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie, len(p.fetches)), + abortChan: make(chan *subfetcher), + closeAbortChan: make(chan struct{}), + } + // p.fetches is safe to be accessed outside of mainloop + // if the triePrefetcher is a active, fetches is not will in mainLoop + // otherwise, inactive triePrefetcher is readonly, it has no loop for root, fetch := range p.fetches { copy.fetches[root] = p.db.CopyTrie(fetch) } return copy } - // Otherwise we're copying an active fetcher, retrieve the current states - for root, fetcher := range p.fetchers { - copy.fetches[root] = fetcher.peek() + + p.copyChan <- struct{}{} + return <-p.copyDoneChan +} + +// close iterates over all the subfetchers, aborts any that were left spinning +// and reports the stats to the metrics subsystem. +// Attention, this API is not thread safe: double close() +func (p *triePrefetcher) close() { + select { + case <-p.closeMainChan: // already closed + return + default: + close(p.closeMainChan) + // wait until all subfetcher are stopped + <-p.closeMainDoneChan } - return copy } // prefetch schedules a batch of trie items to prefetch. @@ -174,13 +273,7 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c if p.fetches != nil { return } - // Active fetcher, schedule the retrievals - fetcher := p.fetchers[root] - if fetcher == nil { - fetcher = newSubfetcher(p.db, root, accountHash) - p.fetchers[root] = fetcher - } - fetcher.schedule(keys) + p.prefetchChan <- &prefetchMsg{root, accountHash, keys} } // trie returns the trie matching the root hash, or nil if the prefetcher doesn't @@ -195,12 +288,10 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { } return p.db.CopyTrie(trie) } - // Otherwise the prefetcher is active, bail if no trie was prefetched for this root - fetcher := p.fetchers[root] - if fetcher == nil { - p.deliveryMissMeter.Mark(1) - return nil - } + + p.trieChan <- &root + fetcher := <-p.trieDoneChan + // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. p.abortChan <- fetcher // safe to do multiple times @@ -216,9 +307,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { // used marks a batch of state items used to allow creating statistics as to // how useful or wasteful the prefetcher is. func (p *triePrefetcher) used(root common.Hash, used [][]byte) { - if fetcher := p.fetchers[root]; fetcher != nil { - fetcher.used = used - } + p.usedChan <- &usedMsg{root, used} } // subfetcher is a trie fetcher goroutine responsible for pulling entries for a From 1a4d369c5474859912bd34de057fc041e02b54d7 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 30 Jun 2022 18:12:36 +0800 Subject: [PATCH 02/10] some code enhancement for triePrefetcher redesign --- core/state/trie_prefetcher.go | 181 +++++++++++++++------------------- 1 file changed, 79 insertions(+), 102 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index b3bd5e4bc4..0054f898d0 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -83,10 +83,9 @@ type triePrefetcher struct { func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map - + db: db, + root: root, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map abortChan: make(chan *subfetcher, abortChanSize), closeAbortChan: make(chan struct{}), @@ -115,93 +114,70 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre } func (p *triePrefetcher) mainLoop() { - // a series of anonymous functions which are concurrent unsafe, - // to avoid being accessed outside of the mainloop - copyFunc := func() *triePrefetcher { - copy := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetches)), - abortChan: make(chan *subfetcher), - closeAbortChan: make(chan struct{}), - } - // we're copying an active fetcher, retrieve the current states - for root, fetcher := range p.fetchers { - copy.fetches[root] = fetcher.peek() - } - return copy - } - - closeFunc := func() { - for _, fetcher := range p.fetchers { - p.abortChan <- fetcher // safe to do multiple times - <-fetcher.term - if metrics.EnabledExpensive { - if fetcher.root == p.root { - p.accountLoadMeter.Mark(int64(len(fetcher.seen))) - p.accountDupMeter.Mark(int64(fetcher.dups)) - p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.accountWasteMeter.Mark(int64(len(fetcher.seen))) - } else { - p.storageLoadMeter.Mark(int64(len(fetcher.seen))) - p.storageDupMeter.Mark(int64(fetcher.dups)) - p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) - - for _, key := range fetcher.used { - delete(fetcher.seen, string(key)) - } - p.storageWasteMeter.Mark(int64(len(fetcher.seen))) - } - } - } - close(p.closeAbortChan) - // Clear out all fetchers (will crash on a second call, deliberate) - p.fetchers = nil - } - - prefetchFunc := func(root common.Hash, keys [][]byte, accountHash common.Hash) { - fetcher := p.fetchers[root] - if fetcher == nil { - fetcher = newSubfetcher(p.db, root, accountHash) - p.fetchers[root] = fetcher - } - fetcher.schedule(keys) - } - - trieFunc := func(root common.Hash) *subfetcher { - // Otherwise the prefetcher is active, bail if no trie was prefetched for this root - fetcher := p.fetchers[root] - if fetcher == nil { - p.deliveryMissMeter.Mark(1) - return nil - } - return fetcher - } - - usedFunc := func(root common.Hash, used [][]byte) { - if fetcher := p.fetchers[root]; fetcher != nil { - fetcher.used = used - } - } - - // the main loop is executed from here for { select { case <-p.copyChan: - p.copyDoneChan <- copyFunc() + copy := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie, len(p.fetchers)), + } + // we're copying an active fetcher, retrieve the current states + for root, fetcher := range p.fetchers { + copy.fetches[root] = fetcher.peek() + } + p.copyDoneChan <- copy + case pMsg := <-p.prefetchChan: - prefetchFunc(pMsg.root, pMsg.keys, pMsg.accountHash) + fetcher := p.fetchers[pMsg.root] + if fetcher == nil { + fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash) + p.fetchers[pMsg.root] = fetcher + } + fetcher.schedule(pMsg.keys) + case tireHash := <-p.trieChan: - p.trieDoneChan <- trieFunc(*tireHash) - case used := <-p.usedChan: - usedFunc(used.root, used.used) + fetcher := p.fetchers[*tireHash] + // bail if no trie was prefetched for this root + if fetcher == nil { + p.deliveryMissMeter.Mark(1) + } + p.trieDoneChan <- fetcher + + case uMsg := <-p.usedChan: + if fetcher := p.fetchers[uMsg.root]; fetcher != nil { + fetcher.used = uMsg.used + } + case <-p.closeMainChan: - closeFunc() + for _, fetcher := range p.fetchers { + p.abortChan <- fetcher // safe to do multiple times + <-fetcher.term + if metrics.EnabledExpensive { + if fetcher.root == p.root { + p.accountLoadMeter.Mark(int64(len(fetcher.seen))) + p.accountDupMeter.Mark(int64(fetcher.dups)) + p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.accountWasteMeter.Mark(int64(len(fetcher.seen))) + } else { + p.storageLoadMeter.Mark(int64(len(fetcher.seen))) + p.storageDupMeter.Mark(int64(fetcher.dups)) + p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) + + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + p.storageWasteMeter.Mark(int64(len(fetcher.seen))) + } + } + } + close(p.closeAbortChan) close(p.closeMainDoneChan) + p.fetchers = nil return } } @@ -226,6 +202,20 @@ func (p *triePrefetcher) abortLoop() { } } +// close iterates over all the subfetchers, aborts any that were left spinning +// and reports the stats to the metrics subsystem. +// Attention, this API is not thread safe: double close() +func (p *triePrefetcher) close() { + select { + case <-p.closeMainChan: // already closed + return + default: + close(p.closeMainChan) + // wait until all subfetcher are stopped + <-p.closeMainDoneChan + } +} + // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data // already loaded will be copied over, but no goroutines will be started. This // is mostly used in the miner which creates a copy of it's actively mutated @@ -241,8 +231,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { closeAbortChan: make(chan struct{}), } // p.fetches is safe to be accessed outside of mainloop - // if the triePrefetcher is a active, fetches is not will in mainLoop - // otherwise, inactive triePrefetcher is readonly, it has no loop + // if the triePrefetcher is active, fetches will not be used in mainLoop + // otherwise, inactive triePrefetcher is readonly, it won't modify fetches for root, fetch := range p.fetches { copy.fetches[root] = p.db.CopyTrie(fetch) } @@ -253,20 +243,6 @@ func (p *triePrefetcher) copy() *triePrefetcher { return <-p.copyDoneChan } -// close iterates over all the subfetchers, aborts any that were left spinning -// and reports the stats to the metrics subsystem. -// Attention, this API is not thread safe: double close() -func (p *triePrefetcher) close() { - select { - case <-p.closeMainChan: // already closed - return - default: - close(p.closeMainChan) - // wait until all subfetcher are stopped - <-p.closeMainDoneChan - } -} - // prefetch schedules a batch of trie items to prefetch. func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash common.Hash) { // If the prefetcher is an inactive one, bail out @@ -283,7 +259,6 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { if p.fetches != nil { trie := p.fetches[root] if trie == nil { - p.deliveryMissMeter.Mark(1) return nil } return p.db.CopyTrie(trie) @@ -291,7 +266,9 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { p.trieChan <- &root fetcher := <-p.trieDoneChan - + if fetcher == nil { + return nil + } // Interrupt the prefetcher if it's by any chance still running and return // a copy of any pre-loaded trie. p.abortChan <- fetcher // safe to do multiple times From b673fdf4ba6685f34116a336ed2c425bff49032d Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 30 Jun 2022 19:55:39 +0800 Subject: [PATCH 03/10] some fixup: rename, temporary trie chan for concurrent safe. --- core/state/trie_prefetcher.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 0054f898d0..30cacd728c 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -45,6 +45,11 @@ type usedMsg struct { used [][]byte } +type trieMsg struct { + root common.Hash + resultChan chan *subfetcher +} + // triePrefetcher is an active prefetcher, which receives accounts or storage // items and does trie-loading of them. The goal is to get as much useful content // into the caches as possible. @@ -61,8 +66,7 @@ type triePrefetcher struct { copyChan chan struct{} copyDoneChan chan *triePrefetcher prefetchChan chan *prefetchMsg // no need to wait for return - trieChan chan *common.Hash - trieDoneChan chan *subfetcher + trieChan chan *trieMsg usedChan chan *usedMsg // no need to wait for return abortChan chan *subfetcher @@ -92,8 +96,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), prefetchChan: make(chan *prefetchMsg, concurrentChanSize), - trieChan: make(chan *common.Hash, concurrentChanSize), - trieDoneChan: make(chan *subfetcher, concurrentChanSize), + trieChan: make(chan *trieMsg, concurrentChanSize), usedChan: make(chan *usedMsg, concurrentChanSize), copyChan: make(chan struct{}, concurrentChanSize), copyDoneChan: make(chan *triePrefetcher, concurrentChanSize), @@ -136,13 +139,13 @@ func (p *triePrefetcher) mainLoop() { } fetcher.schedule(pMsg.keys) - case tireHash := <-p.trieChan: - fetcher := p.fetchers[*tireHash] + case tireMsg := <-p.trieChan: + fetcher := p.fetchers[tireMsg.root] // bail if no trie was prefetched for this root if fetcher == nil { p.deliveryMissMeter.Mark(1) } - p.trieDoneChan <- fetcher + tireMsg.resultChan <- fetcher case uMsg := <-p.usedChan: if fetcher := p.fetchers[uMsg.root]; fetcher != nil { @@ -264,8 +267,10 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } - p.trieChan <- &root - fetcher := <-p.trieDoneChan + trieChan := make(chan *subfetcher) + defer func() { close(trieChan) }() + p.trieChan <- &trieMsg{root, trieChan} + fetcher := <-trieChan if fetcher == nil { return nil } From f1c4e8b44991e2a303d9d3ea241aedbc0b600f0d Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 1 Jul 2022 15:00:41 +0800 Subject: [PATCH 04/10] fix review comments --- core/state/trie_prefetcher.go | 48 +++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 30cacd728c..ad2fded4be 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -120,16 +120,16 @@ func (p *triePrefetcher) mainLoop() { for { select { case <-p.copyChan: - copy := &triePrefetcher{ + fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, fetches: make(map[common.Hash]Trie, len(p.fetchers)), } // we're copying an active fetcher, retrieve the current states for root, fetcher := range p.fetchers { - copy.fetches[root] = fetcher.peek() + fetcherCopied.fetches[root] = fetcher.peek() } - p.copyDoneChan <- copy + p.copyDoneChan <- fetcherCopied case pMsg := <-p.prefetchChan: fetcher := p.fetchers[pMsg.root] @@ -140,12 +140,7 @@ func (p *triePrefetcher) mainLoop() { fetcher.schedule(pMsg.keys) case tireMsg := <-p.trieChan: - fetcher := p.fetchers[tireMsg.root] - // bail if no trie was prefetched for this root - if fetcher == nil { - p.deliveryMissMeter.Mark(1) - } - tireMsg.resultChan <- fetcher + tireMsg.resultChan <- p.fetchers[tireMsg.root] case uMsg := <-p.usedChan: if fetcher := p.fetchers[uMsg.root]; fetcher != nil { @@ -181,7 +176,17 @@ func (p *triePrefetcher) mainLoop() { close(p.closeAbortChan) close(p.closeMainDoneChan) p.fetchers = nil - return + // drain all the channels before quit the loop + for { + select { + case <-p.copyChan: + case <-p.prefetchChan: + case <-p.trieChan: + case <-p.usedChan: + default: + return + } + } } } } @@ -209,13 +214,16 @@ func (p *triePrefetcher) abortLoop() { // and reports the stats to the metrics subsystem. // Attention, this API is not thread safe: double close() func (p *triePrefetcher) close() { + // If the prefetcher is an inactive one, bail out + if p.fetches != nil { + return + } select { - case <-p.closeMainChan: // already closed + case <-p.closeMainChan: // skip if already closed return default: close(p.closeMainChan) - // wait until all subfetcher are stopped - <-p.closeMainDoneChan + <-p.closeMainDoneChan // wait until all subfetcher are stopped } } @@ -226,7 +234,7 @@ func (p *triePrefetcher) close() { func (p *triePrefetcher) copy() *triePrefetcher { // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { - copy := &triePrefetcher{ + fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, fetches: make(map[common.Hash]Trie, len(p.fetches)), @@ -237,9 +245,9 @@ func (p *triePrefetcher) copy() *triePrefetcher { // if the triePrefetcher is active, fetches will not be used in mainLoop // otherwise, inactive triePrefetcher is readonly, it won't modify fetches for root, fetch := range p.fetches { - copy.fetches[root] = p.db.CopyTrie(fetch) + fetcherCopied.fetches[root] = p.db.CopyTrie(fetch) } - return copy + return fetcherCopied } p.copyChan <- struct{}{} @@ -272,6 +280,7 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { p.trieChan <- &trieMsg{root, trieChan} fetcher := <-trieChan if fetcher == nil { + p.deliveryMissMeter.Mark(1) return nil } // Interrupt the prefetcher if it's by any chance still running and return @@ -289,6 +298,13 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { // used marks a batch of state items used to allow creating statistics as to // how useful or wasteful the prefetcher is. func (p *triePrefetcher) used(root common.Hash, used [][]byte) { + if !metrics.EnabledExpensive { + return + } + // If the prefetcher is an inactive one, bail out + if p.fetches != nil { + return + } p.usedChan <- &usedMsg{root, used} } From 20e4a720357f1af6fe0d246a70be24da130b1dbc Mon Sep 17 00:00:00 2001 From: setunapo Date: Mon, 4 Jul 2022 10:08:30 +0800 Subject: [PATCH 05/10] add some protection in case the trie prefetcher is already stopped --- core/state/trie_prefetcher.go | 57 ++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index ad2fded4be..8f271724a0 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -250,8 +250,22 @@ func (p *triePrefetcher) copy() *triePrefetcher { return fetcherCopied } - p.copyChan <- struct{}{} - return <-p.copyDoneChan + select { + case <-p.closeMainChan: + fetcherCopied := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie, len(p.fetches)), + } + // for closed trie prefetcher, retrieve the current states + for root, fetcher := range p.fetchers { + fetcherCopied.fetches[root] = fetcher.peek() + } + return fetcherCopied + default: + p.copyChan <- struct{}{} + return <-p.copyDoneChan + } } // prefetch schedules a batch of trie items to prefetch. @@ -260,7 +274,12 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c if p.fetches != nil { return } - p.prefetchChan <- &prefetchMsg{root, accountHash, keys} + select { + case <-p.closeMainChan: // skip closed trie prefetcher + return + default: + p.prefetchChan <- &prefetchMsg{root, accountHash, keys} + } } // trie returns the trie matching the root hash, or nil if the prefetcher doesn't @@ -275,17 +294,29 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } - trieChan := make(chan *subfetcher) - defer func() { close(trieChan) }() - p.trieChan <- &trieMsg{root, trieChan} - fetcher := <-trieChan + var fetcher *subfetcher + select { + case <-p.closeMainChan: + fetcher = p.fetchers[root] + default: + // currentTrieChan is to make sure we receive root's fetcher in concurrency mode. + currentTrieChan := make(chan *subfetcher) + defer func() { close(currentTrieChan) }() + p.trieChan <- &trieMsg{root, currentTrieChan} + fetcher = <-currentTrieChan + } if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil } - // Interrupt the prefetcher if it's by any chance still running and return - // a copy of any pre-loaded trie. - p.abortChan <- fetcher // safe to do multiple times + + select { + case <-p.closeAbortChan: + default: + // Interrupt the prefetcher if it's by any chance still running and return + // a copy of any pre-loaded trie. + p.abortChan <- fetcher // safe to do multiple times + } trie := fetcher.peek() if trie == nil { @@ -305,7 +336,11 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) { if p.fetches != nil { return } - p.usedChan <- &usedMsg{root, used} + select { + case <-p.closeAbortChan: + default: + p.usedChan <- &usedMsg{root, used} + } } // subfetcher is a trie fetcher goroutine responsible for pulling entries for a From 704493ad550739bafd869176fa5548b8455d5a13 Mon Sep 17 00:00:00 2001 From: setunapo Date: Mon, 4 Jul 2022 20:32:39 +0800 Subject: [PATCH 06/10] fix review comments ** make close concurrent safe ** fix potential deadlock --- core/state/trie_prefetcher.go | 63 +++++++++++++++++------------------ 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 8f271724a0..9cab2e1a8a 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -18,6 +18,7 @@ package state import ( "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -61,6 +62,7 @@ type triePrefetcher struct { fetches map[common.Hash]Trie // Partially or fully fetcher tries fetchers map[common.Hash]*subfetcher // Subfetchers for each trie + closed int32 closeMainChan chan struct{} // it is to inform the mainLoop closeMainDoneChan chan struct{} copyChan chan struct{} @@ -95,11 +97,11 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), + copyChan: make(chan struct{}, concurrentChanSize), + copyDoneChan: make(chan *triePrefetcher, concurrentChanSize), prefetchChan: make(chan *prefetchMsg, concurrentChanSize), trieChan: make(chan *trieMsg, concurrentChanSize), usedChan: make(chan *usedMsg, concurrentChanSize), - copyChan: make(chan struct{}, concurrentChanSize), - copyDoneChan: make(chan *triePrefetcher, concurrentChanSize), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), @@ -197,15 +199,7 @@ func (p *triePrefetcher) abortLoop() { case fetcher := <-p.abortChan: fetcher.abort() case <-p.closeAbortChan: - // drain fetcher channel - for { - select { - case fetcher := <-p.abortChan: - fetcher.abort() - default: - return - } - } + return } } } @@ -218,10 +212,7 @@ func (p *triePrefetcher) close() { if p.fetches != nil { return } - select { - case <-p.closeMainChan: // skip if already closed - return - default: + if atomic.CompareAndSwapInt32(&p.closed, 0, 1) { close(p.closeMainChan) <-p.closeMainDoneChan // wait until all subfetcher are stopped } @@ -249,9 +240,13 @@ func (p *triePrefetcher) copy() *triePrefetcher { } return fetcherCopied } - + p.copyChan <- struct{}{} select { case <-p.closeMainChan: + select { + case <-p.copyChan: // to discard the message sent + default: + } fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, @@ -262,9 +257,8 @@ func (p *triePrefetcher) copy() *triePrefetcher { fetcherCopied.fetches[root] = fetcher.peek() } return fetcherCopied - default: - p.copyChan <- struct{}{} - return <-p.copyDoneChan + case fetcherCopied := <-p.copyDoneChan: + return fetcherCopied } } @@ -277,8 +271,7 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c select { case <-p.closeMainChan: // skip closed trie prefetcher return - default: - p.prefetchChan <- &prefetchMsg{root, accountHash, keys} + case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}: } } @@ -295,27 +288,28 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { } var fetcher *subfetcher + // currentTrieChan is to make sure we receive root's fetcher in concurrency mode. + currentTrieChan := make(chan *subfetcher) + p.trieChan <- &trieMsg{root, currentTrieChan} select { case <-p.closeMainChan: + select { + case <-p.trieChan: + default: + } fetcher = p.fetchers[root] - default: - // currentTrieChan is to make sure we receive root's fetcher in concurrency mode. - currentTrieChan := make(chan *subfetcher) - defer func() { close(currentTrieChan) }() - p.trieChan <- &trieMsg{root, currentTrieChan} - fetcher = <-currentTrieChan + case fetcher = <-currentTrieChan: } if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil } + // Interrupt the prefetcher if it's by any chance still running and return + // a copy of any pre-loaded trie. select { case <-p.closeAbortChan: - default: - // Interrupt the prefetcher if it's by any chance still running and return - // a copy of any pre-loaded trie. - p.abortChan <- fetcher // safe to do multiple times + case p.abortChan <- fetcher: // safe to do multiple times } trie := fetcher.peek() @@ -338,8 +332,11 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) { } select { case <-p.closeAbortChan: - default: - p.usedChan <- &usedMsg{root, used} + select { + case <-p.usedChan: + default: + } + case p.usedChan <- &usedMsg{root, used}: } } From 8b5544a163974d80efa46b5454726c53f1361c23 Mon Sep 17 00:00:00 2001 From: setunapo Date: Tue, 5 Jul 2022 08:52:16 +0800 Subject: [PATCH 07/10] replace channel by RWMutex for a few triePrefetcher APIs For APIs like: trie(), copy(), used(), it is simpler and more efficient to use a RWMutex instead of channel communicaton. Since the mainLoop would be busy handling trie request, while these trie request can be processed in parallism. We would only keep prefetch and close within the mainLoop, since they could update the fetchers --- core/state/trie_prefetcher.go | 95 ++++++++++------------------------- 1 file changed, 27 insertions(+), 68 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 9cab2e1a8a..d594de6679 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -41,16 +41,6 @@ type prefetchMsg struct { keys [][]byte } -type usedMsg struct { - root common.Hash - used [][]byte -} - -type trieMsg struct { - root common.Hash - resultChan chan *subfetcher -} - // triePrefetcher is an active prefetcher, which receives accounts or storage // items and does trie-loading of them. The goal is to get as much useful content // into the caches as possible. @@ -65,11 +55,8 @@ type triePrefetcher struct { closed int32 closeMainChan chan struct{} // it is to inform the mainLoop closeMainDoneChan chan struct{} - copyChan chan struct{} - copyDoneChan chan *triePrefetcher + fetchersMutex sync.RWMutex prefetchChan chan *prefetchMsg // no need to wait for return - trieChan chan *trieMsg - usedChan chan *usedMsg // no need to wait for return abortChan chan *subfetcher closeAbortChan chan struct{} // it is used to inform abortLoop @@ -97,11 +84,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), - copyChan: make(chan struct{}, concurrentChanSize), - copyDoneChan: make(chan *triePrefetcher, concurrentChanSize), prefetchChan: make(chan *prefetchMsg, concurrentChanSize), - trieChan: make(chan *trieMsg, concurrentChanSize), - usedChan: make(chan *usedMsg, concurrentChanSize), deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil), @@ -121,34 +104,16 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre func (p *triePrefetcher) mainLoop() { for { select { - case <-p.copyChan: - fetcherCopied := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetchers)), - } - // we're copying an active fetcher, retrieve the current states - for root, fetcher := range p.fetchers { - fetcherCopied.fetches[root] = fetcher.peek() - } - p.copyDoneChan <- fetcherCopied - case pMsg := <-p.prefetchChan: fetcher := p.fetchers[pMsg.root] if fetcher == nil { fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash) + p.fetchersMutex.Lock() p.fetchers[pMsg.root] = fetcher + p.fetchersMutex.Unlock() } fetcher.schedule(pMsg.keys) - case tireMsg := <-p.trieChan: - tireMsg.resultChan <- p.fetchers[tireMsg.root] - - case uMsg := <-p.usedChan: - if fetcher := p.fetchers[uMsg.root]; fetcher != nil { - fetcher.used = uMsg.used - } - case <-p.closeMainChan: for _, fetcher := range p.fetchers { p.abortChan <- fetcher // safe to do multiple times @@ -177,14 +142,14 @@ func (p *triePrefetcher) mainLoop() { } close(p.closeAbortChan) close(p.closeMainDoneChan) + p.fetchersMutex.Lock() p.fetchers = nil + p.fetchersMutex.Unlock() + // drain all the channels before quit the loop for { select { - case <-p.copyChan: case <-p.prefetchChan: - case <-p.trieChan: - case <-p.usedChan: default: return } @@ -240,24 +205,28 @@ func (p *triePrefetcher) copy() *triePrefetcher { } return fetcherCopied } - p.copyChan <- struct{}{} + select { case <-p.closeMainChan: - select { - case <-p.copyChan: // to discard the message sent - default: + // for closed trie prefetcher, the fetches should not be nil + fetcherCopied := &triePrefetcher{ + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie), } + return fetcherCopied + default: + p.fetchersMutex.RLock() fetcherCopied := &triePrefetcher{ db: p.db, root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetches)), + fetches: make(map[common.Hash]Trie, len(p.fetchers)), } - // for closed trie prefetcher, retrieve the current states + // we're copying an active fetcher, retrieve the current states for root, fetcher := range p.fetchers { fetcherCopied.fetches[root] = fetcher.peek() } - return fetcherCopied - case fetcherCopied := <-p.copyDoneChan: + p.fetchersMutex.RUnlock() return fetcherCopied } } @@ -270,7 +239,6 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c } select { case <-p.closeMainChan: // skip closed trie prefetcher - return case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}: } } @@ -287,19 +255,9 @@ func (p *triePrefetcher) trie(root common.Hash) Trie { return p.db.CopyTrie(trie) } - var fetcher *subfetcher - // currentTrieChan is to make sure we receive root's fetcher in concurrency mode. - currentTrieChan := make(chan *subfetcher) - p.trieChan <- &trieMsg{root, currentTrieChan} - select { - case <-p.closeMainChan: - select { - case <-p.trieChan: - default: - } - fetcher = p.fetchers[root] - case fetcher = <-currentTrieChan: - } + p.fetchersMutex.RLock() + fetcher := p.fetchers[root] + p.fetchersMutex.RUnlock() if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil @@ -331,12 +289,13 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) { return } select { - case <-p.closeAbortChan: - select { - case <-p.usedChan: - default: + case <-p.closeMainChan: + default: + p.fetchersMutex.RLock() + if fetcher := p.fetchers[root]; fetcher != nil { + fetcher.used = used } - case p.usedChan <- &usedMsg{root, used}: + p.fetchersMutex.RUnlock() } } From 593ef70ffeb5f7c243766147eb1e681695c05810 Mon Sep 17 00:00:00 2001 From: setunapo Date: Tue, 5 Jul 2022 12:19:59 +0800 Subject: [PATCH 08/10] add lock for subfecter.used access to make it concurrent safe --- core/state/trie_prefetcher.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index d594de6679..30cb03306a 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -123,19 +123,22 @@ func (p *triePrefetcher) mainLoop() { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) p.accountDupMeter.Mark(int64(fetcher.dups)) p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) - + fetcher.lock.Lock() for _, key := range fetcher.used { delete(fetcher.seen, string(key)) } + fetcher.lock.Unlock() p.accountWasteMeter.Mark(int64(len(fetcher.seen))) } else { p.storageLoadMeter.Mark(int64(len(fetcher.seen))) p.storageDupMeter.Mark(int64(fetcher.dups)) p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) + fetcher.lock.Lock() for _, key := range fetcher.used { delete(fetcher.seen, string(key)) } + fetcher.lock.Unlock() p.storageWasteMeter.Mark(int64(len(fetcher.seen))) } } @@ -171,7 +174,6 @@ func (p *triePrefetcher) abortLoop() { // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. -// Attention, this API is not thread safe: double close() func (p *triePrefetcher) close() { // If the prefetcher is an inactive one, bail out if p.fetches != nil { @@ -293,7 +295,9 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) { default: p.fetchersMutex.RLock() if fetcher := p.fetchers[root]; fetcher != nil { + fetcher.lock.Lock() fetcher.used = used + fetcher.lock.Unlock() } p.fetchersMutex.RUnlock() } From 03b8f9700324b0580b46a7d4f29bc1337f0bae93 Mon Sep 17 00:00:00 2001 From: setunapo Date: Wed, 6 Jul 2022 11:45:07 +0800 Subject: [PATCH 09/10] no need to create channel for copied triePrefetcher --- core/state/trie_prefetcher.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 30cb03306a..e42628dc67 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -193,11 +193,9 @@ func (p *triePrefetcher) copy() *triePrefetcher { // If the prefetcher is already a copy, duplicate the data if p.fetches != nil { fetcherCopied := &triePrefetcher{ - db: p.db, - root: p.root, - fetches: make(map[common.Hash]Trie, len(p.fetches)), - abortChan: make(chan *subfetcher), - closeAbortChan: make(chan struct{}), + db: p.db, + root: p.root, + fetches: make(map[common.Hash]Trie, len(p.fetches)), } // p.fetches is safe to be accessed outside of mainloop // if the triePrefetcher is active, fetches will not be used in mainLoop From 936530beb7a99fe419da0351c63a4d58238fa4b1 Mon Sep 17 00:00:00 2001 From: setunapo Date: Wed, 6 Jul 2022 17:48:54 +0800 Subject: [PATCH 10/10] fix trie_prefetcher_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit trie prefetcher’s behavior has changed, prefetch() won't create subfetcher immediately. it is reasonable, but break the UT, to fix the failed UT --- core/state/trie_prefetcher_test.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index c7f362e84f..aa178dc9d0 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -43,23 +43,33 @@ func filledStateDB() *StateDB { return state } +func prefetchGuaranteed(prefetcher *triePrefetcher, root common.Hash, keys [][]byte, accountHash common.Hash) { + prefetcher.prefetch(root, keys, accountHash) + for { + if len(prefetcher.prefetchChan) == 0 { + return + } + time.Sleep(1 * time.Millisecond) + } +} + func TestCopyAndClose(t *testing.T) { db := filledStateDB() prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") skey := common.HexToHash("aaa") - prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) - prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) time.Sleep(1 * time.Second) a := prefetcher.trie(db.originalRoot) - prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) b := prefetcher.trie(db.originalRoot) cpy := prefetcher.copy() - cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) - cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) c := cpy.trie(db.originalRoot) prefetcher.close() cpy2 := cpy.copy() - cpy2.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(cpy2, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) d := cpy2.trie(db.originalRoot) cpy.close() cpy2.close() @@ -72,7 +82,7 @@ func TestUseAfterClose(t *testing.T) { db := filledStateDB() prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") skey := common.HexToHash("aaa") - prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) a := prefetcher.trie(db.originalRoot) prefetcher.close() b := prefetcher.trie(db.originalRoot) @@ -88,7 +98,7 @@ func TestCopyClose(t *testing.T) { db := filledStateDB() prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") skey := common.HexToHash("aaa") - prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) + prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) cpy := prefetcher.copy() a := prefetcher.trie(db.originalRoot) b := cpy.trie(db.originalRoot)