Skip to content

Commit

Permalink
feat: cache lastTail in txindexer to avoid read from db
Browse files Browse the repository at this point in the history
  • Loading branch information
nolanxyg committed Aug 31, 2024
1 parent 4fe68ff commit 01e07b8
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 29 deletions.
22 changes: 14 additions & 8 deletions core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
//
// There is a passed channel, the whole procedure will be interrupted if any
// signal received.
func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
func indexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
// short circuit for invalid range
if from >= to {
return
Expand Down Expand Up @@ -240,6 +240,9 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
log.Crit("Failed writing batch to db", "error", err)
return
}
if lastTailCh != nil {
lastTailCh <- &lastNum
}
logger := log.Debug
if report {
logger = log.Info
Expand All @@ -261,20 +264,20 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
//
// There is a passed channel, the whole procedure will be interrupted if any
// signal received.
func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) {
indexTransactions(db, from, to, interrupt, nil, report)
func IndexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, report bool) {
indexTransactions(db, from, to, lastTailCh, interrupt, nil, report)
}

// indexTransactionsForTesting is the internal debug version with an additional hook.
func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
indexTransactions(db, from, to, interrupt, hook, false)
indexTransactions(db, from, to, nil, interrupt, hook, false)
}

// unindexTransactions removes txlookup indices of the specified block range.
//
// There is a passed channel, the whole procedure will be interrupted if any
// signal received.
func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
func unindexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) {
// short circuit for invalid range
if from >= to {
return
Expand Down Expand Up @@ -336,6 +339,9 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
log.Crit("Failed writing batch to db", "error", err)
return
}
if lastTailCh != nil {
lastTailCh <- &nextNum
}
logger := log.Debug
if report {
logger = log.Info
Expand All @@ -353,11 +359,11 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
//
// There is a passed channel, the whole procedure will be interrupted if any
// signal received.
func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) {
unindexTransactions(db, from, to, interrupt, nil, report)
func UnindexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, report bool) {
unindexTransactions(db, from, to, lastTailCh, interrupt, nil, report)
}

// unindexTransactionsForTesting is the internal debug version with an additional hook.
func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) {
unindexTransactions(db, from, to, interrupt, hook, false)
unindexTransactions(db, from, to, nil, interrupt, hook, false)
}
10 changes: 5 additions & 5 deletions core/rawdb/chain_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,18 @@ func TestIndexTransactions(t *testing.T) {
t.Fatalf("Transaction tail mismatch")
}
}
IndexTransactions(chainDb, 5, 11, nil, false)
IndexTransactions(chainDb, 5, 11, nil, nil, false)
verify(5, 11, true, 5)
verify(0, 5, false, 5)

IndexTransactions(chainDb, 0, 5, nil, false)
IndexTransactions(chainDb, 0, 5, nil, nil, false)
verify(0, 11, true, 0)

UnindexTransactions(chainDb, 0, 5, nil, false)
UnindexTransactions(chainDb, 0, 5, nil, nil, false)
verify(5, 11, true, 5)
verify(0, 5, false, 5)

UnindexTransactions(chainDb, 5, 11, nil, false)
UnindexTransactions(chainDb, 5, 11, nil, nil, false)
verify(0, 11, false, 11)

// Testing corner cases
Expand All @@ -190,7 +190,7 @@ func TestIndexTransactions(t *testing.T) {
})
verify(9, 11, true, 9)
verify(0, 9, false, 9)
IndexTransactions(chainDb, 0, 9, nil, false)
IndexTransactions(chainDb, 0, 9, nil, nil, false)

signal = make(chan struct{})
var once2 sync.Once
Expand Down
29 changes: 17 additions & 12 deletions core/txindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
// run executes the scheduled indexing/unindexing task in a separate thread.
// If the stop channel is closed, the task should be terminated as soon as
// possible, the done channel will be closed once the task is finished.
func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) {
func (indexer *txIndexer) run(tail *uint64, head uint64, lastTailCh chan *uint64, stop chan struct{}, done chan struct{}) {
defer func() { close(done) }()

// Short circuit if chain is empty and nothing to index.
Expand All @@ -91,7 +91,7 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don
if indexer.limit != 0 && head >= indexer.limit {
from = head - indexer.limit + 1
}
rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)
rawdb.IndexTransactions(indexer.db, from, head+1, lastTailCh, stop, true)
return
}
// The tail flag is existent (which means indexes in [tail, head] should be
Expand All @@ -105,18 +105,18 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(indexer.db, 0, end, stop, true)
rawdb.IndexTransactions(indexer.db, 0, end, lastTailCh, stop, true)
}
return
}
// The tail flag is existent, adjust the index range according to configured
// limit and the latest chain head.
if head-indexer.limit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, stop, true)
rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, lastTailCh, stop, true)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false)
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, lastTailCh, stop, false)
}
}

Expand All @@ -127,10 +127,11 @@ func (indexer *txIndexer) loop(chain *BlockChain) {

// Listening to chain events and manipulate the transaction indexes.
var (
stop chan struct{} // Non-nil if background routine is active.
done chan struct{} // Non-nil if background routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
stop chan struct{} // Non-nil if background routine is active.
done chan struct{} // Non-nil if background routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
lastTailCh = make(chan *uint64, 1) // Pass updated lastTail after indexing work, to avoid read from db

headCh = make(chan ChainHeadEvent)
sub = chain.SubscribeChainHeadEvent(headCh)
Expand All @@ -143,21 +144,24 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
stop = make(chan struct{})
done = make(chan struct{})
lastHead = head.Number().Uint64()
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.NumberU64(), stop, done)
go indexer.run(lastTail, head.NumberU64(), lastTailCh, stop, done)
}
for {
select {
case head := <-headCh:
if done == nil {
stop = make(chan struct{})
done = make(chan struct{})
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
go indexer.run(lastTail, head.Block.NumberU64(), lastTailCh, stop, done)
}
lastHead = head.Block.NumberU64()
case <-done:
select {
case lastTail = <-lastTailCh:
default:
}
stop = nil
done = nil
lastTail = rawdb.ReadTxIndexTail(indexer.db)
case ch := <-indexer.progress:
ch <- indexer.report(lastHead, lastTail)
case ch := <-indexer.term:
Expand All @@ -168,6 +172,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
log.Info("Waiting background transaction indexer to exit")
<-done
}
close(lastTailCh)
close(ch)
return
}
Expand Down
8 changes: 4 additions & 4 deletions core/txindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,20 @@ func TestTxIndexer(t *testing.T) {
db: db,
progress: make(chan chan TxIndexProgress),
}
indexer.run(nil, 128, make(chan struct{}), make(chan struct{}))
indexer.run(nil, 128, nil, make(chan struct{}), make(chan struct{}))
verify(db, c.tailA, indexer)

indexer.limit = c.limitB
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
indexer.run(rawdb.ReadTxIndexTail(db), 128, nil, make(chan struct{}), make(chan struct{}))
verify(db, c.tailB, indexer)

indexer.limit = c.limitC
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
indexer.run(rawdb.ReadTxIndexTail(db), 128, nil, make(chan struct{}), make(chan struct{}))
verify(db, c.tailC, indexer)

// Recover all indexes
indexer.limit = 0
indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{}))
indexer.run(rawdb.ReadTxIndexTail(db), 128, nil, make(chan struct{}), make(chan struct{}))
verify(db, 0, indexer)

db.Close()
Expand Down

0 comments on commit 01e07b8

Please sign in to comment.