Skip to content

Commit

Permalink
Merge 9efc47c into 55b5a58
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc authored Aug 16, 2023
2 parents 55b5a58 + 9efc47c commit 8a1df8e
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 8 deletions.
3 changes: 2 additions & 1 deletion blockindex/sgd_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (

"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-address/address"
"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/action"
"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/state"
"github.com/iotexproject/iotex-core/test/identityset"
"github.com/iotexproject/iotex-core/testutil"
"github.com/stretchr/testify/require"
)

const (
Expand Down
125 changes: 125 additions & 0 deletions blockindex/sync_indexers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) 2023 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package blockindex

import (
"context"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/blockdao"
)

// SyncIndexers is a special index that includes multiple indexes,
// which stay in sync when blocks are added.
type SyncIndexers struct {
indexers []blockdao.BlockIndexer
startHeights []uint64 // start height of each indexer, which will be determined when the indexer is started
minStartHeight uint64 // minimum start height of all indexers
}

// NewSyncIndexers creates a new SyncIndexers
// each indexer will PutBlock one by one in the order of the indexers
func NewSyncIndexers(indexers ...blockdao.BlockIndexer) *SyncIndexers {
return &SyncIndexers{indexers: indexers}
}

// Start starts the indexer group
func (ig *SyncIndexers) Start(ctx context.Context) error {
for _, indexer := range ig.indexers {
if err := indexer.Start(ctx); err != nil {
return err
}
}
return ig.initStartHeight()
}

// Stop stops the indexer group
func (ig *SyncIndexers) Stop(ctx context.Context) error {
for _, indexer := range ig.indexers {
if err := indexer.Stop(ctx); err != nil {
return err
}
}
return nil
}

// PutBlock puts a block into the indexers in the group
func (ig *SyncIndexers) PutBlock(ctx context.Context, blk *block.Block) error {
for i, indexer := range ig.indexers {
// check if the block is higher than the indexer's start height
if blk.Height() < ig.startHeights[i] {
continue
}
// check if the block is higher than the indexer's height
height, err := indexer.Height()
if err != nil {
return err
}
if blk.Height() <= height {
continue
}
// put block
if err := indexer.PutBlock(ctx, blk); err != nil {
return err
}
}
return nil
}

// DeleteTipBlock deletes the tip block from the indexers in the group
func (ig *SyncIndexers) DeleteTipBlock(ctx context.Context, blk *block.Block) error {
for _, indexer := range ig.indexers {
if err := indexer.DeleteTipBlock(ctx, blk); err != nil {
return err
}
}
return nil
}

// StartHeight returns the minimum start height of the indexers in the group
func (ig *SyncIndexers) StartHeight() uint64 {
return ig.minStartHeight
}

// Height returns the minimum height of the indexers in the group
func (ig *SyncIndexers) Height() (uint64, error) {
var height uint64
for i, indexer := range ig.indexers {
h, err := indexer.Height()
if err != nil {
return 0, err
}
if i == 0 || h < height {
height = h
}
}
return height, nil
}

// initStartHeight initializes the start height of the indexers in the group
// for every indexer, the start height is the maximum of tipheight+1 and startheight
func (ig *SyncIndexers) initStartHeight() error {
ig.minStartHeight = 0
ig.startHeights = make([]uint64, len(ig.indexers))
for i, indexer := range ig.indexers {
tipHeight, err := indexer.Height()
if err != nil {
return err
}
indexStartHeight := tipHeight + 1
if indexerWithStart, ok := indexer.(blockdao.BlockIndexerWithStart); ok {
startHeight := indexerWithStart.StartHeight()
if startHeight > indexStartHeight {
indexStartHeight = startHeight
}
}
ig.startHeights[i] = indexStartHeight
if i == 0 || indexStartHeight < ig.minStartHeight {
ig.minStartHeight = indexStartHeight
}
}
return nil
}
218 changes: 218 additions & 0 deletions blockindex/sync_indexers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright (c) 2023 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package blockindex

import (
"context"
"strconv"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/blockdao"
"github.com/iotexproject/iotex-core/test/identityset"
"github.com/iotexproject/iotex-core/test/mock/mock_blockdao"
)

