Skip to content

Commit

Permalink
[R4R] parity warp-like state sync (#568)
Browse files Browse the repository at this point in the history
* parity warp-like state sync

* fix unit-test

* revise a bit to support sync from any breathe block and code clean up

* update dependency
  • Loading branch information
ackratos authored and forcodedancing committed May 19, 2022
1 parent 8bd89ff commit f5932f7
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 376 deletions.
20 changes: 11 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,27 @@
[[override]]
name = "github.com/tendermint/iavl"
source = "github.com/binance-chain/bnc-tendermint-iavl"
version = "=v0.12.0-binance.0"
branch = "release/v0.12.0-binance.1"
# version = "=v0.12.0-binance.0"

[[override]]
name = "github.com/tendermint/tendermint"
source = "github.com/binance-chain/bnc-tendermint"
branch = "develop"
branch = "release/v0.31.5-binance.0"
# version = "=v0.30.1-binance.3"

[[constraint]]
name = "github.com/cosmos/cosmos-sdk"
source = "github.com/binance-chain/bnc-cosmos-sdk"
branch = "develop"
branch = "release/v0.25.0-binance.18"
[[prune.project]]
name = "github.com/zondax/hid"
unused-packages = false

[[constraint]]
name = "github.com/btcsuite/btcd"
revision = "ed77733ec07dfc8a513741138419b8d9d3de9d2d"

[[constraint]]
name = "github.com/pkg/errors"
version = "0.8.0"
Expand Down
69 changes: 36 additions & 33 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/store"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth"
"github.com/cosmos/cosmos-sdk/x/bank"
Expand Down Expand Up @@ -81,8 +82,6 @@ type BinanceChain struct {
govKeeper gov.Keeper
// keeper to process param store and update
ParamHub *param.ParamHub
// manage state sync related status
StateSyncManager

baseConfig *config.BaseConfig
upgradeConfig *config.UpgradeConfig
Expand All @@ -94,7 +93,7 @@ type BinanceChain struct {
// TODO(#246): make it an aggregated wrapper of all component metrics (i.e. DexKeeper, StakeKeeper)
metrics *pub.Metrics

blockStore *blockchain.BlockStore
takeSnapshotHeight int64 // whether to take snapshot of current height, set at endblock(), reset at commit()
}

// NewBinanceChain creates a new instance of the BinanceChain.
Expand All @@ -115,7 +114,6 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
}
// set upgrade config
app.setUpgradeConfig()
app.SetPruning(viper.GetString("pruning"))
app.initRunningMode()
app.SetCommitMultiStoreTracer(traceStore)

Expand Down Expand Up @@ -226,7 +224,11 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
app.initGovHooks()
app.initPlugins()
app.initParams()
app.initStateSyncManager(ServerContext.Config.StateSyncReactor)
if ServerContext.Config.StateSyncReactor {
lastBreatheBlockHeight := app.getLastBreatheBlockHeight()
app.StateSyncHelper = store.NewStateSyncHelper(app.Logger.With("module", "statesync"), db, app.GetCommitMultiStore(), app.Codec)
app.StateSyncHelper.Init(lastBreatheBlockHeight)
}
return app
}

Expand All @@ -253,12 +255,18 @@ func (app *BinanceChain) initDex(pairMapper dex.TradingPairMapper) {
return
}
// count back to days in config.
blockDB := baseapp.LoadBlockDB()
defer blockDB.Close()
blockStore := blockchain.NewBlockStore(blockDB)
txDB := baseapp.LoadTxDB()
defer txDB.Close()

app.DexKeeper.Init(
app.CheckState.Ctx,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack,
baseapp.LoadBlockDB(),
baseapp.LoadTxDB(),
blockStore,
txDB,
app.LastBlockHeight(),
app.TxDecoder)
}
Expand Down Expand Up @@ -464,13 +472,12 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
// breathe block
bnclog.Info("Start Breathe Block Handling",
"height", height, "lastBlockTime", lastBlockTime, "newBlockTime", blockTime)
app.takeSnapshotHeight = height
icoDone := ico.EndBlockAsync(ctx)
dex.EndBreatheBlock(ctx, app.DexKeeper, app.govKeeper, height, blockTime)
param.EndBreatheBlock(ctx, app.ParamHub)
// other end blockers
<-icoDone
// fire and forget reload snapshot
go app.reloadSnapshot(height, true)
}

app.DexKeeper.StoreTradePrices(ctx)
Expand Down Expand Up @@ -513,6 +520,26 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
}
}

func (app *BinanceChain) Commit() (res abci.ResponseCommit) {
res = app.BaseApp.Commit()
if ServerContext.Config.StateSyncReactor && app.takeSnapshotHeight > 0 {
app.StateSyncHelper.SnapshotHeights <- app.takeSnapshotHeight
app.takeSnapshotHeight = 0
}
return
}

func (app *BinanceChain) WriteRecoveryChunk(hash abci.SHA256Sum, chunk *abci.AppStateChunk, isComplete bool) (err error) {
err = app.BaseApp.WriteRecoveryChunk(hash, chunk, isComplete)
if err != nil {
return err
}
if isComplete {
err = app.reInitChain()
}
return err
}

// ExportAppStateAndValidators exports blockchain world state to json.
func (app *BinanceChain) ExportAppStateAndValidators() (appState json.RawMessage, validators []tmtypes.GenesisValidator, err error) {
ctx := app.NewContext(sdk.RunTxModeCheck, abci.Header{})
Expand Down Expand Up @@ -719,27 +746,3 @@ func (app *BinanceChain) publish(tradesToPublish []*pub.Trade, proposalsToPublis

pub.Logger.Debug("finish publish", "height", height)
}

// SetPruning sets a pruning option on the multistore associated with the app
func (app *BinanceChain) SetPruning(pruning string) {
var pruningStrategy sdk.PruningStrategy
switch pruning {
case "nothing":
pruningStrategy = sdk.PruneNothing{}
case "everything":
pruningStrategy = sdk.PruneEverything{}
case "syncable":
// TODO: make these parameters configurable
pruningStrategy = sdk.PruneSyncable{NumRecent: 100, StoreEvery: 10000}
case "breathe":
pruningStrategy = NewKeepRecentAndBreatheBlock(int64(app.baseConfig.BreatheBlockInterval), 10000, ServerContext.Config)
default:
pruningStrategy = NewKeepRecentAndBreatheBlock(int64(app.baseConfig.BreatheBlockInterval), 10000, ServerContext.Config)
app.Logger.Error("failed to load pruning, set to breathe", "strategy", pruning)
}
app.BaseApp.SetPruning(pruningStrategy)
}

func (app *BinanceChain) SetBlockStore(store *blockchain.BlockStore) {
app.blockStore = store
}
2 changes: 0 additions & 2 deletions app/app_pub_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !race

package app

import (
Expand Down
2 changes: 0 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !race

package app

import (
Expand Down
91 changes: 40 additions & 51 deletions app/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"path"
"path/filepath"
"runtime/debug"
"sync"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"

cosmossrv "github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/baseapp"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/version"

Expand All @@ -23,9 +23,10 @@ import (
tmflags "github.com/tendermint/tendermint/libs/cli/flags"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tmlibs/cli"
"github.com/tendermint/tendermint/snapshot"

"github.com/binance-chain/node/app/config"
"github.com/binance-chain/node/common"
bnclog "github.com/binance-chain/node/common/log"
"github.com/binance-chain/node/common/utils"
"github.com/binance-chain/node/plugins/dex/order"
Expand Down Expand Up @@ -159,56 +160,44 @@ func (app *BinanceChain) processErrAbciResponseForPub(txBytes []byte) {
}
}

// binance-chain implementation of PruningStrategy
type KeepRecentAndBreatheBlock struct {
breatheBlockInterval int64

// Keep recent number blocks in case of rollback
numRecent int64

blockStore *blockchain.BlockStore

blockStoreInitializer sync.Once
}

func NewKeepRecentAndBreatheBlock(breatheBlockInterval, numRecent int64, config *tmcfg.Config) *KeepRecentAndBreatheBlock {
return &KeepRecentAndBreatheBlock{
breatheBlockInterval: breatheBlockInterval,
numRecent: numRecent,
}
}

// TODO: must enhance performance!
func (strategy KeepRecentAndBreatheBlock) ShouldPrune(version, latestVersion int64) bool {
// we are replay the possible 1 block diff between state and blockstore db
// save this block anyway and don't init strategy's blockStore
if cosmossrv.BlockStore == nil {
return false
func (app *BinanceChain) getLastBreatheBlockHeight() int64 {
// we should only sync to breathe block height
latestBlockHeight := app.LastBlockHeight()
var timeOfLatestBlock time.Time
if latestBlockHeight == 0 {
timeOfLatestBlock = utils.Now()
} else {
blockDB := baseapp.LoadBlockDB()
defer blockDB.Close()
blockStore := blockchain.NewBlockStore(blockDB)
block := blockStore.LoadBlock(latestBlockHeight)
timeOfLatestBlock = block.Time
}

// only at this time block store is initialized!
// block store has been opened after the start of tendermint node, we have to share same instance of block store
strategy.blockStoreInitializer.Do(func() {
strategy.blockStore = cosmossrv.BlockStore
})
height := app.DexKeeper.GetLastBreatheBlockHeight(
app.CheckState.Ctx,
latestBlockHeight,
timeOfLatestBlock,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack)
app.Logger.Info("get last breathe block height", "height", height)
return height
}

if version == 1 {
return false
} else if latestVersion-version < strategy.numRecent {
return false
} else {
if strategy.breatheBlockInterval > 0 {
return version%strategy.breatheBlockInterval != 0
} else {
lastBlock := strategy.blockStore.LoadBlock(version - 1)
block := strategy.blockStore.LoadBlock(version)
func (app *BinanceChain) reInitChain() error {
app.DexKeeper.Init(
app.CheckState.Ctx,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack,
snapshot.Manager().GetBlockStore(),
snapshot.Manager().GetTxDB(),
app.LastBlockHeight(),
app.TxDecoder)

// init app cache
stores := app.GetCommitMultiStore()
accountStore := stores.GetKVStore(common.AccountStoreKey)
app.SetAccountStoreCache(app.Codec, accountStore, app.baseConfig.AccountCacheSize)

if lastBlock == nil {
// this node is a state_synced node, previously block is not synced
// so we cannot tell whether this (first) block is breathe block or not
return false
}
return utils.SameDayInUTC(lastBlock.Time, block.Time)
}
}
return nil
}
2 changes: 0 additions & 2 deletions app/ordertx_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build !race

package app

import (
Expand Down
Loading

0 comments on commit f5932f7

Please sign in to comment.