Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[blockindex] introduce indexergroup #3906

Merged
merged 14 commits into from
Aug 16, 2023
3 changes: 2 additions & 1 deletion blockindex/sgd_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (

"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-address/address"
"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/action"
"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/genesis"
"github.com/iotexproject/iotex-core/db"
"github.com/iotexproject/iotex-core/state"
"github.com/iotexproject/iotex-core/test/identityset"
"github.com/iotexproject/iotex-core/testutil"
"github.com/stretchr/testify/require"
)

const (
Expand Down
125 changes: 125 additions & 0 deletions blockindex/sync_indexers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) 2023 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package blockindex

import (
"context"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/blockdao"
)

// SyncIndexers is a special index that includes multiple indexes,
// which stay in sync when blocks are added.
type SyncIndexers struct {
indexers []blockdao.BlockIndexer
startHeights []uint64 // start height of each indexer, which will be determined when the indexer is started
minStartHeight uint64 // minimum start height of all indexers
}

// NewSyncIndexers creates a new SyncIndexers
// each indexer will PutBlock one by one in the order of the indexers
func NewSyncIndexers(indexers ...blockdao.BlockIndexer) *SyncIndexers {
return &SyncIndexers{indexers: indexers}
}

// Start starts the indexer group
func (ig *SyncIndexers) Start(ctx context.Context) error {
for _, indexer := range ig.indexers {
if err := indexer.Start(ctx); err != nil {
return err
}
}
return ig.initStartHeight()
}

// Stop stops the indexer group
func (ig *SyncIndexers) Stop(ctx context.Context) error {
for _, indexer := range ig.indexers {
if err := indexer.Stop(ctx); err != nil {
return err
}
}
return nil
}

// PutBlock puts a block into the indexers in the group
func (ig *SyncIndexers) PutBlock(ctx context.Context, blk *block.Block) error {
for i, indexer := range ig.indexers {
// check if the block is higher than the indexer's start height
if blk.Height() < ig.startHeights[i] {
continue
}
// check if the block is higher than the indexer's height
height, err := indexer.Height()
if err != nil {
return err
}
if blk.Height() <= height {
continue
}
// put block
if err := indexer.PutBlock(ctx, blk); err != nil {
return err
}
}
return nil
}

// DeleteTipBlock deletes the tip block from the indexers in the group
func (ig *SyncIndexers) DeleteTipBlock(ctx context.Context, blk *block.Block) error {
for _, indexer := range ig.indexers {
if err := indexer.DeleteTipBlock(ctx, blk); err != nil {
return err
}
}
return nil
}

// StartHeight returns the minimum start height of the indexers in the group
func (ig *SyncIndexers) StartHeight() uint64 {
return ig.minStartHeight
}

// Height returns the minimum height of the indexers in the group
func (ig *SyncIndexers) Height() (uint64, error) {
var height uint64
for i, indexer := range ig.indexers {
h, err := indexer.Height()
if err != nil {
return 0, err
}
if i == 0 || h < height {
height = h
}
}
return height, nil
}

// initStartHeight initializes the start height of the indexers in the group
// for every indexer, the start height is the maximum of tipheight+1 and startheight
func (ig *SyncIndexers) initStartHeight() error {
ig.minStartHeight = 0
ig.startHeights = make([]uint64, len(ig.indexers))
for i, indexer := range ig.indexers {
tipHeight, err := indexer.Height()
if err != nil {
return err
}
indexStartHeight := tipHeight + 1
if indexerWithStart, ok := indexer.(blockdao.BlockIndexerWithStart); ok {
startHeight := indexerWithStart.StartHeight()
if startHeight > indexStartHeight {
indexStartHeight = startHeight
}
}
ig.startHeights[i] = indexStartHeight
if i == 0 || indexStartHeight < ig.minStartHeight {
ig.minStartHeight = indexStartHeight
}
}
return nil
}
218 changes: 218 additions & 0 deletions blockindex/sync_indexers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright (c) 2023 IoTeX Foundation
// This source code is provided 'as is' and no warranties are given as to title or non-infringement, merchantability
// or fitness for purpose and, to the extent permitted by law, all liability for your use of the code is disclaimed.
// This source code is governed by Apache License 2.0 that can be found in the LICENSE file.

package blockindex

import (
"context"
"strconv"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/blockchain/block"
"github.com/iotexproject/iotex-core/blockchain/blockdao"
"github.com/iotexproject/iotex-core/test/identityset"
"github.com/iotexproject/iotex-core/test/mock/mock_blockdao"
)

