Skip to content

Commit

Permalink
fix: ingester change set not propagated due to missing registration (#…
Browse files Browse the repository at this point in the history
…8668)

* fix: ingester change set not propagated due to missing registration

* updates

* Update ingest/indexer/service/indexer_streaming_service.go

Co-authored-by: PaddyMc <[email protected]>

* Update app/app.go

Co-authored-by: PaddyMc <[email protected]>

---------

Co-authored-by: PaddyMc <[email protected]>
  • Loading branch information
p0mvn and PaddyMc authored Sep 4, 2024
1 parent 5503e47 commit c633db6
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 10 deletions.
22 changes: 21 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,12 @@ func NewOsmosisApp(

// getSQSServiceWriteListeners returns the write listeners for the app that are specific to the SQS service.
func getSQSServiceWriteListeners(app *OsmosisApp, appCodec codec.Codec, blockPoolUpdateTracker domain.BlockPoolUpdateTracker, wasmkeeper *wasmkeeper.Keeper) (map[storetypes.StoreKey][]commondomain.WriteListener, map[string]storetypes.StoreKey) {
return getPoolWriteListeners(app, appCodec, blockPoolUpdateTracker, wasmkeeper)
writeListeners, storeKeyMap := getPoolWriteListeners(app, appCodec, blockPoolUpdateTracker, wasmkeeper)

// Register all applicable keys as listeners
registerStoreKeys(app, storeKeyMap)

return writeListeners, storeKeyMap
}

// getIndexerServiceWriteListeners returns the write listeners for the app that are specific to the indexer service.
Expand All @@ -703,6 +708,9 @@ func getIndexerServiceWriteListeners(ctx context.Context, app *OsmosisApp, appCo

storeKeyMap[banktypes.ModuleName] = app.GetKey(banktypes.ModuleName)

// Register all applicable keys as listeners
registerStoreKeys(app, storeKeyMap)

return writeListeners, storeKeyMap
}

Expand Down Expand Up @@ -732,6 +740,18 @@ func getPoolWriteListeners(app *OsmosisApp, appCodec codec.Codec, blockPoolUpdat
return writeListeners, storeKeyMap
}

// registerStoreKeys register the store keys from the given store key map
// on the app's commit multi store so that the change sets from these stores are propagated
// in ListenCommit().
func registerStoreKeys(app *OsmosisApp, storeKeyMap map[string]storetypes.StoreKey) {
// Register all applicable keys as listeners
storeKeys := make([]storetypes.StoreKey, 0)
for _, storeKey := range storeKeyMap {
storeKeys = append(storeKeys, storeKey)
}
app.CommitMultiStore().AddListeners(storeKeys)
}

// we cache the reflectionService to save us time within tests.
var cachedReflectionService *runtimeservices.ReflectionService = nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

// NewBlockProcessor creates a new block process strategy.
func NewBlockProcessor(blockProcessStrategyManager commondomain.BlockProcessStrategyManager, client domain.Publisher, poolExtractor commondomain.PoolExtractor, keepers domain.Keepers) commondomain.BlockProcessor {
func NewBlockProcessor(blockProcessStrategyManager commondomain.BlockProcessStrategyManager, client domain.Publisher, poolExtractor commondomain.PoolExtractor, keepers domain.Keepers, blockUpdateProcessUtils commondomain.BlockUpdateProcessUtilsI) commondomain.BlockProcessor {
// Initialize the pool pair publisher
poolPairPublisher := NewPairPublisher(client, keepers.PoolManagerKeeper)

Expand All @@ -24,8 +24,9 @@ func NewBlockProcessor(blockProcessStrategyManager commondomain.BlockProcessStra
}

return &blockUpdatesIndexerBlockProcessStrategy{
client: client,
poolExtractor: poolExtractor,
poolPairPublisher: poolPairPublisher,
client: client,
poolExtractor: poolExtractor,
poolPairPublisher: poolPairPublisher,
blockUpdateProcessUtils: blockUpdateProcessUtils,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (suite *IndexerBlockProcessorTestSuite) TestNewBlockProcessor() {
}

// System under test
newBlockProcessor := blockprocessor.NewBlockProcessor(blockStrategyManager, publisherMock, poolsExtracter, domain.Keepers{})
newBlockProcessor := blockprocessor.NewBlockProcessor(blockStrategyManager, publisherMock, poolsExtracter, domain.Keepers{}, nil)

// Check if the block processor is a full block processor
isFullBlockProcessor := newBlockProcessor.IsFullBlockProcessor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
)

type blockUpdatesIndexerBlockProcessStrategy struct {
client domain.Publisher
poolExtractor commondomain.PoolExtractor
poolPairPublisher domain.PairPublisher
client domain.Publisher
poolExtractor commondomain.PoolExtractor
poolPairPublisher domain.PairPublisher
blockUpdateProcessUtils commondomain.BlockUpdateProcessUtilsI
}

var _ commondomain.BlockProcessor = &blockUpdatesIndexerBlockProcessStrategy{}
Expand All @@ -32,6 +33,10 @@ func (f *blockUpdatesIndexerBlockProcessStrategy) ProcessBlock(ctx types.Context

// publishChangedPools publishes the pools that were changed in the block.
func (f *blockUpdatesIndexerBlockProcessStrategy) publishChangedPools(ctx types.Context) error {
err := f.blockUpdateProcessUtils.ProcessBlockChangeSet()
if err != nil {
return err
}
// Extract the pools that were changed in the block
blockPools, err := f.poolExtractor.ExtractChanged(ctx)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion ingest/indexer/service/indexer_streaming_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,16 @@ func (s *indexerStreamingService) ListenCommit(ctx context.Context, res abci.Res
defer func() {
// Reset the pool tracker after processing the block.
s.poolTracker.Reset()
// Reset change set upon processing the block.
s.blockUpdatesProcessUtils.SetChangeSet(nil)
}()

// Set change set on the block updates process utils.
// These are processed in ProcessBlock(...) assuming "block updates" strategy.
s.blockUpdatesProcessUtils.SetChangeSet(changeSet)

// Create block processor
blockProcessor := blockprocessor.NewBlockProcessor(s.blockProcessStrategyManager, s.client, s.poolExtractor, s.keepers)
blockProcessor := blockprocessor.NewBlockProcessor(s.blockProcessStrategyManager, s.client, s.poolExtractor, s.keepers, s.blockUpdatesProcessUtils)

// Process block.
if err := blockProcessor.ProcessBlock(sdkCtx); err != nil {
Expand Down

0 comments on commit c633db6

Please sign in to comment.