Skip to content

Commit

Permalink
client.EndTransaction: if offsets were added to transaction, ensure w…
Browse files Browse the repository at this point in the history
…e commit

First, this fixes setting offsetsAddedToTxn to true. That was never
done, which inadvertently would have called AddOffsetsToTxn more than we
needed to if we did not only call it in Session.End.

If a client consumes within a transaction but does not produce, when we
end the transaction, before this commit we actually avoided sending an
EndTxn request. This was because no partition was produced to.

As it turns out, because we consumed, we issue AddOffsetsToTxn.
Internally within Kafka, this adds the __consumer_offsets partition
responsible for this group to the transaction itself, and this falls
into AddPartitionsToTxn, which itself is the logic that actually begins
a transaction.

So, if we consume at all, we need to commit so that we properly end the
transaction. If we do not, we leave the transaction open, and
potentially if we are low volume enough, later time out.

Finally, because AddOffsetsToTxn can trigger the start of a transaction,
we also need to handle the CONCURRENT_TRANSACTIONS error that can crop
up if we start a transaction too quickly after finishing the old. This
was already handled in sink.go, so this copies the code straight from
that and changes a few logging statements.
  • Loading branch information
twmb committed Mar 29, 2021
1 parent 938651e commit 2dc7d3f
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -118,9 +119,7 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
s.revokeMu.Unlock()
}()
if err := s.cl.Flush(ctx); err != nil {
// We do not abort here, since any error is the context
// closing.
return false, err
return false, err // we do not abort below, because an error here is ctx closing
}

wantCommit := bool(commit)
Expand Down Expand Up @@ -322,11 +321,24 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)

atomic.StoreUint32(&cl.producer.producingTxn, 0) // forbid any new produces while ending txn

defer func() {
if g, ok := cl.consumer.loadGroup(); ok {
// anyAdded tracks if any partitions were added to this txn, because
// any partitions written to triggers AddPartitionToTxn, which triggers
// the txn to actually begin within Kafka.
//
// If we consumed at all but did not produce, the transaction ending
// issues AddOffsetsToTxn, which internally adds a __consumer_offsets
// partition to the transaction. Thus, if we added offsets, then we
// also produced.
var anyAdded bool
g, ok := cl.consumer.loadGroup()
if ok {
if g.offsetsAddedToTxn {
g.offsetsAddedToTxn = false
anyAdded = true
}
}()
} else {
cl.cfg.logger.Log(LogLevelDebug, "transaction ending, no group loaded; this must be a producer-only transaction, not consume-modify-produce EOS")
}

if !cl.producer.inTxn {
return ErrNotInTransaction
Expand All @@ -335,7 +347,6 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry)

// After the flush, no records are being produced to, and we can set
// addedToTxn to false outside of any mutex.
var anyAdded bool
for _, parts := range cl.loadTopics() {
for _, part := range parts.load().partitions {
if part.records.addedToTxn {
Expand Down Expand Up @@ -472,6 +483,7 @@ func (cl *Client) commitTransactionOffsets(
}
return
}
g.offsetsAddedToTxn = true
}

g.commitTxn(ctx, uncommitted, onDone)
Expand All @@ -487,6 +499,9 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
return err
}

start := time.Now()
tries := 0
start:
cl.cfg.logger.Log(LogLevelInfo, "issuing AddOffsetsToTxn",
"txn", *cl.cfg.txnID,
"producerID", id,
Expand All @@ -502,7 +517,28 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error {
if err != nil {
return err
}
return kerr.ErrorForCode(resp.ErrorCode)
err = kerr.ErrorForCode(resp.ErrorCode)

// Similar to our ConcurrentTransactions retries in sink.go, we may
// commit offsets without producing, and the commit causes the
// transaction to begin. If we commit too quickly, Kafka may not have
// completely finalized the prior transaction. We have to retry.
if err == kerr.ConcurrentTransactions && time.Since(start) < 10*time.Second {
tries++
cl.cfg.logger.Log(LogLevelInfo, "AddOffsetsToTxn failed with CONCURRENT_TRANSACTIONS, which may be because we ended a txn and began producing in a new txn too quickly; backing off and retrying",
"backoff", 100*time.Millisecond,
"since_request_tries_start", time.Since(start),
"tries", tries,
)
select {
case <-time.After(100 * time.Millisecond):
case <-cl.ctx.Done():
cl.cfg.logger.Log(LogLevelError, "abandoning AddOffsetsToTxn retry due to client ctx quitting")
return err
}
goto start
}
return err
}

// commitTxn is ALMOST EXACTLY THE SAME as commit, but changed for txn types
Expand Down

0 comments on commit 2dc7d3f

Please sign in to comment.