Skip to content

Commit

Permalink
feat(mempool): tests and benchmark for mempool (#13273)
Browse files Browse the repository at this point in the history
* feat(mempool): tests and benchmark for mempool

* added base for select benchmark

* added simple select mempool test

* added select mempool test

* insert 100 and 1000

* t

* t

* t

* move mempool to its own package and fix a small nil pointer error

* Minor bug fixes for the sender pool;

* minor fixes to consume

* test

* t

* t

* t

* added gen tx with order

* t
  • Loading branch information
JeancarloBarrios authored Sep 22, 2022
1 parent 1798d68 commit 6be3812
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 0 deletions.
138 changes: 138 additions & 0 deletions types/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,141 @@ func (smp statefulMempool) Remove(context types.Context, tx Tx) error {

return nil
}

// The complexity is O(log(N)). Implementation
type statefullPriorityKey struct {
hash [32]byte
priority int64
nonce uint64
}

type accountsHeadsKey struct {
sender string
priority int64
hash [32]byte
}

type AccountMemPool struct {
transactions *huandu.SkipList
currentKey accountsHeadsKey
currentItem *huandu.Element
sender string
}

// Push cannot be executed in the middle of a select
func (amp *AccountMemPool) Push(ctx types.Context, key statefullPriorityKey, tx Tx) {
amp.transactions.Set(key, tx)
amp.currentItem = amp.transactions.Back()
newKey := amp.currentItem.Key().(statefullPriorityKey)
amp.currentKey = accountsHeadsKey{hash: newKey.hash, sender: amp.sender, priority: newKey.priority}
}

func (amp *AccountMemPool) Pop() *Tx {
if amp.currentItem == nil {
return nil
}
itemToPop := amp.currentItem
amp.currentItem = itemToPop.Prev()
if amp.currentItem != nil {
newKey := amp.currentItem.Key().(statefullPriorityKey)
amp.currentKey = accountsHeadsKey{hash: newKey.hash, sender: amp.sender, priority: newKey.priority}
} else {
amp.currentKey = accountsHeadsKey{}
}
tx := itemToPop.Value.(Tx)
return &tx
}

type MemPoolI struct {
accountsHeads *huandu.SkipList
senders map[string]*AccountMemPool
}

func NewMemPoolI() MemPoolI {
return MemPoolI{
accountsHeads: huandu.New(huandu.LessThanFunc(priorityHuanduLess)),
senders: make(map[string]*AccountMemPool),
}
}

func (amp *MemPoolI) Insert(ctx types.Context, tx Tx) error {
senders := tx.(signing.SigVerifiableTx).GetSigners()
nonces, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
hashTx, ok := tx.(HashableTx)
if !ok {
return ErrNoTxHash
}

if err != nil {
return err
} else if len(senders) != len(nonces) {
return fmt.Errorf("number of senders (%d) does not match number of nonces (%d)", len(senders), len(nonces))
}
sender := senders[0].String()
nonce := nonces[0].Sequence

accountMeempool, ok := amp.senders[sender]
if !ok {
accountMeempool = &AccountMemPool{
transactions: huandu.New(huandu.LessThanFunc(nonceHuanduLess)),
sender: sender,
}
}
key := statefullPriorityKey{hash: hashTx.GetHash(), nonce: nonce, priority: ctx.Priority()}

prevKey := accountMeempool.currentKey
accountMeempool.Push(ctx, key, tx)

amp.accountsHeads.Remove(prevKey)
amp.accountsHeads.Set(accountMeempool.currentKey, accountMeempool)
amp.senders[sender] = accountMeempool
return nil

}

func (amp *MemPoolI) Select(_ types.Context, _ [][]byte, maxBytes int) ([]Tx, error) {
var selectedTxs []Tx
var txBytes int

currentAccount := amp.accountsHeads.Front()
for currentAccount != nil {
accountMemPool := currentAccount.Value.(*AccountMemPool)
//currentTx := accountMemPool.transactions.Front()
prevKey := accountMemPool.currentKey
tx := accountMemPool.Pop()
if tx == nil {
return selectedTxs, nil
}
mempoolTx := *tx
selectedTxs = append(selectedTxs, mempoolTx)
if txBytes += mempoolTx.Size(); txBytes >= maxBytes {
return selectedTxs, nil
}

amp.accountsHeads.Remove(prevKey)
amp.accountsHeads.Set(accountMemPool.currentKey, accountMemPool)
currentAccount = amp.accountsHeads.Front()
}
return selectedTxs, nil
}

func priorityHuanduLess(a, b interface{}) int {
keyA := a.(accountsHeadsKey)
keyB := b.(accountsHeadsKey)
if keyA.priority == keyB.priority {
return bytes.Compare(keyA.hash[:], keyB.hash[:])
} else {
if keyA.priority < keyB.priority {
return -1
} else {
return 1
}
}
}

func nonceHuanduLess(a, b interface{}) int {
keyA := a.(statefullPriorityKey)
keyB := b.(statefullPriorityKey)
uint64Compare := huandu.Uint64
return uint64Compare.Compare(keyA.nonce, keyB.nonce)
}
82 changes: 82 additions & 0 deletions types/mempool/mempool_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package mempool_test

import (
"fmt"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
signing2 "github.com/cosmos/cosmos-sdk/types/tx/signing"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -156,6 +158,13 @@ func TestTxOrder(t *testing.T) {
}
}

func TestTxOrderN(t *testing.T) {
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())

ordered, shuffled := GenTxOrder(ctx, 5, 2)
fmt.Println(ordered, shuffled)
}

