Skip to content

Commit

Permalink
verify txs if they are missing a header
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Jun 14, 2022
1 parent d5963c4 commit b7ee989
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 26 deletions.
40 changes: 26 additions & 14 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/ballots"
smocks "github.com/spacemeshos/go-spacemesh/system/mocks"
"github.com/spacemeshos/go-spacemesh/vm/transaction"
"github.com/spacemeshos/go-spacemesh/vm/sdk"
"github.com/spacemeshos/go-spacemesh/vm/sdk/wallet"
)

const (
Expand Down Expand Up @@ -69,9 +70,20 @@ func createBuilder(tb testing.TB) *testBuilder {

func genTX(tb testing.TB, nonce uint64, recipient types.Address, signer *signing.EdSigner) *types.Transaction {
tb.Helper()
tx, err := transaction.GenerateCallTransaction(signer, recipient, nonce, 1, defaultGasLimit, defaultFee)
require.NoError(tb, err)
return tx

raw := wallet.Spend(signer.PrivateKey(), recipient, defaultFee,
sdk.WithNonce(types.Nonce{Counter: nonce}),
)
tx := types.Transaction{
RawTx: types.NewRawTx(raw),
TxHeader: &types.TxHeader{},
}
tx.MaxGas = defaultGasLimit
tx.MaxSpend = defaultFee
tx.GasPrice = 1
tx.Nonce = types.Nonce{Counter: nonce}
tx.Principal = types.BytesToAddress(signer.PublicKey().Bytes())
return &tx
}

func genActiveSet(tb testing.TB) []types.ATXID {
Expand Down Expand Up @@ -127,7 +139,7 @@ func TestBuilder_HandleLayer_MultipleProposals(t *testing.T) {
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(atxID, activeSet, proofs, nil).Times(1)

// for 1st proposal, containing the ref ballot of this epoch
b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx1.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx1.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: base}, nil).Times(1)
meshHash := types.RandomHash()
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(meshHash, nil).Times(1)
Expand All @@ -143,7 +155,7 @@ func TestBuilder_HandleLayer_MultipleProposals(t *testing.T) {
require.NotNil(t, p.EpochData)
require.Equal(t, activeSet, p.EpochData.ActiveSet)
require.Equal(t, beacon, p.EpochData.Beacon)
require.Equal(t, []types.TransactionID{tx1.ID()}, p.TxIDs)
require.Equal(t, []types.TransactionID{tx1.ID}, p.TxIDs)
require.Equal(t, proofs, p.EligibilityProofs)
require.Equal(t, meshHash, p.MeshHash)
return nil
Expand Down Expand Up @@ -173,7 +185,7 @@ func TestBuilder_HandleLayer_OneProposal(t *testing.T) {
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(atxID, activeSet, proofs, nil).Times(1)

// for 1st proposal, containing the ref ballot of this epoch
b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: bb}, nil).Times(1)
meshHash := types.RandomHash()
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(meshHash, nil).Times(1)
Expand All @@ -189,7 +201,7 @@ func TestBuilder_HandleLayer_OneProposal(t *testing.T) {
require.NotNil(t, p.EpochData)
require.Equal(t, activeSet, p.EpochData.ActiveSet)
require.Equal(t, beacon, p.EpochData.Beacon)
require.Equal(t, []types.TransactionID{tx.ID()}, p.TxIDs)
require.Equal(t, []types.TransactionID{tx.ID}, p.TxIDs)
require.Equal(t, meshHash, p.MeshHash)
return nil
}).Times(1)
Expand Down Expand Up @@ -276,7 +288,7 @@ func TestBuilder_HandleLayer_NoRefBallot(t *testing.T) {
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1)
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), activeSet, genProofs(t, 1), nil).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1)
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1)
b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -305,7 +317,7 @@ func TestBuilder_HandleLayer_RefBallot(t *testing.T) {
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1)
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1)
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1)
b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -333,7 +345,7 @@ func TestBuilder_HandleLayer_CanceledDuringBuilding(t *testing.T) {
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1)
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1)
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1)

