Skip to content

Commit

Permalink
Restore mined transactions to mempool after blocks rollback (#768)
Browse files Browse the repository at this point in the history
  • Loading branch information
sidenaio authored Aug 26, 2021
1 parent 39ae2f1 commit dbe13d2
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 65 deletions.
22 changes: 14 additions & 8 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2095,10 +2095,10 @@ func (chain *Blockchain) ValidateSubChain(startHeight uint64, blocks []types.Blo
return nil
}

func (chain *Blockchain) ResetTo(height uint64) error {
func (chain *Blockchain) ResetTo(height uint64) (revertedTxs []*types.Transaction, err error) {
prevHead := chain.Head.Height()
if err := chain.appState.ResetTo(height); err != nil {
return errors.WithMessage(err, "state is corrupted, try to resync from scratch")
return nil, errors.WithMessage(err, "state is corrupted, try to resync from scratch")
}
chain.setHead(height, nil)

Expand All @@ -2107,11 +2107,17 @@ func (chain *Blockchain) ResetTo(height uint64) error {
if hash == (common.Hash{}) {
continue
}
block := chain.GetBlock(hash)
if block != nil {
for _, tx := range block.Body.Transactions {
revertedTxs = append(revertedTxs, tx)
}
}
chain.repo.RemoveHeader(hash)
chain.repo.RemoveCanonicalHash(h)
}

return nil
chain.bus.Publish(&events.BlockchainResetEvent{Header: chain.Head, RevertedTxs: revertedTxs})
return revertedTxs, nil
}

func (chain *Blockchain) EnsureIntegrity() error {
Expand All @@ -2120,16 +2126,16 @@ func (chain *Blockchain) EnsureIntegrity() error {
chain.Head.IdentityRoot() != chain.appState.IdentityState.Root() {
wasReset = true
resetTo := uint64(0)
for h, tryCnt := chain.Head.Height()-1, 0; h >= 1 && tryCnt < int(state.MaxSavedStatesCount)+1; h, tryCnt = h-1, tryCnt+1 {
if chain.appState.State.HasVersion(h) && chain.appState.IdentityState.HasVersion(h) {
for h, tryCnt := chain.Head.Height()-1, 0; h >= 1 && tryCnt < state.MaxSavedStatesCount+1; h, tryCnt = h-1, tryCnt+1 {
if chain.appState.State.HasVersion(h) && chain.appState.IdentityState.HasVersion(h) {
resetTo = h
break
}
}
if resetTo == 0 {
return errors.New("state db is corrupted, try to delete idenachain.db folder from your data directory and sync from scratch")
}
if err := chain.ResetTo(resetTo); err != nil {
if _, err := chain.ResetTo(resetTo); err != nil {
return err
}
}
Expand Down Expand Up @@ -2239,7 +2245,7 @@ func (chain *Blockchain) ReadSnapshotManifest() *snapshot.Manifest {
}
return &snapshot.Manifest{
Cid: cid,
CidV2: cidV2,
CidV2: cidV2,
Root: root,
Height: height,
}
Expand Down
34 changes: 28 additions & 6 deletions blockchain/blockchain_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/idena-network/idena-go/secstore"
"github.com/idena-network/idena-go/stats/collector"
"github.com/idena-network/idena-go/subscriptions"
"github.com/shopspring/decimal"
"github.com/tendermint/tm-db"
"math/big"
)

func NewTestBlockchainWithConfig(withIdentity bool, conf *config.ConsensusConf, valConf *config.ValidationConfig, alloc map[common.Address]config.GenesisAllocation, queueSlots int, executableSlots int, executableLimit int, queueLimit int) (*TestBlockchain, *appstate.AppState, *mempool.TxPool, *ecdsa.PrivateKey) {
Expand Down Expand Up @@ -67,7 +69,7 @@ func NewTestBlockchainWithConfig(withIdentity bool, conf *config.ConsensusConf,
appState.Initialize(chain.Head.Height())
txPool.Initialize(chain.Head, secStore.GetAddress(), false)

return &TestBlockchain{db, chain}, appState, txPool, key
return &TestBlockchain{db, chain, 0}, appState, txPool, key
}

func NewTestBlockchain(withIdentity bool, alloc map[common.Address]config.GenesisAllocation) (*TestBlockchain, *appstate.AppState, *mempool.TxPool, *ecdsa.PrivateKey) {
Expand All @@ -89,7 +91,9 @@ func NewCustomTestBlockchain(blocksCount int, emptyBlocksCount int, key *ecdsa.P
Network: 0x99,
Consensus: consensusCfg,
GenesisConf: &config.GenesisConf{
Alloc: nil,
Alloc: map[common.Address]config.GenesisAllocation{
addr: {Balance: big.NewInt(0).Mul(big.NewInt(100), common.DnaBase)},
},
GodAddress: addr,
FirstCeremonyTime: 4070908800, //01.01.2099
},
Expand Down Expand Up @@ -120,15 +124,20 @@ func NewCustomTestBlockchainWithConfig(blocksCount int, emptyBlocksCount int, ke
chain.InitializeChain()
appState.Initialize(chain.Head.Height())

result := &TestBlockchain{db, chain}
result.GenerateBlocks(blocksCount).GenerateEmptyBlocks(emptyBlocksCount)
result := &TestBlockchain{db, chain, 0}
result.GenerateBlocks(blocksCount, 0).GenerateEmptyBlocks(emptyBlocksCount)
txPool.Initialize(chain.Head, secStore.GetAddress(), false)
return result, appState
}

type TestBlockchain struct {
db db.DB
*Blockchain
coinbaseTxNonce uint32
}

func (chain *TestBlockchain) AddTx(tx *types.Transaction) error {
return chain.txpool.AddExternalTxs(tx)
}

func (chain *TestBlockchain) Copy() (*TestBlockchain, *appstate.AppState) {
Expand Down Expand Up @@ -164,7 +173,8 @@ func (chain *TestBlockchain) Copy() (*TestBlockchain, *appstate.AppState) {
copy := NewBlockchain(cfg, db, txPool, appState, ipfs.NewMemoryIpfsProxy(), chain.secStore, bus, offline, keyStore, subManager, upgrader)
copy.InitializeChain()
appState.Initialize(copy.Head.Height())
return &TestBlockchain{db, copy}, appState
txPool.Initialize(chain.Head, chain.secStore.GetAddress(), false)
return &TestBlockchain{db, copy, appState.State.GetNonce(chain.secStore.GetAddress())}, appState
}

func (chain *TestBlockchain) addCert(block *types.Block) {
Expand All @@ -183,8 +193,20 @@ func (chain *TestBlockchain) addCert(block *types.Block) {
chain.WriteCertificate(block.Header.Hash(), cert.Compress(), true)
}

func (chain *TestBlockchain) GenerateBlocks(count int) *TestBlockchain {
func (chain *TestBlockchain) GenerateBlocks(count int, txsInBlock int) *TestBlockchain {
for i := 0; i < count; i++ {
for j := 0; j < txsInBlock; j++ {
tx := BuildTx(chain.appState, chain.coinBaseAddress, &chain.coinBaseAddress, types.SendTx, decimal.Zero,
decimal.New(20, 0), decimal.Zero, chain.coinbaseTxNonce, 0, nil)
tx, err := chain.secStore.SignTx(tx)
if err != nil {
panic(err)
}
if err = chain.txpool.AddExternalTxs(tx); err != nil {
panic(err)
}
}

block := chain.ProposeBlock([]byte{})
block.Block.Header.ProposedHeader.Time = chain.Head.Time() + 20
err := chain.AddBlock(block.Block, nil, collector.NewStatsCollector())
Expand Down
46 changes: 23 additions & 23 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ func Test_applyVrfProposerThreshold(t *testing.T) {
chain.GenerateEmptyBlocks(10)
require.Equal(t, 10, chain.appState.State.EmptyBlocksCount())

chain.GenerateBlocks(5)
chain.GenerateBlocks(5, 0)
require.Equal(t, 10, chain.appState.State.EmptyBlocksCount())

chain.GenerateBlocks(100)
chain.GenerateBlocks(100, 0)
require.Equal(t, 0.5, chain.appState.State.VrfProposerThreshold())
}

Expand Down Expand Up @@ -557,7 +557,7 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
require.NoError(err)

// apply pending status switch
chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)
require.Equal(1, len(state.State.StatusSwitchAddresses()))
require.False(state.IdentityState.IsOnline(addr))

Expand All @@ -567,26 +567,26 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
require.Error(err, "should not validate tx if switch is already pending")

// switch status to online
chain.GenerateBlocks(3)
chain.GenerateBlocks(3, 0)
require.Equal(uint64(10), chain.Head.Height())
require.Zero(len(state.State.StatusSwitchAddresses()))
require.True(state.IdentityState.IsOnline(addr))
require.True(chain.Head.Flags().HasFlag(types.IdentityUpdate))

// fail to switch online again
chain.GenerateBlocks(5)
chain.GenerateBlocks(5, 0)
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
err = chain.txpool.AddInternalTx(tx)
require.Error(err, "should not validate tx if identity already has online status")

// add pending request to switch offline
chain.GenerateBlocks(4)
chain.GenerateBlocks(4, 0)
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(false)))
err = chain.txpool.AddInternalTx(tx)
require.NoError(err)

// switch status to offline
chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)
require.Equal(uint64(20), chain.Head.Height())
require.Zero(len(state.State.StatusSwitchAddresses()))
require.False(state.IdentityState.IsOnline(addr))
Expand All @@ -596,33 +596,33 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
err = chain.txpool.AddInternalTx(tx)
require.NoError(err)
chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)

require.Equal(1, len(state.State.StatusSwitchAddresses()))

// remove pending request to switch online
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(false)))
err = chain.txpool.AddInternalTx(tx)
require.NoError(err)
chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)

require.Zero(len(state.State.StatusSwitchAddresses()))

// 30th block should not update identity statuses, no pending requests
chain.GenerateBlocks(8)
chain.GenerateBlocks(8, 0)
require.Equal(uint64(30), chain.Head.Height())
require.False(state.IdentityState.IsOnline(addr))
require.False(chain.Head.Flags().HasFlag(types.IdentityUpdate))

chain.GenerateBlocks(70)
chain.GenerateBlocks(70, 0)
require.Equal(uint64(100), chain.Head.Height())

tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
err = chain.txpool.AddInternalTx(tx)
require.Nil(err)

// switch status to online
chain.GenerateBlocks(10)
chain.GenerateBlocks(10, 0)

require.True(state.IdentityState.IsOnline(addr))
state.State.AddDelayedPenalty(common.Address{0x2})
Expand All @@ -634,7 +634,7 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
tx, _ = chain.secStore.SignTx(BuildTx(state, addr, nil, types.OnlineStatusTx, decimal.Zero, decimal.New(20, 0), decimal.Zero, 0, 0, attachments.CreateOnlineStatusAttachment(true)))
err = chain.txpool.AddInternalTx(tx)
require.Nil(err)
chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)

require.Equal([]common.Address{{0x2}, {0x3}}, state.State.DelayedOfflinePenalties())
require.True(state.IdentityState.IsOnline(addr))
Expand All @@ -643,7 +643,7 @@ func Test_Blockchain_OnlineStatusSwitch(t *testing.T) {
state.State.AddDelayedPenalty(addr)
state.Commit(nil)
chain.CommitState()
chain.GenerateBlocks(10)
chain.GenerateBlocks(10, 0)

require.False(state.IdentityState.IsOnline(addr))
require.True(state.State.GetPenalty(addr).Sign() > 0)
Expand Down Expand Up @@ -695,7 +695,7 @@ func Test_ApplySubmitCeremonyTxs(t *testing.T) {
err = chain.txpool.AddInternalTx(signed)
require.NoError(t, err)

chain.GenerateBlocks(3)
chain.GenerateBlocks(3, 0)

require.True(t, app.State.HasValidationTx(addr, types.SubmitAnswersHashTx))
require.False(t, app.State.HasValidationTx(addr, types.SubmitShortAnswersTx))
Expand All @@ -709,7 +709,7 @@ func Test_ApplySubmitCeremonyTxs(t *testing.T) {
err = chain.txpool.AddInternalTx(signed)
require.NoError(t, err)

chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)
require.True(t, app.State.HasValidationTx(addr, types.SubmitAnswersHashTx))
require.True(t, app.State.HasValidationTx(addr, types.EvidenceTx))

Expand All @@ -736,7 +736,7 @@ func Test_Blockchain_GodAddressInvitesLimit(t *testing.T) {
receiver := crypto.PubkeyToAddress(keyReceiver.PublicKey)
tx, _ := chain.secStore.SignTx(BuildTx(state, addr, &receiver, types.InviteTx, decimal.Zero, decimal.New(2, 0), decimal.Zero, 0, 0, nil))
require.NoError(chain.txpool.AddInternalTx(tx))
chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)
}

keyReceiver, _ := crypto.GenerateKey()
Expand Down Expand Up @@ -998,18 +998,18 @@ func Test_Delegation(t *testing.T) {
require.NoError(t, addTx(keys[2], types.DelegateTx, 1, 0, &pool2, nil))
require.NoError(t, addTx(keys[3], types.DelegateTx, 1, 0, &pool3, nil))

chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)
require.ErrorIs(t, validation.InvalidRecipient, addTx(pool3Key, types.DelegateTx, 1, 0, &pool3, nil))
require.NoError(t, addTx(pool3Key, types.DelegateTx, 1, 0, &pool2, nil))

chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)

require.Len(t, appState.State.Delegations(), 5)

require.NoError(t, addTx(keys[0], types.OnlineStatusTx, 2, 0, nil, attachments.CreateOnlineStatusAttachment(false)))
require.NoError(t, addTx(keys[3], types.OnlineStatusTx, 2, 0, nil, attachments.CreateOnlineStatusAttachment(true)))

chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)
chain.GenerateEmptyBlocks(50)

require.False(t, appState.ValidatorsCache.IsOnlineIdentity(addrs[0]))
Expand All @@ -1027,7 +1027,7 @@ func Test_Delegation(t *testing.T) {

addr3 := crypto.PubkeyToAddress(keys[3].PublicKey)
require.NoError(t, addTx(pool3Key, types.KillDelegatorTx, 2, 0, &addr3, nil))
chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)

require.Equal(t, 0, appState.ValidatorsCache.PoolSize(pool3))
require.False(t, appState.ValidatorsCache.IsPool(pool3))
Expand All @@ -1042,10 +1042,10 @@ func Test_Delegation(t *testing.T) {
require.NoError(t, addTx(keys[1], types.UndelegateTx, 1, 1, nil, nil))
require.NoError(t, addTx(keys[2], types.UndelegateTx, 1, 1, nil, nil))

chain.GenerateBlocks(1)
chain.GenerateBlocks(1, 0)
require.Len(t, appState.State.Delegations(), 2)

chain.GenerateBlocks(50)
chain.GenerateBlocks(50, 0)
require.Len(t, appState.State.Delegations(), 0)

require.Equal(t, 0, appState.ValidatorsCache.PoolSize(pool2))
Expand Down
14 changes: 12 additions & 2 deletions consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ func (engine *Engine) loop() {
if err := engine.downloader.SyncBlockchain(engine.forkResolver); err != nil {
engine.synced = false
if engine.forkResolver.HasLoadedFork() {
engine.forkResolver.ApplyFork()
if revertedTxs, err := engine.forkResolver.ApplyFork(); err == nil {
if len(revertedTxs) > 0 {
engine.txpool.AddExternalTxs(revertedTxs...)
}
} else {
engine.log.Warn("fork apply error", "err", err)
}
} else {
engine.log.Warn("syncing error", "err", err)
time.Sleep(time.Second * 5)
Expand Down Expand Up @@ -229,8 +235,12 @@ func (engine *Engine) loop() {
engine.log.Info("Binary Ba is failed", "err", err)

if err == ForkDetected {
if err = engine.forkResolver.ApplyFork(); err != nil {
if revertedTxs, err := engine.forkResolver.ApplyFork(); err != nil {
engine.log.Error("error occurred during applying of fork", "err", err)
} else {
if len(revertedTxs) > 0 {
engine.txpool.AddExternalTxs(revertedTxs...)
}
}
}
continue
Expand Down
15 changes: 8 additions & 7 deletions consensus/fork_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (resolver *ForkResolver) checkForkSize(fork []types.BlockBundle) error {
return errors.New("fork has worse seed")
}

func (resolver *ForkResolver) applyFork(commonHeight uint64, fork []types.BlockBundle) error {
func (resolver *ForkResolver) applyFork(commonHeight uint64, fork []types.BlockBundle) ([]*types.Transaction, error) {

defer func() {
resolver.applicableFork = nil
Expand All @@ -154,21 +154,22 @@ func (resolver *ForkResolver) applyFork(commonHeight uint64, fork []types.BlockB
d.ClearPotentialForks()
}
}()

if err := resolver.chain.ResetTo(commonHeight); err != nil {
return err
var revertedTxs []*types.Transaction
var err error
if revertedTxs, err = resolver.chain.ResetTo(commonHeight); err != nil {
return nil, err
}
for _, bundle := range fork {
if err := resolver.chain.AddBlock(bundle.Block, nil, resolver.statsCollector); err != nil {
return err
return nil, err
}
resolver.chain.WriteCertificate(bundle.Block.Hash(), bundle.Cert, false)
}

return nil
return revertedTxs, nil
}

func (resolver *ForkResolver) ApplyFork() error {
func (resolver *ForkResolver) ApplyFork() (revertedTxs []*types.Transaction, err error) {
if !resolver.HasLoadedFork() {
panic("resolver hasn't applicable fork")
}
Expand Down
Loading

0 comments on commit dbe13d2

Please sign in to comment.