Skip to content

Commit

Permalink
autocommitting: only commit previously polled fetches
Browse files Browse the repository at this point in the history
Before this commit, autocommitting could lead to message loss. If
fetches were polled, and then an autocommit happened before any messages
were processed, and then the process crashed, all polled messages would
be lost.

Now, we only allow *previously* polled fetches to be committed. The
assumption is that PollFetches is only called application-side when all
previously polled fetches are finalized within the application
(processed / ready to be committed). This new behavior ensures
at-least-once message processing at all layers, but does have a risk of
duplicates during rebalances.

This new behavior also showed that passing the group context to OnRevoke
was insufficient: if we used the group context to commit when a group is
being left (which triggers OnRevoke before actually leaving), then the
commit would fail. The previous recommendation was to commit before
leaving. We may as well just use the client context. This *does* mean
that users will not be able to detect when a group has exited within the
callback, meaning their callback can take so long that they are booted
from the group, but that's essentially the previous behavior anyway: the
user could do stuff, take so long they're booted, and then fail whatever
they're doing *and* be booted from the group. Now they wont fail
whatever they're doing, but they'll still be booted from the group.
Worst case, we can add the group context back later if the need arises.

We also add a few extra logs: in any OnXyz, we log what we entered the
callback with, and then potentially call into user callbacks.

Closes #57
  • Loading branch information
twmb committed Aug 25, 2021
1 parent 2bc8c4d commit 28bba43
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 90 deletions.
58 changes: 27 additions & 31 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type cfg struct {
setCommitCallback bool

autocommitDisable bool // true if autocommit was disabled or we are transactional
autocommitGreedy bool
autocommitInterval time.Duration
commitCallback func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)
}
Expand Down Expand Up @@ -1132,24 +1133,14 @@ func ConsumeRegex() ConsumerOpt {
// ConsumerGroup sets the consumer group for the client to join and consume in.
// This option is required if using any other group options.
//
// Note that when group consuming, the default is to autocommit every 5s.
// Autocommitting risks losing data if your applications crashes after
// autocommitting but before you have processed polled records. To ensure
// that you lose absolutely no data, you can disable autocommitting and
// manually commit, like so:
//
// if err := cl.CommitUncommittedOffsets(context.Background()) {
// // handle err; unable to commit
// }
//
// The main downside with disabling autocommitting is that you run the risk of
// some duplicate processing of records than necessary. See the documentation
// on DisableAutoCommit for more details.
//
// If you can tolerate a little bit of data loss from crashes because you do
// not expect to ever crash, then relying on autocommitting is a fine option.
// However, if you can tolerate a little bit of duplicate processing, manually
// committing is very easy.
// Note that when group consuming, the default is to autocommit every 5s. To be
// safe, autocommitting only commits what is *previously* polled. If you poll
// once, nothing will be committed. If you poll again, the first poll is
// available to be committed. This ensures at-least-once processing, but does
// mean there is likely some duplicate processing during rebalances. When your
// client shuts down, you should issue one final synchronous commit before
// leaving the group (because you will not be polling again, and you are not
// waiting for an autocommit).
func ConsumerGroup(group string) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.group = group }}
}
Expand Down Expand Up @@ -1237,8 +1228,8 @@ func RequireStableFetchOffsets() GroupOpt {
// interval. It is possible for the group, immediately after finishing a
// balance, to re-enter a new balancing session.
//
// The OnAssigned function is passed the group's context, which is only
// canceled if the group is left or the client is closed.
// The OnAssigned function is passed the client's context, which is only
// canceled if the client is closed.
func OnAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.onAssigned, cfg.setAssigned = onAssigned, true }}
}
Expand All @@ -1251,17 +1242,14 @@ func OnAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) G
// balance, to re-enter a new balancing session.
//
// If autocommit is enabled, the default OnRevoked is a blocking commit all
// offsets. The reason for a blocking commit is so that no later commit cancels
// the blocking commit. If the commit in OnRevoked were canceled, then the
// rebalance would proceed immediately, the commit that canceled the blocking
// commit would fail, and duplicates could be consumed after the rebalance
// completes.
//
// The OnRevoked function is passed the group's context, which is only canceled
// if the group is left or the client is closed. Since OnRevoked is called when
// leaving a group, you likely want to commit before leaving, and to ignore
// context.Canceled / return early if your handling in OnRevoked fails due to
// the context being canceled.
// non-dirty offsets (where dirty is the most recent poll). The reason for a
// blocking commit is so that no later commit cancels the blocking commit. If
// the commit in OnRevoked were canceled, then the rebalance would proceed
// immediately, the commit that canceled the blocking commit would fail, and
// duplicates could be consumed after the rebalance completes.
//
// The OnRevoked function is passed the client's context, which is only
// canceled if the client is closed.
//
// OnRevoked function is called at the end of a group session even if there are
// no partitions being revoked.
Expand Down Expand Up @@ -1308,6 +1296,14 @@ func DisableAutoCommit() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.autocommitDisable = true }}
}

