diff --git a/docs/docs/building-apps/02-app-mempool.md b/docs/docs/building-apps/02-app-mempool.md index 34b7b33667fa..f365fa074894 100644 --- a/docs/docs/building-apps/02-app-mempool.md +++ b/docs/docs/building-apps/02-app-mempool.md @@ -154,6 +154,9 @@ It is an integer value that sets the mempool in one of three modes, *bounded*, * #### Callback -Allow to set a callback to be called when a transaction is read from the mempool. +The priority nonce mempool provides mempool options allowing the application sets callback(s). + +* **OnRead**: Set a callback to be called when a transaction is read from the mempool. +* **TxReplacement**: Sets a callback to be called when duplicated transaction nonce detected during mempool insert. Application can define a transaction replacement rule based on tx priority or certain transaction fields. More information on the SDK mempool implementation can be found in the [godocs](https://pkg.go.dev/github.com/cosmos/cosmos-sdk/types/mempool). diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index eba8311b3205..d5cee302862f 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -29,6 +29,7 @@ type priorityNonceMempool struct { senderIndices map[string]*skiplist.SkipList scores map[txMeta]txMeta onRead func(tx sdk.Tx) + txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool maxTx int } @@ -92,6 +93,14 @@ func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption } } +// PriorityNonceWithTxReplacement sets a callback to be called when duplicated transaction nonce detected during mempool insert. +// Application can define a transaction replacement rule based on tx priority or certain transaction fields. +func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { + return func(mp *priorityNonceMempool) { + mp.txReplacement = txReplacementRule + } +} + // PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the mempool with the semantics: // // <0: disabled, `Insert` is a no-op @@ -166,12 +175,6 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { mp.senderIndices[sender] = senderIndex } - mp.priorityCounts[priority]++ - - // Since senderIndex is scored by nonce, a changed priority will overwrite the - // existing key. - key.senderElement = senderIndex.Set(key, tx) - // Since mp.priorityIndex is scored by priority, then sender, then nonce, a // changed priority will create a new key, so we must remove the old key and // re-insert it to avoid having the same tx with different priorityIndex indexed @@ -181,6 +184,16 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { // changes. sk := txMeta{nonce: nonce, sender: sender} if oldScore, txExists := mp.scores[sk]; txExists { + if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { + return fmt.Errorf( + "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v", + oldScore.priority, + priority, + senderIndex.Get(key).Value.(sdk.Tx), + tx, + ) + } + mp.priorityIndex.Remove(txMeta{ nonce: nonce, sender: sender, @@ -190,6 +203,12 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { mp.priorityCounts[oldScore.priority]-- } + mp.priorityCounts[priority]++ + + // Since senderIndex is scored by nonce, a changed priority will overwrite the + // existing key. + key.senderElement = senderIndex.Set(key, tx) + mp.scores[sk] = txMeta{priority: priority} mp.priorityIndex.Set(key, tx) diff --git a/types/mempool/priority_nonce_test.go b/types/mempool/priority_nonce_test.go index 220fcfb18cf6..2829c4dee9cc 100644 --- a/types/mempool/priority_nonce_test.go +++ b/types/mempool/priority_nonce_test.go @@ -639,3 +639,55 @@ func TestTxLimit(t *testing.T) { require.Equal(t, 0, mp.CountTx()) } } + +func TestTxReplacement(t *testing.T) { + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1) + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + sa := accounts[0].Address + + txs := []testTx{ + {priority: 20, nonce: 1, address: sa}, + {priority: 15, nonce: 1, address: sa}, // priority is less than the first Tx, failed tx replacement when the option enabled. + {priority: 23, nonce: 1, address: sa}, // priority is not 20% more than the first Tx, failed tx replacement when the option enabled. + {priority: 24, nonce: 1, address: sa}, // priority is 20% more than the first Tx, the first tx will be replaced. + } + + // test Priority with default mempool + mp := mempool.NewPriorityMempool() + for _, tx := range txs { + c := ctx.WithPriority(tx.priority) + require.NoError(t, mp.Insert(c, tx)) + require.Equal(t, 1, mp.CountTx()) + + iter := mp.Select(ctx, nil) + require.Equal(t, tx, iter.Tx()) + } + + // test Priority with TxReplacement + // we set a TestTxReplacement rule which the priority of the new Tx must be 20% more than the priority of the old Tx + // otherwise, the Insert will return error + feeBump := 20 + mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithTxReplacement(func(op, np int64, oTx, nTx sdk.Tx) bool { + threshold := int64(100 + feeBump) + return np >= op*threshold/100 + })) + + c := ctx.WithPriority(txs[0].priority) + require.NoError(t, mp.Insert(c, txs[0])) + require.Equal(t, 1, mp.CountTx()) + + c = ctx.WithPriority(txs[1].priority) + require.Error(t, mp.Insert(c, txs[1])) + require.Equal(t, 1, mp.CountTx()) + + c = ctx.WithPriority(txs[2].priority) + require.Error(t, mp.Insert(c, txs[2])) + require.Equal(t, 1, mp.CountTx()) + + c = ctx.WithPriority(txs[3].priority) + require.NoError(t, mp.Insert(c, txs[3])) + require.Equal(t, 1, mp.CountTx()) + + iter := mp.Select(ctx, nil) + require.Equal(t, txs[3], iter.Tx()) +}