Skip to content

Commit

Permalink
all: work around brokers that have inconsistent request versions
Browse files Browse the repository at this point in the history
In some places, the client uses responses from some requests and assumes
that certain features are working. In particular, if the broker returns
a non-negative leader epoch in a metadata response, the client assumes
it can use that leader epoch to validate if a partition was truncated if
it moved to a different broker. That's not always the case, and the
client would permanently loop on failing epoch loading requests.

Now, we only opt in to KIP-320 in certain scenarios if we know that any
broker supports it. This could be finer grained, in that if one broker
in a cluster does not support and others do, we may have the same
failure semantics as before. But, that should ideally only be during
rolling upgrades.

This is basically a patch that should never trigger to work around what
is ideally a temporary bug in brokers.
  • Loading branch information
twmb committed Aug 12, 2021
1 parent f591593 commit fd889cc
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 9 deletions.
21 changes: 21 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,27 @@ func (cl *Client) waitTries(ctx context.Context, tries int) bool {
}
}

// A broker may sometimes indicate it supports offset for leader epoch v2+ when
// it does not. We need to catch that and avoid issuing offset for leader
// epoch, because we will just loop continuously failing.
//
// We do not catch every case, such as when a person explicitly assigns offsets
// with epochs, but we catch a few areas that would be returned from a broker
// itself.
//
// This should only be used *after* at least one successful response.
func (cl *Client) supportsOffsetForLeaderEpoch() bool {
cl.brokersMu.RLock()
defer cl.brokersMu.RUnlock()

for _, b := range cl.brokers {
if v := b.loadVersions(); v != nil && v.versions[23] >= 2 {
return true
}
}
return false
}

// fetchBrokerMetadata issues a metadata request solely for broker information.
func (cl *Client) fetchBrokerMetadata(ctx context.Context) error {
cl.fetchingBrokersMu.Lock()
Expand Down
5 changes: 5 additions & 0 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
// Otherwise, an epoch is specified without an exact
// request which is useless for us, or a request is
// specified without a known epoch.
//
// The client ensures the epoch is non-negative from
// fetch offsets only if the broker supports KIP-320,
// but we do not override the user manually specifying
// an epoch.
if offset.at >= 0 && offset.epoch >= 0 {
loadOffsets.addLoad(topic, partition, loadTypeEpoch, offsetLoad{
replica: -1,
Expand Down
7 changes: 6 additions & 1 deletion pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,11 @@ start:
return err
}

// Even if a leader epoch is returned, if brokers do not support
// OffsetForLeaderEpoch for some reason (odd set of supported reqs), we
// cannot use the returned leader epoch.
kip320 := g.cl.supportsOffsetForLeaderEpoch()

offsets := make(map[string]map[int32]Offset)
for _, rTopic := range resp.Topics {
topicOffsets := make(map[int32]Offset)
Expand Down Expand Up @@ -1104,7 +1109,7 @@ start:
at: rPartition.Offset,
epoch: -1,
}
if resp.Version >= 5 { // KIP-320
if resp.Version >= 5 && kip320 { // KIP-320
offset.epoch = rPartition.LeaderEpoch
}
if rPartition.Offset == -1 {
Expand Down
7 changes: 6 additions & 1 deletion pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,11 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*

topics := make(map[string]*topicPartitionsData, len(meta.Topics))

// Even if metadata returns a leader epoch, we do not use it unless we
// can validate it per OffsetForLeaderEpoch. Some brokers may have an
// odd set of support.
useLeaderEpoch := cl.supportsOffsetForLeaderEpoch()

for i := range meta.Topics {
topicMeta := &meta.Topics[i]

Expand Down Expand Up @@ -378,7 +383,7 @@ func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*
for i := range topicMeta.Partitions {
partMeta := &topicMeta.Partitions[i]
leaderEpoch := partMeta.LeaderEpoch
if meta.Version < 7 {
if meta.Version < 7 || !useLeaderEpoch {
leaderEpoch = -1
}

Expand Down
30 changes: 23 additions & 7 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
reloadOffsets listOrEpochLoads
preferreds []cursorOffsetPreferred
updateMeta bool

kip320 = s.cl.supportsOffsetForLeaderEpoch()
)

for _, rt := range resp.Topics {
Expand Down Expand Up @@ -836,13 +838,23 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
Offset: s.cl.cfg.resetOffset,
})
} else { // partOffset.offset > fp.HighWatermark, KIP-392 case 4
reloadOffsets.addLoad(topic, partition, loadTypeEpoch, offsetLoad{
replica: -1,
Offset: Offset{
at: partOffset.offset,
epoch: partOffset.lastConsumedEpoch,
},
})
if kip320 {
reloadOffsets.addLoad(topic, partition, loadTypeEpoch, offsetLoad{
replica: -1,
Offset: Offset{
at: partOffset.offset,
epoch: partOffset.lastConsumedEpoch,
},
})
} else {
// If the broker does not support offset for leader epoch but
// does support follower fetching for some reason, we have to
// fallback to listing.
reloadOffsets.addLoad(topic, partition, loadTypeList, offsetLoad{
replica: -1,
Offset: s.cl.cfg.resetOffset,
})
}
}

case kerr.FencedLeaderEpoch:
Expand All @@ -853,6 +865,10 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
// If we have consumed nothing, then we got unlucky
// by being fenced right after we grabbed metadata.
// We just refresh metadata and try again.
//
// It would be odd for a broker to reply we are fenced
// but not support offset for leader epoch, so we do
// not check KIP-320 support here.
if partOffset.lastConsumedEpoch >= 0 {
reloadOffsets.addLoad(topic, partition, loadTypeEpoch, offsetLoad{
replica: -1,
Expand Down
3 changes: 3 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ func (old *topicPartition) migrateCursorTo(
// KIP-320: if we had consumed some messages, we need to validate the
// leader epoch on the new broker to see if we experienced data loss
// before we can use this cursor.
//
// Metadata ensures that leaderEpoch is non-negative only if the broker
// supports KIP-320.
if new.leaderEpoch != -1 && old.cursor.lastConsumedEpoch >= 0 {
// Since the cursor consumed messages, it is definitely usable.
// We use it so that the epoch load can finish using it
Expand Down

0 comments on commit fd889cc

Please sign in to comment.