Skip to content

Commit

Permalink
kversion: cut Kafka 3.4
Browse files Browse the repository at this point in the history
Tested with kcl, we can now probe 3.4 successfully.

This required a bit of a change to version guessing (and I really hope
KIP-885 is in by the next release...). Previously, we would skip the few
internal APIs by default. Now, if the ApiVersion response includes an
API we are skipping, we check it.
  • Loading branch information
twmb committed Feb 8, 2023
1 parent ede1adc commit fe727f8
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string {

var last string
cmp := make(map[int16]int16, len(maxTip))
cmpskip := make(map[int16]int16)
for _, comparison := range []struct {
cmp listenerKeys
name string
Expand Down Expand Up @@ -222,10 +223,17 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string {
{max310, "v3.1"},
{max320, "v3.2"},
{max330, "v3.3"},
{max340, "v3.4"},
} {
for k, v := range comparison.cmp.filter(cfg.listener) {
if !skip[int16(k)] && v != -1 {
cmp[int16(k)] = v
if v == -1 {
continue
}
k16 := int16(k)
if skip[k16] {
cmpskip[k16] = v
} else {
cmp[k16] = v
}
}

Expand All @@ -234,7 +242,11 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string {
for k, v := range vs.k2v {
k16 := int16(k)
if skip[k16] {
continue
skipv, ok := cmpskip[k16]
if v == -1 || !ok {
continue
}
cmp[k16] = skipv
}
cmpv, has := cmp[k16]
if has {
Expand Down Expand Up @@ -350,6 +362,7 @@ func V3_0_0() *Versions { return zkBrokerOf(max300) }
func V3_1_0() *Versions { return zkBrokerOf(max310) }
func V3_2_0() *Versions { return zkBrokerOf(max320) }
func V3_3_0() *Versions { return zkBrokerOf(max330) }
func V3_4_0() *Versions { return zkBrokerOf(max340) }

func zkBrokerOf(lks listenerKeys) *Versions {
return &Versions{lks.filter(zkBroker)}
Expand Down Expand Up @@ -934,18 +947,22 @@ var max330 = nextMax(max320, func(v listenerKeys) listenerKeys {
return v
})

var max340 = nextMax(max330, func(v listenerKeys) listenerKeys {
// KAFKA-14304 7b7e40a536a79cebf35cc278b9375c8352d342b9 KIP-866
// KAFKA-14448 67c72596afe58363eceeb32084c5c04637a33831 added BrokerRegistration
// KAFKA-14493 db490707606855c265bc938e1b236070e0e2eba5 changed BrokerRegistration
// KAFKA-14304 0bb05d8679b684ad8fbb2eb40dfc00066186a75a changed BrokerRegistration back to a bool...
// 5b521031edea8ea7cbcca7dc24a58429423740ff added tag to ApiVersions
v[4].inc() // 7 leader and isr
v[5].inc() // 4 stop replica
v[6].inc() // 8 update metadata
v[62].inc() // 1 broker registration
return v
})

var (
maxStable = max330
maxStable = max340
maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys {
// KAFKA-14304 7b7e40a536a79cebf35cc278b9375c8352d342b9 KIP-866
// KAFKA-14448 67c72596afe58363eceeb32084c5c04637a33831 added BrokerRegistration
// KAFKA-14493 db490707606855c265bc938e1b236070e0e2eba5 changed BrokerRegistration
// KAFKA-14304 0bb05d8679b684ad8fbb2eb40dfc00066186a75a changed BrokerRegistration back to a bool...
// 5b521031edea8ea7cbcca7dc24a58429423740ff added tag to ApiVersions
v[4].inc() // 7 leader and isr
v[5].inc() // 4 stop replica
v[6].inc() // 8 update metadata
v[62].inc() // 1 broker registration
return v
return maxStable
})
)

0 comments on commit fe727f8

Please sign in to comment.