// GreedyAutoCommit opts in to committing everything that has been polled when
// autocommitting (the dirty offsets), rather than committing what has
// previously been polled. This option may result in message loss if your
// application crashes.
func GreedyAutoCommit() GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.autocommitGreedy = true }}
}

// AutoCommitInterval sets how long to go between autocommits, overriding the
// default 5s.
func AutoCommitInterval(interval time.Duration) GroupOpt {
Expand Down
131 changes: 96 additions & 35 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,30 @@ func (c *consumer) initGroup() {
g.cfg.autocommitDisable = true
}

for _, logOn := range []struct {
name string
set *func(context.Context, *Client, map[string][]int32)
}{
{"OnAssigned", &g.cfg.onAssigned},
{"OnRevoked", &g.cfg.onRevoked},
{"OnLost", &g.cfg.onLost},
} {
user := *logOn.set
name := logOn.name
*logOn.set = func(ctx context.Context, cl *Client, m map[string][]int32) {
var ctxExpired bool
select {
case <-ctx.Done():
ctxExpired = true
default:
}
cl.cfg.logger.Log(LogLevelDebug, "entering "+name, "with", m, "context_expired", ctxExpired)
if user != nil {
user(ctx, cl, m)
}
}
}

// For non-regex topics, we explicitly ensure they exist for loading
// metadata. This is of no impact if we are *also* consuming via regex,
// but that is no problem.
Expand Down Expand Up @@ -231,16 +255,16 @@ func (g *groupConsumer) manage() {
// onRevoked, but since we are handling this case for
// the cooperative consumer we may as well just also
// include the eager consumer.
g.cfg.onRevoked(g.ctx, g.cl, g.nowAssigned)
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned)
} else if g.cfg.onLost != nil {
// Any other error is perceived as a fatal error,
// and we go into OnLost as appropriate.
g.cfg.onLost(g.ctx, g.cl, g.nowAssigned)
g.cfg.onLost(g.cl.ctx, g.cl, g.nowAssigned)
hook()

} else if g.cfg.onRevoked != nil {
// If OnLost is not specified, we fallback to OnRevoked.
g.cfg.onRevoked(g.ctx, g.cl, g.nowAssigned)
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned)
hook()
}

Expand Down Expand Up @@ -424,7 +448,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer revoking prior assigned partitions because leaving group", "group", g.cfg.group, "revoking", g.nowAssigned)
}
if g.cfg.onRevoked != nil {
g.cfg.onRevoked(g.ctx, g.cl, g.nowAssigned)
g.cfg.onRevoked(g.cl.ctx, g.cl, g.nowAssigned)
}
g.nowAssigned = nil

Expand Down Expand Up @@ -486,7 +510,7 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
g.cfg.logger.Log(LogLevelInfo, "cooperative consumer calling onRevoke", "group", g.cfg.group, "lost", lost, "stage", stage)
}
if g.cfg.onRevoked != nil {
g.cfg.onRevoked(g.ctx, g.cl, lost)
g.cfg.onRevoked(g.cl.ctx, g.cl, lost)
}
}

