diff --git a/generate/DEFINITIONS b/generate/DEFINITIONS index ec5b29c5..e609178c 100644 --- a/generate/DEFINITIONS +++ b/generate/DEFINITIONS @@ -3649,3 +3649,128 @@ AlterUserSCRAMCredentialsResponse => ErrorCode: int16 // The user-level error message, if any. ErrorMessage: nullable-string + +// Part of KIP-595 to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// VoteRequest is used by voters to hold a leader election. +// +// Since this is relatively Kafka internal, most fields are left undocumented. +VoteRequest => key 52, max version 0, flexible v0+, admin + ClusterID: nullable-string + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + // The bumped epoch of the candidate sending the request. + CandidateEpoch: int32 + // The ID of the voter sending the request. + CandidateID: int32 + // The epoch of the last record written to the metadata log. + LastOffsetEpoch: int32 + // The offset of the last record written to the metadata log. + LastOffset: int64 + +VoteResponse => + ErrorCode: int16 + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + ErrorCode: int16 + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID: int32 + // Whether the vote was granted. + VoteGranted: bool + +// Part of KIP-595 to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// BeginQuorumEpochRequest is sent by a leader (once it has enough votes) +// to all voters in the election. +// +// Since this is relatively Kafka internal, most fields are left undocumented. +BeginQuorumEpochRequest => key 53, max version 0, admin + ClusterID: nullable-string + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + // The ID of the newly elected leader. + LeaderID: int32 + // The epoch of the newly elected leader. + LeaderEpoch: int32 + +BeginQuorumEpochResponse => + ErrorCode: int16 + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + ErrorCode: int16 + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID: int32 + // The latest known leader epoch. + LeaderEpoch: int32 + +// Part of KIP-595 to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// EndQuorumEpochRequest is sent by a leader to gracefully step down as leader +// (i.e. on shutdown). Stepping down begins a new election. +// +// Since this is relatively Kafka internal, most fields are left undocumented. +EndQuorumEpochRequest => key 54, max version 0, admin + ClusterID: nullable-string + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + // The ID of the replica sending this request. + ReplicaID: int32 + // The current leader ID, or -1 if there is a vote in progress. + LeaderID: int32 + // The current epoch. + LeaderEpoch: int32 + // A sorted list of preferred successors to start the election. + PreferredSuccessors: [int32] + +EndQuorumEpochResponse => + ErrorCode: int16 + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + ErrorCode: int16 + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID: int32 + // The latest known leader epoch. + LeaderEpoch: int32 + +// A common struct used in DescribeQuorumResponse. +DescribeQuorumResponseTopicPartitionReplicaState => not top level, no encoding + ReplicaID: int32 + // The last known log end offset of the follower, or -1 if it is unknown. + LogEndOffset: int64 + +// Part of KIP-642 (and KIP-595) to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// DescribeQuorumRequest is sent by a leader to describe the quorum. +DescribeQuorumRequest => key 55, max version 0, flexible v0+, admin + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + +DescribeQuorumResponse => + ErrorCode: int16 + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + ErrorCode: int16 + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID: int32 + // The latest known leader epoch. + LeaderEpoch: int32 + HighWatermark: int64 + CurrentVoters: [DescribeQuorumResponseTopicPartitionReplicaState] + TargetVoters: [DescribeQuorumResponseTopicPartitionReplicaState] + Observers: [DescribeQuorumResponseTopicPartitionReplicaState] diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index 93f841c4..fba4fe6b 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -6,7 +6,7 @@ import "github.com/twmb/kafka-go/pkg/kbin" // MaxKey is the maximum key used for any messages in this package. // Note that this value will change as Kafka adds more messages. -const MaxKey = 51 +const MaxKey = 55 // MessageV0 is the message format Kafka used prior to 0.10. // @@ -10429,7 +10429,7 @@ func (v *ApiVersionsResponse) AppendTo(dst []byte) []byte { dst = kbin.AppendUvarint(dst, 0) sized := false lenAt := len(dst) - l9271: + l9273: if isFlexible { dst = kbin.AppendCompactArrayLen(dst, len(v)) } else { @@ -10460,7 +10460,7 @@ func (v *ApiVersionsResponse) AppendTo(dst []byte) []byte { if !sized { dst = kbin.AppendUvarint(dst[:lenAt], uint32(len(dst[lenAt:]))) sized = true - goto l9271 + goto l9273 } } case 1: @@ -10476,7 +10476,7 @@ func (v *ApiVersionsResponse) AppendTo(dst []byte) []byte { dst = kbin.AppendUvarint(dst, 2) sized := false lenAt := len(dst) - l9318: + l9320: if isFlexible { dst = kbin.AppendCompactArrayLen(dst, len(v)) } else { @@ -10507,7 +10507,7 @@ func (v *ApiVersionsResponse) AppendTo(dst []byte) []byte { if !sized { dst = kbin.AppendUvarint(dst[:lenAt], uint32(len(dst[lenAt:]))) sized = true - goto l9318 + goto l9320 } } } @@ -22657,6 +22657,1672 @@ func (v *AlterUserSCRAMCredentialsResponse) ReadFrom(src []byte) error { func (v *AlterUserSCRAMCredentialsResponse) Default() { } +type VoteRequestTopicPartition struct { + Partition int32 + + // The bumped epoch of the candidate sending the request. + CandidateEpoch int32 + + // The ID of the voter sending the request. + CandidateID int32 + + // The epoch of the last record written to the metadata log. + LastOffsetEpoch int32 + + // The offset of the last record written to the metadata log. + LastOffset int64 +} + +func (v *VoteRequestTopicPartition) Default() { +} + +type VoteRequestTopic struct { + Topic string + + Partitions []VoteRequestTopicPartition +} + +func (v *VoteRequestTopic) Default() { +} + +// Part of KIP-595 to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// VoteRequest is used by voters to hold a leader election. +// +// Since this is relatively Kafka internal, most fields are left undocumented. +type VoteRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + ClusterID *string + + Topics []VoteRequestTopic +} + +func (*VoteRequest) Key() int16 { return 52 } +func (*VoteRequest) MaxVersion() int16 { return 0 } +func (v *VoteRequest) SetVersion(version int16) { v.Version = version } +func (v *VoteRequest) GetVersion() int16 { return v.Version } +func (v *VoteRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *VoteRequest) IsAdminRequest() {} +func (v *VoteRequest) ResponseKind() Response { return &VoteResponse{Version: v.Version} } + +func (v *VoteRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ClusterID + if isFlexible { + dst = kbin.AppendCompactNullableString(dst, v) + } else { + dst = kbin.AppendNullableString(dst, v) + } + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.CandidateEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.CandidateID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LastOffsetEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LastOffset + dst = kbin.AppendInt64(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *VoteRequest) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + b := kbin.Reader{Src: src} + s := v + { + var v *string + if isFlexible { + v = b.CompactNullableString() + } else { + v = b.NullableString() + } + s.ClusterID = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]VoteRequestTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]VoteRequestTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int32() + s.CandidateEpoch = v + } + { + v := b.Int32() + s.CandidateID = v + } + { + v := b.Int32() + s.LastOffsetEpoch = v + } + { + v := b.Int64() + s.LastOffset = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} +func (v *VoteRequest) Default() { +} + +type VoteResponseTopicPartition struct { + Partition int32 + + ErrorCode int16 + + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID int32 + + // Whether the vote was granted. + VoteGranted bool +} + +func (v *VoteResponseTopicPartition) Default() { +} + +type VoteResponseTopic struct { + Topic string + + Partitions []VoteResponseTopicPartition +} + +func (v *VoteResponseTopic) Default() { +} + +type VoteResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + ErrorCode int16 + + Topics []VoteResponseTopic +} + +func (*VoteResponse) Key() int16 { return 52 } +func (*VoteResponse) MaxVersion() int16 { return 0 } +func (v *VoteResponse) SetVersion(version int16) { v.Version = version } +func (v *VoteResponse) GetVersion() int16 { return v.Version } +func (v *VoteResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *VoteResponse) RequestKind() Request { return &VoteRequest{Version: v.Version} } + +func (v *VoteResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.VoteGranted + dst = kbin.AppendBool(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *VoteResponse) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + b := kbin.Reader{Src: src} + s := v + { + v := b.Int16() + s.ErrorCode = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]VoteResponseTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]VoteResponseTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Bool() + s.VoteGranted = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} +func (v *VoteResponse) Default() { +} + +type BeginQuorumEpochRequestTopicPartition struct { + Partition int32 + + // The ID of the newly elected leader. + LeaderID int32 + + // The epoch of the newly elected leader. + LeaderEpoch int32 +} + +func (v *BeginQuorumEpochRequestTopicPartition) Default() { +} + +type BeginQuorumEpochRequestTopic struct { + Topic string + + Partitions []BeginQuorumEpochRequestTopicPartition +} + +func (v *BeginQuorumEpochRequestTopic) Default() { +} + +// Part of KIP-595 to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// BeginQuorumEpochRequest is sent by a leader (once it has enough votes) +// to all voters in the election. +// +// Since this is relatively Kafka internal, most fields are left undocumented. +type BeginQuorumEpochRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + ClusterID *string + + Topics []BeginQuorumEpochRequestTopic +} + +func (*BeginQuorumEpochRequest) Key() int16 { return 53 } +func (*BeginQuorumEpochRequest) MaxVersion() int16 { return 0 } +func (v *BeginQuorumEpochRequest) SetVersion(version int16) { v.Version = version } +func (v *BeginQuorumEpochRequest) GetVersion() int16 { return v.Version } +func (v *BeginQuorumEpochRequest) IsFlexible() bool { return false } +func (v *BeginQuorumEpochRequest) IsAdminRequest() {} +func (v *BeginQuorumEpochRequest) ResponseKind() Response { + return &BeginQuorumEpochResponse{Version: v.Version} +} + +func (v *BeginQuorumEpochRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + { + v := v.ClusterID + dst = kbin.AppendNullableString(dst, v) + } + { + v := v.Topics + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Topic + dst = kbin.AppendString(dst, v) + } + { + v := v.Partitions + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + } + } + } + } + return dst +} +func (v *BeginQuorumEpochRequest) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + b := kbin.Reader{Src: src} + s := v + { + v := b.NullableString() + s.ClusterID = v + } + { + v := s.Topics + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]BeginQuorumEpochRequestTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.String() + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]BeginQuorumEpochRequestTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + } + v = a + s.Partitions = v + } + } + v = a + s.Topics = v + } + return b.Complete() +} +func (v *BeginQuorumEpochRequest) Default() { +} + +type BeginQuorumEpochResponseTopicPartition struct { + Partition int32 + + ErrorCode int16 + + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID int32 + + // The latest known leader epoch. + LeaderEpoch int32 +} + +func (v *BeginQuorumEpochResponseTopicPartition) Default() { +} + +type BeginQuorumEpochResponseTopic struct { + Topic string + + Partitions []BeginQuorumEpochResponseTopicPartition +} + +func (v *BeginQuorumEpochResponseTopic) Default() { +} + +type BeginQuorumEpochResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + ErrorCode int16 + + Topics []BeginQuorumEpochResponseTopic +} + +func (*BeginQuorumEpochResponse) Key() int16 { return 53 } +func (*BeginQuorumEpochResponse) MaxVersion() int16 { return 0 } +func (v *BeginQuorumEpochResponse) SetVersion(version int16) { v.Version = version } +func (v *BeginQuorumEpochResponse) GetVersion() int16 { return v.Version } +func (v *BeginQuorumEpochResponse) IsFlexible() bool { return false } +func (v *BeginQuorumEpochResponse) RequestKind() Request { + return &BeginQuorumEpochRequest{Version: v.Version} +} + +func (v *BeginQuorumEpochResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.Topics + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Topic + dst = kbin.AppendString(dst, v) + } + { + v := v.Partitions + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + } + } + } + } + return dst +} +func (v *BeginQuorumEpochResponse) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + b := kbin.Reader{Src: src} + s := v + { + v := b.Int16() + s.ErrorCode = v + } + { + v := s.Topics + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]BeginQuorumEpochResponseTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.String() + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]BeginQuorumEpochResponseTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + } + v = a + s.Partitions = v + } + } + v = a + s.Topics = v + } + return b.Complete() +} +func (v *BeginQuorumEpochResponse) Default() { +} + +type EndQuorumEpochRequestTopicPartition struct { + Partition int32 + + // The ID of the replica sending this request. + ReplicaID int32 + + // The current leader ID, or -1 if there is a vote in progress. + LeaderID int32 + + // The current epoch. + LeaderEpoch int32 + + // A sorted list of preferred successors to start the election. + PreferredSuccessors []int32 +} + +func (v *EndQuorumEpochRequestTopicPartition) Default() { +} + +type EndQuorumEpochRequestTopic struct { + Topic string + + Partitions []EndQuorumEpochRequestTopicPartition +} + +func (v *EndQuorumEpochRequestTopic) Default() { +} + +// Part of KIP-595 to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// EndQuorumEpochRequest is sent by a leader to gracefully step down as leader +// (i.e. on shutdown). Stepping down begins a new election. +// +// Since this is relatively Kafka internal, most fields are left undocumented. +type EndQuorumEpochRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + ClusterID *string + + Topics []EndQuorumEpochRequestTopic +} + +func (*EndQuorumEpochRequest) Key() int16 { return 54 } +func (*EndQuorumEpochRequest) MaxVersion() int16 { return 0 } +func (v *EndQuorumEpochRequest) SetVersion(version int16) { v.Version = version } +func (v *EndQuorumEpochRequest) GetVersion() int16 { return v.Version } +func (v *EndQuorumEpochRequest) IsFlexible() bool { return false } +func (v *EndQuorumEpochRequest) IsAdminRequest() {} +func (v *EndQuorumEpochRequest) ResponseKind() Response { + return &EndQuorumEpochResponse{Version: v.Version} +} + +func (v *EndQuorumEpochRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + { + v := v.ClusterID + dst = kbin.AppendNullableString(dst, v) + } + { + v := v.Topics + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Topic + dst = kbin.AppendString(dst, v) + } + { + v := v.Partitions + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ReplicaID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.PreferredSuccessors + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + } + } + } + } + return dst +} +func (v *EndQuorumEpochRequest) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + b := kbin.Reader{Src: src} + s := v + { + v := b.NullableString() + s.ClusterID = v + } + { + v := s.Topics + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]EndQuorumEpochRequestTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.String() + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]EndQuorumEpochRequestTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int32() + s.ReplicaID = v + } + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + { + v := s.PreferredSuccessors + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]int32, l) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.PreferredSuccessors = v + } + } + v = a + s.Partitions = v + } + } + v = a + s.Topics = v + } + return b.Complete() +} +func (v *EndQuorumEpochRequest) Default() { +} + +type EndQuorumEpochResponseTopicPartition struct { + Partition int32 + + ErrorCode int16 + + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID int32 + + // The latest known leader epoch. + LeaderEpoch int32 +} + +func (v *EndQuorumEpochResponseTopicPartition) Default() { +} + +type EndQuorumEpochResponseTopic struct { + Topic string + + Partitions []EndQuorumEpochResponseTopicPartition +} + +func (v *EndQuorumEpochResponseTopic) Default() { +} + +type EndQuorumEpochResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + ErrorCode int16 + + Topics []EndQuorumEpochResponseTopic +} + +func (*EndQuorumEpochResponse) Key() int16 { return 54 } +func (*EndQuorumEpochResponse) MaxVersion() int16 { return 0 } +func (v *EndQuorumEpochResponse) SetVersion(version int16) { v.Version = version } +func (v *EndQuorumEpochResponse) GetVersion() int16 { return v.Version } +func (v *EndQuorumEpochResponse) IsFlexible() bool { return false } +func (v *EndQuorumEpochResponse) RequestKind() Request { + return &EndQuorumEpochRequest{Version: v.Version} +} + +func (v *EndQuorumEpochResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.Topics + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Topic + dst = kbin.AppendString(dst, v) + } + { + v := v.Partitions + dst = kbin.AppendArrayLen(dst, len(v)) + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + } + } + } + } + return dst +} +func (v *EndQuorumEpochResponse) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + b := kbin.Reader{Src: src} + s := v + { + v := b.Int16() + s.ErrorCode = v + } + { + v := s.Topics + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]EndQuorumEpochResponseTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.String() + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + l = b.ArrayLen() + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]EndQuorumEpochResponseTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + } + v = a + s.Partitions = v + } + } + v = a + s.Topics = v + } + return b.Complete() +} +func (v *EndQuorumEpochResponse) Default() { +} + +// A common struct used in DescribeQuorumResponse. +type DescribeQuorumResponseTopicPartitionReplicaState struct { + ReplicaID int32 + + // The last known log end offset of the follower, or -1 if it is unknown. + LogEndOffset int64 +} + +func (v *DescribeQuorumResponseTopicPartitionReplicaState) Default() { +} + +type DescribeQuorumRequestTopicPartition struct { + Partition int32 +} + +func (v *DescribeQuorumRequestTopicPartition) Default() { +} + +type DescribeQuorumRequestTopic struct { + Topic string + + Partitions []DescribeQuorumRequestTopicPartition +} + +func (v *DescribeQuorumRequestTopic) Default() { +} + +// Part of KIP-642 (and KIP-595) to replace Kafka's dependence on Zookeeper with a +// Kafka-only raft protocol, +// DescribeQuorumRequest is sent by a leader to describe the quorum. +type DescribeQuorumRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + Topics []DescribeQuorumRequestTopic +} + +func (*DescribeQuorumRequest) Key() int16 { return 55 } +func (*DescribeQuorumRequest) MaxVersion() int16 { return 0 } +func (v *DescribeQuorumRequest) SetVersion(version int16) { v.Version = version } +func (v *DescribeQuorumRequest) GetVersion() int16 { return v.Version } +func (v *DescribeQuorumRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *DescribeQuorumRequest) IsAdminRequest() {} +func (v *DescribeQuorumRequest) ResponseKind() Response { + return &DescribeQuorumResponse{Version: v.Version} +} + +func (v *DescribeQuorumRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *DescribeQuorumRequest) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + b := kbin.Reader{Src: src} + s := v + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeQuorumRequestTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeQuorumRequestTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} +func (v *DescribeQuorumRequest) Default() { +} + +type DescribeQuorumResponseTopicPartition struct { + Partition int32 + + ErrorCode int16 + + // The ID of the current leader, or -1 if the leader is unknown. + LeaderID int32 + + // The latest known leader epoch. + LeaderEpoch int32 + + HighWatermark int64 + + CurrentVoters []DescribeQuorumResponseTopicPartitionReplicaState + + TargetVoters []DescribeQuorumResponseTopicPartitionReplicaState + + Observers []DescribeQuorumResponseTopicPartitionReplicaState +} + +func (v *DescribeQuorumResponseTopicPartition) Default() { +} + +type DescribeQuorumResponseTopic struct { + Topic string + + Partitions []DescribeQuorumResponseTopicPartition +} + +func (v *DescribeQuorumResponseTopic) Default() { +} + +type DescribeQuorumResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + ErrorCode int16 + + Topics []DescribeQuorumResponseTopic +} + +func (*DescribeQuorumResponse) Key() int16 { return 55 } +func (*DescribeQuorumResponse) MaxVersion() int16 { return 0 } +func (v *DescribeQuorumResponse) SetVersion(version int16) { v.Version = version } +func (v *DescribeQuorumResponse) GetVersion() int16 { return v.Version } +func (v *DescribeQuorumResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *DescribeQuorumResponse) RequestKind() Request { + return &DescribeQuorumRequest{Version: v.Version} +} + +func (v *DescribeQuorumResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.HighWatermark + dst = kbin.AppendInt64(dst, v) + } + { + v := v.CurrentVoters + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.ReplicaID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LogEndOffset + dst = kbin.AppendInt64(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + { + v := v.TargetVoters + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.ReplicaID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LogEndOffset + dst = kbin.AppendInt64(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + { + v := v.Observers + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.ReplicaID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LogEndOffset + dst = kbin.AppendInt64(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *DescribeQuorumResponse) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + b := kbin.Reader{Src: src} + s := v + { + v := b.Int16() + s.ErrorCode = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeQuorumResponseTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeQuorumResponseTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + { + v := b.Int64() + s.HighWatermark = v + } + { + v := s.CurrentVoters + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeQuorumResponseTopicPartitionReplicaState, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.ReplicaID = v + } + { + v := b.Int64() + s.LogEndOffset = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.CurrentVoters = v + } + { + v := s.TargetVoters + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeQuorumResponseTopicPartitionReplicaState, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.ReplicaID = v + } + { + v := b.Int64() + s.LogEndOffset = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.TargetVoters = v + } + { + v := s.Observers + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]DescribeQuorumResponseTopicPartitionReplicaState, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.ReplicaID = v + } + { + v := b.Int64() + s.LogEndOffset = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Observers = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} +func (v *DescribeQuorumResponse) Default() { +} + // RequestForKey returns the request corresponding to the given request key // or nil if the key is unknown. func RequestForKey(key int16) Request { @@ -22767,6 +24433,14 @@ func RequestForKey(key int16) Request { return new(DescribeUserSCRAMCredentialsRequest) case 51: return new(AlterUserSCRAMCredentialsRequest) + case 52: + return new(VoteRequest) + case 53: + return new(BeginQuorumEpochRequest) + case 54: + return new(EndQuorumEpochRequest) + case 55: + return new(DescribeQuorumRequest) } } @@ -22880,6 +24554,14 @@ func ResponseForKey(key int16) Response { return new(DescribeUserSCRAMCredentialsResponse) case 51: return new(AlterUserSCRAMCredentialsResponse) + case 52: + return new(VoteResponse) + case 53: + return new(BeginQuorumEpochResponse) + case 54: + return new(EndQuorumEpochResponse) + case 55: + return new(DescribeQuorumResponse) } } @@ -22993,5 +24675,13 @@ func NameForKey(key int16) string { return "DescribeUserSCRAMCredentials" case 51: return "AlterUserSCRAMCredentials" + case 52: + return "Vote" + case 53: + return "BeginQuorumEpoch" + case 54: + return "EndQuorumEpoch" + case 55: + return "DescribeQuorum" } } diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 63efb80f..4d5b1bfd 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -409,5 +409,13 @@ func Tip() Versions { // KAFKA-10487: further change in aa5263fba903c85812c0c31443f7d49ee371e9db v[1]++ // 12 fetch + // KAFKA-10492 b7c8490cf47b0c18253d6a776b2b35c76c71c65d KIP-595 + v = append(v, + 0, // 52 vote + 0, // 53 begin quorum epoch + 0, // 54 end quorum epoch + 0, // 55 describe quorum + ) + return v }