From c633db6ace26f5af76e6216e0bf9d5fa8895a68d Mon Sep 17 00:00:00 2001 From: Roman Date: Wed, 4 Sep 2024 08:17:47 -0700 Subject: [PATCH] fix: ingester change set not propagated due to missing registration (#8668) * fix: ingester change set not propagated due to missing registration * updates * Update ingest/indexer/service/indexer_streaming_service.go Co-authored-by: PaddyMc * Update app/app.go Co-authored-by: PaddyMc --------- Co-authored-by: PaddyMc --- app/app.go | 22 ++++++++++++++++++- .../block_process_indexer_factory.go | 9 ++++---- .../block_process_indexer_factory_test.go | 2 +- ..._updates_indexer_block_process_strategy.go | 11 +++++++--- .../service/indexer_streaming_service.go | 8 ++++++- 5 files changed, 42 insertions(+), 10 deletions(-) diff --git a/app/app.go b/app/app.go index b5ee3d1c646..31a650a36a2 100644 --- a/app/app.go +++ b/app/app.go @@ -689,7 +689,12 @@ func NewOsmosisApp( // getSQSServiceWriteListeners returns the write listeners for the app that are specific to the SQS service. func getSQSServiceWriteListeners(app *OsmosisApp, appCodec codec.Codec, blockPoolUpdateTracker domain.BlockPoolUpdateTracker, wasmkeeper *wasmkeeper.Keeper) (map[storetypes.StoreKey][]commondomain.WriteListener, map[string]storetypes.StoreKey) { - return getPoolWriteListeners(app, appCodec, blockPoolUpdateTracker, wasmkeeper) + writeListeners, storeKeyMap := getPoolWriteListeners(app, appCodec, blockPoolUpdateTracker, wasmkeeper) + + // Register all applicable keys as listeners + registerStoreKeys(app, storeKeyMap) + + return writeListeners, storeKeyMap } // getIndexerServiceWriteListeners returns the write listeners for the app that are specific to the indexer service. @@ -703,6 +708,9 @@ func getIndexerServiceWriteListeners(ctx context.Context, app *OsmosisApp, appCo storeKeyMap[banktypes.ModuleName] = app.GetKey(banktypes.ModuleName) + // Register all applicable keys as listeners + registerStoreKeys(app, storeKeyMap) + return writeListeners, storeKeyMap } @@ -732,6 +740,18 @@ func getPoolWriteListeners(app *OsmosisApp, appCodec codec.Codec, blockPoolUpdat return writeListeners, storeKeyMap } +// registerStoreKeys register the store keys from the given store key map +// on the app's commit multi store so that the change sets from these stores are propagated +// in ListenCommit(). +func registerStoreKeys(app *OsmosisApp, storeKeyMap map[string]storetypes.StoreKey) { + // Register all applicable keys as listeners + storeKeys := make([]storetypes.StoreKey, 0) + for _, storeKey := range storeKeyMap { + storeKeys = append(storeKeys, storeKey) + } + app.CommitMultiStore().AddListeners(storeKeys) +} + // we cache the reflectionService to save us time within tests. var cachedReflectionService *runtimeservices.ReflectionService = nil diff --git a/ingest/indexer/service/blockprocessor/block_process_indexer_factory.go b/ingest/indexer/service/blockprocessor/block_process_indexer_factory.go index be4a9706276..b4baf100cec 100644 --- a/ingest/indexer/service/blockprocessor/block_process_indexer_factory.go +++ b/ingest/indexer/service/blockprocessor/block_process_indexer_factory.go @@ -6,7 +6,7 @@ import ( ) // NewBlockProcessor creates a new block process strategy. -func NewBlockProcessor(blockProcessStrategyManager commondomain.BlockProcessStrategyManager, client domain.Publisher, poolExtractor commondomain.PoolExtractor, keepers domain.Keepers) commondomain.BlockProcessor { +func NewBlockProcessor(blockProcessStrategyManager commondomain.BlockProcessStrategyManager, client domain.Publisher, poolExtractor commondomain.PoolExtractor, keepers domain.Keepers, blockUpdateProcessUtils commondomain.BlockUpdateProcessUtilsI) commondomain.BlockProcessor { // Initialize the pool pair publisher poolPairPublisher := NewPairPublisher(client, keepers.PoolManagerKeeper) @@ -24,8 +24,9 @@ func NewBlockProcessor(blockProcessStrategyManager commondomain.BlockProcessStra } return &blockUpdatesIndexerBlockProcessStrategy{ - client: client, - poolExtractor: poolExtractor, - poolPairPublisher: poolPairPublisher, + client: client, + poolExtractor: poolExtractor, + poolPairPublisher: poolPairPublisher, + blockUpdateProcessUtils: blockUpdateProcessUtils, } } diff --git a/ingest/indexer/service/blockprocessor/block_process_indexer_factory_test.go b/ingest/indexer/service/blockprocessor/block_process_indexer_factory_test.go index b1c46db8904..7fb3eba3c6b 100644 --- a/ingest/indexer/service/blockprocessor/block_process_indexer_factory_test.go +++ b/ingest/indexer/service/blockprocessor/block_process_indexer_factory_test.go @@ -56,7 +56,7 @@ func (suite *IndexerBlockProcessorTestSuite) TestNewBlockProcessor() { } // System under test - newBlockProcessor := blockprocessor.NewBlockProcessor(blockStrategyManager, publisherMock, poolsExtracter, domain.Keepers{}) + newBlockProcessor := blockprocessor.NewBlockProcessor(blockStrategyManager, publisherMock, poolsExtracter, domain.Keepers{}, nil) // Check if the block processor is a full block processor isFullBlockProcessor := newBlockProcessor.IsFullBlockProcessor() diff --git a/ingest/indexer/service/blockprocessor/block_updates_indexer_block_process_strategy.go b/ingest/indexer/service/blockprocessor/block_updates_indexer_block_process_strategy.go index 332f049eb30..d4de6bad4d0 100644 --- a/ingest/indexer/service/blockprocessor/block_updates_indexer_block_process_strategy.go +++ b/ingest/indexer/service/blockprocessor/block_updates_indexer_block_process_strategy.go @@ -8,9 +8,10 @@ import ( ) type blockUpdatesIndexerBlockProcessStrategy struct { - client domain.Publisher - poolExtractor commondomain.PoolExtractor - poolPairPublisher domain.PairPublisher + client domain.Publisher + poolExtractor commondomain.PoolExtractor + poolPairPublisher domain.PairPublisher + blockUpdateProcessUtils commondomain.BlockUpdateProcessUtilsI } var _ commondomain.BlockProcessor = &blockUpdatesIndexerBlockProcessStrategy{} @@ -32,6 +33,10 @@ func (f *blockUpdatesIndexerBlockProcessStrategy) ProcessBlock(ctx types.Context // publishChangedPools publishes the pools that were changed in the block. func (f *blockUpdatesIndexerBlockProcessStrategy) publishChangedPools(ctx types.Context) error { + err := f.blockUpdateProcessUtils.ProcessBlockChangeSet() + if err != nil { + return err + } // Extract the pools that were changed in the block blockPools, err := f.poolExtractor.ExtractChanged(ctx) if err != nil { diff --git a/ingest/indexer/service/indexer_streaming_service.go b/ingest/indexer/service/indexer_streaming_service.go index 0ef28eae952..aa114e93e9e 100644 --- a/ingest/indexer/service/indexer_streaming_service.go +++ b/ingest/indexer/service/indexer_streaming_service.go @@ -315,10 +315,16 @@ func (s *indexerStreamingService) ListenCommit(ctx context.Context, res abci.Res defer func() { // Reset the pool tracker after processing the block. s.poolTracker.Reset() + // Reset change set upon processing the block. + s.blockUpdatesProcessUtils.SetChangeSet(nil) }() + // Set change set on the block updates process utils. + // These are processed in ProcessBlock(...) assuming "block updates" strategy. + s.blockUpdatesProcessUtils.SetChangeSet(changeSet) + // Create block processor - blockProcessor := blockprocessor.NewBlockProcessor(s.blockProcessStrategyManager, s.client, s.poolExtractor, s.keepers) + blockProcessor := blockprocessor.NewBlockProcessor(s.blockProcessStrategyManager, s.client, s.poolExtractor, s.keepers, s.blockUpdatesProcessUtils) // Process block. if err := blockProcessor.ProcessBlock(sdkCtx); err != nil {