Expand Down Expand Up @@ -558,7 +582,7 @@ func (s *assignRevokeSession) assign(g *groupConsumer, newAssigned map[string][]
// We always call on assigned, even if nothing new is
// assigned. This allows consumers to know that
// assignment is done and do setup logic.
g.cfg.onAssigned(g.ctx, g.cl, newAssigned)
g.cfg.onAssigned(g.cl.ctx, g.cl, newAssigned)
}
}()
return s.assignDone
Expand Down Expand Up @@ -1157,6 +1181,7 @@ start:
Offset: offset.at,
}
topicUncommitted[partition] = uncommit{
dirty: committed,
head: committed,
committed: committed,
}
Expand Down Expand Up @@ -1265,8 +1290,9 @@ func (g *groupConsumer) findNewAssignments() {
// The reason head is just past the latest offset is because we want
// to commit TO an offset, not BEFORE an offset.
type uncommit struct {
head EpochOffset
committed EpochOffset
dirty EpochOffset // if autocommitting, what will move to head on next Poll
head EpochOffset // ready to commit
committed EpochOffset // what is committed
}

// EpochOffset combines a record offset with the leader epoch the broker
Expand All @@ -1283,6 +1309,8 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) {
var b bytes.Buffer
debug := g.cfg.logger.Level() >= LogLevelDebug

setHead := g.cfg.autocommitDisable || g.cfg.autocommitGreedy

g.mu.Lock()
defer g.mu.Unlock()

Expand Down Expand Up @@ -1311,19 +1339,28 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) {
}
}

uncommit := topicOffsets[partition.Partition]

// Our new head points just past the final consumed offset,
// that is, if we rejoin, this is the offset to begin at.
newOffset := final.Offset + 1
set := EpochOffset{
final.LeaderEpoch, // -1 if old message / unknown
final.Offset + 1,
}
prior := topicOffsets[partition.Partition]

if debug {
fmt.Fprintf(&b, "%d{%d=>%d}, ", partition.Partition, uncommit.head.Offset, newOffset)
if setHead {
fmt.Fprintf(&b, "%d{%d=>%d}, ", partition.Partition, prior.head.Offset, set.Offset)
} else {
fmt.Fprintf(&b, "%d{%d=>%d=>%d}, ", partition.Partition, prior.head.Offset, prior.dirty.Offset, set.Offset)
}
}
uncommit.head = EpochOffset{
final.LeaderEpoch, // -1 if old message / unknown
newOffset,

prior.head = prior.dirty
prior.dirty = set
if setHead {
prior.head = set
}
topicOffsets[partition.Partition] = uncommit
topicOffsets[partition.Partition] = prior
}

