Skip to content

Commit

Permalink
fix: indexer etl breakage pushing all pairs every block and overwhelm…
Browse files Browse the repository at this point in the history
…ing collections (backport #8521) (#8522)

Co-authored-by: Roman <[email protected]>
  • Loading branch information
mergify[bot] and p0mvn authored Jul 19, 2024
1 parent aa5aaf8 commit ff89246
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func NewOsmosisApp(

// Create the indexer streaming service.
poolExtractor := poolextractor.New(poolKeepers, poolTracker)
indexerStreamingService := indexerservice.New(writeListeners, blockProcessStrategyManager, indexerPublisher, poolExtractor, keepers, app.GetTxConfig().TxDecoder(), logger)
indexerStreamingService := indexerservice.New(writeListeners, blockProcessStrategyManager, indexerPublisher, poolExtractor, poolTracker, keepers, app.GetTxConfig().TxDecoder(), logger)

// Register the SQS streaming service with the app.
app.SetStreamingService(indexerStreamingService)
Expand Down
12 changes: 11 additions & 1 deletion ingest/indexer/service/indexer_streaming_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
"github.com/osmosis-labs/osmosis/v25/ingest/indexer/domain"
"github.com/osmosis-labs/osmosis/v25/ingest/indexer/service/blockprocessor"
sqsdomain "github.com/osmosis-labs/osmosis/v25/ingest/sqs/domain"
)

var _ baseapp.StreamingService = (*indexerStreamingService)(nil)
Expand All @@ -39,6 +40,8 @@ type indexerStreamingService struct {
// extracts the pools from chain state
poolExtractor commondomain.PoolExtractor

poolTracker sqsdomain.BlockPoolUpdateTracker

logger log.Logger
}

Expand All @@ -47,14 +50,16 @@ type indexerStreamingService struct {
// sqsIngester is an ingester that ingests the block data into SQS.
// poolTracker is a tracker that tracks the pools that were changed in the block.
// nodeStatusChecker is a checker that checks if the node is syncing.
func New(writeListeners map[storetypes.StoreKey][]storetypes.WriteListener, blockProcessStrategyManager commondomain.BlockProcessStrategyManager, client domain.Publisher, poolExtractor commondomain.PoolExtractor, keepers domain.Keepers, txDecoder sdk.TxDecoder, logger log.Logger) baseapp.StreamingService {
func New(writeListeners map[storetypes.StoreKey][]storetypes.WriteListener, blockProcessStrategyManager commondomain.BlockProcessStrategyManager, client domain.Publisher, poolExtractor commondomain.PoolExtractor, poolTracker sqsdomain.BlockPoolUpdateTracker, keepers domain.Keepers, txDecoder sdk.TxDecoder, logger log.Logger) baseapp.StreamingService {
return &indexerStreamingService{
blockProcessStrategyManager: blockProcessStrategyManager,

writeListeners: writeListeners,

poolExtractor: poolExtractor,

poolTracker: poolTracker,

client: client,

keepers: keepers,
Expand Down Expand Up @@ -181,6 +186,11 @@ func (s *indexerStreamingService) ListenEndBlock(ctx context.Context, req types.

sdkCtx := sdk.UnwrapSDKContext(ctx)

defer func() {
// Reset the pool tracker after processing the block.
s.poolTracker.Reset()
}()

// Create block processor
blockProcessor := blockprocessor.NewBlockProcessor(s.blockProcessStrategyManager, s.client, s.poolExtractor, s.keepers)

Expand Down

0 comments on commit ff89246

Please sign in to comment.