Skip to content

Commit

Permalink
mempool: ensure evicted transactions are removed from the cache (#9000)
Browse files Browse the repository at this point in the history
In the original implementation transactions evicted for priority were also
removed from the cache. In addition, remove expired transactions from
the cache.

Related:

- Add Has method to cache implementations.
- Update tests to exercise this condition.
  • Loading branch information
M. J. Fromberger authored Jul 14, 2022
1 parent a1c8f8d commit b94470a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
13 changes: 13 additions & 0 deletions internal/mempool/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type TxCache interface {

// Remove removes the given raw transaction from the cache.
Remove(tx types.Tx)

// Has reports whether tx is present in the cache. Checking for presence is
// not treated as an access of the value.
Has(tx types.Tx) bool
}

var _ TxCache = (*LRUTxCache)(nil)
Expand Down Expand Up @@ -97,6 +101,14 @@ func (c *LRUTxCache) Remove(tx types.Tx) {
}
}

func (c *LRUTxCache) Has(tx types.Tx) bool {
c.mtx.Lock()
defer c.mtx.Unlock()

_, ok := c.cacheMap[tx.Key()]
return ok
}

// NopTxCache defines a no-op raw transaction cache.
type NopTxCache struct{}

Expand All @@ -105,3 +117,4 @@ var _ TxCache = (*NopTxCache)(nil)
func (NopTxCache) Reset() {}
func (NopTxCache) Push(types.Tx) bool { return true }
func (NopTxCache) Remove(types.Tx) {}
func (NopTxCache) Has(types.Tx) bool { return false }
3 changes: 3 additions & 0 deletions internal/mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
"old_priority", w.priority,
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)

// We may not need to evict all the eligible transactions. Bail out
Expand Down Expand Up @@ -783,9 +784,11 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
w := cur.Value.(*WrappedTx)
if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
} else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
}
cur = next
Expand Down
12 changes: 8 additions & 4 deletions internal/mempool/v1/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestTxMempool_Size(t *testing.T) {
}

func TestTxMempool_Eviction(t *testing.T) {
txmp := setup(t, 0)
txmp := setup(t, 1000)
txmp.config.Size = 5
txmp.config.MaxTxsBytes = 60
txExists := func(spec string) bool {
Expand Down Expand Up @@ -238,6 +238,7 @@ func TestTxMempool_Eviction(t *testing.T) {
mustCheckTx(t, txmp, "key1=0000=25")
require.True(t, txExists("key1=0000=25"))
require.False(t, txExists(bigTx))
require.False(t, txmp.cache.Has([]byte(bigTx)))
require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes())

// Now fill up the rest of the slots with other transactions.
Expand Down Expand Up @@ -521,10 +522,10 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) {
}

func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
txmp := setup(t, 50)
txmp := setup(t, 5000)
txmp.config.TTLDuration = 5 * time.Millisecond

added1 := checkTxs(t, txmp, 25, 0)
added1 := checkTxs(t, txmp, 10, 0)
require.Equal(t, len(added1), txmp.Size())

// Wait a while, then add some more transactions that should not be expired
Expand All @@ -540,7 +541,7 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
// The exact intervals are not important except that the delta should be
// large relative to the cost of CheckTx (ms vs. ns is fine here).
time.Sleep(3 * time.Millisecond)
added2 := checkTxs(t, txmp, 25, 1)
added2 := checkTxs(t, txmp, 10, 1)

// Wait a while longer, so that the first batch will expire.
time.Sleep(3 * time.Millisecond)
Expand All @@ -555,6 +556,9 @@ func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
if _, ok := txmp.txByKey[tx.tx.Key()]; ok {
t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key())
}
if txmp.cache.Has(tx.tx) {
t.Errorf("Transaction %X should have been removed from the cache", tx.tx.Key())
}
}

// All the transactions added later should still be around.
Expand Down

0 comments on commit b94470a

Please sign in to comment.