Skip to content

Commit

Permalink
txn: detect a fatal txnal client when beginning transactions
Browse files Browse the repository at this point in the history
Previously, may have tried to recover the client when ending a
transaction. Now, we also try to recover when beginning (which can help
clear out errors from EndTxn itself), and we return an error from Begin
if recovery is impossible.
  • Loading branch information
twmb committed Jun 2, 2021
1 parent a9691bd commit afa1209
Showing 1 changed file with 62 additions and 34 deletions.
96 changes: 62 additions & 34 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,20 +343,32 @@ retryUnattempted:
}

// BeginTransaction sets the client to a transactional state, erroring if there
// is no transactional ID or if the client is already in a transaction.
// is no transactional ID, or if the producer is currently in a fatal
// (unrecoverable) state, or if the client is already in a transaction.
//
// This must not be called concurrently with other client functions.
func (cl *Client) BeginTransaction() error {
if cl.cfg.txnID == nil {
return errNotTransactional
}

cl.producer.txnMu.Lock()
defer cl.producer.txnMu.Unlock()

if cl.producer.inTxn {
return errors.New("invalid attempt to begin a transaction while already in a transaction")
}

needRecover, didRecover, err := cl.maybeRecoverProducerID()
if needRecover && !didRecover {
cl.cfg.logger.Log(LogLevelInfo, "unable to begin transaction due to unrecoverable producer id error", "err", err)
return fmt.Errorf("producer ID has a fatal, unrecoverable error, err: %v", err)
}

cl.producer.inTxn = true
atomic.StoreUint32(&cl.producer.producingTxn, 1) // allow produces for txns now
cl.cfg.logger.Log(LogLevelInfo, "beginning transaction", "transactional_id", *cl.cfg.txnID)

return nil
}

Expand Down Expand Up @@ -471,38 +483,13 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
return kerr.OperationNotAttempted
}

switch err.(type) {
case *kerr.Error:
kip360 := cl.producer.idVersion >= 3 && (err == kerr.UnknownProducerID || err == kerr.InvalidProducerIDMapping)
kip588 := cl.producer.idVersion >= 4 && (err == kerr.InvalidProducerEpoch || false /* TODO err == kerr.TransactionTimedOut */)

recoverable := kip360 || kip588
if !recoverable {
return err // fatal, unrecoverable
}

// At this point, nothing is being produced and the
// producer ID loads with an error. Before we allow
// production to continue, we reset all sequence
// numbers. Storing errReloadProducerID will reset the
// id / epoch appropriately and everything will work as
// per KIP-360.
cl.resetAllProducerSequences()

// With UnknownProducerID and v3 init id, we can recover.
// No sense issuing an abort request, though.
cl.producer.id.Store(&producerID{
id: id,
epoch: epoch,
err: errReloadProducerID,
})
// If we recovered the producer ID, we return early, since
// there is no reason to issue an abort now that the id is
// different. Otherwise, we issue our EndTxn which will likely
// fail, but that is ok, we will just return error.
_, didRecover, _ := cl.maybeRecoverProducerID()
if didRecover {
return nil

default:
// If this is not a kerr.Error, then it was some arbitrary
// client error. We can try the EndTxnRequest in the hopes
// that our id / epoch is valid, but the id / epoch may
// have never loaded (and thus will be -1 / -1).
}
}

Expand All @@ -529,13 +516,54 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)
// If the returned error is still a Kafka error, this is fatal and we
// need to fail our producer ID we loaded above.
var ke *kerr.Error
if err != nil && errors.As(err, &ke) && !ke.Retriable {
if errors.As(err, &ke) && !ke.Retriable {
cl.failProducerID(id, epoch, err)
}

return err
}

// This returns if it is necessary to recover the producer ID (it has an
// error), whether it is possible to recover, and, if not, the error.
//
// We call this when beginning a transaction or when ending with an abort.
func (cl *Client) maybeRecoverProducerID() (necessary, did bool, err error) {
id, epoch, err := cl.producerID()
if err == nil {
return false, false, nil
}

ke, ok := err.(*kerr.Error)
if !ok {
return true, false, err
}

kip360 := cl.producer.idVersion >= 3 && (ke == kerr.UnknownProducerID || ke == kerr.InvalidProducerIDMapping)
kip588 := cl.producer.idVersion >= 4 && (ke == kerr.InvalidProducerEpoch || false /* TODO err == kerr.TransactionTimedOut */)

recoverable := kip360 || kip588
if !recoverable {
return true, false, err // fatal, unrecoverable
}

// At this point, the producer ID loads with error; anything being
// concurrently produced (which should be nothing, per calling this at
// the beginning or end of a transaction) will load producer IDs with
// an error.
//
// Before we allow production to continue, we reset all sequence
// numbers. Storing errReloadProducerID will reset the id / epoch
// appropriately and everything will work as per KIP-360.
cl.resetAllProducerSequences()

cl.producer.id.Store(&producerID{
id: id,
epoch: epoch,
err: errReloadProducerID,
})
return true, true, nil
}

// If a transaction is begun too quickly after finishing an old transaction,
// Kafka may still be finalizing its commit / abort and will return a
// concurrent transactions error. We handle that by retrying for a bit.
Expand Down Expand Up @@ -658,7 +686,7 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
// If the returned error is still a Kafka error, this is fatal and we
// need to fail our producer ID we created just above.
var ke *kerr.Error
if err != nil && errors.As(err, &ke) && !ke.Retriable {
if errors.As(err, &ke) && !ke.Retriable {
cl.failProducerID(id, epoch, err)
}

Expand Down

0 comments on commit afa1209

Please sign in to comment.