Skip to content

Commit

Permalink
replace channel by RWMutex for a few triePrefetcher APIs
Browse files Browse the repository at this point in the history
For APIs like: trie(), copy(), used(), it is simpler and more efficient to
use a RWMutex instead of channel communicaton.
Since the mainLoop would be busy handling trie request, while these trie request
can be processed in parallism.

We would only keep prefetch and close within the mainLoop, since they could update
the fetchers
  • Loading branch information
setunapo committed Jul 5, 2022
1 parent e2afc0e commit 1cd350f
Showing 1 changed file with 27 additions and 68 deletions.
95 changes: 27 additions & 68 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,6 @@ type prefetchMsg struct {
keys [][]byte
}

type usedMsg struct {
root common.Hash
used [][]byte
}

type trieMsg struct {
root common.Hash
resultChan chan *subfetcher
}

// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
Expand All @@ -65,11 +55,8 @@ type triePrefetcher struct {
closed int32
closeMainChan chan struct{} // it is to inform the mainLoop
closeMainDoneChan chan struct{}
copyChan chan struct{}
copyDoneChan chan *triePrefetcher
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return
trieChan chan *trieMsg
usedChan chan *usedMsg // no need to wait for return

abortChan chan *subfetcher
closeAbortChan chan struct{} // it is used to inform abortLoop
Expand Down Expand Up @@ -97,11 +84,7 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre

closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
copyChan: make(chan struct{}, concurrentChanSize),
copyDoneChan: make(chan *triePrefetcher, concurrentChanSize),
prefetchChan: make(chan *prefetchMsg, concurrentChanSize),
trieChan: make(chan *trieMsg, concurrentChanSize),
usedChan: make(chan *usedMsg, concurrentChanSize),

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
Expand All @@ -121,34 +104,16 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
func (p *triePrefetcher) mainLoop() {
for {
select {
case <-p.copyChan:
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetchers)),
}
// we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
fetcherCopied.fetches[root] = fetcher.peek()
}
p.copyDoneChan <- fetcherCopied

case pMsg := <-p.prefetchChan:
fetcher := p.fetchers[pMsg.root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash)
p.fetchersMutex.Lock()
p.fetchers[pMsg.root] = fetcher
p.fetchersMutex.Unlock()
}
fetcher.schedule(pMsg.keys)

case tireMsg := <-p.trieChan:
tireMsg.resultChan <- p.fetchers[tireMsg.root]

case uMsg := <-p.usedChan:
if fetcher := p.fetchers[uMsg.root]; fetcher != nil {
fetcher.used = uMsg.used
}

case <-p.closeMainChan:
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
Expand Down Expand Up @@ -177,14 +142,14 @@ func (p *triePrefetcher) mainLoop() {
}
close(p.closeAbortChan)
close(p.closeMainDoneChan)
p.fetchersMutex.Lock()
p.fetchers = nil
p.fetchersMutex.Unlock()

// drain all the channels before quit the loop
for {
select {
case <-p.copyChan:
case <-p.prefetchChan:
case <-p.trieChan:
case <-p.usedChan:
default:
return
}
Expand Down Expand Up @@ -238,24 +203,28 @@ func (p *triePrefetcher) copy() *triePrefetcher {
}
return fetcherCopied
}
p.copyChan <- struct{}{}

select {
case <-p.closeMainChan:
select {
case <-p.copyChan: // to discard the message sent
default:
// for closed trie prefetcher, the fetches should not be nil
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie),
}
return fetcherCopied
default:
p.fetchersMutex.RLock()
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetches)),
fetches: make(map[common.Hash]Trie, len(p.fetchers)),
}
// for closed trie prefetcher, retrieve the current states
// we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
fetcherCopied.fetches[root] = fetcher.peek()
}
return fetcherCopied
case fetcherCopied := <-p.copyDoneChan:
p.fetchersMutex.RUnlock()
return fetcherCopied
}
}
Expand All @@ -268,7 +237,6 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
}
select {
case <-p.closeMainChan: // skip closed trie prefetcher
return
case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}:
}
}
Expand All @@ -285,19 +253,9 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
return p.db.CopyTrie(trie)
}

var fetcher *subfetcher
// currentTrieChan is to make sure we receive root's fetcher in concurrency mode.
currentTrieChan := make(chan *subfetcher)
p.trieChan <- &trieMsg{root, currentTrieChan}
select {
case <-p.closeMainChan:
select {
case <-p.trieChan:
default:
}
fetcher = p.fetchers[root]
case fetcher = <-currentTrieChan:
}
p.fetchersMutex.RLock()
fetcher := p.fetchers[root]
p.fetchersMutex.Unlock()
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
Expand Down Expand Up @@ -329,12 +287,13 @@ func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
return
}
select {
case <-p.closeAbortChan:
select {
case <-p.usedChan:
default:
case <-p.closeMainChan:
default:
p.fetchersMutex.RLock()
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.used = used
}
case p.usedChan <- &usedMsg{root, used}:
p.fetchersMutex.RUnlock()
}
}

Expand Down

0 comments on commit 1cd350f

Please sign in to comment.