diff --git a/Gopkg.lock b/Gopkg.lock index 42986503e..e2fd2f3da 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -600,7 +600,7 @@ version = "v0.12.0-binance.0" [[projects]] - digest = "1:31ef3fb8e56574e272cba6c65706ada00b6ee408d54287dbd0efb8ba2a3458c3" + digest = "1:ecb0b4a897ea8a52195e6df0aa3bc02bd1ce12a2d7bce13c10b506a70969395d" name = "github.com/tendermint/tendermint" packages = [ "abci/client", @@ -666,9 +666,9 @@ "version", ] pruneopts = "UT" - revision = "65504720616b801366974329c7b564719b0a1d19" + revision = "14ae0f7facd020c391548dfa226b30cb9fecdfdb" source = "github.com/binance-chain/bnc-tendermint" - version = "v0.30.1-binance.1" + version = "v0.30.1-binance.2" [[projects]] digest = "1:7886f86064faff6f8d08a3eb0e8c773648ff5a2e27730831e2bfbf07467f6666" diff --git a/Gopkg.toml b/Gopkg.toml index a7dc0e1ae..29beb9b80 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -37,7 +37,7 @@ [[override]] name = "github.com/tendermint/tendermint" source = "github.com/binance-chain/bnc-tendermint" - version = "=v0.30.1-binance.1" + version = "=v0.30.1-binance.2" [[constraint]] name = "github.com/cosmos/cosmos-sdk" diff --git a/app/app.go b/app/app.go index b401afb32..166ec3399 100644 --- a/app/app.go +++ b/app/app.go @@ -236,6 +236,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp app.initDex(tradingPairMapper) app.initPlugins() app.initParams() + app.initStateSyncManager(ServerContext.Config.StateSyncReactor) return app } @@ -458,6 +459,8 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a param.EndBreatheBlock(ctx, app.ParamHub) // other end blockers <-icoDone + // fire and forget reload snapshot + go app.reloadSnapshot(height, true) } app.DexKeeper.StoreTradePrices(ctx) diff --git a/app/app_pub_test.go b/app/app_pub_test.go index d03e905b6..92e22fd84 100644 --- a/app/app_pub_test.go +++ b/app/app_pub_test.go @@ -1,3 +1,5 @@ +// +build !race + package app import ( diff --git a/app/app_test.go b/app/app_test.go index f7d4911ca..370f3e16f 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -1,3 +1,5 @@ +// +build !race + package app import ( diff --git a/app/ordertx_test.go b/app/ordertx_test.go index 55c8f9f83..7ab383dda 100644 --- a/app/ordertx_test.go +++ b/app/ordertx_test.go @@ -1,3 +1,5 @@ +// +build !race + package app import ( diff --git a/app/statesync_helper.go b/app/statesync_helper.go index b37eb9def..f7219afae 100644 --- a/app/statesync_helper.go +++ b/app/statesync_helper.go @@ -2,13 +2,16 @@ package app import ( "fmt" + "runtime/debug" + "sync" "time" - "github.com/cosmos/cosmos-sdk/server" + "github.com/cosmos/cosmos-sdk/baseapp" storePkg "github.com/cosmos/cosmos-sdk/store" "github.com/tendermint/iavl" abci "github.com/tendermint/tendermint/abci/types" + bc "github.com/tendermint/tendermint/blockchain" cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" @@ -17,78 +20,37 @@ import ( ) type StateSyncManager struct { + enabledStateSyncReactor bool // whether we enabledStateSyncReactor state sync reactor + stateSyncHeight int64 stateSyncNumKeys []int64 stateSyncStoreInfos []storePkg.StoreInfo + + reloadingMtx sync.RWMutex // guard below fields to make sure no concurrent load snapshot and response snapshot, and they should be updated atomically + + stateCachedHeight int64 + numKeysCache []int64 + totalKeyCache int64 // cache sum of numKeysCache + chunkCache [][]byte } // Implement state sync related ABCI interfaces func (app *BinanceChain) LatestSnapshot() (height int64, numKeys []int64, err error) { - app.Logger.Info("query latest snapshot") - numKeys = make([]int64, 0, len(common.StoreKeyNames)) - - // we should only sync to breathe block height - latestBlockHeight := app.LastBlockHeight() - var timeOfLatestBlock time.Time - if latestBlockHeight == 0 { - timeOfLatestBlock = utils.Now() - } else { - block := server.BlockStore.LoadBlock(latestBlockHeight) - timeOfLatestBlock = block.Time - } - - height = app.DexKeeper.GetLastBreatheBlockHeight( - app.CheckState.Ctx, - latestBlockHeight, - timeOfLatestBlock, - app.baseConfig.BreatheBlockInterval, - app.baseConfig.BreatheBlockDaysCountBack) - - for _, key := range common.StoreKeyNames { - var storeKeys int64 - store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[key]) - mutableTree := store.(*storePkg.IavlStore).Tree - if tree, err := mutableTree.GetImmutable(height); err == nil { - tree.IterateFirst(func(nodeBytes []byte) { - storeKeys++ - }) - } else { - app.Logger.Error("failed to load immutable tree", "err", err) - } - numKeys = append(numKeys, storeKeys) - } + app.reloadingMtx.RLock() + defer app.reloadingMtx.RUnlock() - return + return app.stateCachedHeight, app.numKeysCache, nil } func (app *BinanceChain) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error) { - app.Logger.Info("read snapshot chunk", "height", height, "startIndex", startIndex, "endIndex", endIndex) - chunk = make([][]byte, 0, endIndex-startIndex) - - // TODO: can be optimized - direct jump to expected store - iterated := int64(0) - for _, key := range common.StoreKeyNames { - store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[key]) - - mutableTree := store.(*storePkg.IavlStore).Tree - if tree, err := mutableTree.GetImmutable(height); err == nil { - tree.IterateFirst(func(nodeBytes []byte) { - if iterated >= startIndex && iterated < endIndex { - chunk = append(chunk, nodeBytes) - } - iterated += 1 - }) - } else { - app.Logger.Error("failed to load immutable tree", "err", err) - } - } + app.reloadingMtx.RLock() + defer app.reloadingMtx.RUnlock() - if int64(len(chunk)) != (endIndex - startIndex) { - app.Logger.Error("failed to load enough chunk", "expected", endIndex-startIndex, "got", len(chunk)) + app.Logger.Info("read snapshot chunk", "height", height, "startIndex", startIndex, "endIndex", endIndex) + if height != app.stateCachedHeight { + return nil, fmt.Errorf("peer requested a stale height we do not have, cacheHeight: %d", app.stateCachedHeight) } - - app.Logger.Info("finish read snapshot chunk", "height", height, "startIndex", startIndex, "endIndex", endIndex) - return + return app.chunkCache[startIndex:endIndex], nil } func (app *BinanceChain) StartRecovery(height int64, numKeys []int64) error { @@ -102,7 +64,7 @@ func (app *BinanceChain) StartRecovery(height int64, numKeys []int64) error { func (app *BinanceChain) WriteRecoveryChunk(chunk [][]byte) error { //store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[storeName]) - app.Logger.Info("start write recovery chunk") + app.Logger.Info("start write recovery chunk", "totalKeys", len(chunk)) nodes := make([]*iavl.Node, 0) for idx := 0; idx < len(chunk); idx++ { node, _ := iavl.MakeNode(chunk[idx]) @@ -139,11 +101,18 @@ func (app *BinanceChain) WriteRecoveryChunk(chunk [][]byte) error { }, }) - app.Logger.Debug("commit store: %s, root hash: %X\n", storeName, nodeHash) + app.Logger.Debug("commit store", "store", storeName, "hash", nodeHash) nodeDB.Commit() iterated += storeKeys } + // start serve other's state sync request + app.reloadingMtx.Lock() + defer app.reloadingMtx.Unlock() + app.stateCachedHeight = app.stateSyncHeight + app.numKeysCache = app.stateSyncNumKeys + app.chunkCache = chunk + app.Logger.Info("finished write recovery chunk") return nil } @@ -203,3 +172,93 @@ func (app BinanceChain) resetDexKeeper(height int64) { app.DexKeeper.InitRecentPrices(app.CheckState.Ctx) } + +func (app *BinanceChain) initStateSyncManager(enabled bool) { + app.enabledStateSyncReactor = enabled + if enabled { + height := app.getLastBreatheBlockHeight() + go app.reloadSnapshot(height, false) + } +} + +// the method might take quite a while (> 5 seconds), BETTER to be called concurrently +// so we only do it once a day after breathe block +// we will refactor it into split chunks into snapshot file soon +func (app *BinanceChain) reloadSnapshot(height int64, retry bool) { + if app.enabledStateSyncReactor { + app.reloadingMtx.Lock() + defer app.reloadingMtx.Unlock() + + app.latestSnapshotImpl(height, retry) + } +} + +func (app *BinanceChain) getLastBreatheBlockHeight() int64 { + // we should only sync to breathe block height + latestBlockHeight := app.LastBlockHeight() + var timeOfLatestBlock time.Time + if latestBlockHeight == 0 { + timeOfLatestBlock = utils.Now() + } else { + blockDB := baseapp.LoadBlockDB() + defer blockDB.Close() + blockStore := bc.NewBlockStore(blockDB) + block := blockStore.LoadBlock(latestBlockHeight) + timeOfLatestBlock = block.Time + } + + height := app.DexKeeper.GetLastBreatheBlockHeight( + app.CheckState.Ctx, + latestBlockHeight, + timeOfLatestBlock, + app.baseConfig.BreatheBlockInterval, + app.baseConfig.BreatheBlockDaysCountBack) + app.Logger.Info("get last breathe block height", "height", height) + return height +} + +func (app *BinanceChain) latestSnapshotImpl(height int64, retry bool) { + defer func() { + if r := recover(); r != nil { + log := fmt.Sprintf("recovered: %v\nstack:\n%v", r, string(debug.Stack())) + app.Logger.Error("failed loading latest snapshot", "err", log) + } + }() + app.Logger.Info("reload latest snapshot", "height", height) + + failed := true + for failed { + failed = false + totalKeys := int64(0) + app.numKeysCache = make([]int64, 0, len(common.StoreKeyNames)) + app.chunkCache = make([][]byte, 0, app.totalKeyCache) // assuming we didn't increase too many account in a day + + for _, key := range common.StoreKeyNames { + var storeKeys int64 + store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[key]) + mutableTree := store.(*storePkg.IavlStore).Tree + if tree, err := mutableTree.GetImmutable(height); err == nil { + tree.IterateFirst(func(nodeBytes []byte) { + storeKeys++ + app.chunkCache = append(app.chunkCache, nodeBytes) + }) + } else { + app.Logger.Error("failed to load immutable tree", "err", err) + failed = true + time.Sleep(1 * time.Second) // Endblocker has notified this reload snapshot, + // wait for 1 sec after commit finish + if retry { + break + } else { + return + } + } + totalKeys += storeKeys + app.numKeysCache = append(app.numKeysCache, storeKeys) + } + + app.stateCachedHeight = height + app.totalKeyCache = totalKeys + app.Logger.Info("finish read snapshot chunk", "height", height, "keys", totalKeys) + } +}