func TestSyncIndexers_StartHeight(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

cases := []struct {
name string
indexers [][2]uint64 // [startHeight, height]
expect uint64
}{
{"no indexers", nil, 0},
{"one indexer without start height", [][2]uint64{{0, 100}}, 101},
{"one indexer with start height I", [][2]uint64{{100, 200}}, 201},
{"one indexer with start height II", [][2]uint64{{300, 200}}, 300},
{"two indexers with start height I", [][2]uint64{{100, 200}, {200, 300}}, 201},
{"two indexers with start height II", [][2]uint64{{100, 200}, {400, 300}}, 201},
{"two indexers with start height III", [][2]uint64{{100, 350}, {400, 300}}, 351},
{"two indexers one with start height I", [][2]uint64{{0, 1}, {150, 1}}, 2},
{"two indexers one with start height II", [][2]uint64{{0, 1}, {150, 200}}, 2},
{"two indexers one with start height III", [][2]uint64{{0, 200}, {250, 1}}, 201},
{"two indexers one with start height IV", [][2]uint64{{0, 200}, {150, 1}}, 150},
{"two indexers I", [][2]uint64{{0, 5}, {0, 1}}, 2},
{"two indexers II", [][2]uint64{{0, 5}, {0, 5}}, 6},
{"two indexers III", [][2]uint64{{0, 5}, {0, 6}}, 6},
{"multiple indexers I", [][2]uint64{{0, 5}, {0, 6}, {0, 7}}, 6},
{"multiple indexers II", [][2]uint64{{0, 5}, {10, 6}, {0, 7}}, 6},
{"multiple indexers III", [][2]uint64{{10, 5}, {0, 6}, {0, 7}}, 7},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var indexers []blockdao.BlockIndexer
for _, indexer := range c.indexers {
if indexer[0] > 0 {
mockIndexerWithStart := mock_blockdao.NewMockBlockIndexerWithStart(ctrl)
mockIndexerWithStart.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexerWithStart.EXPECT().StartHeight().Return(indexer[0]).Times(1)
mockIndexerWithStart.EXPECT().Height().Return(indexer[1], nil).Times(1)
indexers = append(indexers, mockIndexerWithStart)
} else {
mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl)
mockIndexer.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexer.EXPECT().Height().Return(indexer[1], nil).Times(1)
indexers = append(indexers, mockIndexer)
}
}
ig := NewSyncIndexers(indexers...)
err := ig.Start(context.Background())
require.NoError(err)
height := ig.StartHeight()
require.Equal(c.expect, height)
})
}

}

func TestSyncIndexers_Height(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cases := []struct {
heights []uint64
expect uint64
}{
{[]uint64{}, 0},
{[]uint64{100}, 100},
{[]uint64{100, 100}, 100},
{[]uint64{90, 100}, 90},
{[]uint64{100, 90}, 90},
{[]uint64{100, 100, 100}, 100},
{[]uint64{90, 100, 100}, 90},
{[]uint64{90, 80, 100}, 80},
{[]uint64{90, 80, 70}, 70},
}

for i := range cases {
name := strconv.FormatUint(uint64(i), 10)
t.Run(name, func(t *testing.T) {
var indexers []blockdao.BlockIndexer
for _, height := range cases[i].heights {
mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl)
mockIndexer.EXPECT().Height().Return(height, nil).Times(1)
indexers = append(indexers, mockIndexer)
}
ig := NewSyncIndexers(indexers...)
height, err := ig.Height()
require.NoError(err)
require.Equal(cases[i].expect, height)
})
}
}

