Skip to content

Commit

Permalink
Last indexed cleanup scribe/agents testing optimization (#1409)
Browse files Browse the repository at this point in the history
  • Loading branch information
nautsimon authored Oct 10, 2023
1 parent cd4aaef commit 2c5d20d
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion services/scribe/cmd/cmd.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/synapsecns/sanguine/services/scribe.svg)](https://pkg.go.dev/github.com/synapsecns/sanguine/services/scribe)
[![Go Report Card](https://goreportcard.com/badge/github.com/synapsecns/sanguine/services/scribe)](https://goreportcard.com/report/github.com/synapsecns/sanguine/services/scribe)

Scribe is a multi-chain indexing service. Scribe is designed to take a list of contracts specified by chain id and store logs, receipts, and txs for every event, past to present, in a mysql database.
Scribe is a multi chain indexing service. Scribe is designed to take a list of contracts specified by chain id and store logs, receipts, and txs for every event, past to present, in a mysql database.

Use cases
- Analytics for on chain events
Expand Down
2 changes: 2 additions & 0 deletions services/scribe/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Config struct {
Chains ChainConfigs `yaml:"chains"`
// RPCURL is the url of the omnirpc.
RPCURL string `yaml:"rpc_url"`
// Verbose is used to enable verbose logging.
Verbose bool `yaml:"verbose"`
}

// IsValid makes sure the config is valid. This is done by calling IsValid() on each
Expand Down
2 changes: 1 addition & 1 deletion services/scribe/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *server) StreamLogs(req *pbscribe.StreamLogsRequest, res pbscribe.Scribe

if latestScribeBlock > toBlock {
nextFromBlock = toBlock + 1
toBlock = latestScribeBlock - 1
toBlock = latestScribeBlock
wait = 0

span.AddEvent("New block. From: " + strconv.Itoa(int(nextFromBlock)) + " To: " + strconv.Itoa(int(toBlock)))
Expand Down
14 changes: 10 additions & 4 deletions services/scribe/service/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (x *Indexer) Index(parentCtx context.Context, startHeight uint64, endHeight
// Reads from the local logsChan and stores the logs and associated receipts / txs.
g.Go(func() error {
concurrentCalls := 0
lastBlockSeen := uint64(0)
gS, storeCtx := errgroup.WithContext(ctx)
// could change this to for - range
for {
Expand Down Expand Up @@ -228,10 +229,15 @@ func (x *Indexer) Index(parentCtx context.Context, startHeight uint64, endHeight
gS, storeCtx = errgroup.WithContext(ctx)
concurrentCalls = 0

err = x.saveLastIndexed(storeCtx, log.BlockNumber)
if err != nil {
logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
return fmt.Errorf("could not store last indexed: %w", err)
// Only update last indexed if all logs from the last block have been processed to prevent premature
// updates of last indexed. Prevents having to lag a block behind on downstream dependencies (agents).
if lastBlockSeen < log.BlockNumber {
err = x.saveLastIndexed(storeCtx, lastBlockSeen)
if err != nil {
logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
return fmt.Errorf("could not store last indexed: %w", err)
}
lastBlockSeen = log.BlockNumber
}

x.blockMeter.Record(ctx, int64(log.BlockNumber), otelMetrics.WithAttributeSet(
Expand Down
4 changes: 2 additions & 2 deletions services/scribe/service/scribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ func (s Scribe) Start(ctx context.Context) error {
cancelChain() // redundant, but clean.
return fmt.Errorf("global scribe context cancel %w", groupCtx.Err())
case <-chainCtx.Done(): // Chain level context cancel, retry and recreate context.
logger.ReportScribeError(fmt.Errorf("chain level scribe context cancel"), chainID, logger.ContextCancelled)
logger.ReportScribeError(fmt.Errorf("chain level scribe context cancel, %w", chainCtx.Err()), chainID, logger.ContextCancelled)
chainCtx, cancelChain = context.WithCancel(ctx)
retryRate = b.Duration()
continue
case <-time.After(retryRate):
err := s.chainIndexers[chainID].Index(groupCtx)
if err != nil {
logger.ReportScribeError(fmt.Errorf("error running chain indexer"), chainID, logger.FatalScribeError)
logger.ReportScribeError(fmt.Errorf("error running chain indexer %w", err), chainID, logger.FatalScribeError)
retryRate = b.Duration()
continue
}
Expand Down

0 comments on commit 2c5d20d

Please sign in to comment.