From aca99bcf19e741850378adbfe64c62b009340d7d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 24 Sep 2020 20:02:30 -0600 Subject: [PATCH] support KIP-497 also notes KIP-595 support in README, since all APIs in KIP-595 are now defined --- README.md | 2 + generate/DEFINITIONS | 38 +++ pkg/kerr/kerr.go | 2 + pkg/kmsg/generated.go | 517 ++++++++++++++++++++++++++++++++++++++- pkg/kversion/kversion.go | 5 + 5 files changed, 563 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b3b4ad0e..b652d75b 100644 --- a/README.md +++ b/README.md @@ -320,6 +320,7 @@ a protocol is supported by code generation. - [KIP-480](https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner) (sticky partition producing; 2.4.0) - [KIP-482](https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields) (tagged fields; KAFKA-8885; 2.4.0) - [KIP-496](https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets) (offset delete admin command; 2.4.0) +- [KIP-497](https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR) (new API to alter ISR; 2.7.0) - [KIP-511](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers) (add client name / version in apiversions req; 2.4.0) - [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state) (list groups by state; 2.6.0) - [KIP-525](https://cwiki.apache.org/confluence/display/KAFKA/KIP-525+-+Return+topic+metadata+and+configs+in+CreateTopics+response) (create topics v5 returns configs; 2.4.0) @@ -331,4 +332,5 @@ a protocol is supported by code generation. - [KIP-570](https://cwiki.apache.org/confluence/display/KAFKA/KIP-570%3A+Add+leader+epoch+in+StopReplicaRequest) (leader epoch in stop replica; 2.6.0) - [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients) (exponential backoff; 2.6.0) - [KIP-588](https://cwiki.apache.org/confluence/display/KAFKA/KIP-588%3A+Allow+producers+to+recover+gracefully+from+transaction+timeouts) (producer recovery from txn timeout; 2.7.0) +- [KIP-595](https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum) (new APIs for raft protocol; 2.7.0) - [KIP-599](https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations) (throttle create/delete topic/partition; 2.7.0) diff --git a/generate/DEFINITIONS b/generate/DEFINITIONS index e609178c..9cad3cf3 100644 --- a/generate/DEFINITIONS +++ b/generate/DEFINITIONS @@ -3774,3 +3774,41 @@ DescribeQuorumResponse => CurrentVoters: [DescribeQuorumResponseTopicPartitionReplicaState] TargetVoters: [DescribeQuorumResponseTopicPartitionReplicaState] Observers: [DescribeQuorumResponseTopicPartitionReplicaState] + + +// AlterISRRequest, proposed in KIP-497 and introduced in Kafka 2.7.0, +// is an admin request to modify ISR. +AlterISRRequest => key 56, max version 0, flexible v0+, admin + // The ID of the requesting broker. + BrokerID: int32 + // The epoch of the requesting broker. + BrokerEpoch: int64(-1) + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + // The leader epoch of this partition. + LeaderEpoch: int32 + // The ISR for this partition. + NewISR: [int32] + // The expected version of ISR which is being updated. + CurrentISRVersion: int32 + +AlterISRResponse => + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis: int32 + ErrorCode: int16 + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + ErrorCode: int16 + // The broker ID of the leader. + LeaderID: int32 + // The leader epoch of this partition. + LeaderEpoch: int32 + // The in-sync replica ids. + ISR: [int32] + // The current ISR version. + CurrentISRVersion: int32 diff --git a/pkg/kerr/kerr.go b/pkg/kerr/kerr.go index 6b277b29..a46e3da9 100644 --- a/pkg/kerr/kerr.go +++ b/pkg/kerr/kerr.go @@ -138,6 +138,7 @@ var ( DuplicateResource = &Error{"DUPLICATE_RESOURCE", 92, false, "A request illegally referred to the same resource twice."} UnacceptableCredential = &Error{"UNACCEPTABLE_CREDENTIAL", 93, false, "Requested credential would not meet criteria for acceptability."} InconsistentVoterSet = &Error{"INCONSISTENT_VOTER_SET", 94, false, "Indicates that either the sender or recipient of a voter-only request is not one of the expected voters."} + InvalidUpdateVersion = &Error{"INVALID_UPDATE_VERSION", 95, false, "The given update version was invalid."} ) var code2err = map[int16]error{ @@ -237,4 +238,5 @@ var code2err = map[int16]error{ 92: DuplicateResource, 93: UnacceptableCredential, 94: InconsistentVoterSet, + 95: InvalidUpdateVersion, } diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index fba4fe6b..3795cba7 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -6,7 +6,7 @@ import "github.com/twmb/kafka-go/pkg/kbin" // MaxKey is the maximum key used for any messages in this package. // Note that this value will change as Kafka adds more messages. -const MaxKey = 55 +const MaxKey = 56 // MessageV0 is the message format Kafka used prior to 0.10. // @@ -24323,6 +24323,515 @@ func (v *DescribeQuorumResponse) ReadFrom(src []byte) error { func (v *DescribeQuorumResponse) Default() { } +type AlterISRRequestTopicPartition struct { + Partition int32 + + // The leader epoch of this partition. + LeaderEpoch int32 + + // The ISR for this partition. + NewISR []int32 + + // The expected version of ISR which is being updated. + CurrentISRVersion int32 +} + +func (v *AlterISRRequestTopicPartition) Default() { +} + +type AlterISRRequestTopic struct { + Topic string + + Partitions []AlterISRRequestTopicPartition +} + +func (v *AlterISRRequestTopic) Default() { +} + +// AlterISRRequest, proposed in KIP-497 and introduced in Kafka 2.7.0, +// is an admin request to modify ISR. +type AlterISRRequest struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // The ID of the requesting broker. + BrokerID int32 + + // The epoch of the requesting broker. + BrokerEpoch int64 + + Topics []AlterISRRequestTopic +} + +func (*AlterISRRequest) Key() int16 { return 56 } +func (*AlterISRRequest) MaxVersion() int16 { return 0 } +func (v *AlterISRRequest) SetVersion(version int16) { v.Version = version } +func (v *AlterISRRequest) GetVersion() int16 { return v.Version } +func (v *AlterISRRequest) IsFlexible() bool { return v.Version >= 0 } +func (v *AlterISRRequest) IsAdminRequest() {} +func (v *AlterISRRequest) ResponseKind() Response { return &AlterISRResponse{Version: v.Version} } + +func (v *AlterISRRequest) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.BrokerID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.BrokerEpoch + dst = kbin.AppendInt64(dst, v) + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.NewISR + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + { + v := v.CurrentISRVersion + dst = kbin.AppendInt32(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *AlterISRRequest) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + b := kbin.Reader{Src: src} + s := v + { + v := b.Int32() + s.BrokerID = v + } + { + v := b.Int64() + s.BrokerEpoch = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]AlterISRRequestTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]AlterISRRequestTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + { + v := s.NewISR + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]int32, l) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.NewISR = v + } + { + v := b.Int32() + s.CurrentISRVersion = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} +func (v *AlterISRRequest) Default() { + v.BrokerEpoch = -1 +} + +type AlterISRResponseTopicPartition struct { + Partition int32 + + ErrorCode int16 + + // The broker ID of the leader. + LeaderID int32 + + // The leader epoch of this partition. + LeaderEpoch int32 + + // The in-sync replica ids. + ISR []int32 + + // The current ISR version. + CurrentISRVersion int32 +} + +func (v *AlterISRResponseTopicPartition) Default() { +} + +type AlterISRResponseTopic struct { + Topic string + + Partitions []AlterISRResponseTopicPartition +} + +func (v *AlterISRResponseTopic) Default() { +} + +type AlterISRResponse struct { + // Version is the version of this message used with a Kafka broker. + Version int16 + + // ThrottleMillis is how long of a throttle Kafka will apply to the client + // after responding to this request. + ThrottleMillis int32 + + ErrorCode int16 + + Topics []AlterISRResponseTopic +} + +func (*AlterISRResponse) Key() int16 { return 56 } +func (*AlterISRResponse) MaxVersion() int16 { return 0 } +func (v *AlterISRResponse) SetVersion(version int16) { v.Version = version } +func (v *AlterISRResponse) GetVersion() int16 { return v.Version } +func (v *AlterISRResponse) IsFlexible() bool { return v.Version >= 0 } +func (v *AlterISRResponse) RequestKind() Request { return &AlterISRRequest{Version: v.Version} } + +func (v *AlterISRResponse) AppendTo(dst []byte) []byte { + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + { + v := v.ThrottleMillis + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + { + v := v.LeaderID + dst = kbin.AppendInt32(dst, v) + } + { + v := v.LeaderEpoch + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ISR + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + { + v := v.CurrentISRVersion + dst = kbin.AppendInt32(dst, v) + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + } + } + if isFlexible { + dst = append(dst, 0) + } + return dst +} +func (v *AlterISRResponse) ReadFrom(src []byte) error { + v.Default() + version := v.Version + _ = version + isFlexible := version >= 0 + _ = isFlexible + b := kbin.Reader{Src: src} + s := v + { + v := b.Int32() + s.ThrottleMillis = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]AlterISRResponseTopic, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]AlterISRResponseTopicPartition, l) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + { + v := b.Int32() + s.LeaderID = v + } + { + v := b.Int32() + s.LeaderEpoch = v + } + { + v := s.ISR + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + if l > 0 { + a = make([]int32, l) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.ISR = v + } + { + v := b.Int32() + s.CurrentISRVersion = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + SkipTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + SkipTags(&b) + } + return b.Complete() +} +func (v *AlterISRResponse) Default() { +} + // RequestForKey returns the request corresponding to the given request key // or nil if the key is unknown. func RequestForKey(key int16) Request { @@ -24441,6 +24950,8 @@ func RequestForKey(key int16) Request { return new(EndQuorumEpochRequest) case 55: return new(DescribeQuorumRequest) + case 56: + return new(AlterISRRequest) } } @@ -24562,6 +25073,8 @@ func ResponseForKey(key int16) Response { return new(EndQuorumEpochResponse) case 55: return new(DescribeQuorumResponse) + case 56: + return new(AlterISRResponse) } } @@ -24683,5 +25196,7 @@ func NameForKey(key int16) string { return "EndQuorumEpoch" case 55: return "DescribeQuorum" + case 56: + return "AlterISR" } } diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 4d5b1bfd..1ff38397 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -417,5 +417,10 @@ func Tip() Versions { 0, // 55 describe quorum ) + // KAFKA-8836 57de67db22eb373f92ec5dd449d317ed2bc8b8d1 KIP-497 + v = append(v, + 0, // 56 alter isr + ) + return v }