func simulateManyTx(ctx sdk.Context, n int) []sdk.Tx {
transactions := make([]sdk.Tx, n)
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -193,3 +202,76 @@ func simulateTx(ctx sdk.Context) sdk.Tx {
)
return tx
}

type txWithPriority struct {
priority int64
tx sdk.Tx
address string
}

func GenTxOrder(ctx sdk.Context, nTx int, nSenders int) (ordered []txWithPriority, shuffled []txWithPriority) {
s := rand.NewSource(time.Now().UnixNano())
r := rand.New(s)
randomAccounts := simtypes.RandomAccounts(r, nSenders)
senderNonces := make(map[string]uint64)
senderLastPriority := make(map[string]int)
for _, acc := range randomAccounts {
address := acc.Address.String()
senderNonces[address] = 1
senderLastPriority[address] = 999999
}

for i := 0; i < nTx; i++ {
acc := randomAccounts[r.Intn(nSenders)]
accAddress := acc.Address.String()
accNonce := senderNonces[accAddress]
senderNonces[accAddress] += 1
lastPriority := senderLastPriority[accAddress]
txPriority := r.Intn(lastPriority)
if txPriority == 0 {
txPriority += 1
}
senderLastPriority[accAddress] = txPriority
tx := txWithPriority{
priority: int64(txPriority),
tx: simulateTx2(ctx, acc, accNonce),
address: accAddress,
}
ordered = append(ordered, tx)
}
for _, item := range ordered {
tx := txWithPriority{
priority: item.priority,
tx: item.tx,
address: item.address,
}
shuffled = append(shuffled, tx)
}
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
return ordered, shuffled
}

func simulateTx2(ctx sdk.Context, acc simtypes.Account, nonce uint64) sdk.Tx {
s := rand.NewSource(1)
r := rand.New(s)
txGen := moduletestutil.MakeTestEncodingConfig().TxConfig
msg := group.MsgUpdateGroupMembers{
GroupId: 1,
Admin: acc.Address.String(),
MemberUpdates: []group.MemberRequest{},
}
fees, _ := simtypes.RandomFees(r, ctx, sdk.NewCoins(sdk.NewCoin("coin", sdk.NewInt(100000000))))

tx, _ := simtestutil.GenSignedMockTx(
r,
txGen,
[]sdk.Msg{&msg},
fees,
simtestutil.DefaultGenTxGas,
ctx.ChainID(),
[]uint64{authtypes.NewBaseAccountWithAddress(acc.Address).GetAccountNumber()},
[]uint64{nonce},
acc.PrivKey,
)
return tx
}

0 comments on commit 6be3812

Please sign in to comment.