diff --git a/miner/proposal_builder_test.go b/miner/proposal_builder_test.go index 5b78a40f7d..7f4804fc01 100644 --- a/miner/proposal_builder_test.go +++ b/miner/proposal_builder_test.go @@ -18,7 +18,8 @@ import ( "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/ballots" smocks "github.com/spacemeshos/go-spacemesh/system/mocks" - "github.com/spacemeshos/go-spacemesh/vm/transaction" + "github.com/spacemeshos/go-spacemesh/vm/sdk" + "github.com/spacemeshos/go-spacemesh/vm/sdk/wallet" ) const ( @@ -69,9 +70,20 @@ func createBuilder(tb testing.TB) *testBuilder { func genTX(tb testing.TB, nonce uint64, recipient types.Address, signer *signing.EdSigner) *types.Transaction { tb.Helper() - tx, err := transaction.GenerateCallTransaction(signer, recipient, nonce, 1, defaultGasLimit, defaultFee) - require.NoError(tb, err) - return tx + + raw := wallet.Spend(signer.PrivateKey(), recipient, defaultFee, + sdk.WithNonce(types.Nonce{Counter: nonce}), + ) + tx := types.Transaction{ + RawTx: types.NewRawTx(raw), + TxHeader: &types.TxHeader{}, + } + tx.MaxGas = defaultGasLimit + tx.MaxSpend = defaultFee + tx.GasPrice = 1 + tx.Nonce = types.Nonce{Counter: nonce} + tx.Principal = types.BytesToAddress(signer.PublicKey().Bytes()) + return &tx } func genActiveSet(tb testing.TB) []types.ATXID { @@ -127,7 +139,7 @@ func TestBuilder_HandleLayer_MultipleProposals(t *testing.T) { b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(atxID, activeSet, proofs, nil).Times(1) // for 1st proposal, containing the ref ballot of this epoch - b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx1.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx1.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: base}, nil).Times(1) meshHash := types.RandomHash() b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(meshHash, nil).Times(1) @@ -143,7 +155,7 @@ func TestBuilder_HandleLayer_MultipleProposals(t *testing.T) { require.NotNil(t, p.EpochData) require.Equal(t, activeSet, p.EpochData.ActiveSet) require.Equal(t, beacon, p.EpochData.Beacon) - require.Equal(t, []types.TransactionID{tx1.ID()}, p.TxIDs) + require.Equal(t, []types.TransactionID{tx1.ID}, p.TxIDs) require.Equal(t, proofs, p.EligibilityProofs) require.Equal(t, meshHash, p.MeshHash) return nil @@ -173,7 +185,7 @@ func TestBuilder_HandleLayer_OneProposal(t *testing.T) { b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(atxID, activeSet, proofs, nil).Times(1) // for 1st proposal, containing the ref ballot of this epoch - b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(len(proofs)).Return([]types.TransactionID{tx.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: bb}, nil).Times(1) meshHash := types.RandomHash() b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(meshHash, nil).Times(1) @@ -189,7 +201,7 @@ func TestBuilder_HandleLayer_OneProposal(t *testing.T) { require.NotNil(t, p.EpochData) require.Equal(t, activeSet, p.EpochData.ActiveSet) require.Equal(t, beacon, p.EpochData.Beacon) - require.Equal(t, []types.TransactionID{tx.ID()}, p.TxIDs) + require.Equal(t, []types.TransactionID{tx.ID}, p.TxIDs) require.Equal(t, meshHash, p.MeshHash) return nil }).Times(1) @@ -276,7 +288,7 @@ func TestBuilder_HandleLayer_NoRefBallot(t *testing.T) { b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1) b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1) b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), activeSet, genProofs(t, 1), nil).Times(1) - b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1) b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1) b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).DoAndReturn( @@ -305,7 +317,7 @@ func TestBuilder_HandleLayer_RefBallot(t *testing.T) { b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1) b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1) b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1) - b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1) b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1) b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).DoAndReturn( @@ -333,7 +345,7 @@ func TestBuilder_HandleLayer_CanceledDuringBuilding(t *testing.T) { b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1) b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1) b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1) - b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1) b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1) @@ -353,7 +365,7 @@ func TestBuilder_HandleLayer_PublishError(t *testing.T) { b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1) b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1) b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1) - b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1) b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1) b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).Return(errors.New("unknown")).Times(1) @@ -375,7 +387,7 @@ func TestBuilder_HandleLayer_StateRootErrorOK(t *testing.T) { b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1) b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1) b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1) - b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1) b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.RandomHash(), nil).Times(1) b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).Return(errors.New("unknown")).Times(1) @@ -396,7 +408,7 @@ func TestBuilder_HandleLayer_MeshHashErrorOK(t *testing.T) { b.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).Times(1) b.mBeacon.EXPECT().GetBeacon(gomock.Any()).Return(beacon, nil).Times(1) b.mOracle.EXPECT().GetProposalEligibility(layerID, beacon).Return(types.RandomATXID(), genActiveSet(t), genProofs(t, 1), nil).Times(1) - b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID()}).Times(1) + b.mCState.EXPECT().SelectProposalTXs(1).Return([]types.TransactionID{tx.ID}).Times(1) b.mBaseBP.EXPECT().EncodeVotes(gomock.Any(), gomock.Any()).Return(&types.Votes{Base: types.RandomBallotID()}, nil).Times(1) b.mMsh.EXPECT().GetAggregatedLayerHash(layerID.Sub(1)).Return(types.EmptyLayerHash, errors.New("unknown")).Times(1) b.mPubSub.EXPECT().Publish(gomock.Any(), proposals.NewProposalProtocol, gomock.Any()).Return(errors.New("unknown")).Times(1) diff --git a/sql/transactions/transactions.go b/sql/transactions/transactions.go index 1968defd74..b26881579e 100644 --- a/sql/transactions/transactions.go +++ b/sql/transactions/transactions.go @@ -50,6 +50,30 @@ func Add(db sql.Executor, tx *types.Transaction, received time.Time) error { return nil } +// AddHeader and derived fields to the existing transaction. +func AddHeader(db sql.Executor, tid types.TransactionID, header *types.TxHeader) error { + buf, err := codec.Encode(header) + if err != nil { + return fmt.Errorf("encode %+v: %w", header, err) + } + rows, err := db.Exec(`update transactions + set header = ?1, principal = ?2, nonce = ?3 + where id = ?4 returning id;`, + func(stmt *sql.Statement) { + stmt.BindBytes(1, buf) + stmt.BindBytes(2, header.Principal[:]) + stmt.BindInt64(3, int64(header.Nonce.Counter)) + stmt.BindBytes(4, tid.Bytes()) + }, nil) + if rows == 0 { + return fmt.Errorf("%w: %s", sql.ErrNotFound, err) + } + if err != nil { + return fmt.Errorf("add header %s: %w", tid, err) + } + return nil +} + // AddToProposal associates a transaction with a proposal. func AddToProposal(db sql.Executor, tid types.TransactionID, lid types.LayerID, pid types.ProposalID) error { if _, err := db.Exec(`insert into proposal_transactions (pid, tid, layer) values (?1, ?2, ?3)`, diff --git a/txs/conservative_state.go b/txs/conservative_state.go index 3a29a385d2..8f79e2b2c7 100644 --- a/txs/conservative_state.go +++ b/txs/conservative_state.go @@ -130,7 +130,6 @@ func (cs *ConservativeState) Validation(raw types.RawTx) system.ValidationReques func (cs *ConservativeState) AddToCache(tx *types.Transaction) error { received := time.Now() // save all new transactions as long as they are syntactically correct - if err := cs.cache.AddToDB(tx, received); err != nil { return err } @@ -159,16 +158,11 @@ func (cs *ConservativeState) ApplyLayer(toApply *types.Block) ([]types.Transacti return nil, err } - txs, err := cs.getTXsToApply(toApply) + txs, raw, err := cs.getTXsToApply(toApply) if err != nil { return nil, err } - // vm parses fields sequentially, so it can't use Transaction - raw := make([]types.RawTx, 0, len(txs)) - for _, tx := range txs { - raw = append(raw, tx.RawTx) - } skipped, err := cs.vmState.Apply(toApply.LayerIndex, raw, toApply.Rewards) if err != nil { logger.With().Error("failed to apply layer txs", @@ -203,20 +197,43 @@ func (cs *ConservativeState) ApplyLayer(toApply *types.Block) ([]types.Transacti return skipped, nil } -func (cs *ConservativeState) getTXsToApply(toApply *types.Block) ([]*types.Transaction, error) { +func (cs *ConservativeState) getTXsToApply(toApply *types.Block) ([]*types.Transaction, []types.RawTx, error) { mtxs, missing := cs.GetMeshTransactions(toApply.TxIDs) if len(missing) > 0 { - return nil, fmt.Errorf("find txs %v for applying layer %v", missing, toApply.LayerIndex) + return nil, nil, fmt.Errorf("find txs %v for applying layer %v", missing, toApply.LayerIndex) } txs := make([]*types.Transaction, 0, len(mtxs)) + raw := make([]types.RawTx, 0, len(mtxs)) for _, mtx := range mtxs { // some TXs in the block may be already applied previously if mtx.State == types.APPLIED { continue } + // txs without header were saved by syncer without validation + if mtx.TxHeader == nil { + req := cs.vmState.Validation(mtx.RawTx) + header, err := req.Parse() + if err != nil { + return nil, nil, fmt.Errorf("parsing %s: %w", mtx.ID, err) + } + if !req.Verify() { + return nil, nil, fmt.Errorf("applying block %s with invalid tx %s", toApply.ID(), mtx.ID) + } + mtx.TxHeader = header + // updating header also updates principal/nonce indexes + if err := cs.tp.AddHeader(mtx.ID, header); err != nil { + return nil, nil, err + } + // restore cache consistency (e.g nonce/balance) so that gossiped + // transactions can be added succesfully + if err := cs.cache.Add(&mtx.Transaction, mtx.Received, nil); err != nil { + return nil, nil, err + } + } txs = append(txs, &mtx.Transaction) + raw = append(raw, mtx.RawTx) } - return txs, nil + return txs, raw, nil } // Transactions exports the transactions DB. diff --git a/txs/conservative_state_test.go b/txs/conservative_state_test.go index 2b4bdb4c53..dad21cfc3d 100644 --- a/txs/conservative_state_test.go +++ b/txs/conservative_state_test.go @@ -688,6 +688,50 @@ func TestApplyLayer_VMError(t *testing.T) { } } +func TestConsistentConservativeState(t *testing.T) { + // we have two different workflows for transactions + // 1. receive gossiped transaction and verify it immediatly + // 2. receive synced transaction and delay verification + // this test is meant to ensure that both of them will result in a consistent + // conservative cache state + + tcs1 := createConservativeState(t) + tcs2 := createConservativeState(t) + _ = tcs2 + + rng := rand.New(rand.NewSource(101)) + signers := make([]*signing.EdSigner, 30) + nonces := make([]uint64, len(signers)) + for i := range signers { + signers[i] = signing.NewEdSignerFromRand(rng) + } + tcs1.mvm.EXPECT().GetBalance(gomock.Any()).Return(defaultBalance, nil).AnyTimes() + tcs1.mvm.EXPECT().GetNonce(gomock.Any()).Return(types.Nonce{}, nil).AnyTimes() + + for lid := 1; lid < 10; lid++ { + txs := make([]*types.Transaction, 100) + ids := make([]types.TransactionID, len(txs)) + raw := make([]types.RawTx, len(txs)) + for i := range txs { + signer := rng.Intn(len(signers)) + txs[i] = newTx(t, nonces[signer], 1, 1, signers[signer]) + nonces[signer]++ + ids[i] = txs[i].ID + raw[i] = txs[i].RawTx + require.NoError(t, tcs1.AddToCache(txs[i])) + } + block := types.NewExistingBlock(types.BlockID{byte(lid)}, + types.InnerBlock{ + LayerIndex: types.NewLayerID(uint32(lid)), + TxIDs: ids, + }, + ) + tcs1.mvm.EXPECT().Apply(block.LayerIndex, raw, block.Rewards).Return(nil, nil).Times(1) + _, err := tcs1.ApplyLayer(block) + require.NoError(t, err) + } +} + func TestTXFetcher(t *testing.T) { tcs := createConservativeState(t) ids, txs := addBatch(t, tcs, numTXs) diff --git a/txs/handler.go b/txs/handler.go index a10a186b00..aa3ed1c42d 100644 --- a/txs/handler.go +++ b/txs/handler.go @@ -83,14 +83,12 @@ func (th *TxHandler) HandleSyncTransaction(ctx context.Context, data []byte) err raw := types.NewRawTx(data) exists, err := th.state.HasTx(raw.ID) if err != nil { - th.logger.WithContext(ctx).With().Warning("failed to check sync tx exists", log.Err(err)) return fmt.Errorf("has sync tx: %w", err) } else if exists { return nil } err = th.state.Add(&types.Transaction{RawTx: raw}, time.Now()) if err != nil { - th.logger.WithContext(ctx).With().Warning("failed to add transaction", log.Err(err)) return fmt.Errorf("add tx %w", err) } return nil diff --git a/txs/interface.go b/txs/interface.go index 000cce175d..3cc7b4dfa6 100644 --- a/txs/interface.go +++ b/txs/interface.go @@ -35,6 +35,7 @@ type conStateCache interface { type txProvider interface { Add(*types.Transaction, time.Time) error + AddHeader(types.TransactionID, *types.TxHeader) error Has(types.TransactionID) (bool, error) Get(types.TransactionID) (*types.MeshTransaction, error) GetBlob(types.TransactionID) ([]byte, error) diff --git a/txs/mocks/mocks.go b/txs/mocks/mocks.go index a0791886e3..d3f11c8516 100644 --- a/txs/mocks/mocks.go +++ b/txs/mocks/mocks.go @@ -325,6 +325,20 @@ func (mr *MocktxProviderMockRecorder) Add(arg0, arg1 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MocktxProvider)(nil).Add), arg0, arg1) } +// AddHeader mocks base method. +func (m *MocktxProvider) AddHeader(arg0 types.TransactionID, arg1 *types.TxHeader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddHeader", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddHeader indicates an expected call of AddHeader. +func (mr *MocktxProviderMockRecorder) AddHeader(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddHeader", reflect.TypeOf((*MocktxProvider)(nil).AddHeader), arg0, arg1) +} + // AddToBlock mocks base method. func (m *MocktxProvider) AddToBlock(arg0 types.LayerID, arg1 types.BlockID, arg2 []types.TransactionID) error { m.ctrl.T.Helper() diff --git a/txs/noop_tx_provider.go b/txs/noop_tx_provider.go index a6e8b3ff5e..1c7e8b7ccd 100644 --- a/txs/noop_tx_provider.go +++ b/txs/noop_tx_provider.go @@ -10,6 +10,7 @@ import ( type nopTP struct{} func (ntp *nopTP) Add(*types.Transaction, time.Time) error { return nil } +func (ntp *nopTP) AddHeader(types.TransactionID, *types.TxHeader) error { return nil } func (ntp *nopTP) Has(types.TransactionID) (bool, error) { return false, nil } func (ntp *nopTP) Get(types.TransactionID) (*types.MeshTransaction, error) { return nil, nil } func (ntp *nopTP) GetBlob(types.TransactionID) ([]byte, error) { return nil, nil } diff --git a/txs/store.go b/txs/store.go index 9dd1868819..b38345631f 100644 --- a/txs/store.go +++ b/txs/store.go @@ -176,3 +176,8 @@ func undoLayers(dbtx *sql.Tx, from types.LayerID) error { } return nil } + +// AddHeader to previously stored tx. +func (s *store) AddHeader(tid types.TransactionID, header *types.TxHeader) error { + return transactions.AddHeader(s.db, tid, header) +}