Skip to content

Commit

Permalink
Merge pull request #120 from michaelwilner/master
Browse files Browse the repository at this point in the history
Support delegation of offset management before consumer group topic consumption begins
  • Loading branch information
twmb authored Jan 6, 2022
2 parents 8585924 + f5414b2 commit 3cb68bf
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ type cfg struct {
onRevoked func(context.Context, *Client, map[string][]int32)
onLost func(context.Context, *Client, map[string][]int32)

adjustOffsetsBeforeAssign func(ctx context.Context, offsets map[string]map[int32]Offset) (map[string]map[int32]Offset, error)

setAssigned bool
setRevoked bool
setLost bool
Expand Down Expand Up @@ -1317,6 +1319,22 @@ func RequireStableFetchOffsets() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.requireStable = true }}
}

// AdjustFetchOffsetsFn sets the function to be called when a group is joined
// after offsets are fetched for those partitions so that a user can adjust them
// before consumption begins.
//
// This function should not exceed the rebalance interval. It is possible
// for the group, immediately after finishing a balance, to re-enter a new balancing
// session. This function is passed a context that is canceled if the current group
// session finishes (i.e., after revoking).
//
// If you are resetting the position of the offset, you may want to clear any existing
// "epoch" with WithEpoch(-1). If the epoch is non-negative, the client performs
// data loss detection, which may result in errors and unexpected behavior.
func AdjustFetchOffsetsFn(adjustOffsetsBeforeAssign func(context.Context, map[string]map[int32]Offset) (map[string]map[int32]Offset, error)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.adjustOffsetsBeforeAssign = adjustOffsetsBeforeAssign }}
}

// OnPartitionsAssigned sets the function to be called when a group is joined
// after partitions are assigned before fetches for those partitions begin.
//
Expand Down
5 changes: 5 additions & 0 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,11 @@ start:
g.cfg.logger.Log(LogLevelWarn, "member was assigned topic that we did not ask for in ConsumeTopics! skipping assigning this topic!", "group", g.cfg.group, "topic", fetchedTopic)
}
}
if g.cfg.adjustOffsetsBeforeAssign != nil {
if offsets, err = g.cfg.adjustOffsetsBeforeAssign(ctx, offsets); err != nil {
return err
}
}

// Lock for assign and then updating uncommitted.
g.c.mu.Lock()
Expand Down

0 comments on commit 3cb68bf

Please sign in to comment.