From d6331c6c6a8a3e5e571157f70b6ef9fbafbe0898 Mon Sep 17 00:00:00 2001 From: Liuhaai Date: Mon, 31 Jul 2023 11:50:13 -0700 Subject: [PATCH] fix --- blockchain/blockdao/blockdao.go | 26 +++--- blockchain/blockdao/blockindexer.go | 101 ++++++++++++++--------- blockchain/blockdao/blockindexer_test.go | 4 +- pkg/recovery/recovery.go | 2 +- 4 files changed, 78 insertions(+), 55 deletions(-) diff --git a/blockchain/blockdao/blockdao.go b/blockchain/blockdao/blockdao.go index f59d5471bf..351f4b2416 100644 --- a/blockchain/blockdao/blockdao.go +++ b/blockchain/blockdao/blockdao.go @@ -94,23 +94,19 @@ func (dao *blockDAO) Start(ctx context.Context) error { func (dao *blockDAO) checkIndexers(ctx context.Context) error { checker := NewBlockIndexerChecker(dao) - for i, indexer := range dao.indexers { - if err := checker.CheckIndexer(ctx, indexer, 0, func(height uint64) { - if height%5000 == 0 { - log.L().Info( - "indexer is catching up.", - zap.Int("indexer", i), - zap.Uint64("height", height), - ) - } - }); err != nil { - return err + if err := checker.CheckIndexers(ctx, dao.indexers, 0, func(height uint64) { + if height%5000 == 0 { + log.L().Info( + "indexer is catching up.", + zap.Uint64("height", height), + ) } - log.L().Info( - "indexer is up to date.", - zap.Int("indexer", i), - ) + }); err != nil { + return err } + log.L().Info( + "indexers is up to date.", + ) return nil } diff --git a/blockchain/blockdao/blockindexer.go b/blockchain/blockdao/blockindexer.go index dfd70241ae..9ecbfd3153 100644 --- a/blockchain/blockdao/blockindexer.go +++ b/blockchain/blockdao/blockindexer.go @@ -7,6 +7,8 @@ package blockdao import ( "context" + "fmt" + "math" "time" "github.com/pkg/errors" @@ -46,8 +48,11 @@ func NewBlockIndexerChecker(dao BlockDAO) *BlockIndexerChecker { return &BlockIndexerChecker{dao: dao} } -// CheckIndexer checks a block indexer against block dao -func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockIndexer, targetHeight uint64, progressReporter func(uint64)) error { +// CheckIndexer checks block indexers against block dao +func (bic *BlockIndexerChecker) CheckIndexers(ctx context.Context, indexers []BlockIndexer, targetHeight uint64, progressReporter func(uint64)) error { + if len(indexers) == 0 { + return nil + } bcCtx, ok := protocol.GetBlockchainCtx(ctx) if !ok { return errors.New("failed to find blockchain ctx") @@ -56,32 +61,48 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI if !ok { return errors.New("failed to find genesis ctx") } - tipHeight, err := indexer.Height() - if err != nil { - return err - } daoTip, err := bic.dao.Height() if err != nil { return err } - if tipHeight > daoTip { - return errors.New("indexer tip height cannot by higher than dao tip height") - } - tipBlk, err := bic.dao.GetBlockByHeight(tipHeight) - if err != nil { - return err - } if targetHeight == 0 || targetHeight > daoTip { targetHeight = daoTip } - startHeight := tipHeight + 1 - if indexerWS, ok := indexer.(BlockIndexerWithStart); ok { - indexStartHeight := indexerWS.StartHeight() - if indexStartHeight > startHeight { - startHeight = indexStartHeight + var ( + startHeights []uint64 + minStartHeight uint64 = math.MaxUint64 + ) + for i, idx := range indexers { + tipHeight, err := idx.Height() + if err != nil { + return err + } + if tipHeight > daoTip { + return errors.New(fmt.Sprintf("indexer %d tip height cannot by higher than dao tip height", i)) + } + startHeight := tipHeight + 1 + if indexerWS, ok := idx.(BlockIndexerWithStart); ok { + indexStartHeight := indexerWS.StartHeight() + if indexStartHeight > startHeight { + startHeight = indexStartHeight + } + } + startHeights = append(startHeights, startHeight) + if startHeight < minStartHeight { + minStartHeight = startHeight } } - for i := startHeight; i <= targetHeight; i++ { + + if minStartHeight == 0 { + panic("minStartHeight is 0") + } + + tipBlk, err := bic.dao.GetBlockByHeight(minStartHeight - 1) + if err != nil { + return err + } + + for i := minStartHeight; i <= targetHeight; i++ { blk, err := bic.dao.GetBlockByHeight(i) if err != nil { return err @@ -104,26 +125,32 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI bcCtx.Tip.Hash = g.Hash() bcCtx.Tip.Timestamp = time.Unix(g.Timestamp, 0) } - for { - if err = indexer.PutBlock(protocol.WithBlockCtx( - protocol.WithBlockchainCtx(ctx, bcCtx), - protocol.BlockCtx{ - BlockHeight: i, - BlockTimeStamp: blk.Timestamp(), - Producer: producer, - GasLimit: g.BlockGasLimit, - }, - ), blk); err == nil { - break - } - if i < g.HawaiiBlockHeight && errors.Cause(err) == block.ErrDeltaStateMismatch { - log.L().Info("delta state mismatch", zap.Uint64("block", i)) + + for j, indexer := range indexers { + if i < startHeights[j] { continue } - return err - } - if progressReporter != nil { - progressReporter(i) + for { + if err = indexer.PutBlock(protocol.WithBlockCtx( + protocol.WithBlockchainCtx(ctx, bcCtx), + protocol.BlockCtx{ + BlockHeight: i, + BlockTimeStamp: blk.Timestamp(), + Producer: producer, + GasLimit: g.BlockGasLimit, + }, + ), blk); err == nil { + break + } + if i < g.HawaiiBlockHeight && errors.Cause(err) == block.ErrDeltaStateMismatch { + log.L().Info("delta state mismatch", zap.Uint64("block", i)) + continue + } + return err + } + if progressReporter != nil { + progressReporter(i) + } } tipBlk = blk } diff --git a/blockchain/blockdao/blockindexer_test.go b/blockchain/blockdao/blockindexer_test.go index d80ae9788e..4c37ab0a3c 100644 --- a/blockchain/blockdao/blockindexer_test.go +++ b/blockchain/blockdao/blockindexer_test.go @@ -72,7 +72,7 @@ func TestCheckIndexer(t *testing.T) { ctx := protocol.WithBlockchainCtx(context.Background(), protocol.BlockchainCtx{}) ctx = genesis.WithGenesisContext(ctx, genesis.Default) - err := checker.CheckIndexer(ctx, indexer, 0, func(u uint64) {}) + err := checker.CheckIndexers(ctx, []BlockIndexer{indexer}, 0, func(u uint64) {}) require.Equalf(c.noErr, err == nil, "error: %v", err) require.Len(putBlocks, len(c.expectedPutBlocks)) for k, h := range c.expectedPutBlocks { @@ -133,7 +133,7 @@ func TestCheckIndexerWithStart(t *testing.T) { ctx := protocol.WithBlockchainCtx(context.Background(), protocol.BlockchainCtx{}) ctx = genesis.WithGenesisContext(ctx, genesis.Default) - err := checker.CheckIndexer(ctx, indexer, 0, func(u uint64) {}) + err := checker.CheckIndexers(ctx, []BlockIndexer{indexer}, 0, func(u uint64) {}) require.Equalf(c.noErr, err == nil, "error: %v", err) require.Len(putBlocks, len(c.expectedPutBlocks)) for k, h := range c.expectedPutBlocks { diff --git a/pkg/recovery/recovery.go b/pkg/recovery/recovery.go index e618e973d4..263ad60da6 100644 --- a/pkg/recovery/recovery.go +++ b/pkg/recovery/recovery.go @@ -12,9 +12,9 @@ import ( "runtime/pprof" "time" - "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" + "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/mem" "github.com/iotexproject/iotex-core/pkg/log"