Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last indexed cleanup scribe/agents testing optimization #1409

Merged
merged 6 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/scribe/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *server) StreamLogs(req *pbscribe.StreamLogsRequest, res pbscribe.Scribe

if latestScribeBlock > toBlock {
nextFromBlock = toBlock + 1
toBlock = latestScribeBlock - 1
toBlock = latestScribeBlock
wait = 0

span.AddEvent("New block. From: " + strconv.Itoa(int(nextFromBlock)) + " To: " + strconv.Itoa(int(toBlock)))
Expand Down
14 changes: 10 additions & 4 deletions services/scribe/service/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
// Reads from the local logsChan and stores the logs and associated receipts / txs.
g.Go(func() error {
concurrentCalls := 0
lastBlockSeen := uint64(0)
gS, storeCtx := errgroup.WithContext(ctx)
// could change this to for - range
for {
nautsimon marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -228,10 +229,15 @@
gS, storeCtx = errgroup.WithContext(ctx)
concurrentCalls = 0

err = x.saveLastIndexed(storeCtx, log.BlockNumber)
if err != nil {
logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
return fmt.Errorf("could not store last indexed: %w", err)
// Only update last indexed if all logs from the last block have been processed to prevent premature
// updates of last indexed. Prevents having to lag a block behind on downstream dependencies (agents).
if lastBlockSeen < log.BlockNumber {
err = x.saveLastIndexed(storeCtx, lastBlockSeen)
if err != nil {
logger.ReportIndexerError(err, x.indexerConfig, logger.StoreError)
return fmt.Errorf("could not store last indexed: %w", err)
}

Check warning on line 239 in services/scribe/service/indexer/indexer.go

View check run for this annotation

Codecov / codecov/patch

services/scribe/service/indexer/indexer.go#L237-L239

Added lines #L237 - L239 were not covered by tests
lastBlockSeen = log.BlockNumber
}

x.blockMeter.Record(ctx, int64(log.BlockNumber), otelMetrics.WithAttributeSet(
Expand Down
Loading