Skip to content

Commit

Permalink
Query txPool if transaction not found inside the cache
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f committed Mar 11, 2021
1 parent 0ded88b commit 135a697
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 75 deletions.
2 changes: 1 addition & 1 deletion state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (st *state) executeBlock(block block.Block) ([]tx.CommittedTx, error) {
twrs := make([]tx.CommittedTx, len(ids))
var mintbaseTrx *tx.Tx
for i := 0; i < len(ids); i++ {
trx := st.txPool.PendingTx(ids[i])
trx := st.txPool.QueryTx(ids[i])
if trx == nil {
return nil, errors.Errorf(errors.ErrInvalidBlock, "Transaction not found: %s", ids[i])
}
Expand Down
5 changes: 1 addition & 4 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,12 @@ func (st *state) ProposeBlock(round int) (*block.Block, error) {
st.logger.Error("Probably the node is shutting down.")
return nil, errors.Errorf(errors.ErrInvalidBlock, "No subsidy transaction")
}
if err := st.txPool.AppendTx(subsidyTx); err != nil {
if err := st.txPool.AppendTxAndBroadcast(subsidyTx); err != nil {
st.logger.Error("Our subsidy transaction is invalid. Why?", "err", err)
return nil, err
}
txIDs.Prepend(subsidyTx.ID())

// Broadcast all transaction
st.txPool.BroadcastTxs(txIDs.IDs())

stateHash := st.stateHash()
committeeHash := st.committee.CommitteeHash()
timestamp := st.proposeNextBlockTime()
Expand Down
23 changes: 14 additions & 9 deletions sync/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/zarbchain/zarb-go/proposal"
"github.com/zarbchain/zarb-go/store"
"github.com/zarbchain/zarb-go/tx"
"github.com/zarbchain/zarb-go/txpool"
"github.com/zarbchain/zarb-go/util"
)

Expand Down Expand Up @@ -46,18 +47,20 @@ func proposalKey(height, round int) key {
}

type Cache struct {
cache *lru.ARCCache // it's thread safe
store store.StoreReader
cache *lru.ARCCache // it's thread safe
store store.StoreReader
txPool txpool.TxPoolReader
}

func NewCache(size int, store store.StoreReader) (*Cache, error) {
func NewCache(size int, store store.StoreReader, txPool txpool.TxPoolReader) (*Cache, error) {
c, err := lru.NewARC(size)
if err != nil {
return nil, err
}
return &Cache{
cache: c,
store: store,
cache: c,
store: store,
txPool: txPool,
}, nil
}

Expand Down Expand Up @@ -101,16 +104,18 @@ func (c *Cache) GetTransaction(id tx.ID) *tx.Tx {
return i.(*tx.Tx)
}

trx := c.txPool.PendingTx(id)
if trx != nil {
c.cache.Add(txKey(id), trx)
return trx
}

ct, err := c.store.Transaction(id)
if err == nil {
c.cache.Add(txKey(id), ct.Tx)
return ct.Tx
}

// Should we check txpool?
// No, because transaction in txpool should be in cache.
// TODO: write tests for me

return nil
}
func (c *Cache) AddTransaction(trx *tx.Tx) {
Expand Down
12 changes: 10 additions & 2 deletions sync/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
"github.com/zarbchain/zarb-go/proposal"
"github.com/zarbchain/zarb-go/store"
"github.com/zarbchain/zarb-go/tx"
"github.com/zarbchain/zarb-go/txpool"
)

var tCache *Cache
var tStore *store.MockStore
var tTxPool *txpool.MockTxPool

func setup(t *testing.T) {
var err error
tStore = store.MockingStore()
tCache, err = NewCache(10, tStore)
tTxPool = txpool.MockingTxPool()
tCache, err = NewCache(10, tStore, tTxPool)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -58,19 +61,24 @@ func TestCacheCommit(t *testing.T) {
assert.Nil(t, tCache.GetCertificate(b3.Header().LastBlockHash()))
}

func TestCacheTx(t *testing.T) {
func TestGetTx(t *testing.T) {
setup(t)

trx1, _ := tx.GenerateTestSendTx()
trx2, _ := tx.GenerateTestSendTx()
trx3, _ := tx.GenerateTestSendTx()
trx4, _ := tx.GenerateTestSendTx()

tStore.Transactions[trx1.ID()] = &tx.CommittedTx{Tx: trx1}
tTxPool.AppendTx(trx4)
tCache.AddTransaction(trx2)

assert.Equal(t, tCache.GetTransaction(trx1.ID()).ID(), trx1.ID())
assert.Equal(t, tCache.GetTransaction(trx2.ID()).ID(), trx2.ID())
assert.Equal(t, tCache.GetTransaction(trx4.ID()).ID(), trx4.ID())
assert.Nil(t, tCache.GetTransaction(trx3.ID()))
assert.NotNil(t, tCache.GetTransaction(trx1.ID()))
assert.NotNil(t, tCache.GetTransaction(trx4.ID()))
}

func TestCacheProposal(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewSynchronizer(
return nil, err
}

cache, err := cache.NewCache(conf.CacheSize, state.StoreReader())
cache, err := cache.NewCache(conf.CacheSize, state.StoreReader(), txPool)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func setup(t *testing.T) {
tBobBroadcastCh = make(chan *message.Message, 100)
tAliceNetAPI = network_api.MockingNetworkAPI(tAlicePeerID)
tBobNetAPI = network_api.MockingNetworkAPI(tBobPeerID)
aliceCache, _ := cache.NewCache(tAliceConfig.CacheSize, tAliceState.StoreReader())
bobCache, _ := cache.NewCache(tBobConfig.CacheSize, tBobState.StoreReader())
aliceCache, _ := cache.NewCache(tAliceConfig.CacheSize, tAliceState.StoreReader(), tTxPool)
bobCache, _ := cache.NewCache(tBobConfig.CacheSize, tBobState.StoreReader(), tTxPool)

tBobState.GenHash = tAliceState.GenHash

Expand Down
2 changes: 1 addition & 1 deletion txpool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
type TxPoolReader interface {
AllTransactions() []*tx.Tx
PendingTx(id tx.ID) *tx.Tx
BroadcastTxs(ids []tx.ID)
HasTx(id tx.ID) bool
Size() int

Expand All @@ -21,6 +20,7 @@ type TxPool interface {
SetSandbox(sandbox sandbox.Sandbox)
AppendTx(tx *tx.Tx) error
AppendTxAndBroadcast(trx *tx.Tx) error
QueryTx(id tx.ID) *tx.Tx
RemoveTx(id tx.ID)
Recheck()
}
7 changes: 4 additions & 3 deletions txpool/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (m *MockTxPool) PendingTx(id tx.ID) *tx.Tx {
return nil
}

func (m *MockTxPool) QueryTx(id tx.ID) *tx.Tx {
return m.PendingTx(id)
}

func (m *MockTxPool) HasTx(id tx.ID) bool {
for _, t := range m.Txs {
if t.ID().EqualsTo(id) {
Expand Down Expand Up @@ -66,6 +70,3 @@ func (m *MockTxPool) RemoveTx(hash crypto.Hash) {
func (m *MockTxPool) AllTransactions() []*tx.Tx {
return m.Txs
}

func (m *MockTxPool) BroadcastTxs([]tx.ID) {
}
47 changes: 20 additions & 27 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,28 +123,42 @@ func (pool *txPool) RemoveTx(id tx.ID) {
pool.pendings.Remove(id)
}

// QueryTx returns immediately a transaction if we have, otherwise nil
func (pool *txPool) PendingTx(id tx.ID) *tx.Tx {
pool.lk.Lock()
defer pool.lk.Unlock()

val, found := pool.pendings.Get(id)
if found {
trx := val.(*tx.Tx)
pool.lk.Unlock()
return trx
}

pool.logger.Debug("Request transaction from peers", "id", id)
pool.lk.Unlock()
return nil
}

// QueryTx returns immediately a transaction if we have,
// it queries from other nodes
func (pool *txPool) QueryTx(id tx.ID) *tx.Tx {
trx := pool.PendingTx(id)
if trx != nil {
return trx
}

pool.logger.Debug("Query transaction from nodes", "id", id)

msg := message.NewOpaqueQueryTransactionsMessage([]tx.ID{id})
pool.broadcastCh <- msg

pool.appendTxCh = make(chan *tx.Tx, 100)
defer func() {
close(pool.appendTxCh)
pool.appendTxCh = nil
if pool.appendTxCh != nil {
close(pool.appendTxCh)
pool.appendTxCh = nil
}
}()

pool.appendTxCh = make(chan *tx.Tx, 100)

timeout := time.NewTimer(pool.config.WaitingTimeout)

for {
Expand Down Expand Up @@ -203,27 +217,6 @@ func (pool *txPool) Recheck() {
}
}

func (pool *txPool) BroadcastTxs(ids []tx.ID) {
pool.lk.Lock()
defer pool.lk.Unlock()

trxs := make([]*tx.Tx, len(ids))
for i, id := range ids {
val, found := pool.pendings.Get(id)
if !found {
pool.logger.Error("Try to broadcast a transaction which is not in the pool", "id", id)
return
}

trxs[i] = val.(*tx.Tx)
}

go func(_trxs []*tx.Tx) {
msg := message.NewTransactionsMessage(_trxs)
pool.broadcastCh <- msg
}(trxs)
}

func (pool *txPool) Fingerprint() string {
return fmt.Sprintf("{%v}", pool.pendings.Size())
}
28 changes: 3 additions & 25 deletions txpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var tAcc1Signer crypto.Signer
var tCh chan *message.Message

func setup(t *testing.T) {
logger.InitLogger(logger.DefaultConfig())
logger.InitLogger(logger.TestConfig())
tCh = make(chan *message.Message, 10)
p, _ := NewTxPool(TestConfig(), tCh)
tSandbox = sandbox.MockingSandbox()
Expand Down Expand Up @@ -93,12 +93,10 @@ func TestPending(t *testing.T) {
}
}()

assert.NotNil(t, tPool.PendingTx(trx.ID()))
assert.Nil(t, tPool.PendingTx(trx.ID()))
assert.NotNil(t, tPool.QueryTx(trx.ID()))
assert.True(t, tPool.pendings.Has(trx.ID()))

// For second time it should response immediately
assert.NotNil(t, tPool.PendingTx(trx.ID()))

invID := crypto.GenerateTestHash()
assert.Nil(t, tPool.PendingTx(invID))
}
Expand Down Expand Up @@ -185,23 +183,3 @@ func TestAddSubsidyTransactions(t *testing.T) {
tPool.Recheck()
assert.Zero(t, tPool.Size())
}

func TestBroadcastTxs(t *testing.T) {
setup(t)

stamp := crypto.GenerateTestHash()
tSandbox.AppendStampAndUpdateHeight(88, stamp)
ids := make([]tx.ID, 5)

for i := 0; i < 5; i++ {
a, _, _ := crypto.GenerateTestKeyPair()
trx := tx.NewSendTx(stamp, tSandbox.AccSeq(tAcc1Addr)+1, tAcc1Addr, a, 1000, 1000, "ok")
tAcc1Signer.SignMsg(trx)
assert.NoError(t, tPool.AppendTx(trx))
ids[i] = trx.ID()
}

tPool.BroadcastTxs(ids)

shouldPublishTransaction(t, ids[0])
}

0 comments on commit 135a697

Please sign in to comment.