Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix race conditions in txpool #23474

Merged
merged 6 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"math"
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -478,9 +480,10 @@ func (h *priceHeap) Pop() interface{} {
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
// the floating heap is better. When baseFee is decreasing they behave similarly.
type txPricedList struct {
all *txLookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
stales int // Number of stale price points to (re-heap trigger)
all *txLookup // Pointer to the map of all transactions
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
stales int64 // Number of stale price points to (re-heap trigger)
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
}

const (
Expand Down Expand Up @@ -510,8 +513,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
// the heap if a large enough ratio of transactions go stale.
func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
l.stales += count
if l.stales <= (len(l.urgent.list)+len(l.floating.list))/4 {
stales := atomic.AddInt64(&l.stales, int64(count))
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
return
}
// Seems we've reached a critical number of stale transactions, reheap
Expand All @@ -535,7 +538,7 @@ func (l *txPricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool
for len(h.list) > 0 {
head := h.list[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
l.stales--
atomic.AddInt64(&l.stales, -1)
heap.Pop(h)
continue
}
Expand All @@ -561,7 +564,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.urgent).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
atomic.AddInt64(&l.stales, -1)
continue
}
// Non stale transaction found, move to floating heap
Expand All @@ -574,7 +577,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.floating).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
l.stales--
atomic.AddInt64(&l.stales, -1)
continue
}
// Non stale transaction found, discard it
Expand All @@ -594,8 +597,10 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)

// Reheap forcibly rebuilds the heap based on the current remote transaction set.
func (l *txPricedList) Reheap() {
l.reheapMu.Lock()
defer l.reheapMu.Unlock()
start := time.Now()
l.stales = 0
atomic.StoreInt64(&l.stales, 0)
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
l.urgent.list = append(l.urgent.list, tx)
Expand Down
7 changes: 6 additions & 1 deletion core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/big"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -264,6 +265,7 @@ type TxPool struct {
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
}
Expand Down Expand Up @@ -294,6 +296,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
Expand Down Expand Up @@ -347,6 +350,8 @@ func (pool *TxPool) loop() {
defer evict.Stop()
defer journal.Stop()

// Notify tests that the init phase is done
close(pool.initDoneCh)
for {
select {
// Handle ChainHeadEvent
Expand All @@ -365,8 +370,8 @@ func (pool *TxPool) loop() {
case <-report.C:
pool.mu.RLock()
pending, queued := pool.stats()
stales := pool.priced.stales
pool.mu.RUnlock()
stales := int(atomic.LoadInt64(&pool.priced.stales))

if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
Expand Down
7 changes: 5 additions & 2 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/big"
"math/rand"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -64,7 +65,7 @@ type testBlockChain struct {

func (bc *testBlockChain) CurrentBlock() *types.Block {
return types.NewBlock(&types.Header{
GasLimit: bc.gasLimit,
GasLimit: atomic.LoadUint64(&bc.gasLimit),
}, nil, nil, nil, trie.NewStackTrie(nil))
}

Expand Down Expand Up @@ -123,6 +124,8 @@ func setupTxPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateK
key, _ := crypto.GenerateKey()
pool := NewTxPool(testTxPoolConfig, config, blockchain)

// wait for the pool to initialize
<-pool.initDoneCh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change. What's the point of waiting the loop function to start up here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise the resetState function race with the loop function

return pool, key
}

Expand Down Expand Up @@ -625,7 +628,7 @@ func TestTransactionDropping(t *testing.T) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
}
// Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
<-pool.requestReset(nil, nil)

if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
Expand Down