diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index f961ff5b..7c638a0d 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1546,6 +1546,8 @@ type fetchRequest struct { topic2id map[string][16]byte id2topic map[[16]byte]string + disableIDs bool // #295: using an old IBP on new Kafka results in ApiVersions advertising 13+ while the broker does not return IDs + // Session is a copy of the source session at the time a request is // built. If the source is reset, the session it has is reset at the // field level only. Our view of the original session is still valid. @@ -1565,6 +1567,10 @@ func (f *fetchRequest) addCursor(c *cursor) { f.usedOffsets[c.topic] = partitions f.id2topic[c.topicID] = c.topic f.topic2id[c.topic] = c.topicID + var noID [16]byte + if c.topicID == noID { + f.disableIDs = true + } f.torder = append(f.torder, c.topic) } partitions[c.partition] = c.use() @@ -1754,8 +1760,13 @@ func (f *fetchRequest) adjustPreferringLag() { } } -func (*fetchRequest) Key() int16 { return 1 } -func (*fetchRequest) MaxVersion() int16 { return 13 } +func (*fetchRequest) Key() int16 { return 1 } +func (f *fetchRequest) MaxVersion() int16 { + if f.disableIDs { + return 12 + } + return 13 +} func (f *fetchRequest) SetVersion(v int16) { f.version = v } func (f *fetchRequest) GetVersion() int16 { return f.version } func (f *fetchRequest) IsFlexible() bool { return f.version >= 12 } // version 12+ is flexible