func TestSyncIndexers_StartHeight(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

cases := []struct {
name string
indexers [][2]uint64 // [startHeight, height]
expect uint64
}{
{"no indexers", nil, 0},
{"one indexer without start height", [][2]uint64{{0, 100}}, 101},
{"one indexer with start height I", [][2]uint64{{100, 200}}, 201},
{"one indexer with start height II", [][2]uint64{{300, 200}}, 300},
{"two indexers with start height I", [][2]uint64{{100, 200}, {200, 300}}, 201},
{"two indexers with start height II", [][2]uint64{{100, 200}, {400, 300}}, 201},
{"two indexers with start height III", [][2]uint64{{100, 350}, {400, 300}}, 351},
{"two indexers one with start height I", [][2]uint64{{0, 1}, {150, 1}}, 2},
{"two indexers one with start height II", [][2]uint64{{0, 1}, {150, 200}}, 2},
{"two indexers one with start height III", [][2]uint64{{0, 200}, {250, 1}}, 201},
{"two indexers one with start height IV", [][2]uint64{{0, 200}, {150, 1}}, 150},
{"two indexers I", [][2]uint64{{0, 5}, {0, 1}}, 2},
{"two indexers II", [][2]uint64{{0, 5}, {0, 5}}, 6},
{"two indexers III", [][2]uint64{{0, 5}, {0, 6}}, 6},
{"multiple indexers I", [][2]uint64{{0, 5}, {0, 6}, {0, 7}}, 6},
{"multiple indexers II", [][2]uint64{{0, 5}, {10, 6}, {0, 7}}, 6},
{"multiple indexers III", [][2]uint64{{10, 5}, {0, 6}, {0, 7}}, 7},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var indexers []blockdao.BlockIndexer
for _, indexer := range c.indexers {
if indexer[0] > 0 {
mockIndexerWithStart := mock_blockdao.NewMockBlockIndexerWithStart(ctrl)
mockIndexerWithStart.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexerWithStart.EXPECT().StartHeight().Return(indexer[0]).Times(1)
mockIndexerWithStart.EXPECT().Height().Return(indexer[1], nil).Times(1)
indexers = append(indexers, mockIndexerWithStart)
} else {
mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl)
mockIndexer.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexer.EXPECT().Height().Return(indexer[1], nil).Times(1)
indexers = append(indexers, mockIndexer)
}
}
ig := NewSyncIndexers(indexers...)
err := ig.Start(context.Background())
require.NoError(err)
height := ig.StartHeight()
require.Equal(c.expect, height)
})
}

}

func TestSyncIndexers_Height(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cases := []struct {
heights []uint64
expect uint64
}{
{[]uint64{}, 0},
{[]uint64{100}, 100},
{[]uint64{100, 100}, 100},
{[]uint64{90, 100}, 90},
{[]uint64{100, 90}, 90},
{[]uint64{100, 100, 100}, 100},
{[]uint64{90, 100, 100}, 90},
{[]uint64{90, 80, 100}, 80},
{[]uint64{90, 80, 70}, 70},
}

for i := range cases {
name := strconv.FormatUint(uint64(i), 10)
t.Run(name, func(t *testing.T) {
var indexers []blockdao.BlockIndexer
for _, height := range cases[i].heights {
mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl)
mockIndexer.EXPECT().Height().Return(height, nil).Times(1)
indexers = append(indexers, mockIndexer)
}
ig := NewSyncIndexers(indexers...)
height, err := ig.Height()
require.NoError(err)
require.Equal(cases[i].expect, height)
})
}
}

