Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last indexed cleanup scribe/agents testing optimization #1409

Merged
merged 6 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/scribe/cmd/cmd.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/synapsecns/sanguine/services/scribe.svg)](https://pkg.go.dev/github.com/synapsecns/sanguine/services/scribe)
[![Go Report Card](https://goreportcard.com/badge/github.com/synapsecns/sanguine/services/scribe)](https://goreportcard.com/report/github.com/synapsecns/sanguine/services/scribe)

Scribe is a multi-chain indexing service. Scribe is designed to take a list of contracts specified by chain id and store logs, receipts, and txs for every event, past to present, in a mysql database.
Scribe is a multi chain indexing service. Scribe is designed to take a list of contracts specified by chain id and store logs, receipts, and txs for every event, past to present, in a mysql database.

Use cases
- Analytics for on chain events
Expand Down
2 changes: 2 additions & 0 deletions services/scribe/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Config struct {
Chains ChainConfigs `yaml:"chains"`
// RPCURL is the url of the omnirpc.
RPCURL string `yaml:"rpc_url"`
// Verbose is used to enable verbose logging.
Verbose bool `yaml:"verbose"`
}

// IsValid makes sure the config is valid. This is done by calling IsValid() on each
Expand Down
2 changes: 1 addition & 1 deletion services/scribe/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *server) StreamLogs(req *pbscribe.StreamLogsRequest, res pbscribe.Scribe

if latestScribeBlock > toBlock {
nextFromBlock = toBlock + 1
toBlock = latestScribeBlock - 1
toBlock = latestScribeBlock
wait = 0

span.AddEvent("New block. From: " + strconv.Itoa(int(nextFromBlock)) + " To: " + strconv.Itoa(int(toBlock)))
Expand Down
14 changes: 10 additions & 4 deletions services/scribe/service/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
// Reads from the local logsChan and stores the logs and associated receipts / txs.
g.Go(func() error {
concurrentCalls := 0
lastBlockSeen := uint64(0)
gS, storeCtx := errgroup.WithContext(ctx)
// could change this to for - range
for {
nautsimon marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -228,10 +229,15 @@
gS, storeCtx = errgroup.WithContext(ctx)
concurrentCalls = 0

err = x.saveLastIndexed(storeCtx, log.BlockNumber)
if err != nil {
logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
return fmt.Errorf("could not store last indexed: %w", err)
// Only update last indexed if all logs from the last block have been processed to prevent premature
// updates of last indexed. Prevents having to lag a block behind on downstream dependencies (agents).
if lastBlockSeen < log.BlockNumber {
err = x.saveLastIndexed(storeCtx, lastBlockSeen)
if err != nil {
logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
return fmt.Errorf("could not store last indexed: %w", err)
}

Check warning on line 239 in services/scribe/service/indexer/indexer.go

View check run for this annotation

Codecov / codecov/patch

services/scribe/service/indexer/indexer.go#L237-L239

Added lines #L237 - L239 were not covered by tests
lastBlockSeen = log.BlockNumber
}

x.blockMeter.Record(ctx, int64(log.BlockNumber), otelMetrics.WithAttributeSet(
Expand Down
4 changes: 2 additions & 2 deletions services/scribe/service/scribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@
cancelChain() // redundant, but clean.
return fmt.Errorf("global scribe context cancel %w", groupCtx.Err())
case <-chainCtx.Done(): // Chain level context cancel, retry and recreate context.
logger.ReportScribeError(fmt.Errorf("chain level scribe context cancel"), chainID, logger.ContextCancelled)
logger.ReportScribeError(fmt.Errorf("chain level scribe context cancel, %w", chainCtx.Err()), chainID, logger.ContextCancelled)

Check warning on line 85 in services/scribe/service/scribe.go

View check run for this annotation

Codecov / codecov/patch

services/scribe/service/scribe.go#L85

Added line #L85 was not covered by tests
chainCtx, cancelChain = context.WithCancel(ctx)
retryRate = b.Duration()
continue
case <-time.After(retryRate):
err := s.chainIndexers[chainID].Index(groupCtx)
if err != nil {
logger.ReportScribeError(fmt.Errorf("error running chain indexer"), chainID, logger.FatalScribeError)
logger.ReportScribeError(fmt.Errorf("error running chain indexer %w", err), chainID, logger.FatalScribeError)

Check warning on line 92 in services/scribe/service/scribe.go

View check run for this annotation

Codecov / codecov/patch

services/scribe/service/scribe.go#L92

Added line #L92 was not covered by tests
retryRate = b.Duration()
continue
}
Expand Down
Loading