Expand All @@ -353,7 +365,7 @@ func TestBuilder_HandleLayer_PublishError(t *testing.T) {
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1)
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1)
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1)
b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).Return(errors.New("unknown")).Times(1)
Expand All @@ -375,7 +387,7 @@ func TestBuilder_HandleLayer_StateRootErrorOK(t *testing.T) {
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1)
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1)
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1)
b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).Return(errors.New("unknown")).Times(1)
Expand All @@ -396,7 +408,7 @@ func TestBuilder_HandleLayer_MeshHashErrorOK(t *testing.T) {
b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1)
b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1)
b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1)
b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1)
b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1)
b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.EmptyLayerHash, errors.New("unknown")).Times(1)
b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).Return(errors.New("unknown")).Times(1)
Expand Down
24 changes: 24 additions & 0 deletions sql/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,30 @@ func Add(db sql.Executor, tx *types.Transaction, received time.Time) error {
return nil
}

// AddHeader and derived fields to the existing transaction.
func AddHeader(db sql.Executor, tid types.TransactionID, header *types.TxHeader) error {
buf, err := codec.Encode(header)
if err != nil {
return fmt.Errorf("encode %+v: %w", header, err)
}
rows, err := db.Exec(`update transactions
set header = ?1, principal = ?2, nonce = ?3
where id = ?4 returning id;`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, buf)
stmt.BindBytes(2, header.Principal[:])
stmt.BindInt64(3, int64(header.Nonce.Counter))
stmt.BindBytes(4, tid.Bytes())
}, nil)
if rows == 0 {
return fmt.Errorf("%w: %s", sql.ErrNotFound, err)
}
if err != nil {
return fmt.Errorf("add header %s: %w", tid, err)
}
return nil
}

