Skip to content

Commit

Permalink
Merge pull request #496 from binance-chain/state_sync_cache
Browse files Browse the repository at this point in the history
[R4R] state sync cache
  • Loading branch information
ackratos authored Apr 1, 2019
2 parents 23f8226 + 32c608c commit e4553bd
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 66 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
[[override]]
name = "github.com/tendermint/tendermint"
source = "github.com/binance-chain/bnc-tendermint"
version = "=v0.30.1-binance.1"
version = "=v0.30.1-binance.2"

[[constraint]]
name = "github.com/cosmos/cosmos-sdk"
Expand Down
3 changes: 3 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func NewBinanceChain(logger log.Logger, db dbm.DB, traceStore io.Writer, baseApp
app.initDex(tradingPairMapper)
app.initPlugins()
app.initParams()
app.initStateSyncManager(ServerContext.Config.StateSyncReactor)
return app
}

Expand Down Expand Up @@ -458,6 +459,8 @@ func (app *BinanceChain) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) a
param.EndBreatheBlock(ctx, app.ParamHub)
// other end blockers
<-icoDone
// fire and forget reload snapshot
go app.reloadSnapshot(height, true)
}

app.DexKeeper.StoreTradePrices(ctx)
Expand Down
2 changes: 2 additions & 0 deletions app/app_pub_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !race

package app

