diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 7abd727c..a5bab259 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -343,7 +343,10 @@ retryUnattempted: } // BeginTransaction sets the client to a transactional state, erroring if there -// is no transactional ID or if the client is already in a transaction. +// is no transactional ID, or if the producer is currently in a fatal +// (unrecoverable) state, or if the client is already in a transaction. +// +// This must not be called concurrently with other client functions. func (cl *Client) BeginTransaction() error { if cl.cfg.txnID == nil { return errNotTransactional @@ -351,12 +354,21 @@ func (cl *Client) BeginTransaction() error { cl.producer.txnMu.Lock() defer cl.producer.txnMu.Unlock() + if cl.producer.inTxn { return errors.New("invalid attempt to begin a transaction while already in a transaction") } + + needRecover, didRecover, err := cl.maybeRecoverProducerID() + if needRecover && !didRecover { + cl.cfg.logger.Log(LogLevelInfo, "unable to begin transaction due to unrecoverable producer id error", "err", err) + return fmt.Errorf("producer ID has a fatal, unrecoverable error, err: %v", err) + } + cl.producer.inTxn = true atomic.StoreUint32(&cl.producer.producingTxn, 1) // allow produces for txns now cl.cfg.logger.Log(LogLevelInfo, "beginning transaction", "transactional_id", *cl.cfg.txnID) + return nil } @@ -471,38 +483,13 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) return kerr.OperationNotAttempted } - switch err.(type) { - case *kerr.Error: - kip360 := cl.producer.idVersion >= 3 && (err == kerr.UnknownProducerID || err == kerr.InvalidProducerIDMapping) - kip588 := cl.producer.idVersion >= 4 && (err == kerr.InvalidProducerEpoch || false /* TODO err == kerr.TransactionTimedOut */) - - recoverable := kip360 || kip588 - if !recoverable { - return err // fatal, unrecoverable - } - - // At this point, nothing is being produced and the - // producer ID loads with an error. Before we allow - // production to continue, we reset all sequence - // numbers. Storing errReloadProducerID will reset the - // id / epoch appropriately and everything will work as - // per KIP-360. - cl.resetAllProducerSequences() - - // With UnknownProducerID and v3 init id, we can recover. - // No sense issuing an abort request, though. - cl.producer.id.Store(&producerID{ - id: id, - epoch: epoch, - err: errReloadProducerID, - }) + // If we recovered the producer ID, we return early, since + // there is no reason to issue an abort now that the id is + // different. Otherwise, we issue our EndTxn which will likely + // fail, but that is ok, we will just return error. + _, didRecover, _ := cl.maybeRecoverProducerID() + if didRecover { return nil - - default: - // If this is not a kerr.Error, then it was some arbitrary - // client error. We can try the EndTxnRequest in the hopes - // that our id / epoch is valid, but the id / epoch may - // have never loaded (and thus will be -1 / -1). } } @@ -529,13 +516,54 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) // If the returned error is still a Kafka error, this is fatal and we // need to fail our producer ID we loaded above. var ke *kerr.Error - if err != nil && errors.As(err, &ke) && !ke.Retriable { + if errors.As(err, &ke) && !ke.Retriable { cl.failProducerID(id, epoch, err) } return err } +// This returns if it is necessary to recover the producer ID (it has an +// error), whether it is possible to recover, and, if not, the error. +// +// We call this when beginning a transaction or when ending with an abort. +func (cl *Client) maybeRecoverProducerID() (necessary, did bool, err error) { + id, epoch, err := cl.producerID() + if err == nil { + return false, false, nil + } + + ke, ok := err.(*kerr.Error) + if !ok { + return true, false, err + } + + kip360 := cl.producer.idVersion >= 3 && (ke == kerr.UnknownProducerID || ke == kerr.InvalidProducerIDMapping) + kip588 := cl.producer.idVersion >= 4 && (ke == kerr.InvalidProducerEpoch || false /* TODO err == kerr.TransactionTimedOut */) + + recoverable := kip360 || kip588 + if !recoverable { + return true, false, err // fatal, unrecoverable + } + + // At this point, the producer ID loads with error; anything being + // concurrently produced (which should be nothing, per calling this at + // the beginning or end of a transaction) will load producer IDs with + // an error. + // + // Before we allow production to continue, we reset all sequence + // numbers. Storing errReloadProducerID will reset the id / epoch + // appropriately and everything will work as per KIP-360. + cl.resetAllProducerSequences() + + cl.producer.id.Store(&producerID{ + id: id, + epoch: epoch, + err: errReloadProducerID, + }) + return true, true, nil +} + // If a transaction is begun too quickly after finishing an old transaction, // Kafka may still be finalizing its commit / abort and will return a // concurrent transactions error. We handle that by retrying for a bit. @@ -658,7 +686,7 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error { // If the returned error is still a Kafka error, this is fatal and we // need to fail our producer ID we created just above. var ke *kerr.Error - if err != nil && errors.As(err, &ke) && !ke.Retriable { + if errors.As(err, &ke) && !ke.Retriable { cl.failProducerID(id, epoch, err) }