// AddToProposal associates a transaction with a proposal.
func AddToProposal(db sql.Executor, tid types.TransactionID, lid types.LayerID, pid types.ProposalID) error {
if _, err := db.Exec(`insert into proposal_transactions (pid, tid, layer) values (?1, ?2, ?3)`,
Expand Down
37 changes: 27 additions & 10 deletions txs/conservative_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func (cs *ConservativeState) Validation(raw types.RawTx) system.ValidationReques
func (cs *ConservativeState) AddToCache(tx *types.Transaction) error {
received := time.Now()
// save all new transactions as long as they are syntactically correct

if err := cs.cache.AddToDB(tx, received); err != nil {
return err
}
Expand Down Expand Up @@ -159,16 +158,11 @@ func (cs *ConservativeState) ApplyLayer(toApply *types.Block) ([]types.Transacti
return nil, err
}

txs, err := cs.getTXsToApply(toApply)
txs, raw, err := cs.getTXsToApply(toApply)
if err != nil {
return nil, err
}

// vm parses fields sequentially, so it can't use Transaction
raw := make([]types.RawTx, 0, len(txs))
for _, tx := range txs {
raw = append(raw, tx.RawTx)
}
skipped, err := cs.vmState.Apply(toApply.LayerIndex, raw, toApply.Rewards)
if err != nil {
logger.With().Error("failed to apply layer txs",
Expand Down Expand Up @@ -203,20 +197,43 @@ func (cs *ConservativeState) ApplyLayer(toApply *types.Block) ([]types.Transacti
return skipped, nil
}

func (cs *ConservativeState) getTXsToApply(toApply *types.Block) ([]*types.Transaction, error) {
func (cs *ConservativeState) getTXsToApply(toApply *types.Block) ([]*types.Transaction, []types.RawTx, error) {
mtxs, missing := cs.GetMeshTransactions(toApply.TxIDs)
if len(missing) > 0 {
return nil, fmt.Errorf("find txs %v for applying layer %v", missing, toApply.LayerIndex)
return nil, nil, fmt.Errorf("find txs %v for applying layer %v", missing, toApply.LayerIndex)
}
txs := make([]*types.Transaction, 0, len(mtxs))
raw := make([]types.RawTx, 0, len(mtxs))
for _, mtx := range mtxs {
// some TXs in the block may be already applied previously
if mtx.State == types.APPLIED {
continue
}
// txs without header were saved by syncer without validation
if mtx.TxHeader == nil {
req := cs.vmState.Validation(mtx.RawTx)
header, err := req.Parse()
if err != nil {
return nil, nil, fmt.Errorf("parsing %s: %w", mtx.ID, err)
}
if !req.Verify() {
return nil, nil, fmt.Errorf("applying block %s with invalid tx %s", toApply.ID(), mtx.ID)
}
mtx.TxHeader = header
// updating header also updates principal/nonce indexes
if err := cs.tp.AddHeader(mtx.ID, header); err != nil {
return nil, nil, err
}
// restore cache consistency (e.g nonce/balance) so that gossiped
// transactions can be added succesfully
if err := cs.cache.Add(&mtx.Transaction, mtx.Received, nil); err != nil {
return nil, nil, err
}
}
txs = append(txs, &mtx.Transaction)
raw = append(raw, mtx.RawTx)
}
return txs, nil
return txs, raw, nil
}

// Transactions exports the transactions DB.
Expand Down
44 changes: 44 additions & 0 deletions txs/conservative_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,50 @@ func TestApplyLayer_VMError(t *testing.T) {
}
}

func TestConsistentConservativeState(t *testing.T) {
// we have two different workflows for transactions
// 1. receive gossiped transaction and verify it immediatly
// 2. receive synced transaction and delay verification
// this test is meant to ensure that both of them will result in a consistent
// conservative cache state

tcs1 := createConservativeState(t)
tcs2 := createConservativeState(t)
_ = tcs2

rng := rand.New(rand.NewSource(101))
signers := make([]*signing.EdSigner, 30)
nonces := make([]uint64, len(signers))
for i := range signers {
signers[i] = signing.NewEdSignerFromRand(rng)
}
tcs1.mvm.EXPECT().GetBalance(gomock.Any()).Return(defaultBalance, nil).AnyTimes()
tcs1.mvm.EXPECT().GetNonce(gomock.Any()).Return(types.Nonce{}, nil).AnyTimes()

for lid := 1; lid < 10; lid++ {
txs := make([]*types.Transaction, 100)
ids := make([]types.TransactionID, len(txs))
raw := make([]types.RawTx, len(txs))
for i := range txs {
signer := rng.Intn(len(signers))
txs[i] = newTx(t, nonces[signer], 1, 1, signers[signer])
nonces[signer]++
ids[i] = txs[i].ID
raw[i] = txs[i].RawTx
require.NoError(t, tcs1.AddToCache(txs[i]))
}
block := types.NewExistingBlock(types.BlockID{byte(lid)},
types.InnerBlock{
LayerIndex: types.NewLayerID(uint32(lid)),
TxIDs: ids,
},
)
tcs1.mvm.EXPECT().Apply(block.LayerIndex, raw, block.Rewards).Return(nil, nil).Times(1)
_, err := tcs1.ApplyLayer(block)
require.NoError(t, err)
}
}

func TestTXFetcher(t *testing.T) {
tcs := createConservativeState(t)
ids, txs := addBatch(t, tcs, numTXs)
Expand Down
2 changes: 0 additions & 2 deletions txs/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,12 @@ func (th *TxHandler) HandleSyncTransaction(ctx context.Context, data []byte) err
raw := types.NewRawTx(data)
exists, err := th.state.HasTx(raw.ID)
if err != nil {
th.logger.WithContext(ctx).With().Warning("failed to check sync tx exists", log.Err(err))
return fmt.Errorf("has sync tx: %w", err)
} else if exists {
return nil
}
err = th.state.Add(&types.Transaction{RawTx: raw}, time.Now())
if err != nil {
th.logger.WithContext(ctx).With().Warning("failed to add transaction", log.Err(err))
return fmt.Errorf("add tx %w", err)
}
return nil
Expand Down
1 change: 1 addition & 0 deletions txs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type conStateCache interface {

type txProvider interface {
Add(*types.Transaction, time.Time) error
AddHeader(types.TransactionID, *types.TxHeader) error
Has(types.TransactionID) (bool, error)
Get(types.TransactionID) (*types.MeshTransaction, error)
GetBlob(types.TransactionID) ([]byte, error)
Expand Down
14 changes: 14 additions & 0 deletions txs/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions txs/noop_tx_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type nopTP struct{}

func (ntp *nopTP) Add(*types.Transaction, time.Time) error { return nil }
func (ntp *nopTP) AddHeader(types.TransactionID, *types.TxHeader) error { return nil }
func (ntp *nopTP) Has(types.TransactionID) (bool, error) { return false, nil }
func (ntp *nopTP) Get(types.TransactionID) (*types.MeshTransaction, error) { return nil, nil }
func (ntp *nopTP) GetBlob(types.TransactionID) ([]byte, error) { return nil, nil }
Expand Down
5 changes: 5 additions & 0 deletions txs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,8 @@ func undoLayers(dbtx *sql.Tx, from types.LayerID) error {
}
return nil
}

// AddHeader to previously stored tx.
func (s *store) AddHeader(tid types.TransactionID, header *types.TxHeader) error {
return transactions.AddHeader(s.db, tid, header)
}

0 comments on commit b7ee989

Please sign in to comment.