From 0444ed4200bb6047dab51dbf2c9a5b3e2f0b30eb Mon Sep 17 00:00:00 2001 From: Javad Date: Thu, 17 Oct 2024 22:54:22 +0330 Subject: [PATCH 1/5] chore: add store reader to tx_pool for consumpion --- node/node.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/node.go b/node/node.go index 6d88ae884..6c909240a 100644 --- a/node/node.go +++ b/node/node.go @@ -59,13 +59,13 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, eventCh = nil } - txPool := txpool.NewTxPool(conf.TxPool, messageCh) - str, err := store.NewStore(conf.Store) if err != nil { return nil, err } + txPool := txpool.NewTxPool(conf.TxPool, messageCh, str) + st, err := state.LoadOrNewState(genDoc, valKeys, str, txPool, eventCh) if err != nil { return nil, err From 687716e356ae12ca775dd724230ddbbf319edc53 Mon Sep 17 00:00:00 2001 From: Javad Date: Thu, 17 Oct 2024 22:55:17 +0330 Subject: [PATCH 2/5] fix: used HandleCommittedBlock instead public remove tx --- state/state.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/state/state.go b/state/state.go index 1b1906906..2eebcfdd5 100644 --- a/state/state.go +++ b/state/state.go @@ -325,7 +325,6 @@ func (st *state) ProposeBlock(valKey *bls.ValidatorKey, rewardAddr crypto.Addres // Only one subsidy transaction per blk if txs[i].IsSubsidyTx() { st.logger.Error("found duplicated subsidy transaction", "tx", txs[i]) - st.txPool.RemoveTx(txs[i].ID()) txs.Remove(i) i-- @@ -434,9 +433,9 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat st.store.SaveBlock(blk, cert) - // Remove transactions from pool - for _, trx := range blk.Transactions() { - st.txPool.RemoveTx(trx.ID()) + // Remove transactions from pool and update consumption + if err = st.txPool.HandleCommittedBlock(blk); err != nil { + return err } if err := st.store.WriteBatch(); err != nil { From fe83c48f67b6aa3ddf18f277112805bb873ba77b Mon Sep 17 00:00:00 2001 From: Javad Date: Thu, 17 Oct 2024 22:55:45 +0330 Subject: [PATCH 3/5] feat: add calculating consumptional for new committed block --- txpool/config.go | 10 +++--- txpool/interface.go | 2 +- txpool/mock.go | 4 +++ txpool/txpool.go | 74 +++++++++++++++++++++++++++++++++++++------ txpool/txpool_test.go | 37 ++++++++++++++++++++-- 5 files changed, 109 insertions(+), 18 deletions(-) diff --git a/txpool/config.go b/txpool/config.go index 02b86242c..69042a329 100644 --- a/txpool/config.go +++ b/txpool/config.go @@ -5,8 +5,9 @@ import ( ) type Config struct { - MaxSize int `toml:"max_size"` - Fee *FeeConfig `toml:"fee"` + MaxSize int `toml:"max_size"` + ConsumptionBlocks uint32 `toml:"-"` + Fee *FeeConfig `toml:"fee"` } type FeeConfig struct { @@ -17,8 +18,9 @@ type FeeConfig struct { func DefaultConfig() *Config { return &Config{ - MaxSize: 1000, - Fee: DefaultFeeConfig(), + MaxSize: 1000, + ConsumptionBlocks: 8640, + Fee: DefaultFeeConfig(), } } diff --git a/txpool/interface.go b/txpool/interface.go index 895bd5a75..7c969c5bf 100644 --- a/txpool/interface.go +++ b/txpool/interface.go @@ -23,5 +23,5 @@ type TxPool interface { SetNewSandboxAndRecheck(sb sandbox.Sandbox) AppendTxAndBroadcast(trx *tx.Tx) error AppendTx(trx *tx.Tx) error - RemoveTx(id tx.ID) + HandleCommittedBlock(block *block.Block) error } diff --git a/txpool/mock.go b/txpool/mock.go index 5b5fa5ea6..212963cfc 100644 --- a/txpool/mock.go +++ b/txpool/mock.go @@ -79,6 +79,10 @@ func (m *MockTxPool) RemoveTx(id hash.Hash) { } } +func (m *MockTxPool) HandleCommittedBlock(_ *block.Block) error { + return nil +} + func (m *MockTxPool) PrepareBlockTransactions() block.Txs { txs := make([]*tx.Tx, m.Size()) copy(txs, m.Txs) diff --git a/txpool/txpool.go b/txpool/txpool.go index e0b725b7c..839349e58 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -4,6 +4,9 @@ import ( "fmt" "sync" + "github.com/pactus-project/pactus/crypto" + "github.com/pactus-project/pactus/store" + "github.com/pactus-project/pactus/execution" "github.com/pactus-project/pactus/sandbox" "github.com/pactus-project/pactus/sync/bundle/message" @@ -19,14 +22,16 @@ import ( type txPool struct { lk sync.RWMutex - config *Config - sandbox sandbox.Sandbox - pools map[payload.Type]pool - broadcastCh chan message.Message - logger *logger.SubLogger + config *Config + sandbox sandbox.Sandbox + pools map[payload.Type]pool + consumptionMap map[crypto.Address]uint32 + broadcastCh chan message.Message + strReader store.Reader + logger *logger.SubLogger } -func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool { +func NewTxPool(conf *Config, broadcastCh chan message.Message, storeReader store.Reader) TxPool { pools := make(map[payload.Type]pool) pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee()) pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee()) @@ -35,9 +40,11 @@ func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool { pools[payload.TypeSortition] = newPool(conf.sortitionPoolSize(), 0) pool := &txPool{ - config: conf, - pools: pools, - broadcastCh: broadcastCh, + config: conf, + pools: pools, + consumptionMap: make(map[crypto.Address]uint32), + strReader: storeReader, + broadcastCh: broadcastCh, } pool.logger = logger.NewSubLogger("_pool", pool) @@ -133,10 +140,57 @@ func (p *txPool) checkTx(trx *tx.Tx) error { return nil } -func (p *txPool) RemoveTx(id tx.ID) { +func (p *txPool) HandleCommittedBlock(blk *block.Block) error { p.lk.Lock() defer p.lk.Unlock() + for _, trx := range blk.Transactions() { + p.removeTx(trx.ID()) + + p.handleIncreaseConsumption(trx) + } + + return p.handleDecreaseConsumption(blk.Height()) +} + +func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) { + if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() { + signer := trx.Payload().Signer() + + // retrieve existing consumption or start with 0 + p.consumptionMap[signer] = p.consumptionMap[signer] + uint32(trx.SerializeSize()) + } +} + +func (p *txPool) handleDecreaseConsumption(height uint32) error { + if height <= p.config.ConsumptionBlocks { + return nil + } + + oldConsumptionHeight := height - p.config.ConsumptionBlocks + committedBlock, err := p.strReader.Block(oldConsumptionHeight) + if err != nil { + return err + } + + blk, err := committedBlock.ToBlock() + if err != nil { + return err + } + + for _, trx := range blk.Transactions() { + if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() { + signer := trx.Payload().Signer() + if v, ok := p.consumptionMap[signer]; ok { + p.consumptionMap[signer] = v - uint32(trx.SerializeSize()) + } + } + } + + return nil +} + +func (p *txPool) removeTx(id tx.ID) { for _, pool := range p.pools { if pool.list.Remove(id) { break diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index f1e0570bb..05b9673c3 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "github.com/pactus-project/pactus/store" + "github.com/pactus-project/pactus/execution" "github.com/pactus-project/pactus/sandbox" "github.com/pactus-project/pactus/sync/bundle/message" @@ -22,12 +24,14 @@ type testData struct { pool *txPool sandbox *sandbox.MockSandbox + str *store.MockStore ch chan message.Message } func testConfig() *Config { return &Config{ - MaxSize: 10, + MaxSize: 10, + ConsumptionBlocks: 3, Fee: &FeeConfig{ FixedFee: 0.000001, DailyLimit: 280, @@ -44,7 +48,8 @@ func setup(t *testing.T) *testData { ch := make(chan message.Message, 10) sb := sandbox.MockingSandbox(ts) config := testConfig() - p := NewTxPool(config, ch) + mockStore := store.MockingStore(ts) + p := NewTxPool(config, ch, mockStore) p.SetNewSandboxAndRecheck(sb) pool := p.(*txPool) assert.NotNil(t, pool) @@ -53,6 +58,7 @@ func setup(t *testing.T) *testData { TestSuite: ts, pool: pool, sandbox: sb, + str: mockStore, ch: ch, } } @@ -96,11 +102,36 @@ func TestAppendAndRemove(t *testing.T) { // Appending the same transaction again, should not return any error assert.NoError(t, td.pool.AppendTx(testTrx)) - td.pool.RemoveTx(testTrx.ID()) + td.pool.removeTx(testTrx.ID()) assert.False(t, td.pool.HasTx(testTrx.ID()), "Transaction should be removed") assert.Nil(t, td.pool.PendingTx(testTrx.ID())) } +func TestCalculatingConsumption(t *testing.T) { + td := setup(t) + + _, prv := td.TestSuite.RandEd25519KeyPair() + signer := prv + + for i := uint32(1); i < 10; i++ { + txr := td.GenerateTestTransferTx(func(tm *testsuite.TransactionMaker) { + tm.Signer = signer + }) + + blk, cert := td.TestSuite.GenerateTestBlock(i, func(bm *testsuite.BlockMaker) { + bm.Txs = []*tx.Tx{txr} + }) + + td.str.SaveBlock(blk, cert) + + err := td.pool.HandleCommittedBlock(blk) + require.NoError(t, err) + + //t.Log(td.pool.consumptionMap[txr.Payload().Signer()]) + } + +} + func TestAppendInvalidTransaction(t *testing.T) { td := setup(t) From db12d972f03a0014c30c3cd23084f87d7824c445 Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 19 Oct 2024 10:38:13 +0330 Subject: [PATCH 4/5] feat: handle signer consumption in tx pool for new committed block txs --- state/state.go | 4 ++-- txpool/interface.go | 2 +- txpool/mock.go | 2 +- txpool/txpool.go | 19 +++++++++++----- txpool/txpool_test.go | 52 ++++++++++++++++++++++++++++++++----------- 5 files changed, 57 insertions(+), 22 deletions(-) diff --git a/state/state.go b/state/state.go index 2eebcfdd5..e002bb304 100644 --- a/state/state.go +++ b/state/state.go @@ -434,8 +434,8 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat st.store.SaveBlock(blk, cert) // Remove transactions from pool and update consumption - if err = st.txPool.HandleCommittedBlock(blk); err != nil { - return err + if errHandleCommittedBlk := st.txPool.HandleCommittedBlock(blk); errHandleCommittedBlk != nil { + return errHandleCommittedBlk } if err := st.store.WriteBatch(); err != nil { diff --git a/txpool/interface.go b/txpool/interface.go index 7c969c5bf..34ee3b4ac 100644 --- a/txpool/interface.go +++ b/txpool/interface.go @@ -23,5 +23,5 @@ type TxPool interface { SetNewSandboxAndRecheck(sb sandbox.Sandbox) AppendTxAndBroadcast(trx *tx.Tx) error AppendTx(trx *tx.Tx) error - HandleCommittedBlock(block *block.Block) error + HandleCommittedBlock(blk *block.Block) error } diff --git a/txpool/mock.go b/txpool/mock.go index 212963cfc..e305a6aca 100644 --- a/txpool/mock.go +++ b/txpool/mock.go @@ -79,7 +79,7 @@ func (m *MockTxPool) RemoveTx(id hash.Hash) { } } -func (m *MockTxPool) HandleCommittedBlock(_ *block.Block) error { +func (*MockTxPool) HandleCommittedBlock(_ *block.Block) error { return nil } diff --git a/txpool/txpool.go b/txpool/txpool.go index 839349e58..827fe0c52 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -5,10 +5,9 @@ import ( "sync" "github.com/pactus-project/pactus/crypto" - "github.com/pactus-project/pactus/store" - "github.com/pactus-project/pactus/execution" "github.com/pactus-project/pactus/sandbox" + "github.com/pactus-project/pactus/store" "github.com/pactus-project/pactus/sync/bundle/message" "github.com/pactus-project/pactus/types/amount" "github.com/pactus-project/pactus/types/block" @@ -157,16 +156,17 @@ func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) { if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() { signer := trx.Payload().Signer() - // retrieve existing consumption or start with 0 - p.consumptionMap[signer] = p.consumptionMap[signer] + uint32(trx.SerializeSize()) + p.consumptionMap[signer] += uint32(trx.SerializeSize()) } } func (p *txPool) handleDecreaseConsumption(height uint32) error { + // If height is less than or equal to ConsumptionBlocks, nothing to do if height <= p.config.ConsumptionBlocks { return nil } + // Calculate the height of the old block based on ConsumptionBlocks oldConsumptionHeight := height - p.config.ConsumptionBlocks committedBlock, err := p.strReader.Block(oldConsumptionHeight) if err != nil { @@ -182,7 +182,16 @@ func (p *txPool) handleDecreaseConsumption(height uint32) error { if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() { signer := trx.Payload().Signer() if v, ok := p.consumptionMap[signer]; ok { - p.consumptionMap[signer] = v - uint32(trx.SerializeSize()) + // Decrease the consumption by the size of the transaction + v -= uint32(trx.SerializeSize()) + + // If the new value is zero, remove the signer from the consumptionMap + if v == 0 { + delete(p.consumptionMap, signer) + } else { + // Otherwise, update the map with the new value + p.consumptionMap[signer] = v + } } } } diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index 05b9673c3..e45b4768f 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -5,10 +5,10 @@ import ( "testing" "time" - "github.com/pactus-project/pactus/store" - + "github.com/pactus-project/pactus/crypto" "github.com/pactus-project/pactus/execution" "github.com/pactus-project/pactus/sandbox" + "github.com/pactus-project/pactus/store" "github.com/pactus-project/pactus/sync/bundle/message" "github.com/pactus-project/pactus/types/account" "github.com/pactus-project/pactus/types/tx" @@ -110,26 +110,52 @@ func TestAppendAndRemove(t *testing.T) { func TestCalculatingConsumption(t *testing.T) { td := setup(t) - _, prv := td.TestSuite.RandEd25519KeyPair() - signer := prv + // Generate keys for different transaction signers + _, prv1 := td.RandEd25519KeyPair() + _, prv2 := td.RandEd25519KeyPair() + _, prv3 := td.RandBLSKeyPair() + _, prv4 := td.RandBLSKeyPair() + + // Generate different types of transactions + trx11 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv1)) + trx12 := td.GenerateTestBondTx(testsuite.TransactionWithEd25519Signer(prv1)) + trx13 := td.GenerateTestWithdrawTx(testsuite.TransactionWithBLSSigner(prv3)) + trx14 := td.GenerateTestUnbondTx(testsuite.TransactionWithBLSSigner(prv4)) + trx21 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv2)) + trx31 := td.GenerateTestBondTx(testsuite.TransactionWithBLSSigner(prv4)) + trx41 := td.GenerateTestWithdrawTx(testsuite.TransactionWithBLSSigner(prv3)) + trx42 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv2)) + + // Expected consumption map after transactions + expected := map[crypto.Address]uint32{ + prv2.PublicKeyNative().AccountAddress(): uint32(trx21.SerializeSize()) + uint32(trx42.SerializeSize()), + prv4.PublicKeyNative().AccountAddress(): uint32(trx31.SerializeSize()), + prv3.PublicKeyNative().ValidatorAddress(): uint32(trx41.SerializeSize()), + } - for i := uint32(1); i < 10; i++ { - txr := td.GenerateTestTransferTx(func(tm *testsuite.TransactionMaker) { - tm.Signer = signer - }) + tests := []struct { + height uint32 + txs []*tx.Tx + }{ + {1, []*tx.Tx{trx11, trx12, trx13, trx14}}, + {2, []*tx.Tx{trx21}}, + {3, []*tx.Tx{trx31}}, + {4, []*tx.Tx{trx41, trx42}}, + } - blk, cert := td.TestSuite.GenerateTestBlock(i, func(bm *testsuite.BlockMaker) { - bm.Txs = []*tx.Tx{txr} + for _, tt := range tests { + // Generate a block with the transactions for the given height + blk, cert := td.TestSuite.GenerateTestBlock(tt.height, func(bm *testsuite.BlockMaker) { + bm.Txs = tt.txs }) - td.str.SaveBlock(blk, cert) + // Handle the block in the transaction pool err := td.pool.HandleCommittedBlock(blk) require.NoError(t, err) - - //t.Log(td.pool.consumptionMap[txr.Payload().Signer()]) } + require.Equal(t, expected, td.pool.consumptionMap) } func TestAppendInvalidTransaction(t *testing.T) { From c7dbcf1169f4a7926cbbe4e18e07d0e89af70b0f Mon Sep 17 00:00:00 2001 From: Javad Date: Sat, 19 Oct 2024 12:34:17 +0330 Subject: [PATCH 5/5] fix: change requests - put store reader before message channel - add comment private for ConsumptionBlocks - move ConsumptionBlocks to down of struct config - move handle committed block after write batch --- node/node.go | 2 +- state/state.go | 10 +++++----- txpool/config.go | 4 ++-- txpool/txpool.go | 4 ++-- txpool/txpool_test.go | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/node/node.go b/node/node.go index 6c909240a..86eac2c35 100644 --- a/node/node.go +++ b/node/node.go @@ -64,7 +64,7 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config, return nil, err } - txPool := txpool.NewTxPool(conf.TxPool, messageCh, str) + txPool := txpool.NewTxPool(conf.TxPool, str, messageCh) st, err := state.LoadOrNewState(genDoc, valKeys, str, txPool, eventCh) if err != nil { diff --git a/state/state.go b/state/state.go index e002bb304..bdb4046f6 100644 --- a/state/state.go +++ b/state/state.go @@ -433,15 +433,15 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat st.store.SaveBlock(blk, cert) - // Remove transactions from pool and update consumption - if errHandleCommittedBlk := st.txPool.HandleCommittedBlock(blk); errHandleCommittedBlk != nil { - return errHandleCommittedBlk - } - if err := st.store.WriteBatch(); err != nil { st.logger.Panic("unable to update state", "error", err) } + // Remove transactions from pool and update consumption + if err := st.txPool.HandleCommittedBlock(blk); err != nil { + return err + } + st.logger.Info("new block committed", "block", blk, "round", cert.Round()) st.evaluateSortition() diff --git a/txpool/config.go b/txpool/config.go index 69042a329..dc94f2fc8 100644 --- a/txpool/config.go +++ b/txpool/config.go @@ -6,8 +6,8 @@ import ( type Config struct { MaxSize int `toml:"max_size"` - ConsumptionBlocks uint32 `toml:"-"` Fee *FeeConfig `toml:"fee"` + ConsumptionBlocks uint32 `toml:"-"` // Private configs } type FeeConfig struct { @@ -19,8 +19,8 @@ type FeeConfig struct { func DefaultConfig() *Config { return &Config{ MaxSize: 1000, - ConsumptionBlocks: 8640, Fee: DefaultFeeConfig(), + ConsumptionBlocks: 8640, } } diff --git a/txpool/txpool.go b/txpool/txpool.go index 827fe0c52..687359e7b 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -30,7 +30,7 @@ type txPool struct { logger *logger.SubLogger } -func NewTxPool(conf *Config, broadcastCh chan message.Message, storeReader store.Reader) TxPool { +func NewTxPool(conf *Config, storeReader store.Reader, broadcastCh chan message.Message) TxPool { pools := make(map[payload.Type]pool) pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee()) pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee()) @@ -185,8 +185,8 @@ func (p *txPool) handleDecreaseConsumption(height uint32) error { // Decrease the consumption by the size of the transaction v -= uint32(trx.SerializeSize()) - // If the new value is zero, remove the signer from the consumptionMap if v == 0 { + // If the new value is zero, remove the signer from the consumptionMap delete(p.consumptionMap, signer) } else { // Otherwise, update the map with the new value diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index e45b4768f..9f734eef6 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -49,7 +49,7 @@ func setup(t *testing.T) *testData { sb := sandbox.MockingSandbox(ts) config := testConfig() mockStore := store.MockingStore(ts) - p := NewTxPool(config, ch, mockStore) + p := NewTxPool(config, mockStore, ch) p.SetNewSandboxAndRecheck(sb) pool := p.(*txPool) assert.NotNil(t, pool)