Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kversion: detect 3.5 and kraft #497

Merged
merged 2 commits into from
Jul 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 93 additions & 16 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,78 @@ type guessCfg struct {
// Options can be specified to change how version guessing is performed, for
// example, certain keys can be skipped, or the guessing can try evaluating the
// versions as Raft broker based versions.
//
// Internally, this function tries guessing the version against both KRaft and
// Kafka APIs. The more exact match is returned.
func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string {
standard := vs.versionGuess(opts...)
raftBroker := vs.versionGuess(append(opts, TryRaftBroker())...)
raftController := vs.versionGuess(append(opts, TryRaftController())...)

// If any of these are exact, return the exact guess.
for _, g := range []guess{
standard,
raftBroker,
raftController,
} {
if g.how == guessExact {
return g.String()
}
}

// If any are atLeast, that means it is newer than we can guess and we
// return the highest version.
for _, g := range []guess{
standard,
raftBroker,
raftController,
} {
if g.how == guessAtLeast {
return g.String()
}
}

// This is a custom version. We could do some advanced logic to try to
// return highest of all three guesses, but that may be inaccurate:
// KRaft may detect a higher guess because not all requests exist in
// KRaft. Instead, we just return our standard guess.
return standard.String()
}

type guess struct {
v1 string
v2 string // for between
how int8
}

const (
guessExact = iota
guessAtLeast
guessCustomUnknown
guessCustomAtLeast
guessBetween
guessNotEven
)

func (g guess) String() string {
switch g.how {
case guessExact:
return g.v1
case guessAtLeast:
return "at least " + g.v1
case guessCustomUnknown:
return "unknown custom version"
case guessCustomAtLeast:
return "unknown custom version at least " + g.v1
case guessBetween:
return "between " + g.v1 + " and " + g.v2
case guessNotEven:
return "not even " + g.v1
}
return g.v1
}

func (vs *Versions) versionGuess(opts ...VersionGuessOpt) guess {
cfg := guessCfg{
listener: zkBroker,
skipKeys: []int16{4, 5, 6, 7, 27},
Expand Down Expand Up @@ -224,6 +295,7 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string {
{max320, "v3.2"},
{max330, "v3.3"},
{max340, "v3.4"},
{max350, "v3.5"},
} {
for k, v := range comparison.cmp.filter(cfg.listener) {
if v == -1 {
Expand Down Expand Up @@ -286,30 +358,30 @@ func (vs *Versions) VersionGuess(opts ...VersionGuessOpt) string {
case under && over:
// Regardless of equal being true or not, this is a custom version.
if last != "" {
return "unknown custom version at least " + last
return guess{v1: last, how: guessCustomAtLeast}
}
return "unknown custom version"
return guess{v1: last, how: guessCustomUnknown}

case under:
// Regardless of equal being true or not, we have not yet hit
// this version.
if last != "" {
return "between " + last + " and " + current
return guess{v1: last, v2: current, how: guessBetween}
}
return "not even " + current
return guess{v1: current, how: guessNotEven}

case over:
// Regardless of equal being true or not, we try again.
last = current

case equal:
return current
return guess{v1: current, how: guessExact}
}
// At least one of under, equal, or over must be true, so there
// is no default case.
}

return "at least " + last
return guess{v1: last, how: guessAtLeast}
}

// String returns a string representation of the versions; the format may
Expand Down Expand Up @@ -363,6 +435,7 @@ func V3_1_0() *Versions { return zkBrokerOf(max310) }
func V3_2_0() *Versions { return zkBrokerOf(max320) }
func V3_3_0() *Versions { return zkBrokerOf(max330) }
func V3_4_0() *Versions { return zkBrokerOf(max340) }
func V3_5_0() *Versions { return zkBrokerOf(max350) }

func zkBrokerOf(lks listenerKeys) *Versions {
return &Versions{lks.filter(zkBroker)}
Expand Down Expand Up @@ -506,7 +579,7 @@ var max0110 = nextMax(max0102, func(v listenerKeys) listenerKeys {
k(zkBroker, rBroker), // 23 offset for leader epoch KAFKA-1211 0baea2ac13 KIP-101

k(zkBroker, rBroker), // 24 add partitions to txn KAFKA-4990 865d82af2c KIP-98 (raft 3.0 6e857c531f14d07d5b05f174e6063a124c917324)
k(zkBroker), // 25 add offsets to txn (same, same raft)
k(zkBroker, rBroker), // 25 add offsets to txn (same, same raft)
k(zkBroker, rBroker), // 26 end txn (same, same raft)
k(zkBroker, rBroker), // 27 write txn markers (same)
k(zkBroker, rBroker), // 28 txn offset commit (same, same raft)
Expand Down Expand Up @@ -767,8 +840,8 @@ var max270 = nextMax(max260, func(v listenerKeys) listenerKeys {
v[26].inc() // 2 end txn

v = append(v,
k(zkBroker), // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554
k(zkBroker), // 51 alter user scram creds, same
k(zkBroker, rBroker, rController), // 50 describe user scram creds, KAFKA-10259 e8524ccd8fca0caac79b844d87e98e9c055f76fb KIP-554; 38c409cf33c kraft
k(zkBroker, rBroker, rController), // 51 alter user scram creds, same
)

// KAFKA-10435 634c9175054cc69d10b6da22ea1e95edff6a4747 KIP-595
Expand Down Expand Up @@ -960,15 +1033,19 @@ var max340 = nextMax(max330, func(v listenerKeys) listenerKeys {
return v
})

var max350 = nextMax(max340, func(v listenerKeys) listenerKeys {
// KAFKA-13369 7146ac57ba9ddd035dac992b9f188a8e7677c08d KIP-405
v[1].inc() // 14 fetch
v[2].inc() // 8 list offsets

v[1].inc() // 15 fetch // KAFKA-14617 79b5f7f1ce2 KIP-903
v[56].inc() // 3 alter partition // KAFKA-14617 8c88cdb7186b1d594f991eb324356dcfcabdf18a KIP-903
return v
})

var (
maxStable = max340
maxStable = max350
maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys {
// KAFKA-13369 7146ac57ba9ddd035dac992b9f188a8e7677c08d KIP-405
v[1].inc() // 14 fetch // KAFKA-13369 7146ac57ba9 KIP-405
v[2].inc() // 8 list offsets // same

v[1].inc() // 15 fetch // KAFKA-14617 79b5f7f1ce2 KIP-903
v[56].inc() // 3 alter partition // KAFKA-14617 8c88cdb7186b1d594f991eb324356dcfcabdf18a KIP-903
return v
})
)