Skip to content

Commit

Permalink
kgo source: do not use fetch topic IDs if the broker returns no ID
Browse files Browse the repository at this point in the history
Closes #295.
  • Loading branch information
twmb committed Jan 3, 2023
1 parent a2c4bad commit d0c42ad
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,8 @@ type fetchRequest struct {
topic2id map[string][16]byte
id2topic map[[16]byte]string

disableIDs bool // #295: using an old IBP on new Kafka results in ApiVersions advertising 13+ while the broker does not return IDs

// Session is a copy of the source session at the time a request is
// built. If the source is reset, the session it has is reset at the
// field level only. Our view of the original session is still valid.
Expand All @@ -1565,6 +1567,10 @@ func (f *fetchRequest) addCursor(c *cursor) {
f.usedOffsets[c.topic] = partitions
f.id2topic[c.topicID] = c.topic
f.topic2id[c.topic] = c.topicID
var noID [16]byte
if c.topicID == noID {
f.disableIDs = true
}
f.torder = append(f.torder, c.topic)
}
partitions[c.partition] = c.use()
Expand Down Expand Up @@ -1754,8 +1760,13 @@ func (f *fetchRequest) adjustPreferringLag() {
}
}

func (*fetchRequest) Key() int16 { return 1 }
func (*fetchRequest) MaxVersion() int16 { return 13 }
func (*fetchRequest) Key() int16 { return 1 }
func (f *fetchRequest) MaxVersion() int16 {
if f.disableIDs {
return 12
}
return 13
}
func (f *fetchRequest) SetVersion(v int16) { f.version = v }
func (f *fetchRequest) GetVersion() int16 { return f.version }
func (f *fetchRequest) IsFlexible() bool { return f.version >= 12 } // version 12+ is flexible
Expand Down

0 comments on commit d0c42ad

Please sign in to comment.