if debug {
Expand Down Expand Up @@ -1412,10 +1449,28 @@ func (g *groupConsumer) updateCommitted(
fmt.Fprintf(&b, "%d{%d=>%d}, ", reqPart.Partition, uncommit.committed.Offset, reqPart.Offset)
}

uncommit.committed = EpochOffset{
set := EpochOffset{
reqPart.LeaderEpoch,
reqPart.Offset,
}
uncommit.committed = set

// We always commit either dirty offsets or head
// offsets. For sanity, we bump both dirty/head to the
// commit if they are before the commit. We only expect
// head to be before the commit, if committing manually
// through UncommittedOffsets in an OnRevoke with
// autocommitting enabled.
for _, next := range []*EpochOffset{
&uncommit.head,
&uncommit.committed,
} {
if set.Epoch > next.Epoch ||
set.Epoch == next.Epoch &&
set.Offset > next.Offset {
*next = set
}
}
topic[respPart.Partition] = uncommit
}

Expand Down Expand Up @@ -1473,10 +1528,14 @@ func (g *groupConsumer) loopCommit() {
// after the group context is canceled (which is the first
// thing that happens so as to quit the manage loop before
// leaving a group).
//
// We always commit only the head. If we are autocommitting
// dirty, then updateUncommitted updates the head to dirty
// offsets.
g.mu.Lock()
if !g.blockAuto {
g.cfg.logger.Log(LogLevelDebug, "autocommitting", "group", g.cfg.group)
g.commit(g.ctx, g.getUncommittedLocked(true), g.cfg.commitCallback)
g.commit(g.ctx, g.getUncommittedLocked(true, false), g.cfg.commitCallback)
}
g.mu.Unlock()
}
Expand Down Expand Up @@ -1536,22 +1595,20 @@ func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) {
var topicAssigns map[int32]Offset
for partition, epochOffset := range partitions {
current, exists := topicUncommitted[partition]
if exists && current.head == epochOffset {
current.committed = epochOffset
topicUncommitted[partition] = current
continue
topicUncommitted[partition] = uncommit{
dirty: epochOffset,
head: epochOffset,
committed: epochOffset,
}
if topicAssigns == nil {
if exists && current.dirty == epochOffset {
continue
} else if topicAssigns == nil {
topicAssigns = make(map[int32]Offset, len(partitions))
}
topicAssigns[partition] = Offset{
at: epochOffset.Offset,
epoch: epochOffset.Epoch,
}
topicUncommitted[partition] = uncommit{
head: epochOffset,
committed: epochOffset,
}
}
if len(topicAssigns) > 0 {
if assigns == nil {
Expand Down Expand Up @@ -1582,7 +1639,7 @@ func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset) {
// may fail with REBALANCE_IN_PROGRESS.
func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset {
if g := cl.consumer.g; g != nil {
return g.getUncommitted()
return g.getUncommitted(true)
}
return nil
}
Expand All @@ -1599,16 +1656,16 @@ func (cl *Client) CommittedOffsets() map[string]map[int32]EpochOffset {
g.mu.Lock()
defer g.mu.Unlock()

return g.getUncommittedLocked(false)
return g.getUncommittedLocked(false, false)
}

func (g *groupConsumer) getUncommitted() map[string]map[int32]EpochOffset {
func (g *groupConsumer) getUncommitted(dirty bool) map[string]map[int32]EpochOffset {
g.mu.Lock()
defer g.mu.Unlock()
return g.getUncommittedLocked(true)
return g.getUncommittedLocked(true, dirty)
}

func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]EpochOffset {
func (g *groupConsumer) getUncommittedLocked(head, dirty bool) map[string]map[int32]EpochOffset {
if g.uncommitted == nil {
return nil
}
Expand All @@ -1617,7 +1674,7 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo
for topic, partitions := range g.uncommitted {
var topicUncommitted map[int32]EpochOffset
for partition, uncommit := range partitions {
if head && uncommit.head == uncommit.committed {
if head && uncommit.dirty == uncommit.committed {
continue
}
if topicUncommitted == nil {
Expand All @@ -1631,7 +1688,11 @@ func (g *groupConsumer) getUncommittedLocked(head bool) map[string]map[int32]Epo
}
}
if head {
topicUncommitted[partition] = uncommit.head
if dirty {
topicUncommitted[partition] = uncommit.dirty
} else {
topicUncommitted[partition] = uncommit.head
}
} else {
topicUncommitted[partition] = uncommit.committed
}
Expand Down Expand Up @@ -1917,7 +1978,7 @@ func (g *groupConsumer) defaultRevoke(context.Context, *Client, map[string][]int
// We use the client's context rather than the group context,
// because this could come from the group being left. The group
// context will already be canceled.
g.commitOffsetsSync(g.cl.ctx, g.getUncommitted(), g.cfg.commitCallback)
g.commitOffsetsSync(g.cl.ctx, g.getUncommitted(false), g.cfg.commitCallback)
}
}

Expand Down
Loading

0 comments on commit 28bba43

Please sign in to comment.