Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]: Redesign triePrefetcher to make it thread safe #972

Merged
merged 10 commits into from
Jul 7, 2022
232 changes: 156 additions & 76 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,29 @@ package state

import (
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

const abortChanSize = 64
const (
abortChanSize = 64
concurrentChanSize = 10
)

var (
// triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"
)

type prefetchMsg struct {
root common.Hash
accountHash common.Hash
keys [][]byte
}

// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
Expand All @@ -42,8 +52,14 @@ type triePrefetcher struct {
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie

abortChan chan *subfetcher
closeChan chan struct{}
closed int32
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
closeMainChan chan struct{} // it is to inform the mainLoop
closeMainDoneChan chan struct{}
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return

abortChan chan *subfetcher
closeAbortChan chan struct{} // it is used to inform abortLoop

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
Expand All @@ -60,11 +76,15 @@ type triePrefetcher struct {
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeChan: make(chan struct{}),
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeAbortChan: make(chan struct{}),

closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
prefetchChan: make(chan *prefetchMsg, concurrentChanSize),

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
Expand All @@ -77,20 +97,62 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
go p.abortLoop()
go p.mainLoop()
return p
}

func (p *triePrefetcher) abortLoop() {
func (p *triePrefetcher) mainLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeChan:
// drain fetcher channel
case pMsg := <-p.prefetchChan:
fetcher := p.fetchers[pMsg.root]
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
if fetcher == nil {
fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash)
p.fetchersMutex.Lock()
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
p.fetchers[pMsg.root] = fetcher
p.fetchersMutex.Unlock()
}
fetcher.schedule(pMsg.keys)

case <-p.closeMainChan:
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
<-fetcher.term
if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))

fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
}
close(p.closeAbortChan)
close(p.closeMainDoneChan)
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
p.fetchersMutex.Lock()
p.fetchers = nil
p.fetchersMutex.Unlock()

// drain all the channels before quit the loop
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.prefetchChan:
j75689 marked this conversation as resolved.
Show resolved Hide resolved
default:
return
}
Expand All @@ -99,73 +161,74 @@ func (p *triePrefetcher) abortLoop() {
}
}

func (p *triePrefetcher) abortLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeAbortChan:
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}

// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
<-fetcher.term
if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
if atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
close(p.closeMainChan)
<-p.closeMainDoneChan // wait until all subfetcher are stopped
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
}
close(p.closeChan)
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}

// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
// already loaded will be copied over, but no goroutines will be started. This
// is mostly used in the miner which creates a copy of it's actively mutated
// state to be sealed while it may further mutate the state.
func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map

abortChan: make(chan *subfetcher),
closeChan: make(chan struct{}),
deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
storageLoadMeter: p.storageLoadMeter,
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetches)),
}
// p.fetches is safe to be accessed outside of mainloop
// if the triePrefetcher is active, fetches will not be used in mainLoop
// otherwise, inactive triePrefetcher is readonly, it won't modify fetches
for root, fetch := range p.fetches {
copy.fetches[root] = p.db.CopyTrie(fetch)
fetcherCopied.fetches[root] = p.db.CopyTrie(fetch)
}
return copy
return fetcherCopied
}
// Otherwise we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
copy.fetches[root] = fetcher.peek()

select {
case <-p.closeMainChan:
// for closed trie prefetcher, the fetches should not be nil
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie),
}
return fetcherCopied
default:
p.fetchersMutex.RLock()
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetchers)),
}
// we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
fetcherCopied.fetches[root] = fetcher.peek()
}
p.fetchersMutex.RUnlock()
return fetcherCopied
}
return copy
}

// prefetch schedules a batch of trie items to prefetch.
Expand All @@ -174,13 +237,10 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
if p.fetches != nil {
return
}
// Active fetcher, schedule the retrievals
fetcher := p.fetchers[root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, root, accountHash)
p.fetchers[root] = fetcher
select {
case <-p.closeMainChan: // skip closed trie prefetcher
case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}:
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
}
fetcher.schedule(keys)
}

// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
Expand All @@ -190,20 +250,25 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
if p.fetches != nil {
trie := p.fetches[root]
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root

p.fetchersMutex.RLock()
fetcher := p.fetchers[root]
p.fetchersMutex.RUnlock()
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
}

// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
p.abortChan <- fetcher // safe to do multiple times
select {
qinglin89 marked this conversation as resolved.
Show resolved Hide resolved
case <-p.closeAbortChan:
case p.abortChan <- fetcher: // safe to do multiple times
unclezoro marked this conversation as resolved.
Show resolved Hide resolved
}

trie := fetcher.peek()
if trie == nil {
Expand All @@ -216,8 +281,23 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.used = used
if !metrics.EnabledExpensive {
yutianwu marked this conversation as resolved.
Show resolved Hide resolved
return
}
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
select {
case <-p.closeMainChan:
default:
p.fetchersMutex.RLock()
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.lock.Lock()
fetcher.used = used
fetcher.lock.Unlock()
}
p.fetchersMutex.RUnlock()
}
}

Expand Down
26 changes: 18 additions & 8 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,33 @@ func filledStateDB() *StateDB {
return state
}

func prefetchGuaranteed(prefetcher *triePrefetcher, root common.Hash, keys [][]byte, accountHash common.Hash) {
prefetcher.prefetch(root, keys, accountHash)
for {
if len(prefetcher.prefetchChan) == 0 {
return
}
time.Sleep(1 * time.Millisecond)
}
}

func TestCopyAndClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
time.Sleep(1 * time.Second)
a := prefetcher.trie(db.originalRoot)
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
b := prefetcher.trie(db.originalRoot)
cpy := prefetcher.copy()
cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
c := cpy.trie(db.originalRoot)
prefetcher.close()
cpy2 := cpy.copy()
cpy2.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy2, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
d := cpy2.trie(db.originalRoot)
cpy.close()
cpy2.close()
Expand All @@ -72,7 +82,7 @@ func TestUseAfterClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
a := prefetcher.trie(db.originalRoot)
prefetcher.close()
b := prefetcher.trie(db.originalRoot)
Expand All @@ -88,7 +98,7 @@ func TestCopyClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy := prefetcher.copy()
a := prefetcher.trie(db.originalRoot)
b := cpy.trie(db.originalRoot)
Expand Down