diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 2e8747e8..a70d1cbb 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -179,7 +179,78 @@ type guessCfg struct { // Options can be specified to change how version guessing is performed, for // example, certain keys can be skipped, or the guessing can try evaluating the // versions as Raft broker based versions. +// +// Internally, this function tries guessing the version against both KRaft and +// Kafka APIs. The more exact match is returned. func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string { + standard := vs.versionGuess(opts...) + raftBroker := vs.versionGuess(append(opts, TryRaftBroker())...) + raftController := vs.versionGuess(append(opts, TryRaftController())...) + + // If any of these are exact, return the exact guess. + for _, g := range []guess{ + standard, + raftBroker, + raftController, + } { + if g.how == guessExact { + return g.String() + } + } + + // If any are atLeast, that means it is newer than we can guess and we + // return the highest version. + for _, g := range []guess{ + standard, + raftBroker, + raftController, + } { + if g.how == guessAtLeast { + return g.String() + } + } + + // This is a custom version. We could do some advanced logic to try to + // return highest of all three guesses, but that may be inaccurate: + // KRaft may detect a higher guess because not all requests exist in + // KRaft. Instead, we just return our standard guess. + return standard.String() +} + +type guess struct { + v1 string + v2 string // for between + how int8 +} + +const ( + guessExact = iota + guessAtLeast + guessCustomUnknown + guessCustomAtLeast + guessBetween + guessNotEven +) + +func (g guess) String() string { + switch g.how { + case guessExact: + return g.v1 + case guessAtLeast: + return "at least " + g.v1 + case guessCustomUnknown: + return "unknown custom version" + case guessCustomAtLeast: + return "unknown custom version at least " + g.v1 + case guessBetween: + return "between " + g.v1 + " and " + g.v2 + case guessNotEven: + return "not even " + g.v1 + } + return g.v1 +} + +func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess { cfg := guessCfg{ listener: zkBroker, skipKeys: []int16{4, 5, 6, 7, 27}, @@ -224,6 +295,7 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string { {max320, "v3.2"}, {max330, "v3.3"}, {max340, "v3.4"}, + {max350, "v3.5"}, } { for k, v := range comparison.cmp.filter(cfg.listener) { if v == -1 { @@ -286,30 +358,30 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string { case under && over: // Regardless of equal being true or not, this is a custom version. if last != "" { - return "unknown custom version at least " + last + return guess{v1: last, how: guessCustomAtLeast} } - return "unknown custom version" + return guess{v1: last, how: guessCustomUnknown} case under: // Regardless of equal being true or not, we have not yet hit // this version. if last != "" { - return "between " + last + " and " + current + return guess{v1: last, v2: current, how: guessBetween} } - return "not even " + current + return guess{v1: current, how: guessNotEven} case over: // Regardless of equal being true or not, we try again. last = current case equal: - return current + return guess{v1: current, how: guessExact} } // At least one of under, equal, or over must be true, so there // is no default case. } - return "at least " + last + return guess{v1: last, how: guessAtLeast} } // String returns a string representation of the versions; the format may @@ -363,6 +435,7 @@ func V3_1_0() *Versions { return zkBrokerOf(max310) } func V3_2_0() *Versions { return zkBrokerOf(max320) } func V3_3_0() *Versions { return zkBrokerOf(max330) } func V3_4_0() *Versions { return zkBrokerOf(max340) } +func V3_5_0() *Versions { return zkBrokerOf(max350) } func zkBrokerOf(lks listenerKeys) *Versions { return &Versions{lks.filter(zkBroker)} @@ -506,7 +579,7 @@ var max0110 = nextMax(max0102, func(v listenerKeys) listenerKeys { k(zkBroker, rBroker), // 23 offset for leader epoch KAFKA-1211 0baea2ac13 KIP-101 k(zkBroker, rBroker), // 24 add partitions to txn KAFKA-4990 865d82af2c KIP-98 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324) - k(zkBroker), // 25 add offsets to txn (same, same raft) + k(zkBroker, rBroker), // 25 add offsets to txn (same, same raft) k(zkBroker, rBroker), // 26 end txn (same, same raft) k(zkBroker, rBroker), // 27 write txn markers (same) k(zkBroker, rBroker), // 28 txn offset commit (same, same raft) @@ -767,8 +840,8 @@ var max270 = nextMax(max260, func(v listenerKeys) listenerKeys { v[26].inc() // 2 end txn v = append(v, - k(zkBroker), // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554 - k(zkBroker), // 51 alter user scram creds, same + k(zkBroker, rBroker, rController), // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554; 38c409cf33c kraft + k(zkBroker, rBroker, rController), // 51 alter user scram creds, same ) // KAFKA-10435 634c9175054cc69d10b6da22ea1e95edff6a4747 KIP-595 @@ -960,15 +1033,19 @@ var max340 = nextMax(max330, func(v listenerKeys) listenerKeys { return v }) +var max350 = nextMax(max340, func(v listenerKeys) listenerKeys { + // KAFKA-13369 7146ac57ba9ddd035dac992b9f188a8e7677c08d KIP-405 + v[1].inc() // 14 fetch + v[2].inc() // 8 list offsets + + v[1].inc() // 15 fetch // KAFKA-14617 79b5f7f1ce2 KIP-903 + v[56].inc() // 3 alter partition // KAFKA-14617 8c88cdb7186b1d594f991eb324356dcfcabdf18a KIP-903 + return v +}) + var ( - maxStable = max340 + maxStable = max350 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { - // KAFKA-13369 7146ac57ba9ddd035dac992b9f188a8e7677c08d KIP-405 - v[1].inc() // 14 fetch // KAFKA-13369 7146ac57ba9 KIP-405 - v[2].inc() // 8 list offsets // same - - v[1].inc() // 15 fetch // KAFKA-14617 79b5f7f1ce2 KIP-903 - v[56].inc() // 3 alter partition // KAFKA-14617 8c88cdb7186b1d594f991eb324356dcfcabdf18a KIP-903 return v }) )