func TestSyncIndexers_PutBlock(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cases := []struct {
indexers [][2]uint64 // [startHeight, height]
blocks []uint64 // blocks to put
expectBlocks [][]uint64 // expect blocks to put on every indexer
}{
{
[][2]uint64{},
[]uint64{100},
[][]uint64{},
},
{
[][2]uint64{{100, 10}},
[]uint64{10, 20, 90, 100, 101},
[][]uint64{{100, 101}},
},
{
[][2]uint64{{100, 210}},
[]uint64{10, 20, 90, 100, 101, 210, 211},
[][]uint64{{211}},
},
{
[][2]uint64{{0, 200}, {250, 1}},
[]uint64{1, 2, 201, 249, 250, 251},
[][]uint64{{201, 249, 250, 251}, {250, 251}},
},
{
[][2]uint64{{0, 250}, {250, 250}},
[]uint64{1, 2, 201, 249, 250, 251, 252},
[][]uint64{{251, 252}, {251, 252}},
},
{
[][2]uint64{{0, 200}, {250, 1}, {300, 1}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301},
[][]uint64{{201, 249, 250, 251, 300, 301}, {250, 251, 300, 301}, {300, 301}},
},
{
[][2]uint64{{0, 250}, {250, 250}, {300, 250}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301},
[][]uint64{{251, 300, 301}, {251, 300, 301}, {300, 301}},
},
{
[][2]uint64{{0, 300}, {250, 300}, {300, 300}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301},
[][]uint64{{301}, {301}, {301}},
},
{
[][2]uint64{{0, 400}, {250, 400}, {300, 400}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301, 400, 401},
[][]uint64{{401}, {401}, {401}},
},
}

for _, c := range cases {
t.Run("", func(t *testing.T) {
var indexers []blockdao.BlockIndexer
putBlocks := make([][]uint64, len(c.indexers))
indexersHeight := make([]uint64, len(c.indexers))
for id, indexer := range c.indexers {
idx := id
indexersHeight[idx] = indexer[1]
if indexer[0] > 0 {
mockIndexerWithStart := mock_blockdao.NewMockBlockIndexerWithStart(ctrl)
mockIndexerWithStart.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexerWithStart.EXPECT().StartHeight().Return(indexer[0]).Times(1)
mockIndexerWithStart.EXPECT().Height().DoAndReturn(func() (uint64, error) {
return indexersHeight[idx], nil
}).AnyTimes()
mockIndexerWithStart.EXPECT().PutBlock(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blk *block.Block) error {
putBlocks[idx] = append(putBlocks[idx], blk.Height())
indexersHeight[idx] = blk.Height()
return nil
}).Times(len(c.expectBlocks[idx]))
indexers = append(indexers, mockIndexerWithStart)
} else {
mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl)
mockIndexer.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexer.EXPECT().Height().DoAndReturn(func() (uint64, error) {
return indexersHeight[idx], nil
}).AnyTimes()
mockIndexer.EXPECT().PutBlock(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blk *block.Block) error {
putBlocks[idx] = append(putBlocks[idx], blk.Height())
indexersHeight[idx] = blk.Height()
return nil
}).Times(len(c.expectBlocks[idx]))
indexers = append(indexers, mockIndexer)
}
}
ig := NewSyncIndexers(indexers...)
err := ig.Start(context.Background())
require.NoError(err)
for _, blkHeight := range c.blocks {
blk, err := block.NewBuilder(block.RunnableActions{}).SetHeight(blkHeight).SignAndBuild(identityset.PrivateKey(0))
require.NoError(err)
err = ig.PutBlock(context.Background(), &blk)
require.NoError(err)
}
require.Equal(c.expectBlocks, putBlocks)
})
}
}
21 changes: 14 additions & 7 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,26 @@ func (builder *Builder) buildBlockDAO(forTest bool) error {
}

var indexers []blockdao.BlockIndexer
indexers = append(indexers, builder.cs.factory)
// indexers in synchronizedIndexers will need to run PutBlock() one by one
// factory is dependent on sgdIndexer and contractStakingIndexer, so it should be put in the first place
synchronizedIndexers := []blockdao.BlockIndexer{builder.cs.factory}
if builder.cs.contractStakingIndexer != nil {
synchronizedIndexers = append(synchronizedIndexers, builder.cs.contractStakingIndexer)
}
if builder.cs.sgdIndexer != nil {
synchronizedIndexers = append(synchronizedIndexers, builder.cs.sgdIndexer)
}
if len(synchronizedIndexers) > 1 {
indexers = append(indexers, blockindex.NewSyncIndexers(synchronizedIndexers...))
} else {
indexers = append(indexers, builder.cs.factory)
}
if !builder.cfg.Chain.EnableAsyncIndexWrite && builder.cs.indexer != nil {
indexers = append(indexers, builder.cs.indexer)
}
if builder.cs.bfIndexer != nil {
indexers = append(indexers, builder.cs.bfIndexer)
}
if builder.cs.sgdIndexer != nil {
indexers = append(indexers, builder.cs.sgdIndexer)
}
if builder.cs.contractStakingIndexer != nil {
indexers = append(indexers, builder.cs.contractStakingIndexer)
}
if forTest {
builder.cs.blockdao = blockdao.NewBlockDAOInMemForTest(indexers)
} else {
Expand Down

0 comments on commit 8a1df8e

Please sign in to comment.