diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index f01e683ff2..fe156cd044 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -24,9 +24,9 @@ import ( "sort" "time" - mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" @@ -54,6 +54,10 @@ const ( // re-request them. maxTxUnderpricedSetSize = 32768 + // maxTxUnderpricedTimeout is the max time a transaction should be stuck in the underpriced set + // set as 5 minutes. + maxTxUnderpricedTimeout = 300 + // txArriveTimeout is the time allowance before an announced transaction is // explicitly requested. txArriveTimeout = 500 * time.Millisecond @@ -149,7 +153,7 @@ type TxFetcher struct { drop chan *txDrop quit chan struct{} - underpriced mapset.Set[common.Hash] // Transactions discarded as too cheap (don't re-fetch) + underpriced *lru.Cache[common.Hash, int64] // Transactions discarded as too cheap (don't re-fetch) // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. @@ -203,7 +207,7 @@ func NewTxFetcherForTests( fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), alternates: make(map[common.Hash]map[string]struct{}), - underpriced: mapset.NewSet[common.Hash](), + underpriced: lru.NewCache[common.Hash, int64](maxTxUnderpricedSetSize), hasTx: hasTx, addTxs: addTxs, fetchTxs: fetchTxs, @@ -232,7 +236,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { case f.hasTx(hash): duplicate++ - case f.underpriced.Contains(hash): + case f.isKnownUnderpriced(hash): underpriced++ default: @@ -258,6 +262,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { } } +// isKnownUnderpriced reports whether a transaction hash was recently found to be underpriced. +func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool { + prevTime, ok := f.underpriced.Peek(hash) + if ok && prevTime+maxTxUnderpricedTimeout < time.Now().Unix() { + f.underpriced.Remove(hash) + return false + } + return ok +} + // Enqueue imports a batch of received transaction into the transaction pool // and the fetcher. This method may be called by both transaction broadcasts and // direct request replies. The differentiation is important so the fetcher can @@ -300,10 +314,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) // Avoid re-request this transaction when we receive another // announcement. if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) { - for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { - f.underpriced.Pop() - } - f.underpriced.Add(batch[j].Hash()) + f.underpriced.Add(batch[j].Hash(), batch[j].Time().Unix()) } // Track a few interesting failure types switch { diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 1715def99c..980c1a6c26 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -1509,8 +1509,8 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { } case isUnderpriced: - if fetcher.underpriced.Cardinality() != int(step) { - t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Cardinality(), step) + if fetcher.underpriced.Len() != int(step) { + t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Len(), step) } default: