Skip to content

Commit

Permalink
parity warp-like state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos committed May 8, 2019
1 parent 91044e2 commit 65d99bd
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 367 deletions.
20 changes: 11 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,19 @@
[[override]]
name = "github.com/tendermint/iavl"
source = "github.com/binance-chain/bnc-tendermint-iavl"
version = "=v0.12.0-binance.0"
branch = "state_sync_warp"
# version = "=v0.12.0-binance.0"

[[override]]
name = "github.com/tendermint/tendermint"
source = "github.com/binance-chain/bnc-tendermint"
version = "=v0.30.1-binance.6"
branch = "state_sync_warp"
# version = "=v0.30.1-binance.3"

[[constraint]]
name = "github.com/cosmos/cosmos-sdk"
source = "github.com/binance-chain/bnc-cosmos-sdk"
version = "=v0.25.0-binance.17"
branch = "state_sync_warp"

[[constraint]]
name = "github.com/btcsuite/btcd"
Expand Down
71 changes: 36 additions & 35 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
"sort"
"time"

"github.com/spf13/viper"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/store"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth"
"github.com/cosmos/cosmos-sdk/x/bank"
Expand Down Expand Up @@ -85,8 +84,6 @@ type BinanceChain struct {
govKeeper gov.Keeper
// keeper to process param store and update
ParamHub *param.ParamHub
// manage state sync related status
StateSyncManager

baseConfig *config.BaseConfig
upgradeConfig *config.UpgradeConfig
Expand All @@ -98,7 +95,7 @@ type BinanceChain struct {
// TODO(#246): make it an aggregated wrapper of all component metrics (i.e. DexKeeper, StakeKeeper)
metrics *pub.Metrics

blockStore *blockchain.BlockStore
takeSnapshotHeight int64 // whether to take snapshot of current height, set at endblock(), reset at commit()
}

// NewBinanceChain creates a new instance of the BinanceChain.
Expand All @@ -119,7 +116,6 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
}
// set upgrade config
app.setUpgradeConfig()
app.SetPruning(viper.GetString("pruning"))
app.initRunningMode()
app.SetCommitMultiStoreTracer(traceStore)

Expand Down Expand Up @@ -232,7 +228,11 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
app.initDex(tradingPairMapper)
app.initPlugins()
app.initParams()
app.initStateSyncManager(ServerContext.Config.StateSyncReactor)
if ServerContext.Config.StateSyncReactor {
lastBreatheBlockHeight := app.getLastBreatheBlockHeight()
app.StateSyncHelper = store.NewStateSyncHelper(app.Logger.With("module", "statesync"), db, app.GetCommitMultiStore(), app.Codec)
app.StateSyncHelper.Init(lastBreatheBlockHeight)
}
return app
}

Expand All @@ -259,12 +259,18 @@ func (app *BinanceChain) initDex(pairMapper dex.TradingPairMapper) {
return
}
// count back to days in config.
blockDB := baseapp.LoadBlockDB()
defer blockDB.Close()
blockStore := blockchain.NewBlockStore(blockDB)
txDB := baseapp.LoadTxDB()
defer txDB.Close()

app.DexKeeper.Init(
app.CheckState.Ctx,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack,
baseapp.LoadBlockDB(),
baseapp.LoadTxDB(),
blockStore,
txDB,
app.LastBlockHeight(),
app.TxDecoder)
}
Expand Down Expand Up @@ -461,13 +467,12 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
// breathe block
bnclog.Info("Start Breathe Block Handling",
"height", height, "lastBlockTime", lastBlockTime, "newBlockTime", blockTime)
app.takeSnapshotHeight = height
icoDone := ico.EndBlockAsync(ctx)
dex.EndBreatheBlock(ctx, app.DexKeeper, height, blockTime)
param.EndBreatheBlock(ctx, app.ParamHub)
// other end blockers
<-icoDone
// fire and forget reload snapshot
go app.reloadSnapshot(height, true)
}

app.DexKeeper.StoreTradePrices(ctx)
Expand Down Expand Up @@ -510,6 +515,26 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
}
}

func (app *BinanceChain) Commit() (res abci.ResponseCommit) {
res = app.BaseApp.Commit()
if ServerContext.Config.StateSyncReactor && app.takeSnapshotHeight > 0 {
app.StateSyncHelper.SnapshotHeights <- app.takeSnapshotHeight
app.takeSnapshotHeight = 0
}
return
}

func (app *BinanceChain) WriteRecoveryChunk(hash abci.SHA256Sum, chunk *abci.AppStateChunk, isComplete bool) (err error) {
err = app.BaseApp.WriteRecoveryChunk(hash, chunk, isComplete)
if err != nil {
return err
}
if isComplete {
err = app.reInitChain()
}
return err
}

