Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuhaai committed Jul 31, 2023
1 parent 913a162 commit d6331c6
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 55 deletions.
26 changes: 11 additions & 15 deletions blockchain/blockdao/blockdao.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,19 @@ func (dao *blockDAO) Start(ctx context.Context) error {

func (dao *blockDAO) checkIndexers(ctx context.Context) error {
checker := NewBlockIndexerChecker(dao)
for i, indexer := range dao.indexers {
if err := checker.CheckIndexer(ctx, indexer, 0, func(height uint64) {
if height%5000 == 0 {
log.L().Info(
"indexer is catching up.",
zap.Int("indexer", i),
zap.Uint64("height", height),
)
}
}); err != nil {
return err
if err := checker.CheckIndexers(ctx, dao.indexers, 0, func(height uint64) {
if height%5000 == 0 {
log.L().Info(
"indexer is catching up.",
zap.Uint64("height", height),
)
}
log.L().Info(
"indexer is up to date.",
zap.Int("indexer", i),
)
}); err != nil {
return err
}
log.L().Info(
"indexers is up to date.",
)
return nil
}

Expand Down
101 changes: 64 additions & 37 deletions blockchain/blockdao/blockindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package blockdao

import (
"context"
"fmt"
"math"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -46,8 +48,11 @@ func NewBlockIndexerChecker(dao BlockDAO) *BlockIndexerChecker {
return &BlockIndexerChecker{dao: dao}
}

// CheckIndexer checks a block indexer against block dao
func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockIndexer, targetHeight uint64, progressReporter func(uint64)) error {
// CheckIndexer checks block indexers against block dao
func (bic *BlockIndexerChecker) CheckIndexers(ctx context.Context, indexers []BlockIndexer, targetHeight uint64, progressReporter func(uint64)) error {
if len(indexers) == 0 {
return nil
}
bcCtx, ok := protocol.GetBlockchainCtx(ctx)
if !ok {
return errors.New("failed to find blockchain ctx")
Expand All @@ -56,32 +61,48 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI
if !ok {
return errors.New("failed to find genesis ctx")
}
tipHeight, err := indexer.Height()
if err != nil {
return err
}
daoTip, err := bic.dao.Height()
if err != nil {
return err
}
if tipHeight > daoTip {
return errors.New("indexer tip height cannot by higher than dao tip height")
}
tipBlk, err := bic.dao.GetBlockByHeight(tipHeight)
if err != nil {
return err
}
if targetHeight == 0 || targetHeight > daoTip {
targetHeight = daoTip
}
startHeight := tipHeight + 1
if indexerWS, ok := indexer.(BlockIndexerWithStart); ok {
indexStartHeight := indexerWS.StartHeight()
if indexStartHeight > startHeight {
startHeight = indexStartHeight
var (
startHeights []uint64
minStartHeight uint64 = math.MaxUint64
)
for i, idx := range indexers {
tipHeight, err := idx.Height()
if err != nil {
return err
}
if tipHeight > daoTip {
return errors.New(fmt.Sprintf("indexer %d tip height cannot by higher than dao tip height", i))
}
startHeight := tipHeight + 1
if indexerWS, ok := idx.(BlockIndexerWithStart); ok {
indexStartHeight := indexerWS.StartHeight()
if indexStartHeight > startHeight {
startHeight = indexStartHeight
}
}
startHeights = append(startHeights, startHeight)
if startHeight < minStartHeight {
minStartHeight = startHeight
}
}
for i := startHeight; i <= targetHeight; i++ {

if minStartHeight == 0 {
panic("minStartHeight is 0")
}

tipBlk, err := bic.dao.GetBlockByHeight(minStartHeight - 1)
if err != nil {
return err
}

for i := minStartHeight; i <= targetHeight; i++ {
blk, err := bic.dao.GetBlockByHeight(i)
if err != nil {
return err
Expand All @@ -104,26 +125,32 @@ func (bic *BlockIndexerChecker) CheckIndexer(ctx context.Context, indexer BlockI
bcCtx.Tip.Hash = g.Hash()
bcCtx.Tip.Timestamp = time.Unix(g.Timestamp, 0)
}
for {
if err = indexer.PutBlock(protocol.WithBlockCtx(
protocol.WithBlockchainCtx(ctx, bcCtx),
protocol.BlockCtx{
BlockHeight: i,
BlockTimeStamp: blk.Timestamp(),
Producer: producer,
GasLimit: g.BlockGasLimit,
},
), blk); err == nil {
break
}
if i < g.HawaiiBlockHeight && errors.Cause(err) == block.ErrDeltaStateMismatch {
log.L().Info("delta state mismatch", zap.Uint64("block", i))

for j, indexer := range indexers {
if i < startHeights[j] {
continue
}
return err
}
if progressReporter != nil {
progressReporter(i)
for {
if err = indexer.PutBlock(protocol.WithBlockCtx(
protocol.WithBlockchainCtx(ctx, bcCtx),
protocol.BlockCtx{
BlockHeight: i,
BlockTimeStamp: blk.Timestamp(),
Producer: producer,
GasLimit: g.BlockGasLimit,
},
), blk); err == nil {
break
}
if i < g.HawaiiBlockHeight && errors.Cause(err) == block.ErrDeltaStateMismatch {
log.L().Info("delta state mismatch", zap.Uint64("block", i))
continue
}
return err
}
if progressReporter != nil {
progressReporter(i)
}
}
tipBlk = blk
}
Expand Down
4 changes: 2 additions & 2 deletions blockchain/blockdao/blockindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestCheckIndexer(t *testing.T) {

ctx := protocol.WithBlockchainCtx(context.Background(), protocol.BlockchainCtx{})
ctx = genesis.WithGenesisContext(ctx, genesis.Default)
err := checker.CheckIndexer(ctx, indexer, 0, func(u uint64) {})
err := checker.CheckIndexers(ctx, []BlockIndexer{indexer}, 0, func(u uint64) {})
require.Equalf(c.noErr, err == nil, "error: %v", err)
require.Len(putBlocks, len(c.expectedPutBlocks))
for k, h := range c.expectedPutBlocks {
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestCheckIndexerWithStart(t *testing.T) {

ctx := protocol.WithBlockchainCtx(context.Background(), protocol.BlockchainCtx{})
ctx = genesis.WithGenesisContext(ctx, genesis.Default)
err := checker.CheckIndexer(ctx, indexer, 0, func(u uint64) {})
err := checker.CheckIndexers(ctx, []BlockIndexer{indexer}, 0, func(u uint64) {})
require.Equalf(c.noErr, err == nil, "error: %v", err)
require.Len(putBlocks, len(c.expectedPutBlocks))
for k, h := range c.expectedPutBlocks {
Expand Down
2 changes: 1 addition & 1 deletion pkg/recovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"runtime/pprof"
"time"

"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/load"
"github.com/shirou/gopsutil/v3/mem"

"github.com/iotexproject/iotex-core/pkg/log"
Expand Down

0 comments on commit d6331c6

Please sign in to comment.