diff --git a/pkg/indexer/blockchain/rpcLogStreamer.go b/pkg/indexer/blockchain/rpcLogStreamer.go index 560d24af..a01dcdcd 100644 --- a/pkg/indexer/blockchain/rpcLogStreamer.go +++ b/pkg/indexer/blockchain/rpcLogStreamer.go @@ -18,6 +18,7 @@ const ( // Setting to 0 since we are talking about L2s with low reorg risk LAG_FROM_HIGHEST_BLOCK = 0 ERROR_SLEEP_TIME = 100 * time.Millisecond + NO_LOGS_SLEEP_TIME = 1 * time.Second ) // The builder that allows you to configure contract events to listen for @@ -32,9 +33,16 @@ func NewRpcLogStreamBuilder(rpcUrl string, logger *zap.Logger) *RpcLogStreamBuil return &RpcLogStreamBuilder{rpcUrl: rpcUrl, logger: logger} } -func (c *RpcLogStreamBuilder) ListenForContractEvent(fromBlock int, contractAddress common.Address, topics []common.Hash) <-chan types.Log { +func (c *RpcLogStreamBuilder) ListenForContractEvent( + fromBlock int, + contractAddress common.Address, + topics []common.Hash, +) <-chan types.Log { eventChannel := make(chan types.Log, 100) - c.contractConfigs = append(c.contractConfigs, contractConfig{fromBlock, contractAddress, topics, eventChannel}) + c.contractConfigs = append( + c.contractConfigs, + contractConfig{fromBlock, contractAddress, topics, eventChannel}, + ) return eventChannel } @@ -68,7 +76,11 @@ type RpcLogStreamer struct { logger *zap.Logger } -func NewRpcLogStreamer(client ChainClient, logger *zap.Logger, watchers []contractConfig) *RpcLogStreamer { +func NewRpcLogStreamer( + client ChainClient, + logger *zap.Logger, + watchers []contractConfig, +) *RpcLogStreamer { return &RpcLogStreamer{ client: client, watchers: watchers, @@ -97,12 +109,19 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { default: logs, nextBlock, err := r.getNextPage(watcher, fromBlock) if err != nil { - logger.Error("Error getting next page", zap.Int("fromBlock", fromBlock), zap.Error(err)) + logger.Error( + "Error getting next page", + zap.Int("fromBlock", fromBlock), + zap.Error(err), + ) time.Sleep(ERROR_SLEEP_TIME) continue } logger.Info("Got logs", zap.Int("numLogs", len(logs)), zap.Int("fromBlock", fromBlock)) + if len(logs) == 0 { + time.Sleep(NO_LOGS_SLEEP_TIME) + } for _, log := range logs { watcher.channel <- log } @@ -113,7 +132,10 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) { } } -func (r *RpcLogStreamer) getNextPage(config contractConfig, fromBlock int) (logs []types.Log, nextBlock *int, err error) { +func (r *RpcLogStreamer) getNextPage( + config contractConfig, + fromBlock int, +) (logs []types.Log, nextBlock *int, err error) { highestBlock, err := r.client.BlockNumber(r.ctx) if err != nil { return nil, nil, err @@ -144,7 +166,11 @@ func (r *RpcLogStreamer) getNextPage(config contractConfig, fromBlock int) (logs return logs, &nextBlockNumber, nil } -func buildFilterQuery(contractConfig contractConfig, fromBlock int64, toBlock int64) ethereum.FilterQuery { +func buildFilterQuery( + contractConfig contractConfig, + fromBlock int64, + toBlock int64, +) ethereum.FilterQuery { addresses := []common.Address{contractConfig.contractAddress} topics := [][]common.Hash{} for _, topic := range contractConfig.topics {