func TestSyncIndexers_PutBlock(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

cases := []struct {
indexers [][2]uint64 // [startHeight, height]
blocks []uint64 // blocks to put
expectBlocks [][]uint64 // expect blocks to put on every indexer
}{
{
[][2]uint64{},
[]uint64{100},
[][]uint64{},
},
{
[][2]uint64{{100, 10}},
[]uint64{10, 20, 90, 100, 101},
[][]uint64{{100, 101}},
},
{
[][2]uint64{{100, 210}},
[]uint64{10, 20, 90, 100, 101, 210, 211},
[][]uint64{{211}},
},
{
[][2]uint64{{0, 200}, {250, 1}},
[]uint64{1, 2, 201, 249, 250, 251},
[][]uint64{{201, 249, 250, 251}, {250, 251}},
},
{
[][2]uint64{{0, 250}, {250, 250}},
[]uint64{1, 2, 201, 249, 250, 251, 252},
[][]uint64{{251, 252}, {251, 252}},
},
{
[][2]uint64{{0, 200}, {250, 1}, {300, 1}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301},
[][]uint64{{201, 249, 250, 251, 300, 301}, {250, 251, 300, 301}, {300, 301}},
},
{
[][2]uint64{{0, 250}, {250, 250}, {300, 250}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301},
[][]uint64{{251, 300, 301}, {251, 300, 301}, {300, 301}},
},
{
[][2]uint64{{0, 300}, {250, 300}, {300, 300}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301},
[][]uint64{{301}, {301}, {301}},
},
{
[][2]uint64{{0, 400}, {250, 400}, {300, 400}},
[]uint64{1, 2, 201, 249, 250, 251, 300, 301, 400, 401},
[][]uint64{{401}, {401}, {401}},
},
}

for _, c := range cases {
t.Run("", func(t *testing.T) {
var indexers []blockdao.BlockIndexer
putBlocks := make([][]uint64, len(c.indexers))
indexersHeight := make([]uint64, len(c.indexers))
for id, indexer := range c.indexers {
idx := id
indexersHeight[idx] = indexer[1]
if indexer[0] > 0 {
mockIndexerWithStart := mock_blockdao.NewMockBlockIndexerWithStart(ctrl)
mockIndexerWithStart.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexerWithStart.EXPECT().StartHeight().Return(indexer[0]).Times(1)
mockIndexerWithStart.EXPECT().Height().DoAndReturn(func() (uint64, error) {
return indexersHeight[idx], nil
}).AnyTimes()
mockIndexerWithStart.EXPECT().PutBlock(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blk *block.Block) error {
putBlocks[idx] = append(putBlocks[idx], blk.Height())
indexersHeight[idx] = blk.Height()
return nil
}).Times(len(c.expectBlocks[idx]))
indexers = append(indexers, mockIndexerWithStart)
} else {
mockIndexer := mock_blockdao.NewMockBlockIndexer(ctrl)
mockIndexer.EXPECT().Start(gomock.Any()).Return(nil).Times(1)
mockIndexer.EXPECT().Height().DoAndReturn(func() (uint64, error) {
return indexersHeight[idx], nil
}).AnyTimes()
mockIndexer.EXPECT().PutBlock(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blk *block.Block) error {
putBlocks[idx] = append(putBlocks[idx], blk.Height())
indexersHeight[idx] = blk.Height()
return nil
}).Times(len(c.expectBlocks[idx]))
indexers = append(indexers, mockIndexer)
}
}
ig := NewSyncIndexers(indexers...)
err := ig.Start(context.Background())
require.NoError(err)
for _, blkHeight := range c.blocks {
blk, err := block.NewBuilder(block.RunnableActions{}).SetHeight(blkHeight).SignAndBuild(identityset.PrivateKey(0))
require.NoError(err)
err = ig.PutBlock(context.Background(), &blk)
require.NoError(err)
}
require.Equal(c.expectBlocks, putBlocks)
})
}
}
20 changes: 13 additions & 7 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,19 +257,25 @@ func (builder *Builder) buildBlockDAO(forTest bool) error {
}

var indexers []blockdao.BlockIndexer
indexers = append(indexers, builder.cs.factory)
// indexers in synchronizedIndexers will need to run PutBlock() one by one
synchronizedIndexers := []blockdao.BlockIndexer{builder.cs.factory}
if builder.cs.contractStakingIndexer != nil {
synchronizedIndexers = append(synchronizedIndexers, builder.cs.contractStakingIndexer)
}
if builder.cs.sgdIndexer != nil {
synchronizedIndexers = append(synchronizedIndexers, builder.cs.sgdIndexer)
}
if len(synchronizedIndexers) > 1 {
indexers = append(indexers, blockindex.NewSyncIndexers(synchronizedIndexers...))
} else {
indexers = append(indexers, builder.cs.factory)
}
if !builder.cfg.Chain.EnableAsyncIndexWrite && builder.cs.indexer != nil {
indexers = append(indexers, builder.cs.indexer)
}
if builder.cs.bfIndexer != nil {
indexers = append(indexers, builder.cs.bfIndexer)
}
if builder.cs.sgdIndexer != nil {
indexers = append(indexers, builder.cs.sgdIndexer)
}
if builder.cs.contractStakingIndexer != nil {
indexers = append(indexers, builder.cs.contractStakingIndexer)
}
if forTest {
builder.cs.blockdao = blockdao.NewBlockDAOInMemForTest(indexers)
} else {
Expand Down