diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 26be8ef1..6582db44 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -195,6 +195,7 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string { var last string cmp := make(map[int16]int16, len(maxTip)) + cmpskip := make(map[int16]int16) for _, comparison := range []struct { cmp listenerKeys name string @@ -222,10 +223,17 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string { {max310, "v3.1"}, {max320, "v3.2"}, {max330, "v3.3"}, + {max340, "v3.4"}, } { for k, v := range comparison.cmp.filter(cfg.listener) { - if !skip[int16(k)] && v != -1 { - cmp[int16(k)] = v + if v == -1 { + continue + } + k16 := int16(k) + if skip[k16] { + cmpskip[k16] = v + } else { + cmp[k16] = v } } @@ -234,7 +242,11 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string { for k, v := range vs.k2v { k16 := int16(k) if skip[k16] { - continue + skipv, ok := cmpskip[k16] + if v == -1 || !ok { + continue + } + cmp[k16] = skipv } cmpv, has := cmp[k16] if has { @@ -350,6 +362,7 @@ func V3_0_0() *Versions { return zkBrokerOf(max300) } func V3_1_0() *Versions { return zkBrokerOf(max310) } func V3_2_0() *Versions { return zkBrokerOf(max320) } func V3_3_0() *Versions { return zkBrokerOf(max330) } +func V3_4_0() *Versions { return zkBrokerOf(max340) } func zkBrokerOf(lks listenerKeys) *Versions { return &Versions{lks.filter(zkBroker)} @@ -934,18 +947,22 @@ var max330 = nextMax(max320, func(v listenerKeys) listenerKeys { return v }) +var max340 = nextMax(max330, func(v listenerKeys) listenerKeys { + // KAFKA-14304 7b7e40a536a79cebf35cc278b9375c8352d342b9 KIP-866 + // KAFKA-14448 67c72596afe58363eceeb32084c5c04637a33831 added BrokerRegistration + // KAFKA-14493 db490707606855c265bc938e1b236070e0e2eba5 changed BrokerRegistration + // KAFKA-14304 0bb05d8679b684ad8fbb2eb40dfc00066186a75a changed BrokerRegistration back to a bool... + // 5b521031edea8ea7cbcca7dc24a58429423740ff added tag to ApiVersions + v[4].inc() // 7 leader and isr + v[5].inc() // 4 stop replica + v[6].inc() // 8 update metadata + v[62].inc() // 1 broker registration + return v +}) + var ( - maxStable = max330 + maxStable = max340 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { - // KAFKA-14304 7b7e40a536a79cebf35cc278b9375c8352d342b9 KIP-866 - // KAFKA-14448 67c72596afe58363eceeb32084c5c04637a33831 added BrokerRegistration - // KAFKA-14493 db490707606855c265bc938e1b236070e0e2eba5 changed BrokerRegistration - // KAFKA-14304 0bb05d8679b684ad8fbb2eb40dfc00066186a75a changed BrokerRegistration back to a bool... - // 5b521031edea8ea7cbcca7dc24a58429423740ff added tag to ApiVersions - v[4].inc() // 7 leader and isr - v[5].inc() // 4 stop replica - v[6].inc() // 8 update metadata - v[62].inc() // 1 broker registration - return v + return maxStable }) )