// ExportAppStateAndValidators exports blockchain world state to json.
func (app *BinanceChain) ExportAppStateAndValidators() (appState json.RawMessage, validators []tmtypes.GenesisValidator, err error) {
ctx := app.NewContext(sdk.RunTxModeCheck, abci.Header{})
Expand Down Expand Up @@ -714,27 +739,3 @@ func (app *BinanceChain) publish(tradesToPublish []*pub.Trade, proposalsToPublis

pub.Logger.Debug("finish publish", "height", height)
}

// SetPruning sets a pruning option on the multistore associated with the app
func (app *BinanceChain) SetPruning(pruning string) {
var pruningStrategy sdk.PruningStrategy
switch pruning {
case "nothing":
pruningStrategy = sdk.PruneNothing{}
case "everything":
pruningStrategy = sdk.PruneEverything{}
case "syncable":
// TODO: make these parameters configurable
pruningStrategy = sdk.PruneSyncable{NumRecent: 100, StoreEvery: 10000}
case "breathe":
pruningStrategy = NewKeepRecentAndBreatheBlock(int64(app.baseConfig.BreatheBlockInterval), 10000, ServerContext.Config)
default:
pruningStrategy = NewKeepRecentAndBreatheBlock(int64(app.baseConfig.BreatheBlockInterval), 10000, ServerContext.Config)
app.Logger.Error("failed to load pruning, set to breathe", "strategy", pruning)
}
app.BaseApp.SetPruning(pruningStrategy)
}

func (app *BinanceChain) SetBlockStore(store *blockchain.BlockStore) {
app.blockStore = store
}
99 changes: 51 additions & 48 deletions app/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"path"
"path/filepath"
"runtime/debug"
"sync"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"

cosmossrv "github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/baseapp"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/version"

Expand All @@ -25,8 +25,10 @@ import (
tmflags "github.com/tendermint/tendermint/libs/cli/flags"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/snapshot"

"github.com/binance-chain/node/app/config"
"github.com/binance-chain/node/common"
bnclog "github.com/binance-chain/node/common/log"
"github.com/binance-chain/node/common/utils"
"github.com/binance-chain/node/plugins/dex/order"
Expand Down Expand Up @@ -187,56 +189,57 @@ func (app *BinanceChain) processErrAbciResponseForPub(txBytes []byte) {
}
}

// binance-chain implementation of PruningStrategy
type KeepRecentAndBreatheBlock struct {
breatheBlockInterval int64

// Keep recent number blocks in case of rollback
numRecent int64

blockStore *blockchain.BlockStore
func (app *BinanceChain) getLastBreatheBlockHeight() int64 {
// we should only sync to breathe block height
latestBlockHeight := app.LastBlockHeight()
var timeOfLatestBlock time.Time
if latestBlockHeight == 0 {
timeOfLatestBlock = utils.Now()
} else {
blockDB := baseapp.LoadBlockDB()
defer blockDB.Close()
blockStore := blockchain.NewBlockStore(blockDB)
block := blockStore.LoadBlock(latestBlockHeight)
timeOfLatestBlock = block.Time
}

blockStoreInitializer sync.Once
height := app.DexKeeper.GetLastBreatheBlockHeight(
app.CheckState.Ctx,
latestBlockHeight,
timeOfLatestBlock,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack)
app.Logger.Info("get last breathe block height", "height", height)
return height
}

func NewKeepRecentAndBreatheBlock(breatheBlockInterval, numRecent int64, config *tmcfg.Config) *KeepRecentAndBreatheBlock {
return &KeepRecentAndBreatheBlock{
breatheBlockInterval: breatheBlockInterval,
numRecent: numRecent,
}
}
func (app *BinanceChain) reInitChain() error {

// TODO: must enhance performance!
func (strategy KeepRecentAndBreatheBlock) ShouldPrune(version, latestVersion int64) bool {
// we are replay the possible 1 block diff between state and blockstore db
// save this block anyway and don't init strategy's blockStore
if cosmossrv.BlockStore == nil {
return false
// load into memory from db
if err := app.LoadCMSLatestVersion(); err != nil {
return err
}
stores := app.GetCommitMultiStore()
commitId := stores.LastCommitID()
hashHex := fmt.Sprintf("%X", commitId.Hash)
app.Logger.Info("commit by state reactor", "version", commitId.Version, "hash", hashHex)

// simulate we just "Commit()" :P
app.SetCheckState(abci.Header{Height: snapshot.Manager().RestorationManifest.Height})
app.DeliverState = nil

app.DexKeeper.Init(
app.CheckState.Ctx,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack,
snapshot.Manager().GetBlockStore(),
snapshot.Manager().GetTxDB(),
app.LastBlockHeight(),
app.TxDecoder)

// init app cache
accountStore := stores.GetKVStore(common.AccountStoreKey)
app.SetAccountStoreCache(app.Codec, accountStore, app.baseConfig.AccountCacheSize)

// only at this time block store is initialized!
// block store has been opened after the start of tendermint node, we have to share same instance of block store
strategy.blockStoreInitializer.Do(func() {
strategy.blockStore = cosmossrv.BlockStore
})

if version == 1 {
return false
} else if latestVersion-version < strategy.numRecent {
return false
} else {
if strategy.breatheBlockInterval > 0 {
return version%strategy.breatheBlockInterval != 0
} else {
lastBlock := strategy.blockStore.LoadBlock(version - 1)
block := strategy.blockStore.LoadBlock(version)

if lastBlock == nil {
// this node is a state_synced node, previously block is not synced
// so we cannot tell whether this (first) block is breathe block or not
return false
}
return utils.SameDayInUTC(lastBlock.Time, block.Time)
}
}
return nil
}
Loading

0 comments on commit 65d99bd

Please sign in to comment.