From 2418c3ef2e6f74fd6e7575b743fc1da4b53ab972 Mon Sep 17 00:00:00 2001 From: Matt Kocubinski Date: Wed, 26 Oct 2022 08:04:34 -0600 Subject: [PATCH] feat: nonce (sequence number) based mempool (#13645) simple default mempool implementation Co-authored-by: Jeancarlo --- baseapp/baseapp.go | 7 +- go.mod | 1 + go.sum | 4 + simapp/go.mod | 1 + simapp/go.sum | 4 + tests/go.mod | 1 + tests/go.sum | 4 + types/{ => mempool}/mempool.go | 30 ++- types/mempool/mempool_test.go | 388 ++++++++++++++++++++++++++++++++ types/mempool/nonce.go | 111 +++++++++ types/mempool/skip_list_test.go | 55 +++++ x/auth/tx/builder.go | 35 ++- 12 files changed, 625 insertions(+), 16 deletions(-) rename types/{ => mempool}/mempool.go (57%) create mode 100644 types/mempool/mempool_test.go create mode 100644 types/mempool/nonce.go create mode 100644 types/mempool/skip_list_test.go diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 49a5c398acce..2ff90cb8effb 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -5,7 +5,6 @@ import ( "sort" "strings" - "github.com/cosmos/gogoproto/proto" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/libs/log" @@ -20,6 +19,8 @@ import ( storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" + "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/cosmos/gogoproto/proto" ) const ( @@ -56,7 +57,7 @@ type BaseApp struct { //nolint: maligned interfaceRegistry codectypes.InterfaceRegistry txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx - mempool sdk.Mempool // application side mempool + mempool mempool.Mempool // application side mempool anteHandler sdk.AnteHandler // ante handler for fee and auth postHandler sdk.AnteHandler // post handler, optional, e.g. for tips initChainer sdk.InitChainer // initialize state with validators and state blob @@ -655,7 +656,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re // TODO remove nil check when implemented if mode == runTxModeCheck && app.mempool != nil { - err = app.mempool.Insert(ctx, tx.(sdk.MempoolTx)) + err = app.mempool.Insert(ctx, tx.(mempool.Tx)) if err != nil { return gInfo, nil, anteEvents, priority, err } diff --git a/go.mod b/go.mod index e79cdf64bd61..c77bf164a9c8 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/hashicorp/go-getter v1.6.2 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 + github.com/huandu/skiplist v1.2.0 github.com/improbable-eng/grpc-web v0.15.0 github.com/jhump/protoreflect v1.12.1-0.20220721211354-060cc04fc18b github.com/magiconair/properties v1.8.6 diff --git a/go.sum b/go.sum index 8b41accbb156..538be98dfa3c 100644 --- a/go.sum +++ b/go.sum @@ -492,6 +492,10 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 h1:aSVUgRRRtOrZOC1fYmY9gV0e9z/Iu+xNVSASWjsuyGU= github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3/go.mod h1:5PC6ZNPde8bBqU/ewGZig35+UIZtw9Ytxez8/q5ZyFE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c= +github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U= +github.com/huandu/skiplist v1.2.0 h1:gox56QD77HzSC0w+Ws3MH3iie755GBJU1OER3h5VsYw= +github.com/huandu/skiplist v1.2.0/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXMrPiHF9w= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/simapp/go.mod b/simapp/go.mod index 8fb0ba56d150..92a9472f85e7 100644 --- a/simapp/go.mod +++ b/simapp/go.mod @@ -92,6 +92,7 @@ require ( github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect + github.com/huandu/skiplist v1.2.0 // indirect github.com/improbable-eng/grpc-web v0.15.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/simapp/go.sum b/simapp/go.sum index 69c04ba7522e..c88b23292aef 100644 --- a/simapp/go.sum +++ b/simapp/go.sum @@ -488,6 +488,10 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 h1:aSVUgRRRtOrZOC1fYmY9gV0e9z/Iu+xNVSASWjsuyGU= github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3/go.mod h1:5PC6ZNPde8bBqU/ewGZig35+UIZtw9Ytxez8/q5ZyFE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c= +github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U= +github.com/huandu/skiplist v1.2.0 h1:gox56QD77HzSC0w+Ws3MH3iie755GBJU1OER3h5VsYw= +github.com/huandu/skiplist v1.2.0/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXMrPiHF9w= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/tests/go.mod b/tests/go.mod index 860de9638e54..f7bc18c1031e 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -92,6 +92,7 @@ require ( github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 // indirect + github.com/huandu/skiplist v1.2.0 // indirect github.com/improbable-eng/grpc-web v0.15.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/tests/go.sum b/tests/go.sum index 5afcf2406bdf..0cb8414181bd 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -490,6 +490,10 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3 h1:aSVUgRRRtOrZOC1fYmY9gV0e9z/Iu+xNVSASWjsuyGU= github.com/hdevalence/ed25519consensus v0.0.0-20220222234857-c00d1f31bab3/go.mod h1:5PC6ZNPde8bBqU/ewGZig35+UIZtw9Ytxez8/q5ZyFE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c= +github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U= +github.com/huandu/skiplist v1.2.0 h1:gox56QD77HzSC0w+Ws3MH3iie755GBJU1OER3h5VsYw= +github.com/huandu/skiplist v1.2.0/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXMrPiHF9w= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/types/mempool.go b/types/mempool/mempool.go similarity index 57% rename from types/mempool.go rename to types/mempool/mempool.go index 1ea0b4da9c0c..f32150d6f41e 100644 --- a/types/mempool.go +++ b/types/mempool/mempool.go @@ -1,31 +1,41 @@ -package types +package mempool -// MempoolTx we define an app-side mempool transaction interface that is as +import ( + "errors" + + "github.com/cosmos/cosmos-sdk/types" +) + +// Tx defines an app-side mempool transaction interface that is as // minimal as possible, only requiring applications to define the size of the -// transaction to be used when reaping and getting the transaction itself. +// transaction to be used when inserting, selecting, and deleting the transaction. // Interface type casting can be used in the actual app-side mempool implementation. -type MempoolTx interface { - Tx +type Tx interface { + types.Tx // Size returns the size of the transaction in bytes. - Size() int + Size() int64 } type Mempool interface { - // Insert attempts to insert a MempoolTx into the app-side mempool returning + // Insert attempts to insert a Tx into the app-side mempool returning // an error upon failure. - Insert(Context, MempoolTx) error + Insert(types.Context, Tx) error // Select returns the next set of available transactions from the app-side // mempool, up to maxBytes or until the mempool is empty. The application can // decide to return transactions from its own mempool, from the incoming // txs, or some combination of both. - Select(ctx Context, txs [][]byte, maxBytes int) ([]MempoolTx, error) + Select(txs [][]byte, maxBytes int64) ([]Tx, error) // CountTx returns the number of transactions currently in the mempool. CountTx() int // Remove attempts to remove a transaction from the mempool, returning an error // upon failure. - Remove(Context, MempoolTx) error + Remove(Tx) error } + +var ErrTxNotFound = errors.New("tx not found in mempool") + +type Factory func() Mempool diff --git a/types/mempool/mempool_test.go b/types/mempool/mempool_test.go new file mode 100644 index 000000000000..0e85bc83ab97 --- /dev/null +++ b/types/mempool/mempool_test.go @@ -0,0 +1,388 @@ +package mempool_test + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tendermint/tendermint/libs/log" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + simtypes "github.com/cosmos/cosmos-sdk/types/simulation" + txsigning "github.com/cosmos/cosmos-sdk/types/tx/signing" + "github.com/cosmos/cosmos-sdk/x/auth/signing" +) + +// testPubKey is a dummy implementation of PubKey used for testing. +type testPubKey struct { + address sdk.AccAddress +} + +func (t testPubKey) Reset() { panic("not implemented") } + +func (t testPubKey) String() string { panic("not implemented") } + +func (t testPubKey) ProtoMessage() { panic("not implemented") } + +func (t testPubKey) Address() cryptotypes.Address { return t.address.Bytes() } + +func (t testPubKey) Bytes() []byte { panic("not implemented") } + +func (t testPubKey) VerifySignature(msg []byte, sig []byte) bool { panic("not implemented") } + +func (t testPubKey) Equals(key cryptotypes.PubKey) bool { panic("not implemented") } + +func (t testPubKey) Type() string { panic("not implemented") } + +// testTx is a dummy implementation of Tx used for testing. +type testTx struct { + id int + priority int64 + nonce uint64 + address sdk.AccAddress +} + +func (tx testTx) GetSigners() []sdk.AccAddress { panic("not implemented") } + +func (tx testTx) GetPubKeys() ([]cryptotypes.PubKey, error) { panic("not implemented") } + +func (tx testTx) GetSignaturesV2() (res []txsigning.SignatureV2, err error) { + res = append(res, txsigning.SignatureV2{ + PubKey: testPubKey{address: tx.address}, + Data: nil, + Sequence: tx.nonce}) + + return res, nil +} + +var ( + _ sdk.Tx = (*testTx)(nil) + _ mempool.Tx = (*testTx)(nil) + _ signing.SigVerifiableTx = (*testTx)(nil) + _ cryptotypes.PubKey = (*testPubKey)(nil) +) + +func (tx testTx) Size() int64 { return 1 } + +func (tx testTx) GetMsgs() []sdk.Msg { return nil } + +func (tx testTx) ValidateBasic() error { return nil } + +func (tx testTx) String() string { + return fmt.Sprintf("tx a: %s, p: %d, n: %d", tx.address, tx.priority, tx.nonce) +} + +type sigErrTx struct { + getSigs func() ([]txsigning.SignatureV2, error) +} + +func (_ sigErrTx) Size() int64 { return 0 } + +func (_ sigErrTx) GetMsgs() []sdk.Msg { return nil } + +func (_ sigErrTx) ValidateBasic() error { return nil } + +func (_ sigErrTx) GetSigners() []sdk.AccAddress { return nil } + +func (_ sigErrTx) GetPubKeys() ([]cryptotypes.PubKey, error) { return nil, nil } + +func (t sigErrTx) GetSignaturesV2() ([]txsigning.SignatureV2, error) { return t.getSigs() } + +func (s *MempoolTestSuite) TestDefaultMempool() { + t := s.T() + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 10) + txCount := 1000 + var txs []testTx + + for i := 0; i < txCount; i++ { + acc := accounts[i%len(accounts)] + tx := testTx{ + address: acc.Address, + priority: rand.Int63(), + } + txs = append(txs, tx) + } + + // same sender-nonce just overwrites a tx + for _, tx := range txs { + ctx = ctx.WithPriority(tx.priority) + err := s.mempool.Insert(ctx, tx) + require.NoError(t, err) + } + require.Equal(t, len(accounts), s.mempool.CountTx()) + + // distinct sender-nonce should not overwrite a tx + s.resetMempool() + for i, tx := range txs { + tx.nonce = uint64(i) + err := s.mempool.Insert(ctx, tx) + require.NoError(t, err) + } + require.Equal(t, txCount, s.mempool.CountTx()) + + sel, err := s.mempool.Select(nil, 13) + require.NoError(t, err) + require.Equal(t, 13, len(sel)) + + // a tx which does not implement SigVerifiableTx should not be inserted + tx := &sigErrTx{getSigs: func() ([]txsigning.SignatureV2, error) { + return nil, fmt.Errorf("error") + }} + require.Error(t, s.mempool.Insert(ctx, tx)) + require.Error(t, s.mempool.Remove(tx)) + tx.getSigs = func() ([]txsigning.SignatureV2, error) { + return nil, nil + } + require.Error(t, s.mempool.Insert(ctx, tx)) + require.Error(t, s.mempool.Remove(tx)) + + // removing a tx not in the mempool should error + s.resetMempool() + require.NoError(t, s.mempool.Insert(ctx, txs[0])) + require.ErrorIs(t, s.mempool.Remove(txs[1]), mempool.ErrTxNotFound) + + // inserting a tx with a different priority should overwrite the old tx + newPriorityTx := testTx{ + address: txs[0].address, + priority: txs[0].priority + 1, + nonce: txs[0].nonce, + } + require.NoError(t, s.mempool.Insert(ctx, newPriorityTx)) + require.Equal(t, 1, s.mempool.CountTx()) +} + +type txSpec struct { + i int + p int + n int + a sdk.AccAddress +} + +func (tx txSpec) String() string { + return fmt.Sprintf("[tx i: %d, a: %s, p: %d, n: %d]", tx.i, tx.a, tx.p, tx.n) +} + +func (s *MempoolTestSuite) TestTxOrder() { + t := s.T() + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 5) + sa := accounts[0].Address + sb := accounts[1].Address + sc := accounts[2].Address + + tests := []struct { + txs []txSpec + order []int + fail bool + }{ + { + txs: []txSpec{ + {p: 21, n: 4, a: sa}, + {p: 8, n: 3, a: sa}, + {p: 6, n: 2, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 20, n: 1, a: sa}, + }, + order: []int{3, 4, 2, 1, 0}, + }, + { + txs: []txSpec{ + {p: 3, n: 0, a: sa}, + {p: 5, n: 1, a: sa}, + {p: 9, n: 2, a: sa}, + {p: 6, n: 0, a: sb}, + {p: 5, n: 1, a: sb}, + {p: 8, n: 2, a: sb}, + }, + order: []int{3, 0, 4, 1, 5, 2}, + }, + { + txs: []txSpec{ + {p: 21, n: 4, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 20, n: 1, a: sa}, + }, + order: []int{1, 2, 0}, + }, + { + txs: []txSpec{ + {p: 50, n: 3, a: sa}, + {p: 30, n: 2, a: sa}, + {p: 10, n: 1, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 21, n: 2, a: sb}, + }, + order: []int{3, 2, 4, 1, 0}, + }, + { + txs: []txSpec{ + {p: 50, n: 3, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 99, n: 1, a: sa}, + {p: 15, n: 1, a: sb}, + {p: 8, n: 2, a: sb}, + }, + order: []int{3, 2, 4, 1, 0}, + }, + { + txs: []txSpec{ + {p: 30, a: sa, n: 2}, + {p: 20, a: sb, n: 1}, + {p: 15, a: sa, n: 1}, + {p: 10, a: sa, n: 0}, + {p: 8, a: sb, n: 0}, + {p: 6, a: sa, n: 3}, + {p: 4, a: sb, n: 3}, + }, + order: []int{4, 3, 1, 2, 0, 6, 5}, + }, + { + txs: []txSpec{ + {p: 30, n: 2, a: sa}, + {p: 20, a: sb, n: 1}, + {p: 15, a: sa, n: 1}, + {p: 10, a: sa, n: 0}, + {p: 8, a: sb, n: 0}, + {p: 6, a: sa, n: 3}, + {p: 4, a: sb, n: 3}, + {p: 2, a: sc, n: 0}, + {p: 7, a: sc, n: 3}, + }, + order: []int{4, 3, 7, 1, 2, 0, 6, 5, 8}, + }, + { + txs: []txSpec{ + {p: 6, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sb}, + }, + order: []int{2, 0, 3, 1}, + }, + { + // if all txs have the same priority they will be ordered lexically sender address, and nonce with the + // sender. + txs: []txSpec{ + {p: 10, n: 7, a: sc}, + {p: 10, n: 8, a: sc}, + {p: 10, n: 9, a: sc}, + {p: 10, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 10, n: 3, a: sa}, + {p: 10, n: 4, a: sb}, + {p: 10, n: 5, a: sb}, + {p: 10, n: 6, a: sb}, + }, + order: []int{3, 4, 5, 6, 7, 8, 0, 1, 2}, + }, + /* + The next 4 tests are different permutations of the same set: + + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 20, n: 2, a: sb}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sc}, + {p: 5, n: 1, a: sc}, + + which exercises the actions required to resolve priority ties. + */ + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sb}, + }, + order: []int{2, 0, 3, 1}, + }, + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 20, n: 2, a: sb}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sc}, + {p: 5, n: 1, a: sc}, + }, + order: []int{3, 0, 5, 2, 1, 4}, + }, + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sb}, + {p: 20, n: 2, a: sb}, + {p: 5, n: 1, a: sc}, + {p: 99, n: 2, a: sc}, + }, + order: []int{2, 0, 4, 3, 1, 5}, + }, + { + txs: []txSpec{ + {p: 5, n: 1, a: sa}, + {p: 10, n: 2, a: sa}, + {p: 5, n: 1, a: sc}, + {p: 20, n: 2, a: sc}, + {p: 5, n: 1, a: sb}, + {p: 99, n: 2, a: sb}, + }, + order: []int{4, 0, 2, 5, 1, 3}, + }, + } + for i, tt := range tests { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + pool := s.mempool + + // create test txs and insert into mempool + for i, ts := range tt.txs { + tx := testTx{id: i, priority: int64(ts.p), nonce: uint64(ts.n), address: ts.a} + c := ctx.WithPriority(tx.priority) + err := pool.Insert(c, tx) + require.NoError(t, err) + } + + orderedTxs, err := pool.Select(nil, 1000) + require.NoError(t, err) + var txOrder []int + for _, tx := range orderedTxs { + txOrder = append(txOrder, tx.(testTx).id) + } + for _, tx := range orderedTxs { + require.NoError(t, pool.Remove(tx)) + } + + require.Equal(t, tt.order, txOrder) + require.Equal(t, 0, pool.CountTx()) + }) + } +} + +type MempoolTestSuite struct { + suite.Suite + numTxs int + numAccounts int + iterations int + mempool mempool.Mempool +} + +func (s *MempoolTestSuite) resetMempool() { + s.iterations = 0 + s.mempool = mempool.NewNonceMempool() +} + +func (s *MempoolTestSuite) SetupTest() { + s.numTxs = 1000 + s.numAccounts = 100 + s.resetMempool() +} + +func TestMempoolTestSuite(t *testing.T) { + suite.Run(t, new(MempoolTestSuite)) +} diff --git a/types/mempool/nonce.go b/types/mempool/nonce.go new file mode 100644 index 000000000000..da7f10d8d193 --- /dev/null +++ b/types/mempool/nonce.go @@ -0,0 +1,111 @@ +package mempool + +import ( + "fmt" + + huandu "github.com/huandu/skiplist" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/auth/signing" +) + +// nonceMempool is a mempool that keeps transactions sorted by nonce. Transactions with the lowest nonce globally +// are prioritized. Transactions with the same nonce are prioritized by sender address. Fee/gas based +// prioritization is not supported. +type nonceMempool struct { + txQueue *huandu.SkipList +} + +type txKey struct { + nonce uint64 + sender string +} + +// txKeyLessNonce compares two txKeys by nonce then by sender address. +func txKeyLessNonce(a, b any) int { + keyA := a.(txKey) + keyB := b.(txKey) + + res := huandu.Uint64.Compare(keyB.nonce, keyA.nonce) + if res != 0 { + return res + } + + return huandu.String.Compare(keyB.sender, keyA.sender) +} + +func NewNonceMempool() Mempool { + sp := &nonceMempool{ + txQueue: huandu.New(huandu.LessThanFunc(txKeyLessNonce)), + } + + return sp +} + +// Insert adds a tx to the mempool. It returns an error if the tx does not have at least one signer. +// priority is ignored. +func (sp nonceMempool) Insert(_ sdk.Context, tx Tx) error { + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + if len(sigs) == 0 { + return fmt.Errorf("tx must have at least one signer") + } + + sig := sigs[0] + sender := sig.PubKey.Address().String() + nonce := sig.Sequence + tk := txKey{nonce: nonce, sender: sender} + sp.txQueue.Set(tk, tx) + return nil +} + +// Select returns txs from the mempool with the lowest nonce globally first. A sender's txs will always be returned +// in nonce order. +func (sp nonceMempool) Select(_ [][]byte, maxBytes int64) ([]Tx, error) { + var ( + txBytes int64 + selectedTxs []Tx + ) + + currentTx := sp.txQueue.Front() + for currentTx != nil { + mempoolTx := currentTx.Value.(Tx) + + if txBytes += mempoolTx.Size(); txBytes <= maxBytes { + selectedTxs = append(selectedTxs, mempoolTx) + } else { + return selectedTxs, nil + } + currentTx = currentTx.Next() + } + return selectedTxs, nil +} + +// CountTx returns the number of txs in the mempool. +func (sp nonceMempool) CountTx() int { + return sp.txQueue.Len() +} + +// Remove removes a tx from the mempool. It returns an error if the tx does not have at least one signer or the tx +// was not found in the pool. +func (sp nonceMempool) Remove(tx Tx) error { + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + if len(sigs) == 0 { + return fmt.Errorf("tx must have at least one signer") + } + + sig := sigs[0] + sender := sig.PubKey.Address().String() + nonce := sig.Sequence + tk := txKey{nonce: nonce, sender: sender} + res := sp.txQueue.Remove(tk) + if res == nil { + return ErrTxNotFound + } + return nil +} diff --git a/types/mempool/skip_list_test.go b/types/mempool/skip_list_test.go new file mode 100644 index 000000000000..f13f297f12e2 --- /dev/null +++ b/types/mempool/skip_list_test.go @@ -0,0 +1,55 @@ +package mempool_test + +import ( + "testing" + + huandu "github.com/huandu/skiplist" + "github.com/stretchr/testify/require" +) + +type collisionKey struct { + a int + b int +} + +func TestSkipListCollisions(t *testing.T) { + integerList := huandu.New(huandu.Int) + + integerList.Set(1, 1) + integerList.Set(2, 2) + integerList.Set(3, 3) + + k := integerList.Front() + i := 1 + for k != nil { + require.Equal(t, i, k.Key()) + require.Equal(t, i, k.Value) + i++ + k = k.Next() + } + + // a duplicate key will overwrite the previous value + integerList.Set(1, 4) + require.Equal(t, 3, integerList.Len()) + require.Equal(t, 4, integerList.Get(1).Value) + + // prove this again with a compound key + compoundList := huandu.New(huandu.LessThanFunc(func(x, y any) int { + kx := x.(collisionKey) + ky := y.(collisionKey) + if kx.a == ky.a { + return huandu.Int.Compare(kx.b, ky.b) + } + return huandu.Int.Compare(kx.a, ky.a) + })) + + compoundList.Set(collisionKey{a: 1, b: 1}, 1) + compoundList.Set(collisionKey{a: 1, b: 2}, 2) + compoundList.Set(collisionKey{a: 1, b: 3}, 3) + + require.Equal(t, 3, compoundList.Len()) + compoundList.Set(collisionKey{a: 1, b: 2}, 4) + require.Equal(t, 4, compoundList.Get(collisionKey{a: 1, b: 2}).Value) + compoundList.Set(collisionKey{a: 2, b: 2}, 5) + require.Equal(t, 4, compoundList.Len()) +} diff --git a/x/auth/tx/builder.go b/x/auth/tx/builder.go index 9579865d9fd2..604123167034 100644 --- a/x/auth/tx/builder.go +++ b/x/auth/tx/builder.go @@ -9,6 +9,7 @@ import ( cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" + "github.com/cosmos/cosmos-sdk/types/mempool" "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/types/tx/signing" "github.com/cosmos/cosmos-sdk/x/auth/ante" @@ -31,6 +32,8 @@ type wrapper struct { authInfoBz []byte txBodyHasUnknownNonCriticals bool + + txSize int64 } var ( @@ -40,7 +43,7 @@ var ( _ ante.HasExtensionOptionsTx = &wrapper{} _ ExtensionOptionsTxBuilder = &wrapper{} _ tx.TipTx = &wrapper{} - _ sdk.MempoolTx = &wrapper{} + _ mempool.Tx = &wrapper{} ) // ExtensionOptionsTxBuilder defines a TxBuilder that can also set extensions. @@ -63,8 +66,10 @@ func newBuilder(cdc codec.Codec) *wrapper { } } -func (w *wrapper) Size() int { - panic("not yet implemented") +// Size returns the size of the transaction, but is only correct immediately after decoding a proto-marshal transaction. +// It should not be used in any other cases. +func (w *wrapper) Size() int64 { + return w.txSize } func (w *wrapper) GetMsgs() []sdk.Msg { @@ -216,6 +221,8 @@ func (w *wrapper) SetMsgs(msgs ...sdk.Msg) error { // set bodyBz to nil because the cached bodyBz no longer matches tx.Body w.bodyBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 return nil } @@ -226,6 +233,8 @@ func (w *wrapper) SetTimeoutHeight(height uint64) { // set bodyBz to nil because the cached bodyBz no longer matches tx.Body w.bodyBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetMemo(memo string) { @@ -233,6 +242,8 @@ func (w *wrapper) SetMemo(memo string) { // set bodyBz to nil because the cached bodyBz no longer matches tx.Body w.bodyBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetGasLimit(limit uint64) { @@ -244,6 +255,8 @@ func (w *wrapper) SetGasLimit(limit uint64) { // set authInfoBz to nil because the cached authInfoBz no longer matches tx.AuthInfo w.authInfoBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetFeeAmount(coins sdk.Coins) { @@ -255,6 +268,8 @@ func (w *wrapper) SetFeeAmount(coins sdk.Coins) { // set authInfoBz to nil because the cached authInfoBz no longer matches tx.AuthInfo w.authInfoBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetTip(tip *tx.Tip) { @@ -262,6 +277,8 @@ func (w *wrapper) SetTip(tip *tx.Tip) { // set authInfoBz to nil because the cached authInfoBz no longer matches tx.AuthInfo w.authInfoBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetFeePayer(feePayer sdk.AccAddress) { @@ -273,6 +290,8 @@ func (w *wrapper) SetFeePayer(feePayer sdk.AccAddress) { // set authInfoBz to nil because the cached authInfoBz no longer matches tx.AuthInfo w.authInfoBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetFeeGranter(feeGranter sdk.AccAddress) { @@ -284,6 +303,8 @@ func (w *wrapper) SetFeeGranter(feeGranter sdk.AccAddress) { // set authInfoBz to nil because the cached authInfoBz no longer matches tx.AuthInfo w.authInfoBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetSignatures(signatures ...signing.SignatureV2) error { @@ -315,6 +336,8 @@ func (w *wrapper) setSignerInfos(infos []*tx.SignerInfo) { w.tx.AuthInfo.SignerInfos = infos // set authInfoBz to nil because the cached authInfoBz no longer matches tx.AuthInfo w.authInfoBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) setSignerInfoAtIndex(index int, info *tx.SignerInfo) { @@ -325,6 +348,8 @@ func (w *wrapper) setSignerInfoAtIndex(index int, info *tx.SignerInfo) { w.tx.AuthInfo.SignerInfos[index] = info // set authInfoBz to nil because the cached authInfoBz no longer matches tx.AuthInfo w.authInfoBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) setSignatures(sigs [][]byte) { @@ -371,11 +396,15 @@ func (w *wrapper) GetNonCriticalExtensionOptions() []*codectypes.Any { func (w *wrapper) SetExtensionOptions(extOpts ...*codectypes.Any) { w.tx.Body.ExtensionOptions = extOpts w.bodyBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) SetNonCriticalExtensionOptions(extOpts ...*codectypes.Any) { w.tx.Body.NonCriticalExtensionOptions = extOpts w.bodyBz = nil + // set txSize to 0 because it is no longer correct + w.txSize = 0 } func (w *wrapper) AddAuxSignerData(data tx.AuxSignerData) error {