From 0fd1959d691ecfaa9c2b12687ff50daa9fe12c3a Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Thu, 10 Oct 2024 19:53:26 -0600 Subject: [PATCH] kgo: support Kafka 3.8's kip-890 modifications STILL NOT ALL OF KIP-890, despite what I originally coded. Kafka 3.8 only added support for TransactionAbortable. Producers still need to send AddPartitionsToTxn. --- pkg/kerr/kerr.go | 20 ++++++++++++++++++++ pkg/kgo/sink.go | 11 ++++++++--- pkg/kgo/txn.go | 15 +++++++++++---- pkg/kversion/kversion.go | 18 +++++++++++++++++- 4 files changed, 56 insertions(+), 8 deletions(-) diff --git a/pkg/kerr/kerr.go b/pkg/kerr/kerr.go index 731a23a1..1f408783 100644 --- a/pkg/kerr/kerr.go +++ b/pkg/kerr/kerr.go @@ -190,6 +190,21 @@ var ( MismatchedEndpointType = &Error{"MISMATCHED_ENDPOINT_TYPE", 114, false, "The request was sent to an endpoint of the wrong type."} UnsupportedEndpointType = &Error{"UNSUPPORTED_ENDPOINT_TYPE", 115, false, "This endpoint type is not supported yet."} UnknownControllerID = &Error{"UNKNOWN_CONTROLLER_ID", 116, false, "This controller ID is not known"} + + // UnknownSubscriptionID = &Error{"UNKNOWN_SUBSCRIPTION_ID", 117, false, "Client sent a push telemetry request with an invalid or outdated subscription ID."} + // TelemetryTooLarge = &Error{"TELEMETRY_TOO_LARGE", 118, false, "Client sent a push telemetry request larger than the maximum size the broker will accept."} + // InvalidRegistration = &Error{"INVALID_REGISTRATION", 119, false, "The controller has considered the broker registration to be invalid."} + + TransactionAbortable = &Error{"TRANSACTION_ABORTABLE", 120, false, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID."} + + // InvalidRecordState = &Error{"INVALID_RECORD_STATE", 121, false, "The record state is invalid. The acknowledgement of delivery could not be completed."} + // ShareSessionNowFound = &Error{"SHARE_SESSION_NOT_FOUND", 122, false, "The share session was not found."} + // InvalidShareSessionEpoch = &Error{"INVALID_SHARE_SESSION_EPOCH", 123, false, "The share session epoch is invalid."} + // FencedStateEpoch = &Error{"FENCED_STATE_EPOCH", 124, false, "The share coordinator rejected the request because the share-group state epoch did not match."} + // InvalidVoterKey = &Error{"INVALID_VOTER_KEY", 125, false, "The voter key doesn't match the receiving replica's key."} + // DuplicateVoter = &Error{"DUPLICATE_VOTER", 126, false, "The voter is already part of the set of voters."} + // VoterNotFound = &Error{"VOTER_NOT_FOUND", 127, false, "The voter is not part of the set of voters."} + // InvalidRegularExpression = &Error{"INVALID_REGULAR_EXPRESSION", 128, false, "The regular expression is not valid."} ) var code2err = map[int16]error{ @@ -312,4 +327,9 @@ var code2err = map[int16]error{ 115: UnsupportedEndpointType, // "" 116: UnknownControllerID, // "" + // 117: UnknownSubscriptionID, // KIP-714 f1819f448 KAFKA-15778 & KAFKA-15779 + // 118: TelemetryTooLarge, // "" + // 119: InvalidRegistration, // KIP-858 f467f6bb4 KAFKA-15361 + + 120: TransactionAbortable, // KIP-890 2e8d69b78 KAFKA-16314 } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6d0f3dfe..fa66e304 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -406,6 +406,7 @@ func (s *sink) produce(sem <-chan struct{}) bool { if txnReq != nil { // txnReq can fail from: + // - TransactionAbortable // - retry failure // - auth failure // - producer id mapping / epoch errors @@ -417,6 +418,10 @@ func (s *sink) produce(sem <-chan struct{}) bool { batchesStripped, err := s.doTxnReq(req, txnReq) if err != nil { switch { + case errors.Is(err, kerr.TransactionAbortable): + // If we get TransactionAbortable, we continue into producing. + // The produce will fail with the same error, and this is the + // only way to notify the user to abort the txn. case isRetryableBrokerErr(err) || isDialNonTimeoutErr(err): s.cl.bumpRepeatedLoadErr(err) s.cl.cfg.logger.Log(LogLevelWarn, "unable to AddPartitionsToTxn due to retryable broker err, bumping client's buffered record load errors by 1 and retrying", "err", err) @@ -431,8 +436,8 @@ func (s *sink) produce(sem <-chan struct{}) bool { // with produce request vs. end txn (KAFKA-12671) s.cl.failProducerID(id, epoch, err) s.cl.cfg.logger.Log(LogLevelError, "fatal AddPartitionsToTxn error, failing all buffered records (it is possible the client can recover after EndTransaction)", "broker", logID(s.nodeID), "err", err) + return false } - return false } // If we stripped everything, ensure we backoff to force a @@ -563,7 +568,7 @@ func (s *sink) issueTxnReq( continue } for _, partition := range topic.Partitions { - if err := kerr.ErrorForCode(partition.ErrorCode); err != nil { + if err := kerr.ErrorForCode(partition.ErrorCode); err != nil && err != kerr.TransactionAbortable { // see below for txn abortable // OperationNotAttempted is set for all partitions that are authorized // if any partition is unauthorized _or_ does not exist. We simply remove // unattempted partitions and treat them as retryable. @@ -2057,7 +2062,7 @@ func (b *recBatch) tryBuffer(pr promisedRec, produceVersion, maxBatchBytes int32 ////////////// func (*produceRequest) Key() int16 { return 0 } -func (*produceRequest) MaxVersion() int16 { return 10 } +func (*produceRequest) MaxVersion() int16 { return 11 } func (p *produceRequest) SetVersion(v int16) { p.version = v } func (p *produceRequest) GetVersion() int16 { return p.version } func (p *produceRequest) IsFlexible() bool { return p.version >= 9 } diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 25cfd443..68cba7cd 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -281,7 +281,8 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry errors.Is(err, kerr.CoordinatorLoadInProgress), errors.Is(err, kerr.NotCoordinator), errors.Is(err, kerr.ConcurrentTransactions), - errors.Is(err, kerr.UnknownServerError): + errors.Is(err, kerr.UnknownServerError), + errors.Is(err, kerr.TransactionAbortable): return true } return false @@ -408,6 +409,11 @@ retry: willTryCommit = false goto retry + case errors.Is(endTxnErr, kerr.TransactionAbortable): + s.cl.cfg.logger.Log(LogLevelInfo, "end transaction returned TransactionAbortable; retrying as abort") + willTryCommit = false + goto retry + case errors.Is(endTxnErr, kerr.UnknownServerError): s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying") after := time.NewTimer(s.cl.cfg.retryBackoff(tries)) @@ -517,7 +523,7 @@ const ( // Deprecated: Kafka 3.6 removed support for the hacky behavior that // this option was abusing. Thus, as of Kafka 3.6, this option does not // work against Kafka. This option also has never worked for Redpanda - // becuse Redpanda always strictly validated that partitions were a + // because Redpanda always strictly validated that partitions were a // part of a transaction. Later versions of Kafka and Redpanda will // remove the need for AddPartitionsToTxn at all and thus this option // ultimately will be unnecessary anyway. @@ -820,8 +826,9 @@ func (cl *Client) UnsafeAbortBufferedRecords() { // // If the producer ID has an error and you are trying to commit, this will // return with kerr.OperationNotAttempted. If this happened, retry -// EndTransaction with TryAbort. Not other error is retryable, and you should -// not retry with TryAbort. +// EndTransaction with TryAbort. If this returns kerr.TransactionAbortable, you +// can retry with TryAbort. No other error is retryable, and you should not +// retry with TryAbort. // // If records failed with UnknownProducerID and your Kafka version is at least // 2.5, then aborting here will potentially allow the client to recover for diff --git a/pkg/kversion/kversion.go b/pkg/kversion/kversion.go index 3081c346..11a06c8c 100644 --- a/pkg/kversion/kversion.go +++ b/pkg/kversion/kversion.go @@ -67,6 +67,7 @@ var versions = []struct { {"v3.5", V3_5_0()}, {"v3.6", V3_6_0()}, {"v3.7", V3_7_0()}, + {"v3.8", V3_8_0()}, } // VersionStrings returns all recognized versions, minus any patch, that can be @@ -520,6 +521,7 @@ func V3_4_0() *Versions { return zkBrokerOf(max340) } func V3_5_0() *Versions { return zkBrokerOf(max350) } func V3_6_0() *Versions { return zkBrokerOf(max360) } func V3_7_0() *Versions { return zkBrokerOf(max370) } +func V3_8_0() *Versions { return zkBrokerOf(max380) } func zkBrokerOf(lks listenerKeys) *Versions { return &Versions{lks.filter(zkBroker)} @@ -1158,8 +1160,22 @@ var max370 = nextMax(max360, func(v listenerKeys) listenerKeys { return v }) +var max380 = nextMax(max370, func(v listenerKeys) listenerKeys { + // KAFKA-16314 2e8d69b78ca52196decd851c8520798aa856c073 KIP-890 + // Then error rename in cf1ba099c0723f9cf65dda4cd334d36b7ede6327 + v[0].inc() // 11 produce + v[10].inc() // 5 find coordinator + v[22].inc() // 5 init producer id + v[24].inc() // 5 add partitions to txn + v[25].inc() // 4 add offsets to txn + v[26].inc() // 4 end txn + v[28].inc() // 4 txn offset commit + + return v +}) + var ( - maxStable = max370 + maxStable = max380 maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys { return v })