diff --git a/.github/workflows/lint-and-test.yml b/.github/workflows/lint-and-test.yml index 43e6f046..632ff551 100644 --- a/.github/workflows/lint-and-test.yml +++ b/.github/workflows/lint-and-test.yml @@ -37,7 +37,7 @@ jobs: container: golang:1.20.3 services: kafka: - image: bitnami/kafka:latest + image: bitnami/kafka:3.5 ports: - 9092:9092 env: diff --git a/.golangci.yml b/.golangci.yml index 0ad253b4..600552cc 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,7 +1,7 @@ # We may as well allow multiple golangci-lint invocations at once. run: allow-parallel-runners: true - go: "1.19" + go: "1.21" # golangci-lint by default ignores some staticcheck and vet raised issues that # are actually important to catch. The following ensures that we do not ignore @@ -68,7 +68,7 @@ linters-settings: # # https://github.com/mvdan/gofumpt/issues/137 gofumpt: - lang-version: "1.19" + lang-version: "1.21" extra-rules: true gosec: @@ -76,7 +76,6 @@ linters-settings: - G104 # unhandled errors, we exclude for the same reason we do not use errcheck - G404 # we want math/rand - # Gocritic is a meta linter that has very good lints, and most of the # experimental ones are very good too. We opt into everything, which helps # us when we upgrade golangci-lint, and we specifically opt out of a batch. @@ -164,5 +163,5 @@ linters-settings: # contexts for beneficial reasons, and we disable the SSLv3 deprecation # warning because this is solely for a debug log. staticcheck: - go: "1.19" + go: "1.21" checks: ["all", "-SA1012", "-SA1019"] diff --git a/generate/definitions/24_add_partitions_to_txn b/generate/definitions/24_add_partitions_to_txn index cdec6913..dc8b757b 100644 --- a/generate/definitions/24_add_partitions_to_txn +++ b/generate/definitions/24_add_partitions_to_txn @@ -2,28 +2,60 @@ // partitions in the request. Before producing any records to a partition in // the transaction, that partition must have been added to the transaction with // this request. -AddPartitionsToTxnRequest => key 24, max version 3, flexible v3+, txn coordinator +// +// Versions 3 and below are exclusively used by clients and versions 4 and +// above are used by brokers. +// +// Version 4 adds VerifyOnly field to check if partitions are already in +// transaction and adds support to batch multiple transactions. +AddPartitionsToTxnRequest => key 24, max version 4, flexible v3+, txn coordinator // TransactionalID is the transactional ID to use for this request. - TransactionalID: string + TransactionalID: string // v0-v3 // ProducerID is the producer ID of the client for this transactional ID // as received from InitProducerID. - ProducerID: int64 + ProducerID: int64 // v0-v3 // ProducerEpoch is the producer epoch of the client for this transactional ID // as received from InitProducerID. - ProducerEpoch: int16 + ProducerEpoch: int16 // v0-v3 // Topics are topics to add as part of the producer side of a transaction. - Topics: [=>] + Topics: [=>] // v0-v3 // Topic is a topic name. Topic: string // Partitions are partitions within a topic to add as part of the producer // side of a transaction. Partitions: [int32] + // The list of transactions to add partitions to, for v4+, for brokers only. + // The fields in this are batch broker requests that duplicate the above fields + // and thus are undocumented (except VerifyOnly, which is new). + Transactions: [=>] // v4+ + TransactionalID: string + ProducerID: int64 + ProducerEpoch: int16 + // VerifyOnly signifies if we want to check if the partition is in the + // transaction rather than add it. + VerifyOnly: bool + Topics: [=>] + Topic: string + Partitions: [int32] // AddPartitionsToTxnResponse is a response to an AddPartitionsToTxnRequest. AddPartitionsToTxnResponse => ThrottleMillis(1) + // The response top level error code. + ErrorCode: int16 // v4+ + // Results categorized by transactional ID, v4+ only, for brokers only. + // The fields duplicate v3 and below fields (except TransactionalID) and + // are left undocumented. + Transactions: [=>] // v4+ + // The transactional id corresponding to the transaction. + TransactionalID: string + Topics: [=>] + Topic: string + Partitions: [=>] + Partition: int32 + ErrorCode: int16 // Topics are responses to topics in the request. - Topics: [=>] + Topics: [=>] // v0-v3 // Topic is a topic being responded to. Topic: string // Partitions are responses to partitions in the request. diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index caf98a6d..475270ed 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -41,9 +41,8 @@ func (o Offset) String() string { return fmt.Sprintf("{%d e%d ce%d}", o.at, o.epoch, o.currentEpoch) } else if o.relative > 0 { return fmt.Sprintf("{%d+%d e%d ce%d}", o.at, o.relative, o.epoch, o.currentEpoch) - } else { - return fmt.Sprintf("{%d-%d e%d ce%d}", o.at, -o.relative, o.epoch, o.currentEpoch) } + return fmt.Sprintf("{%d-%d e%d ce%d}", o.at, -o.relative, o.epoch, o.currentEpoch) } // EpochOffset returns this offset as an EpochOffset, allowing visibility into diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 4e8bc2fd..f7b8e97f 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -89,7 +89,7 @@ func testLogger() Logger { num := loggerNum.Add(1) pfx := strconv.Itoa(int(num)) return BasicLogger(os.Stderr, testLogLevel, func() string { - return time.Now().Format("[15:04:05 ") + pfx + "]" + return time.Now().UTC().Format("[15:04:05.999 ") + pfx + "]" }) } diff --git a/pkg/kgo/ring.go b/pkg/kgo/ring.go index 43ee72a6..3ef989f4 100644 --- a/pkg/kgo/ring.go +++ b/pkg/kgo/ring.go @@ -260,11 +260,10 @@ func (r *ringBatchPromise) dropPeek() (next batchPromise, more bool) { return next, false } return r.overflow[0], true - } else { - r.overflow = r.overflow[1:] - if len(r.overflow) > 0 { - return r.overflow[0], true - } - return next, false } + r.overflow = r.overflow[1:] + if len(r.overflow) > 0 { + return r.overflow[0], true + } + return next, false } diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 9a7f8d8f..63a6f5c8 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -28,7 +28,9 @@ func TestTxnEtl(t *testing.T) { go func() { cl, err := NewClient( getSeedBrokers(), - WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), + WithLogger(BasicLogger(os.Stderr, testLogLevel, func() string { + return time.Now().UTC().Format("15:04:05.999") + " " + })), TransactionalID("p"+randsha()), TransactionTimeout(2*time.Minute), MaxBufferedRecords(10000), diff --git a/pkg/kmsg/generated.go b/pkg/kmsg/generated.go index c9c240cd..10987d2f 100644 --- a/pkg/kmsg/generated.go +++ b/pkg/kmsg/generated.go @@ -19800,34 +19800,97 @@ func NewAddPartitionsToTxnRequestTopic() AddPartitionsToTxnRequestTopic { return v } +type AddPartitionsToTxnRequestTransactionTopic struct { + Topic string + + Partitions []int32 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnRequestTransactionTopic. +func (v *AddPartitionsToTxnRequestTransactionTopic) Default() { +} + +// NewAddPartitionsToTxnRequestTransactionTopic returns a default AddPartitionsToTxnRequestTransactionTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnRequestTransactionTopic() AddPartitionsToTxnRequestTransactionTopic { + var v AddPartitionsToTxnRequestTransactionTopic + v.Default() + return v +} + +type AddPartitionsToTxnRequestTransaction struct { + TransactionalID string + + ProducerID int64 + + ProducerEpoch int16 + + // VerifyOnly signifies if we want to check if the partition is in the + // transaction rather than add it. + VerifyOnly bool + + Topics []AddPartitionsToTxnRequestTransactionTopic + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnRequestTransaction. +func (v *AddPartitionsToTxnRequestTransaction) Default() { +} + +// NewAddPartitionsToTxnRequestTransaction returns a default AddPartitionsToTxnRequestTransaction +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnRequestTransaction() AddPartitionsToTxnRequestTransaction { + var v AddPartitionsToTxnRequestTransaction + v.Default() + return v +} + // AddPartitionsToTxnRequest begins the producer side of a transaction for all // partitions in the request. Before producing any records to a partition in // the transaction, that partition must have been added to the transaction with // this request. +// +// Versions 3 and below are exclusively used by clients and versions 4 and +// above are used by brokers. +// +// Version 4 adds VerifyOnly field to check if partitions are already in +// transaction and adds support to batch multiple transactions. type AddPartitionsToTxnRequest struct { // Version is the version of this message used with a Kafka broker. Version int16 // TransactionalID is the transactional ID to use for this request. - TransactionalID string + TransactionalID string // v0-v3 // ProducerID is the producer ID of the client for this transactional ID // as received from InitProducerID. - ProducerID int64 + ProducerID int64 // v0-v3 // ProducerEpoch is the producer epoch of the client for this transactional ID // as received from InitProducerID. - ProducerEpoch int16 + ProducerEpoch int16 // v0-v3 // Topics are topics to add as part of the producer side of a transaction. - Topics []AddPartitionsToTxnRequestTopic + Topics []AddPartitionsToTxnRequestTopic // v0-v3 + + // The list of transactions to add partitions to, for v4+, for brokers only. + // The fields in this are batch broker requests that duplicate the above fields + // and thus are undocumented (except VerifyOnly, which is new). + Transactions []AddPartitionsToTxnRequestTransaction // v4+ // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } func (*AddPartitionsToTxnRequest) Key() int16 { return 24 } -func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 3 } +func (*AddPartitionsToTxnRequest) MaxVersion() int16 { return 4 } func (v *AddPartitionsToTxnRequest) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnRequest) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnRequest) IsFlexible() bool { return v.Version >= 3 } @@ -19852,7 +19915,7 @@ func (v *AddPartitionsToTxnRequest) AppendTo(dst []byte) []byte { _ = version isFlexible := version >= 3 _ = isFlexible - { + if version >= 0 && version <= 3 { v := v.TransactionalID if isFlexible { dst = kbin.AppendCompactString(dst, v) @@ -19860,15 +19923,15 @@ func (v *AddPartitionsToTxnRequest) AppendTo(dst []byte) []byte { dst = kbin.AppendString(dst, v) } } - { + if version >= 0 && version <= 3 { v := v.ProducerID dst = kbin.AppendInt64(dst, v) } - { + if version >= 0 && version <= 3 { v := v.ProducerEpoch dst = kbin.AppendInt16(dst, v) } - { + if version >= 0 && version <= 3 { v := v.Topics if isFlexible { dst = kbin.AppendCompactArrayLen(dst, len(v)) @@ -19903,6 +19966,76 @@ func (v *AddPartitionsToTxnRequest) AppendTo(dst []byte) []byte { } } } + if version >= 4 { + v := v.Transactions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TransactionalID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.ProducerID + dst = kbin.AppendInt64(dst, v) + } + { + v := v.ProducerEpoch + dst = kbin.AppendInt16(dst, v) + } + { + v := v.VerifyOnly + dst = kbin.AppendBool(dst, v) + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := v[i] + dst = kbin.AppendInt32(dst, v) + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } if isFlexible { dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) dst = v.UnknownTags.AppendEach(dst) @@ -19926,7 +20059,7 @@ func (v *AddPartitionsToTxnRequest) readFrom(src []byte, unsafe bool) error { isFlexible := version >= 3 _ = isFlexible s := v - { + if version >= 0 && version <= 3 { var v string if unsafe { if isFlexible { @@ -19943,15 +20076,15 @@ func (v *AddPartitionsToTxnRequest) readFrom(src []byte, unsafe bool) error { } s.TransactionalID = v } - { + if version >= 0 && version <= 3 { v := b.Int64() s.ProducerID = v } - { + if version >= 0 && version <= 3 { v := b.Int16() s.ProducerEpoch = v } - { + if version >= 0 && version <= 3 { v := s.Topics a := v var l int32 @@ -20018,6 +20151,129 @@ func (v *AddPartitionsToTxnRequest) readFrom(src []byte, unsafe bool) error { v = a s.Topics = v } + if version >= 4 { + v := s.Transactions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnRequestTransaction, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.TransactionalID = v + } + { + v := b.Int64() + s.ProducerID = v + } + { + v := b.Int16() + s.ProducerEpoch = v + } + { + v := b.Bool() + s.VerifyOnly = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnRequestTransactionTopic, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]int32, l)...) + } + for i := int32(0); i < l; i++ { + v := b.Int32() + a[i] = v + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Transactions = v + } if isFlexible { s.UnknownTags = internalReadTags(&b) } @@ -20045,6 +20301,73 @@ func NewAddPartitionsToTxnRequest() AddPartitionsToTxnRequest { return v } +type AddPartitionsToTxnResponseTransactionTopicPartition struct { + Partition int32 + + ErrorCode int16 + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnResponseTransactionTopicPartition. +func (v *AddPartitionsToTxnResponseTransactionTopicPartition) Default() { +} + +// NewAddPartitionsToTxnResponseTransactionTopicPartition returns a default AddPartitionsToTxnResponseTransactionTopicPartition +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnResponseTransactionTopicPartition() AddPartitionsToTxnResponseTransactionTopicPartition { + var v AddPartitionsToTxnResponseTransactionTopicPartition + v.Default() + return v +} + +type AddPartitionsToTxnResponseTransactionTopic struct { + Topic string + + Partitions []AddPartitionsToTxnResponseTransactionTopicPartition + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnResponseTransactionTopic. +func (v *AddPartitionsToTxnResponseTransactionTopic) Default() { +} + +// NewAddPartitionsToTxnResponseTransactionTopic returns a default AddPartitionsToTxnResponseTransactionTopic +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnResponseTransactionTopic() AddPartitionsToTxnResponseTransactionTopic { + var v AddPartitionsToTxnResponseTransactionTopic + v.Default() + return v +} + +type AddPartitionsToTxnResponseTransaction struct { + // The transactional id corresponding to the transaction. + TransactionalID string + + Topics []AddPartitionsToTxnResponseTransactionTopic + + // UnknownTags are tags Kafka sent that we do not know the purpose of. + UnknownTags Tags // v3+ +} + +// Default sets any default fields. Calling this allows for future compatibility +// if new fields are added to AddPartitionsToTxnResponseTransaction. +func (v *AddPartitionsToTxnResponseTransaction) Default() { +} + +// NewAddPartitionsToTxnResponseTransaction returns a default AddPartitionsToTxnResponseTransaction +// This is a shortcut for creating a struct and calling Default yourself. +func NewAddPartitionsToTxnResponseTransaction() AddPartitionsToTxnResponseTransaction { + var v AddPartitionsToTxnResponseTransaction + v.Default() + return v +} + type AddPartitionsToTxnResponseTopicPartition struct { // Partition is a partition being responded to. Partition int32 @@ -20137,15 +20460,23 @@ type AddPartitionsToTxnResponse struct { // This request switched at version 1. ThrottleMillis int32 + // The response top level error code. + ErrorCode int16 // v4+ + + // Results categorized by transactional ID, v4+ only, for brokers only. + // The fields duplicate v3 and below fields (except TransactionalID) and + // are left undocumented. + Transactions []AddPartitionsToTxnResponseTransaction // v4+ + // Topics are responses to topics in the request. - Topics []AddPartitionsToTxnResponseTopic + Topics []AddPartitionsToTxnResponseTopic // v0-v3 // UnknownTags are tags Kafka sent that we do not know the purpose of. UnknownTags Tags // v3+ } func (*AddPartitionsToTxnResponse) Key() int16 { return 24 } -func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 3 } +func (*AddPartitionsToTxnResponse) MaxVersion() int16 { return 4 } func (v *AddPartitionsToTxnResponse) SetVersion(version int16) { v.Version = version } func (v *AddPartitionsToTxnResponse) GetVersion() int16 { return v.Version } func (v *AddPartitionsToTxnResponse) IsFlexible() bool { return v.Version >= 3 } @@ -20170,7 +20501,80 @@ func (v *AddPartitionsToTxnResponse) AppendTo(dst []byte) []byte { v := v.ThrottleMillis dst = kbin.AppendInt32(dst, v) } - { + if version >= 4 { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + if version >= 4 { + v := v.Transactions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.TransactionalID + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Topics + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Topic + if isFlexible { + dst = kbin.AppendCompactString(dst, v) + } else { + dst = kbin.AppendString(dst, v) + } + } + { + v := v.Partitions + if isFlexible { + dst = kbin.AppendCompactArrayLen(dst, len(v)) + } else { + dst = kbin.AppendArrayLen(dst, len(v)) + } + for i := range v { + v := &v[i] + { + v := v.Partition + dst = kbin.AppendInt32(dst, v) + } + { + v := v.ErrorCode + dst = kbin.AppendInt16(dst, v) + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if isFlexible { + dst = kbin.AppendUvarint(dst, 0+uint32(v.UnknownTags.Len())) + dst = v.UnknownTags.AppendEach(dst) + } + } + } + if version >= 0 && version <= 3 { v := v.Topics if isFlexible { dst = kbin.AppendCompactArrayLen(dst, len(v)) @@ -20243,7 +20647,134 @@ func (v *AddPartitionsToTxnResponse) readFrom(src []byte, unsafe bool) error { v := b.Int32() s.ThrottleMillis = v } - { + if version >= 4 { + v := b.Int16() + s.ErrorCode = v + } + if version >= 4 { + v := s.Transactions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnResponseTransaction, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.TransactionalID = v + } + { + v := s.Topics + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnResponseTransactionTopic, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + var v string + if unsafe { + if isFlexible { + v = b.UnsafeCompactString() + } else { + v = b.UnsafeString() + } + } else { + if isFlexible { + v = b.CompactString() + } else { + v = b.String() + } + } + s.Topic = v + } + { + v := s.Partitions + a := v + var l int32 + if isFlexible { + l = b.CompactArrayLen() + } else { + l = b.ArrayLen() + } + if !b.Ok() { + return b.Complete() + } + a = a[:0] + if l > 0 { + a = append(a, make([]AddPartitionsToTxnResponseTransactionTopicPartition, l)...) + } + for i := int32(0); i < l; i++ { + v := &a[i] + v.Default() + s := v + { + v := b.Int32() + s.Partition = v + } + { + v := b.Int16() + s.ErrorCode = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Partitions = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Topics = v + } + if isFlexible { + s.UnknownTags = internalReadTags(&b) + } + } + v = a + s.Transactions = v + } + if version >= 0 && version <= 3 { v := s.Topics a := v var l int32 diff --git a/pkg/sasl/aws/aws.go b/pkg/sasl/aws/aws.go index 685e1d7b..89f9cc33 100644 --- a/pkg/sasl/aws/aws.go +++ b/pkg/sasl/aws/aws.go @@ -201,7 +201,7 @@ func task1(host, qps string) []byte { // Finally, we add our empty body. // // HexEncode(Hash(RequestPayload)) - const emptyBody = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" //nolint:gosec // this is a defined constant hash of an empty body + const emptyBody = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" return append(canon, emptyBody...) }