From 2c5d20db9ac6a45e31c7cc289d663e1103a7f875 Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 10 Oct 2023 17:22:27 -0500 Subject: [PATCH] Last indexed cleanup scribe/agents testing optimization (#1409) --- services/scribe/cmd/cmd.md | 2 +- services/scribe/config/config.go | 2 ++ services/scribe/grpc/server/server.go | 2 +- services/scribe/service/indexer/indexer.go | 14 ++++++++++---- services/scribe/service/scribe.go | 4 ++-- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/services/scribe/cmd/cmd.md b/services/scribe/cmd/cmd.md index 890138bd43..6a896ac7af 100644 --- a/services/scribe/cmd/cmd.md +++ b/services/scribe/cmd/cmd.md @@ -2,7 +2,7 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/synapsecns/sanguine/services/scribe.svg)](https://pkg.go.dev/github.com/synapsecns/sanguine/services/scribe) [![Go Report Card](https://goreportcard.com/badge/github.com/synapsecns/sanguine/services/scribe)](https://goreportcard.com/report/github.com/synapsecns/sanguine/services/scribe) -Scribe is a multi-chain indexing service. Scribe is designed to take a list of contracts specified by chain id and store logs, receipts, and txs for every event, past to present, in a mysql database. +Scribe is a multi chain indexing service. Scribe is designed to take a list of contracts specified by chain id and store logs, receipts, and txs for every event, past to present, in a mysql database. Use cases - Analytics for on chain events diff --git a/services/scribe/config/config.go b/services/scribe/config/config.go index 6d059fc501..e8412a96a6 100644 --- a/services/scribe/config/config.go +++ b/services/scribe/config/config.go @@ -16,6 +16,8 @@ type Config struct { Chains ChainConfigs `yaml:"chains"` // RPCURL is the url of the omnirpc. RPCURL string `yaml:"rpc_url"` + // Verbose is used to enable verbose logging. + Verbose bool `yaml:"verbose"` } // IsValid makes sure the config is valid. This is done by calling IsValid() on each diff --git a/services/scribe/grpc/server/server.go b/services/scribe/grpc/server/server.go index 73952d2f3c..f6a8d3a258 100644 --- a/services/scribe/grpc/server/server.go +++ b/services/scribe/grpc/server/server.go @@ -158,7 +158,7 @@ func (s *server) StreamLogs(req *pbscribe.StreamLogsRequest, res pbscribe.Scribe if latestScribeBlock > toBlock { nextFromBlock = toBlock + 1 - toBlock = latestScribeBlock - 1 + toBlock = latestScribeBlock wait = 0 span.AddEvent("New block. From: " + strconv.Itoa(int(nextFromBlock)) + " To: " + strconv.Itoa(int(toBlock))) diff --git a/services/scribe/service/indexer/indexer.go b/services/scribe/service/indexer/indexer.go index a121e6375b..9926a72f9a 100644 --- a/services/scribe/service/indexer/indexer.go +++ b/services/scribe/service/indexer/indexer.go @@ -180,6 +180,7 @@ func (x *Indexer) Index(parentCtx context.Context, startHeight uint64, endHeight // Reads from the local logsChan and stores the logs and associated receipts / txs. g.Go(func() error { concurrentCalls := 0 + lastBlockSeen := uint64(0) gS, storeCtx := errgroup.WithContext(ctx) // could change this to for - range for { @@ -228,10 +229,15 @@ func (x *Indexer) Index(parentCtx context.Context, startHeight uint64, endHeight gS, storeCtx = errgroup.WithContext(ctx) concurrentCalls = 0 - err = x.saveLastIndexed(storeCtx, log.BlockNumber) - if err != nil { - logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError) - return fmt.Errorf("could not store last indexed: %w", err) + // Only update last indexed if all logs from the last block have been processed to prevent premature + // updates of last indexed. Prevents having to lag a block behind on downstream dependencies (agents). + if lastBlockSeen < log.BlockNumber { + err = x.saveLastIndexed(storeCtx, lastBlockSeen) + if err != nil { + logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError) + return fmt.Errorf("could not store last indexed: %w", err) + } + lastBlockSeen = log.BlockNumber } x.blockMeter.Record(ctx, int64(log.BlockNumber), otelMetrics.WithAttributeSet( diff --git a/services/scribe/service/scribe.go b/services/scribe/service/scribe.go index c19b5ddc97..6a0f442d52 100644 --- a/services/scribe/service/scribe.go +++ b/services/scribe/service/scribe.go @@ -82,14 +82,14 @@ func (s Scribe) Start(ctx context.Context) error { cancelChain() // redundant, but clean. return fmt.Errorf("global scribe context cancel %w", groupCtx.Err()) case <-chainCtx.Done(): // Chain level context cancel, retry and recreate context. - logger.ReportScribeError(fmt.Errorf("chain level scribe context cancel"), chainID, logger.ContextCancelled) + logger.ReportScribeError(fmt.Errorf("chain level scribe context cancel, %w", chainCtx.Err()), chainID, logger.ContextCancelled) chainCtx, cancelChain = context.WithCancel(ctx) retryRate = b.Duration() continue case <-time.After(retryRate): err := s.chainIndexers[chainID].Index(groupCtx) if err != nil { - logger.ReportScribeError(fmt.Errorf("error running chain indexer"), chainID, logger.FatalScribeError) + logger.ReportScribeError(fmt.Errorf("error running chain indexer %w", err), chainID, logger.FatalScribeError) retryRate = b.Duration() continue }