diff --git a/internal/mempool/cache.go b/internal/mempool/cache.go index 3cd45d2bc5..deaae09e77 100644 --- a/internal/mempool/cache.go +++ b/internal/mempool/cache.go @@ -22,6 +22,10 @@ type TxCache interface { // Remove removes the given raw transaction from the cache. Remove(tx types.Tx) + + // Has reports whether tx is present in the cache. Checking for presence is + // not treated as an access of the value. + Has(tx types.Tx) bool } var _ TxCache = (*LRUTxCache)(nil) @@ -97,6 +101,14 @@ func (c *LRUTxCache) Remove(tx types.Tx) { } } +func (c *LRUTxCache) Has(tx types.Tx) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + + _, ok := c.cacheMap[tx.Key()] + return ok +} + // NopTxCache defines a no-op raw transaction cache. type NopTxCache struct{} @@ -105,3 +117,4 @@ var _ TxCache = (*NopTxCache)(nil) func (NopTxCache) Reset() {} func (NopTxCache) Push(types.Tx) bool { return true } func (NopTxCache) Remove(types.Tx) {} +func (NopTxCache) Has(types.Tx) bool { return false } diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index 67c4c28589..622ee2c86b 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -597,6 +597,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { "old_priority", w.priority, ) txmp.removeTxByElement(vic) + txmp.cache.Remove(w.tx) txmp.metrics.EvictedTxs.Add(1) // We may not need to evict all the eligible transactions. Bail out @@ -783,9 +784,11 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { w := cur.Value.(*WrappedTx) if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks { txmp.removeTxByElement(cur) + txmp.cache.Remove(w.tx) txmp.metrics.EvictedTxs.Add(1) } else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration { txmp.removeTxByElement(cur) + txmp.cache.Remove(w.tx) txmp.metrics.EvictedTxs.Add(1) } cur = next diff --git a/internal/mempool/v1/mempool_test.go b/internal/mempool/v1/mempool_test.go index b338301b8a..616264c70d 100644 --- a/internal/mempool/v1/mempool_test.go +++ b/internal/mempool/v1/mempool_test.go @@ -209,7 +209,7 @@ func TestTxMempool_Size(t *testing.T) { } func TestTxMempool_Eviction(t *testing.T) { - txmp := setup(t, 0) + txmp := setup(t, 1000) txmp.config.Size = 5 txmp.config.MaxTxsBytes = 60 txExists := func(spec string) bool { @@ -238,6 +238,7 @@ func TestTxMempool_Eviction(t *testing.T) { mustCheckTx(t, txmp, "key1=0000=25") require.True(t, txExists("key1=0000=25")) require.False(t, txExists(bigTx)) + require.False(t, txmp.cache.Has([]byte(bigTx))) require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) // Now fill up the rest of the slots with other transactions. @@ -521,10 +522,10 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) { } func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { - txmp := setup(t, 50) + txmp := setup(t, 5000) txmp.config.TTLDuration = 5 * time.Millisecond - added1 := checkTxs(t, txmp, 25, 0) + added1 := checkTxs(t, txmp, 10, 0) require.Equal(t, len(added1), txmp.Size()) // Wait a while, then add some more transactions that should not be expired @@ -540,7 +541,7 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { // The exact intervals are not important except that the delta should be // large relative to the cost of CheckTx (ms vs. ns is fine here). time.Sleep(3 * time.Millisecond) - added2 := checkTxs(t, txmp, 25, 1) + added2 := checkTxs(t, txmp, 10, 1) // Wait a while longer, so that the first batch will expire. time.Sleep(3 * time.Millisecond) @@ -555,6 +556,9 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { if _, ok := txmp.txByKey[tx.tx.Key()]; ok { t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) } + if txmp.cache.Has(tx.tx) { + t.Errorf("Transaction %X should have been removed from the cache", tx.tx.Key()) + } } // All the transactions added later should still be around.