import (
Expand Down
2 changes: 2 additions & 0 deletions app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !race

package app

import (
Expand Down
2 changes: 2 additions & 0 deletions app/ordertx_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build !race

package app

import (
Expand Down
183 changes: 121 additions & 62 deletions app/statesync_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package app

import (
"fmt"
"runtime/debug"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/baseapp"
storePkg "github.com/cosmos/cosmos-sdk/store"

"github.com/tendermint/iavl"
abci "github.com/tendermint/tendermint/abci/types"
bc "github.com/tendermint/tendermint/blockchain"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"

Expand All @@ -17,78 +20,37 @@ import (
)

type StateSyncManager struct {
enabledStateSyncReactor bool // whether we enabledStateSyncReactor state sync reactor

stateSyncHeight int64
stateSyncNumKeys []int64
stateSyncStoreInfos []storePkg.StoreInfo

reloadingMtx sync.RWMutex // guard below fields to make sure no concurrent load snapshot and response snapshot, and they should be updated atomically

stateCachedHeight int64
numKeysCache []int64
totalKeyCache int64 // cache sum of numKeysCache
chunkCache [][]byte
}

// Implement state sync related ABCI interfaces
func (app *BinanceChain) LatestSnapshot() (height int64, numKeys []int64, err error) {
app.Logger.Info("query latest snapshot")
numKeys = make([]int64, 0, len(common.StoreKeyNames))

// we should only sync to breathe block height
latestBlockHeight := app.LastBlockHeight()
var timeOfLatestBlock time.Time
if latestBlockHeight == 0 {
timeOfLatestBlock = utils.Now()
} else {
block := server.BlockStore.LoadBlock(latestBlockHeight)
timeOfLatestBlock = block.Time
}

height = app.DexKeeper.GetLastBreatheBlockHeight(
app.CheckState.Ctx,
latestBlockHeight,
timeOfLatestBlock,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack)

for _, key := range common.StoreKeyNames {
var storeKeys int64
store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[key])
mutableTree := store.(*storePkg.IavlStore).Tree
if tree, err := mutableTree.GetImmutable(height); err == nil {
tree.IterateFirst(func(nodeBytes []byte) {
storeKeys++
})
} else {
app.Logger.Error("failed to load immutable tree", "err", err)
}
numKeys = append(numKeys, storeKeys)
}
app.reloadingMtx.RLock()
defer app.reloadingMtx.RUnlock()

return
return app.stateCachedHeight, app.numKeysCache, nil
}

func (app *BinanceChain) ReadSnapshotChunk(height int64, startIndex, endIndex int64) (chunk [][]byte, err error) {
app.Logger.Info("read snapshot chunk", "height", height, "startIndex", startIndex, "endIndex", endIndex)
chunk = make([][]byte, 0, endIndex-startIndex)

// TODO: can be optimized - direct jump to expected store
iterated := int64(0)
for _, key := range common.StoreKeyNames {
store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[key])

mutableTree := store.(*storePkg.IavlStore).Tree
if tree, err := mutableTree.GetImmutable(height); err == nil {
tree.IterateFirst(func(nodeBytes []byte) {
if iterated >= startIndex && iterated < endIndex {
chunk = append(chunk, nodeBytes)
}
iterated += 1
})
} else {
app.Logger.Error("failed to load immutable tree", "err", err)
}
}
app.reloadingMtx.RLock()
defer app.reloadingMtx.RUnlock()

if int64(len(chunk)) != (endIndex - startIndex) {
app.Logger.Error("failed to load enough chunk", "expected", endIndex-startIndex, "got", len(chunk))
app.Logger.Info("read snapshot chunk", "height", height, "startIndex", startIndex, "endIndex", endIndex)
if height != app.stateCachedHeight {
return nil, fmt.Errorf("peer requested a stale height we do not have, cacheHeight: %d", app.stateCachedHeight)
}

app.Logger.Info("finish read snapshot chunk", "height", height, "startIndex", startIndex, "endIndex", endIndex)
return
return app.chunkCache[startIndex:endIndex], nil
}

func (app *BinanceChain) StartRecovery(height int64, numKeys []int64) error {
Expand All @@ -102,7 +64,7 @@ func (app *BinanceChain) StartRecovery(height int64, numKeys []int64) error {

func (app *BinanceChain) WriteRecoveryChunk(chunk [][]byte) error {
//store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[storeName])
app.Logger.Info("start write recovery chunk")
app.Logger.Info("start write recovery chunk", "totalKeys", len(chunk))
nodes := make([]*iavl.Node, 0)
for idx := 0; idx < len(chunk); idx++ {
node, _ := iavl.MakeNode(chunk[idx])
Expand Down Expand Up @@ -139,11 +101,18 @@ func (app *BinanceChain) WriteRecoveryChunk(chunk [][]byte) error {
},
})

app.Logger.Debug("commit store: %s, root hash: %X\n", storeName, nodeHash)
app.Logger.Debug("commit store", "store", storeName, "hash", nodeHash)
nodeDB.Commit()
iterated += storeKeys
}

// start serve other's state sync request
app.reloadingMtx.Lock()
defer app.reloadingMtx.Unlock()
app.stateCachedHeight = app.stateSyncHeight
app.numKeysCache = app.stateSyncNumKeys
app.chunkCache = chunk

app.Logger.Info("finished write recovery chunk")
return nil
}
Expand Down Expand Up @@ -203,3 +172,93 @@ func (app BinanceChain) resetDexKeeper(height int64) {
app.DexKeeper.InitRecentPrices(app.CheckState.Ctx)

}

func (app *BinanceChain) initStateSyncManager(enabled bool) {
app.enabledStateSyncReactor = enabled
if enabled {
height := app.getLastBreatheBlockHeight()
go app.reloadSnapshot(height, false)
}
}

// the method might take quite a while (> 5 seconds), BETTER to be called concurrently
// so we only do it once a day after breathe block
// we will refactor it into split chunks into snapshot file soon
func (app *BinanceChain) reloadSnapshot(height int64, retry bool) {
if app.enabledStateSyncReactor {
app.reloadingMtx.Lock()
defer app.reloadingMtx.Unlock()

app.latestSnapshotImpl(height, retry)
}
}

func (app *BinanceChain) getLastBreatheBlockHeight() int64 {
// we should only sync to breathe block height
latestBlockHeight := app.LastBlockHeight()
var timeOfLatestBlock time.Time
if latestBlockHeight == 0 {
timeOfLatestBlock = utils.Now()
} else {
blockDB := baseapp.LoadBlockDB()
defer blockDB.Close()
blockStore := bc.NewBlockStore(blockDB)
block := blockStore.LoadBlock(latestBlockHeight)
timeOfLatestBlock = block.Time
}

height := app.DexKeeper.GetLastBreatheBlockHeight(
app.CheckState.Ctx,
latestBlockHeight,
timeOfLatestBlock,
app.baseConfig.BreatheBlockInterval,
app.baseConfig.BreatheBlockDaysCountBack)
app.Logger.Info("get last breathe block height", "height", height)
return height
}

func (app *BinanceChain) latestSnapshotImpl(height int64, retry bool) {
defer func() {
if r := recover(); r != nil {
log := fmt.Sprintf("recovered: %v\nstack:\n%v", r, string(debug.Stack()))
app.Logger.Error("failed loading latest snapshot", "err", log)
}
}()
app.Logger.Info("reload latest snapshot", "height", height)

failed := true
for failed {
failed = false
totalKeys := int64(0)
app.numKeysCache = make([]int64, 0, len(common.StoreKeyNames))
app.chunkCache = make([][]byte, 0, app.totalKeyCache) // assuming we didn't increase too many account in a day

for _, key := range common.StoreKeyNames {
var storeKeys int64
store := app.GetCommitMultiStore().GetKVStore(common.StoreKeyNameMap[key])
mutableTree := store.(*storePkg.IavlStore).Tree
if tree, err := mutableTree.GetImmutable(height); err == nil {
tree.IterateFirst(func(nodeBytes []byte) {
storeKeys++
app.chunkCache = append(app.chunkCache, nodeBytes)
})
} else {
app.Logger.Error("failed to load immutable tree", "err", err)
failed = true
time.Sleep(1 * time.Second) // Endblocker has notified this reload snapshot,
// wait for 1 sec after commit finish
if retry {
break
} else {
return
}
}
totalKeys += storeKeys
app.numKeysCache = append(app.numKeysCache, storeKeys)
}

app.stateCachedHeight = height
app.totalKeyCache = totalKeys
app.Logger.Info("finish read snapshot chunk", "height", height, "keys", totalKeys)
}
}

0 comments on commit e4553bd

Please sign in to comment.