Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mempool): priority nonce mempool option with tx replacement #14484

Merged
merged 12 commits into from
Jan 10, 2023
5 changes: 4 additions & 1 deletion docs/docs/building-apps/02-app-mempool.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ It is an integer value that sets the mempool in one of three modes, *bounded*, *

#### Callback

Allow to set a callback to be called when a transaction is read from the mempool.
The priority nonce mempool provides mempool options allowing the application sets callback(s).

* **OnRead**: Set a callback to be called when a transaction is read from the mempool.
* **TxReplacement**: Sets a callback to be called when duplicated transaction nonce detected during mempool insert. Application can define a transaction replacement rule based on tx priority or certain transaction fields.

More information on the SDK mempool implementation can be found in the [godocs](https://pkg.go.dev/github.com/cosmos/cosmos-sdk/types/mempool).
31 changes: 25 additions & 6 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type priorityNonceMempool struct {
senderIndices map[string]*skiplist.SkipList
scores map[txMeta]txMeta
onRead func(tx sdk.Tx)
txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool
maxTx int
}

Expand Down Expand Up @@ -92,6 +93,14 @@ func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption
}
}

// PriorityNonceWithTxReplacement sets a callback to be called when duplicated transaction nonce detected during mempool insert.
// Application can define a transaction replacement rule based on tx priority or certain transaction fields.
func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption {
return func(mp *priorityNonceMempool) {
mp.txReplacement = txReplacementRule
}
}

// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the mempool with the semantics:
//
// <0: disabled, `Insert` is a no-op
Expand Down Expand Up @@ -166,12 +175,6 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
mp.senderIndices[sender] = senderIndex
}

mp.priorityCounts[priority]++

// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)

// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
// changed priority will create a new key, so we must remove the old key and
// re-insert it to avoid having the same tx with different priorityIndex indexed
Expand All @@ -181,6 +184,16 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
// changes.
sk := txMeta{nonce: nonce, sender: sender}
if oldScore, txExists := mp.scores[sk]; txExists {
if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
return fmt.Errorf(
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldScore.priority,
priority,
senderIndex.Get(key).Value.(sdk.Tx),
tx,
)
}

mp.priorityIndex.Remove(txMeta{
nonce: nonce,
sender: sender,
Expand All @@ -190,6 +203,12 @@ func (mp *priorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
mp.priorityCounts[oldScore.priority]--
}

mp.priorityCounts[priority]++

// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)

mp.scores[sk] = txMeta{priority: priority}
mp.priorityIndex.Set(key, tx)

Expand Down
52 changes: 52 additions & 0 deletions types/mempool/priority_nonce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,55 @@ func TestTxLimit(t *testing.T) {
require.Equal(t, 0, mp.CountTx())
}
}

func TestTxReplacement(t *testing.T) {
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
sa := accounts[0].Address

txs := []testTx{
{priority: 20, nonce: 1, address: sa},
{priority: 15, nonce: 1, address: sa}, // priority is less than the first Tx, failed tx replacement when the option enabled.
{priority: 23, nonce: 1, address: sa}, // priority is not 20% more than the first Tx, failed tx replacement when the option enabled.
{priority: 24, nonce: 1, address: sa}, // priority is 20% more than the first Tx, the first tx will be replaced.
}

// test Priority with default mempool
mp := mempool.NewPriorityMempool()
for _, tx := range txs {
c := ctx.WithPriority(tx.priority)
require.NoError(t, mp.Insert(c, tx))
require.Equal(t, 1, mp.CountTx())

iter := mp.Select(ctx, nil)
require.Equal(t, tx, iter.Tx())
}

// test Priority with TxReplacement
// we set a TestTxReplacement rule which the priority of the new Tx must be 20% more than the priority of the old Tx
// otherwise, the Insert will return error
feeBump := 20
mp = mempool.NewPriorityMempool(mempool.PriorityNonceWithTxReplacement(func(op, np int64, oTx, nTx sdk.Tx) bool {
threshold := int64(100 + feeBump)
return np >= op*threshold/100
}))

c := ctx.WithPriority(txs[0].priority)
require.NoError(t, mp.Insert(c, txs[0]))
require.Equal(t, 1, mp.CountTx())

c = ctx.WithPriority(txs[1].priority)
require.Error(t, mp.Insert(c, txs[1]))
require.Equal(t, 1, mp.CountTx())

c = ctx.WithPriority(txs[2].priority)
require.Error(t, mp.Insert(c, txs[2]))
require.Equal(t, 1, mp.CountTx())

c = ctx.WithPriority(txs[3].priority)
require.NoError(t, mp.Insert(c, txs[3]))
require.Equal(t, 1, mp.CountTx())

iter := mp.Select(ctx, nil)
require.Equal(t, txs[3], iter.Tx())
}