diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index dd733ec2..f672a714 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -159,6 +159,8 @@ type cfg struct { onRevoked func(context.Context, *Client, map[string][]int32) onLost func(context.Context, *Client, map[string][]int32) + adjustOffsetsBeforeAssign func(ctx context.Context, offsets map[string]map[int32]Offset) (map[string]map[int32]Offset, error) + setAssigned bool setRevoked bool setLost bool @@ -1317,6 +1319,22 @@ func RequireStableFetchOffsets() GroupOpt { return groupOpt{func(cfg *cfg) { cfg.requireStable = true }} } +// AdjustFetchOffsetsFn sets the function to be called when a group is joined +// after offsets are fetched for those partitions so that a user can adjust them +// before consumption begins. +// +// This function should not exceed the rebalance interval. It is possible +// for the group, immediately after finishing a balance, to re-enter a new balancing +// session. This function is passed a context that is canceled if the current group +// session finishes (i.e., after revoking). +// +// If you are resetting the position of the offset, you may want to clear any existing +// "epoch" with WithEpoch(-1). If the epoch is non-negative, the client performs +// data loss detection, which may result in errors and unexpected behavior. +func AdjustFetchOffsetsFn(adjustOffsetsBeforeAssign func(context.Context, map[string]map[int32]Offset) (map[string]map[int32]Offset, error)) GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.adjustOffsetsBeforeAssign = adjustOffsetsBeforeAssign }} +} + // OnPartitionsAssigned sets the function to be called when a group is joined // after partitions are assigned before fetches for those partitions begin. // diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 506eaab3..277012cd 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -1246,6 +1246,11 @@ start: g.cfg.logger.Log(LogLevelWarn, "member was assigned topic that we did not ask for in ConsumeTopics! skipping assigning this topic!", "group", g.cfg.group, "topic", fetchedTopic) } } + if g.cfg.adjustOffsetsBeforeAssign != nil { + if offsets, err = g.cfg.adjustOffsetsBeforeAssign(ctx, offsets); err != nil { + return err + } + } // Lock for assign and then updating uncommitted. g.c.mu.Lock()