diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 759e5913d1..c2343f937f 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -178,7 +178,7 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { +func indexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { // short circuit for invalid range if from >= to { return @@ -240,6 +240,9 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan log.Crit("Failed writing batch to db", "error", err) return } + if lastTailCh != nil { + lastTailCh <- &lastNum + } logger := log.Debug if report { logger = log.Info @@ -261,20 +264,20 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func IndexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) { - indexTransactions(db, from, to, interrupt, nil, report) +func IndexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, report bool) { + indexTransactions(db, from, to, lastTailCh, interrupt, nil, report) } // indexTransactionsForTesting is the internal debug version with an additional hook. func indexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { - indexTransactions(db, from, to, interrupt, hook, false) + indexTransactions(db, from, to, nil, interrupt, hook, false) } // unindexTransactions removes txlookup indices of the specified block range. // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { +func unindexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, hook func(uint64) bool, report bool) { // short circuit for invalid range if from >= to { return @@ -336,6 +339,9 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch log.Crit("Failed writing batch to db", "error", err) return } + if lastTailCh != nil { + lastTailCh <- &nextNum + } logger := log.Debug if report { logger = log.Info @@ -353,11 +359,11 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch // // There is a passed channel, the whole procedure will be interrupted if any // signal received. -func UnindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, report bool) { - unindexTransactions(db, from, to, interrupt, nil, report) +func UnindexTransactions(db ethdb.Database, from uint64, to uint64, lastTailCh chan *uint64, interrupt chan struct{}, report bool) { + unindexTransactions(db, from, to, lastTailCh, interrupt, nil, report) } // unindexTransactionsForTesting is the internal debug version with an additional hook. func unindexTransactionsForTesting(db ethdb.Database, from uint64, to uint64, interrupt chan struct{}, hook func(uint64) bool) { - unindexTransactions(db, from, to, interrupt, hook, false) + unindexTransactions(db, from, to, nil, interrupt, hook, false) } diff --git a/core/rawdb/chain_iterator_test.go b/core/rawdb/chain_iterator_test.go index 78b0a82e10..7ec23d6885 100644 --- a/core/rawdb/chain_iterator_test.go +++ b/core/rawdb/chain_iterator_test.go @@ -162,18 +162,18 @@ func TestIndexTransactions(t *testing.T) { t.Fatalf("Transaction tail mismatch") } } - IndexTransactions(chainDb, 5, 11, nil, false) + IndexTransactions(chainDb, 5, 11, nil, nil, false) verify(5, 11, true, 5) verify(0, 5, false, 5) - IndexTransactions(chainDb, 0, 5, nil, false) + IndexTransactions(chainDb, 0, 5, nil, nil, false) verify(0, 11, true, 0) - UnindexTransactions(chainDb, 0, 5, nil, false) + UnindexTransactions(chainDb, 0, 5, nil, nil, false) verify(5, 11, true, 5) verify(0, 5, false, 5) - UnindexTransactions(chainDb, 5, 11, nil, false) + UnindexTransactions(chainDb, 5, 11, nil, nil, false) verify(0, 11, false, 11) // Testing corner cases @@ -190,7 +190,7 @@ func TestIndexTransactions(t *testing.T) { }) verify(9, 11, true, 9) verify(0, 9, false, 9) - IndexTransactions(chainDb, 0, 9, nil, false) + IndexTransactions(chainDb, 0, 9, nil, nil, false) signal = make(chan struct{}) var once2 sync.Once diff --git a/core/txindexer.go b/core/txindexer.go index 70fe5f3322..77b518dca1 100644 --- a/core/txindexer.go +++ b/core/txindexer.go @@ -76,7 +76,7 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer { // run executes the scheduled indexing/unindexing task in a separate thread. // If the stop channel is closed, the task should be terminated as soon as // possible, the done channel will be closed once the task is finished. -func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) { +func (indexer *txIndexer) run(tail *uint64, head uint64, lastTailCh chan *uint64, stop chan struct{}, done chan struct{}) { defer func() { close(done) }() // Short circuit if chain is empty and nothing to index. @@ -91,7 +91,7 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don if indexer.limit != 0 && head >= indexer.limit { from = head - indexer.limit + 1 } - rawdb.IndexTransactions(indexer.db, from, head+1, stop, true) + rawdb.IndexTransactions(indexer.db, from, head+1, lastTailCh, stop, true) return } // The tail flag is existent (which means indexes in [tail, head] should be @@ -105,7 +105,7 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don if end > head+1 { end = head + 1 } - rawdb.IndexTransactions(indexer.db, 0, end, stop, true) + rawdb.IndexTransactions(indexer.db, 0, end, lastTailCh, stop, true) } return } @@ -113,10 +113,10 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don // limit and the latest chain head. if head-indexer.limit+1 < *tail { // Reindex a part of missing indices and rewind index tail to HEAD-limit - rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, stop, true) + rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, lastTailCh, stop, true) } else { // Unindex a part of stale indices and forward index tail to HEAD-limit - rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false) + rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, lastTailCh, stop, false) } } @@ -127,10 +127,11 @@ func (indexer *txIndexer) loop(chain *BlockChain) { // Listening to chain events and manipulate the transaction indexes. var ( - stop chan struct{} // Non-nil if background routine is active. - done chan struct{} // Non-nil if background routine is active. - lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created) - lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed + stop chan struct{} // Non-nil if background routine is active. + done chan struct{} // Non-nil if background routine is active. + lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created) + lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed + lastTailCh = make(chan *uint64, 1) // Pass updated lastTail after indexing work, to avoid read from db headCh = make(chan ChainHeadEvent) sub = chain.SubscribeChainHeadEvent(headCh) @@ -143,7 +144,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) { stop = make(chan struct{}) done = make(chan struct{}) lastHead = head.Number().Uint64() - go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.NumberU64(), stop, done) + go indexer.run(lastTail, head.NumberU64(), lastTailCh, stop, done) } for { select { @@ -151,13 +152,16 @@ func (indexer *txIndexer) loop(chain *BlockChain) { if done == nil { stop = make(chan struct{}) done = make(chan struct{}) - go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done) + go indexer.run(lastTail, head.Block.NumberU64(), lastTailCh, stop, done) } lastHead = head.Block.NumberU64() case <-done: + select { + case lastTail = <-lastTailCh: + default: + } stop = nil done = nil - lastTail = rawdb.ReadTxIndexTail(indexer.db) case ch := <-indexer.progress: ch <- indexer.report(lastHead, lastTail) case ch := <-indexer.term: @@ -168,6 +172,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) { log.Info("Waiting background transaction indexer to exit") <-done } + close(lastTailCh) close(ch) return } diff --git a/core/txindexer_test.go b/core/txindexer_test.go index 7b5ff1f206..d2570da673 100644 --- a/core/txindexer_test.go +++ b/core/txindexer_test.go @@ -221,20 +221,20 @@ func TestTxIndexer(t *testing.T) { db: db, progress: make(chan chan TxIndexProgress), } - indexer.run(nil, 128, make(chan struct{}), make(chan struct{})) + indexer.run(nil, 128, nil, make(chan struct{}), make(chan struct{})) verify(db, c.tailA, indexer) indexer.limit = c.limitB - indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{})) + indexer.run(rawdb.ReadTxIndexTail(db), 128, nil, make(chan struct{}), make(chan struct{})) verify(db, c.tailB, indexer) indexer.limit = c.limitC - indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{})) + indexer.run(rawdb.ReadTxIndexTail(db), 128, nil, make(chan struct{}), make(chan struct{})) verify(db, c.tailC, indexer) // Recover all indexes indexer.limit = 0 - indexer.run(rawdb.ReadTxIndexTail(db), 128, make(chan struct{}), make(chan struct{})) + indexer.run(rawdb.ReadTxIndexTail(db), 128, nil, make(chan struct{}), make(chan struct{})) verify(db, 0, indexer) db.Close()