Skip to content

Commit

Permalink
pkg/kgo: add Client.CommitMarkedOffsets
Browse files Browse the repository at this point in the history
  • Loading branch information
celrenheit committed Jan 24, 2023
1 parent 9d4480e commit 6751589
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *splitConsume) assigned(_ context.Context, cl *kgo.Client, assigned map[

func (s *splitConsume) revoked(ctx context.Context, cl *kgo.Client, revoked map[string][]int32) {
s.killConsumers(revoked)
if err := cl.CommitUncommittedOffsets(ctx); err != nil {
if err := cl.CommitMarkedOffsets(ctx); err != nil {
fmt.Printf("Revoke commit failed: %v\n", err)
}
}
Expand Down
36 changes: 35 additions & 1 deletion pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2352,8 +2352,42 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {
// processing records, you can call this function in a goroutine.
func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
// This function is just the tail end of CommitRecords just above.
return cl.commitOffsets(ctx, cl.UncommittedOffsets())
}

// CommitMarkedOffsets issues a synchronous offset commit for any
// partition that has been consumed from that has marked offsets.
// Retriable errors are retried up to the configured retry limit, and any
// unretriable error is returned.
//
// This function is useful as a simple way to commit offsets if you have
// marked offsets with MarkCommitRecords when using AutoCommitMarks. As an
// alternative if you want to commit specific records, see CommitRecords.
//
// Simple usage of this function may lead to duplicate records if a consumer
// group rebalance occurs before or while this function is being executed. You
// can avoid this scenario by calling CommitRecords in a custom
// OnPartitionsRevoked, but for most workloads, a small bit of potential
// duplicate processing is fine. See the documentation on DisableAutoCommit
// for more details. You can also avoid this problem by using
// BlockRebalanceOnCommit, but that option comes with its own tradeoffs (refer
// to its documentation).
//
// The recommended pattern for using this function is to have a poll / process
// / commit loop. First PollFetches, then process every record,
// call MarkCommitRecords for the records you wish the commit and then call
// CommitMarkedOffsets.
//
// If you do not want to wait for this function to complete before continuing
// processing records, you can call this function in a goroutine.
func (cl *Client) CommitMarkedOffsets(ctx context.Context) error {
// This function is just the tail end of CommitRecords just above.
return cl.commitOffsets(ctx, cl.MarkedOffsets())
}

func (cl *Client) commitOffsets(ctx context.Context, offsets map[string]map[int32]EpochOffset) error {
var rerr error
cl.CommitOffsetsSync(ctx, cl.UncommittedOffsets(), func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
cl.CommitOffsetsSync(ctx, offsets, func(_ *Client, _ *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
if err != nil {
rerr = err
return
Expand Down

0 comments on commit 6751589

Please sign in to comment.