diff --git a/generate/definitions/04_leader_and_isr b/generate/definitions/04_leader_and_isr index 2204a0f3..9741d337 100644 --- a/generate/definitions/04_leader_and_isr +++ b/generate/definitions/04_leader_and_isr @@ -31,8 +31,11 @@ LeaderAndISRResponseTopicPartition => not top level, no encoding, flexible v4+ // Kafka 1.0 introduced version 1. Kafka 2.2 introduced version 2, proposed // in KIP-380, which changed the layout of the struct to be more memory // efficient. Kafka 2.4.0 introduced version 3 with KIP-455. -LeaderAndISRRequest => key 4, max version 6, flexible v4+ +// Kafka 3.4 introduced version 7 with KIP-866. +LeaderAndISRRequest => key 4, max version 7, flexible v4+ ControllerID: int32 + // If KRaft controller id is used during migration. See KIP-866. + IsKRaftController: bool // v7+ ControllerEpoch: int32 BrokerEpoch: int64(-1) // v2+ Type: int8 // v5+ diff --git a/generate/definitions/05_stop_replica b/generate/definitions/05_stop_replica index 9d5379ed..f0f66cb8 100644 --- a/generate/definitions/05_stop_replica +++ b/generate/definitions/05_stop_replica @@ -8,9 +8,12 @@ // // Kafka 2.6 introduced version 3, proposed in KIP-570, reorganizes partitions // to be stored and adds the leader epoch and delete partition fields per partition. -StopReplicaRequest => key 5, max version 3, flexible v2+ +// Kafka 3.4 introduced version 4 with KIP-866. +StopReplicaRequest => key 5, max version 4, flexible v2+ ControllerID: int32 ControllerEpoch: int32 + // If KRaft controller id is used during migration. See KIP-866. + IsKRaftController: bool // v4+ BrokerEpoch: int64(-1) // v1+ DeletePartitions: bool // v0-v2 Topics: [=>] diff --git a/generate/definitions/06_update_metadata b/generate/definitions/06_update_metadata index a65b7ab4..cf6ea1d3 100644 --- a/generate/definitions/06_update_metadata +++ b/generate/definitions/06_update_metadata @@ -19,8 +19,11 @@ UpdateMetadataRequestTopicPartition => not top level, no encoding, flexible v6+ // // Kafka 2.2 introduced version 5, proposed in KIP-380, which changed the // layout of the struct to be more memory efficient. -UpdateMetadataRequest => key 6, max version 7, flexible v6+ +// Kafka 3.4 introduced version 8 with KIP-866. +UpdateMetadataRequest => key 6, max version 8, flexible v6+ ControllerID: int32 + // If KRaft controller id is used during migration. See KIP-866. + IsKRaftController: bool // v8+ ControllerEpoch: int32 BrokerEpoch: int64(-1) // v5+ PartitionStates: [UpdateMetadataRequestTopicPartition] // v0-v4 diff --git a/generate/definitions/18_api_versions b/generate/definitions/18_api_versions index 49d1c9f0..7da9869d 100644 --- a/generate/definitions/18_api_versions +++ b/generate/definitions/18_api_versions @@ -65,3 +65,6 @@ ApiVersionsResponse => MaxVersionLevel: int16 // The cluster-wide finalized min version level for the feature. MinVersionLevel: int16 + // Set by a KRaft controller if the required configurations for ZK migration + // are present + ZkMigrationReady: bool // tag 3 diff --git a/generate/definitions/62_broker_registration b/generate/definitions/62_broker_registration index b6ac6a9e..6da6a3b3 100644 --- a/generate/definitions/62_broker_registration +++ b/generate/definitions/62_broker_registration @@ -1,6 +1,6 @@ // For KIP-500 / KIP-631, BrokerRegistrationRequest is an internal // broker-to-broker only request. -BrokerRegistrationRequest => key 62, max version 0, flexible v0+ +BrokerRegistrationRequest => key 62, max version 1, flexible v0+ // The broker ID. BrokerID: int32 // The cluster ID of the broker process. @@ -27,6 +27,9 @@ BrokerRegistrationRequest => key 62, max version 0, flexible v0+ MaxSupportedVersion: int16 // The rack that this broker is in, if any. Rack: nullable-string + // If the required configurations for ZK migration are present, this value is + // set to true. + IsMigratingZkBroker: bool // v1+ // BrokerRegistrationResponse is a response to a BrokerRegistrationRequest. BrokerRegistrationResponse => diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index bc71c280..bfa247c2 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -6893,12 +6893,16 @@ func NewLeaderAndISRRequestLiveLeader() LeaderAndISRRequestLiveLeader { // Kafka 1.0 introduced version 1. Kafka 2.2 introduced version 2, proposed // in KIP-380, which changed the layout of the struct to be more memory // efficient. Kafka 2.4.0 introduced version 3 with KIP-455. +// Kafka 3.4 introduced version 7 with KIP-866. type LeaderAndISRRequest struct { // Version is the version of this message used with a Kafka broker. Version int16 ControllerID int32 + // If KRaft controller id is used during migration. See KIP-866. + IsKRaftController bool // v7+ + ControllerEpoch int32 // This field has a default of -1. @@ -6917,7 +6921,7 @@ type LeaderAndISRRequest struct { } func (*LeaderAndISRRequest) Key() int16 { return 4 } -func (*LeaderAndISRRequest) MaxVersion() int16 { return 6 } +func (*LeaderAndISRRequest) MaxVersion() int16 { return 7 } func (v *LeaderAndISRRequest) SetVersion(version int16) { v.Version = version } func (v *LeaderAndISRRequest) GetVersion() int16 { return v.Version } func (v *LeaderAndISRRequest) IsFlexible() bool { return v.Version >= 4 } @@ -6945,6 +6949,10 @@ func (v *LeaderAndISRRequest) AppendTo(dst []byte) []byte { v := v.ControllerID dst = kbin.AppendInt32(dst, v) } + if version >= 7 { + v := v.IsKRaftController + dst = kbin.AppendBool(dst, v) + } { v := v.ControllerEpoch dst = kbin.AppendInt32(dst, v) @@ -7240,6 +7248,10 @@ func (v *LeaderAndISRRequest) readFrom(src []byte, unsafe bool) error { v := b.Int32() s.ControllerID = v } + if version >= 7 { + v := b.Bool() + s.IsKRaftController = v + } { v := b.Int32() s.ControllerEpoch = v @@ -7746,7 +7758,7 @@ type LeaderAndISRResponse struct { } func (*LeaderAndISRResponse) Key() int16 { return 4 } -func (*LeaderAndISRResponse) MaxVersion() int16 { return 6 } +func (*LeaderAndISRResponse) MaxVersion() int16 { return 7 } func (v *LeaderAndISRResponse) SetVersion(version int16) { v.Version = version } func (v *LeaderAndISRResponse) GetVersion() int16 { return v.Version } func (v *LeaderAndISRResponse) IsFlexible() bool { return v.Version >= 4 } @@ -8093,6 +8105,7 @@ func NewStopReplicaRequestTopic() StopReplicaRequestTopic { // // Kafka 2.6 introduced version 3, proposed in KIP-570, reorganizes partitions // to be stored and adds the leader epoch and delete partition fields per partition. +// Kafka 3.4 introduced version 4 with KIP-866. type StopReplicaRequest struct { // Version is the version of this message used with a Kafka broker. Version int16 @@ -8101,6 +8114,9 @@ type StopReplicaRequest struct { ControllerEpoch int32 + // If KRaft controller id is used during migration. See KIP-866. + IsKRaftController bool // v4+ + // This field has a default of -1. BrokerEpoch int64 // v1+ @@ -8113,7 +8129,7 @@ type StopReplicaRequest struct { } func (*StopReplicaRequest) Key() int16 { return 5 } -func (*StopReplicaRequest) MaxVersion() int16 { return 3 } +func (*StopReplicaRequest) MaxVersion() int16 { return 4 } func (v *StopReplicaRequest) SetVersion(version int16) { v.Version = version } func (v *StopReplicaRequest) GetVersion() int16 { return v.Version } func (v *StopReplicaRequest) IsFlexible() bool { return v.Version >= 2 } @@ -8145,6 +8161,10 @@ func (v *StopReplicaRequest) AppendTo(dst []byte) []byte { v := v.ControllerEpoch dst = kbin.AppendInt32(dst, v) } + if version >= 4 { + v := v.IsKRaftController + dst = kbin.AppendBool(dst, v) + } if version >= 1 { v := v.BrokerEpoch dst = kbin.AppendInt64(dst, v) @@ -8250,6 +8270,10 @@ func (v *StopReplicaRequest) readFrom(src []byte, unsafe bool) error { v := b.Int32() s.ControllerEpoch = v } + if version >= 4 { + v := b.Bool() + s.IsKRaftController = v + } if version >= 1 { v := b.Int64() s.BrokerEpoch = v @@ -8435,7 +8459,7 @@ type StopReplicaResponse struct { } func (*StopReplicaResponse) Key() int16 { return 5 } -func (*StopReplicaResponse) MaxVersion() int16 { return 3 } +func (*StopReplicaResponse) MaxVersion() int16 { return 4 } func (v *StopReplicaResponse) SetVersion(version int16) { v.Version = version } func (v *StopReplicaResponse) GetVersion() int16 { return v.Version } func (v *StopReplicaResponse) IsFlexible() bool { return v.Version >= 2 } @@ -8711,12 +8735,16 @@ func NewUpdateMetadataRequestLiveBroker() UpdateMetadataRequestLiveBroker { // // Kafka 2.2 introduced version 5, proposed in KIP-380, which changed the // layout of the struct to be more memory efficient. +// Kafka 3.4 introduced version 8 with KIP-866. type UpdateMetadataRequest struct { // Version is the version of this message used with a Kafka broker. Version int16 ControllerID int32 + // If KRaft controller id is used during migration. See KIP-866. + IsKRaftController bool // v8+ + ControllerEpoch int32 // This field has a default of -1. @@ -8733,7 +8761,7 @@ type UpdateMetadataRequest struct { } func (*UpdateMetadataRequest) Key() int16 { return 6 } -func (*UpdateMetadataRequest) MaxVersion() int16 { return 7 } +func (*UpdateMetadataRequest) MaxVersion() int16 { return 8 } func (v *UpdateMetadataRequest) SetVersion(version int16) { v.Version = version } func (v *UpdateMetadataRequest) GetVersion() int16 { return v.Version } func (v *UpdateMetadataRequest) IsFlexible() bool { return v.Version >= 6 } @@ -8761,6 +8789,10 @@ func (v *UpdateMetadataRequest) AppendTo(dst []byte) []byte { v := v.ControllerID dst = kbin.AppendInt32(dst, v) } + if version >= 8 { + v := v.IsKRaftController + dst = kbin.AppendBool(dst, v) + } { v := v.ControllerEpoch dst = kbin.AppendInt32(dst, v) @@ -9059,6 +9091,10 @@ func (v *UpdateMetadataRequest) readFrom(src []byte, unsafe bool) error { v := b.Int32() s.ControllerID = v } + if version >= 8 { + v := b.Bool() + s.IsKRaftController = v + } { v := b.Int32() s.ControllerEpoch = v @@ -9559,7 +9595,7 @@ type UpdateMetadataResponse struct { } func (*UpdateMetadataResponse) Key() int16 { return 6 } -func (*UpdateMetadataResponse) MaxVersion() int16 { return 7 } +func (*UpdateMetadataResponse) MaxVersion() int16 { return 8 } func (v *UpdateMetadataResponse) SetVersion(version int16) { v.Version = version } func (v *UpdateMetadataResponse) GetVersion() int16 { return v.Version } func (v *UpdateMetadataResponse) IsFlexible() bool { return v.Version >= 6 } @@ -16191,6 +16227,10 @@ type ApiVersionsResponse struct { // FinalizedFeaturesEpoch is >= 0). FinalizedFeatures []ApiVersionsResponseFinalizedFeature // tag 2 + // Set by a KRaft controller if the required configurations for ZK migration + // are present + ZkMigrationReady bool // tag 3 + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } @@ -16254,6 +16294,9 @@ func (v *ApiVersionsResponse) AppendTo(dst []byte) []byte { if len(v.FinalizedFeatures) > 0 { toEncode = append(toEncode, 2) } + if v.ZkMigrationReady != false { + toEncode = append(toEncode, 3) + } dst = kbin.AppendUvarint(dst, uint32(len(toEncode)+v.UnknownTags.Len())) for _, tag := range toEncode { switch tag { @@ -16346,6 +16389,13 @@ func (v *ApiVersionsResponse) AppendTo(dst []byte) []byte { goto fFinalizedFeatures } } + case 3: + { + v := v.ZkMigrationReady + dst = kbin.AppendUvarint(dst, 3) + dst = kbin.AppendUvarint(dst, 1) + dst = kbin.AppendBool(dst, v) + } } } dst = v.UnknownTags.AppendEach(dst) @@ -16538,6 +16588,13 @@ func (v *ApiVersionsResponse) readFrom(src []byte, unsafe bool) error { if err := b.Complete(); err != nil { return err } + case 3: + b := kbin.Reader{Src: b.Span(int(b.Uvarint()))} + v := b.Bool() + s.ZkMigrationReady = v + if err := b.Complete(); err != nil { + return err + } } } } @@ -40894,12 +40951,16 @@ type BrokerRegistrationRequest struct { // The rack that this broker is in, if any. Rack *string + // If the required configurations for ZK migration are present, this value is + // set to true. + IsMigratingZkBroker bool // v1+ + // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags } func (*BrokerRegistrationRequest) Key() int16 { return 62 } -func (*BrokerRegistrationRequest) MaxVersion() int16 { return 0 } +func (*BrokerRegistrationRequest) MaxVersion() int16 { return 1 } func (v *BrokerRegistrationRequest) SetVersion(version int16) { v.Version = version } func (v *BrokerRegistrationRequest) GetVersion() int16 { return v.Version } func (v *BrokerRegistrationRequest) IsFlexible() bool { return v.Version >= 0 } @@ -41017,6 +41078,10 @@ func (v *BrokerRegistrationRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendNullableString(dst, v) } } + if version >= 1 { + v := v.IsMigratingZkBroker + dst = kbin.AppendBool(dst, v) + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -41203,6 +41268,10 @@ func (v *BrokerRegistrationRequest) readFrom(src []byte, unsafe bool) error { } s.Rack = v } + if version >= 1 { + v := b.Bool() + s.IsMigratingZkBroker = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -41252,7 +41321,7 @@ type BrokerRegistrationResponse struct { } func (*BrokerRegistrationResponse) Key() int16 { return 62 } -func (*BrokerRegistrationResponse) MaxVersion() int16 { return 0 } +func (*BrokerRegistrationResponse) MaxVersion() int16 { return 1 } func (v *BrokerRegistrationResponse) SetVersion(version int16) { v.Version = version } func (v *BrokerRegistrationResponse) GetVersion() int16 { return v.Version } func (v *BrokerRegistrationResponse) IsFlexible() bool { return v.Version >= 0 } diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index ef8108d6..26be8ef1 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -788,7 +788,7 @@ var max270 = nextMax(max260, func(v listenerKeys) listenerKeys { var max280 = nextMax(max270, func(v listenerKeys) listenerKeys { // KAFKA-10181 KAFKA-10181 KIP-590 v = append(v, - k(rController), // 58 envelope + k(zkBroker, rController), // 58 envelope, controller first, zk in KAFKA-14446 8b045dcbf6b89e1a9594ff95642d4882765e4b0d KIP-866 Kafka 3.4 ) // KAFKA-10729 85f94d50271c952c3e9ee49c4fc814c0da411618 KIP-482 @@ -939,6 +939,9 @@ var ( maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { // KAFKA-14304 7b7e40a536a79cebf35cc278b9375c8352d342b9 KIP-866 // KAFKA-14448 67c72596afe58363eceeb32084c5c04637a33831 added BrokerRegistration + // KAFKA-14493 db490707606855c265bc938e1b236070e0e2eba5 changed BrokerRegistration + // KAFKA-14304 0bb05d8679b684ad8fbb2eb40dfc00066186a75a changed BrokerRegistration back to a bool... + // 5b521031edea8ea7cbcca7dc24a58429423740ff added tag to ApiVersions v[4].inc() // 7 leader and isr v[5].inc() // 4 stop replica v[6].inc() // 8 update metadata