Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

analyzer/block: Preemptively terminate batch before timeout #605

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
// this time can be picked again. Keep strictly > 1; the analyzer stops processing
// blocks before the lock expires, by a safety margin of 1 minute.
lockExpiryMinutes = 5
// The expected time between blocks being newly created on the blockchain.
// The analyzer will check for new blocks at least this often.
blockGenerationInterval = 6 * time.Second
)

// BlockProcessor is the interface that block-based processors should implement to use them with the
Expand Down Expand Up @@ -286,6 +289,15 @@ func (b *blockBasedAnalyzer) nodeHeight(ctx context.Context) (int, error) {
}
}

// Returns true if `ctx` is about to expire within `margin`, or has already expired.
func expiresWithin(ctx context.Context, margin time.Duration) bool {
deadline, hasDeadline := ctx.Deadline()
if !hasDeadline {
return false
}
return time.Until(deadline) < margin
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Run prework.
Expand Down Expand Up @@ -316,7 +328,7 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Start processing blocks.
backoff, err := util.NewBackoff(
100*time.Millisecond,
6*time.Second, // cap the timeout at the expected consensus block time
blockGenerationInterval, // cap the timeout at the expected consensus block time
)
if err != nil {
b.logger.Error("error configuring backoff policy",
Expand Down Expand Up @@ -416,13 +428,21 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
b.unlockBlocks(ctx, []uint64{height})
continue
}

// If we successfully fetched the node height earlier, update the estimated queue length.
if nodeHeight != -1 {
b.sendQueueLengthMetric(uint64(nodeHeight) - height)
}

cancel()
backoff.Success()
b.logger.Info("processed block", "height", height)

// If the batch context is close to expiring, do not attempt more blocks.
if expiresWithin(batchCtx, processBlockTimeout) {
b.unlockBlocks(ctx, heights)
break
}
}

if len(heights) == 0 {
Expand Down
Loading