Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): enqueue commit offset only once per batch process #14278

Merged

Conversation

cyriltovena
Copy link
Contributor

What this PR does / why we need it:

This ensure we enqueue commit in ingester only once we have processed the batch and not when it was sent to the channel for processing which could cause dataloss if the ingester crashes in between

Which issue(s) this PR fixes:
Fixes https://github.com/grafana/loki-private/issues/1123

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • For Helm chart changes bump the Helm chart version in production/helm/loki/Chart.yaml and update production/helm/loki/CHANGELOG.md and production/helm/loki/README.md. Example PR
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@cyriltovena cyriltovena requested a review from a team as a code owner September 26, 2024 08:35
pkg/ingester/kafka_consumer.go Outdated Show resolved Hide resolved
MaxBackoff: 5 * time.Second,
MaxRetries: 0, // Retry infinitely
})
backoff.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be the same (we don't need the manual step at the start before the backoff loop)?

func retryWithBackoff(ctx context.Context, fn func(attempts int) error) error {
    backoff := backoff.New(ctx, backoff.Config{
		MinBackoff: 100 * time.Millisecond,
		MaxBackoff: 5 * time.Second,
		MaxRetries: 0, // Retry infinitely
	})
    for backoff.Ongoing() {
		err = fn(backoff.NumRetries())
		if err == nil {
			return nil
		}
		if !canRetry(err) {
			return err
		}
		backoff.Wait()
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually avoiding the backoff creation for every record in case its all fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be over-optimization ?

@cyriltovena cyriltovena merged commit beca6f3 into grafana:main Sep 26, 2024
61 checks passed
jeschkies pushed a commit to jeschkies/loki that referenced this pull request Oct 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants