From 6be3812ce1eae0aba3a90e42aab0c69a19b06056 Mon Sep 17 00:00:00 2001 From: Jeancarlo Barrios Date: Thu, 22 Sep 2022 13:56:19 -0600 Subject: [PATCH] feat(mempool): tests and benchmark for mempool (#13273) * feat(mempool): tests and benchmark for mempool * added base for select benchmark * added simple select mempool test * added select mempool test * insert 100 and 1000 * t * t * t * move mempool to its own package and fix a small nil pointer error * Minor bug fixes for the sender pool; * minor fixes to consume * test * t * t * t * added gen tx with order * t --- types/mempool/mempool.go | 138 ++++++++++++++++++++++++++++++++++ types/mempool/mempool_test.go | 82 ++++++++++++++++++++ 2 files changed, 220 insertions(+) diff --git a/types/mempool/mempool.go b/types/mempool/mempool.go index 67c5b6dad873..89e76340bb74 100644 --- a/types/mempool/mempool.go +++ b/types/mempool/mempool.go @@ -536,3 +536,141 @@ func (smp statefulMempool) Remove(context types.Context, tx Tx) error { return nil } + +// The complexity is O(log(N)). Implementation +type statefullPriorityKey struct { + hash [32]byte + priority int64 + nonce uint64 +} + +type accountsHeadsKey struct { + sender string + priority int64 + hash [32]byte +} + +type AccountMemPool struct { + transactions *huandu.SkipList + currentKey accountsHeadsKey + currentItem *huandu.Element + sender string +} + +// Push cannot be executed in the middle of a select +func (amp *AccountMemPool) Push(ctx types.Context, key statefullPriorityKey, tx Tx) { + amp.transactions.Set(key, tx) + amp.currentItem = amp.transactions.Back() + newKey := amp.currentItem.Key().(statefullPriorityKey) + amp.currentKey = accountsHeadsKey{hash: newKey.hash, sender: amp.sender, priority: newKey.priority} +} + +func (amp *AccountMemPool) Pop() *Tx { + if amp.currentItem == nil { + return nil + } + itemToPop := amp.currentItem + amp.currentItem = itemToPop.Prev() + if amp.currentItem != nil { + newKey := amp.currentItem.Key().(statefullPriorityKey) + amp.currentKey = accountsHeadsKey{hash: newKey.hash, sender: amp.sender, priority: newKey.priority} + } else { + amp.currentKey = accountsHeadsKey{} + } + tx := itemToPop.Value.(Tx) + return &tx +} + +type MemPoolI struct { + accountsHeads *huandu.SkipList + senders map[string]*AccountMemPool +} + +func NewMemPoolI() MemPoolI { + return MemPoolI{ + accountsHeads: huandu.New(huandu.LessThanFunc(priorityHuanduLess)), + senders: make(map[string]*AccountMemPool), + } +} + +func (amp *MemPoolI) Insert(ctx types.Context, tx Tx) error { + senders := tx.(signing.SigVerifiableTx).GetSigners() + nonces, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + hashTx, ok := tx.(HashableTx) + if !ok { + return ErrNoTxHash + } + + if err != nil { + return err + } else if len(senders) != len(nonces) { + return fmt.Errorf("number of senders (%d) does not match number of nonces (%d)", len(senders), len(nonces)) + } + sender := senders[0].String() + nonce := nonces[0].Sequence + + accountMeempool, ok := amp.senders[sender] + if !ok { + accountMeempool = &AccountMemPool{ + transactions: huandu.New(huandu.LessThanFunc(nonceHuanduLess)), + sender: sender, + } + } + key := statefullPriorityKey{hash: hashTx.GetHash(), nonce: nonce, priority: ctx.Priority()} + + prevKey := accountMeempool.currentKey + accountMeempool.Push(ctx, key, tx) + + amp.accountsHeads.Remove(prevKey) + amp.accountsHeads.Set(accountMeempool.currentKey, accountMeempool) + amp.senders[sender] = accountMeempool + return nil + +} + +func (amp *MemPoolI) Select(_ types.Context, _ [][]byte, maxBytes int) ([]Tx, error) { + var selectedTxs []Tx + var txBytes int + + currentAccount := amp.accountsHeads.Front() + for currentAccount != nil { + accountMemPool := currentAccount.Value.(*AccountMemPool) + //currentTx := accountMemPool.transactions.Front() + prevKey := accountMemPool.currentKey + tx := accountMemPool.Pop() + if tx == nil { + return selectedTxs, nil + } + mempoolTx := *tx + selectedTxs = append(selectedTxs, mempoolTx) + if txBytes += mempoolTx.Size(); txBytes >= maxBytes { + return selectedTxs, nil + } + + amp.accountsHeads.Remove(prevKey) + amp.accountsHeads.Set(accountMemPool.currentKey, accountMemPool) + currentAccount = amp.accountsHeads.Front() + } + return selectedTxs, nil +} + +func priorityHuanduLess(a, b interface{}) int { + keyA := a.(accountsHeadsKey) + keyB := b.(accountsHeadsKey) + if keyA.priority == keyB.priority { + return bytes.Compare(keyA.hash[:], keyB.hash[:]) + } else { + if keyA.priority < keyB.priority { + return -1 + } else { + return 1 + } + } +} + +func nonceHuanduLess(a, b interface{}) int { + keyA := a.(statefullPriorityKey) + keyB := b.(statefullPriorityKey) + uint64Compare := huandu.Uint64 + return uint64Compare.Compare(keyA.nonce, keyB.nonce) +} diff --git a/types/mempool/mempool_test.go b/types/mempool/mempool_test.go index d14a50ed6b17..e9c71f5fa74c 100644 --- a/types/mempool/mempool_test.go +++ b/types/mempool/mempool_test.go @@ -1,11 +1,13 @@ package mempool_test import ( + "fmt" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" signing2 "github.com/cosmos/cosmos-sdk/types/tx/signing" "github.com/cosmos/cosmos-sdk/x/auth/signing" "math/rand" "testing" + "time" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/log" @@ -156,6 +158,13 @@ func TestTxOrder(t *testing.T) { } } +func TestTxOrderN(t *testing.T) { + ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger()) + + ordered, shuffled := GenTxOrder(ctx, 5, 2) + fmt.Println(ordered, shuffled) +} + func simulateManyTx(ctx sdk.Context, n int) []sdk.Tx { transactions := make([]sdk.Tx, n) for i := 0; i < n; i++ { @@ -193,3 +202,76 @@ func simulateTx(ctx sdk.Context) sdk.Tx { ) return tx } + +type txWithPriority struct { + priority int64 + tx sdk.Tx + address string +} + +func GenTxOrder(ctx sdk.Context, nTx int, nSenders int) (ordered []txWithPriority, shuffled []txWithPriority) { + s := rand.NewSource(time.Now().UnixNano()) + r := rand.New(s) + randomAccounts := simtypes.RandomAccounts(r, nSenders) + senderNonces := make(map[string]uint64) + senderLastPriority := make(map[string]int) + for _, acc := range randomAccounts { + address := acc.Address.String() + senderNonces[address] = 1 + senderLastPriority[address] = 999999 + } + + for i := 0; i < nTx; i++ { + acc := randomAccounts[r.Intn(nSenders)] + accAddress := acc.Address.String() + accNonce := senderNonces[accAddress] + senderNonces[accAddress] += 1 + lastPriority := senderLastPriority[accAddress] + txPriority := r.Intn(lastPriority) + if txPriority == 0 { + txPriority += 1 + } + senderLastPriority[accAddress] = txPriority + tx := txWithPriority{ + priority: int64(txPriority), + tx: simulateTx2(ctx, acc, accNonce), + address: accAddress, + } + ordered = append(ordered, tx) + } + for _, item := range ordered { + tx := txWithPriority{ + priority: item.priority, + tx: item.tx, + address: item.address, + } + shuffled = append(shuffled, tx) + } + rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + return ordered, shuffled +} + +func simulateTx2(ctx sdk.Context, acc simtypes.Account, nonce uint64) sdk.Tx { + s := rand.NewSource(1) + r := rand.New(s) + txGen := moduletestutil.MakeTestEncodingConfig().TxConfig + msg := group.MsgUpdateGroupMembers{ + GroupId: 1, + Admin: acc.Address.String(), + MemberUpdates: []group.MemberRequest{}, + } + fees, _ := simtypes.RandomFees(r, ctx, sdk.NewCoins(sdk.NewCoin("coin", sdk.NewInt(100000000)))) + + tx, _ := simtestutil.GenSignedMockTx( + r, + txGen, + []sdk.Msg{&msg}, + fees, + simtestutil.DefaultGenTxGas, + ctx.ChainID(), + []uint64{authtypes.NewBaseAccountWithAddress(acc.Address).GetAccountNumber()}, + []uint64{nonce}, + acc.PrivKey, + ) + return tx +}