Skip to content

Commit

Permalink
txns: make even safer (& drop default txn timeout to 40s)
Browse files Browse the repository at this point in the history
See embedded comments; by ensuring our txn timeout is less than the
session timeout and the rebalance timeout, then if we force a heartbeat
before we send EndTxn, we can be prettyyyyy sure that our transaction
will either complete successfully before the next heartbeat, or the
transaction will time out. This helps avoid the KIP-447 situation for
pre-2.5.0 clusters.
  • Loading branch information
twmb committed May 12, 2021
1 parent 076bb39 commit 939cba2
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 31 deletions.
12 changes: 7 additions & 5 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func defaultCfg() cfg {
metadataMaxAge: 5 * time.Minute,
metadataMinAge: 10 * time.Second,

txnTimeout: 60 * time.Second,
txnTimeout: 40 * time.Second,
acks: AllISRAcks(),
compression: []CompressionCodec{SnappyCompression(), NoCompression()},
maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012
Expand Down Expand Up @@ -800,10 +800,12 @@ func TransactionalID(id string) ProducerOpt {
}

// TransactionTimeout sets the allowed for a transaction, overriding the
// default 60s. It may be a good idea to set this under the rebalance timeout
// for a group, so that a produce will not complete successfully after the
// consumer group has already moved the partitions the consumer/producer is
// working on from one group member to another.
// default 40s. It is a good idea to keep this less than a group's session
// timeout, so that a group member will always be alive for the duration of a
// transaction even if connectivity dies. This helps prevent a transaction
// finishing after a rebalance, which is problematic pre-Kafka 2.5.0. If you
// are on Kafka 2.5.0+, then you can use the RequireStableFetchOffsets option
// when assigning the group, and you can set this to whatever you would like.
//
// Transaction timeouts begin when the first record is produced within a
// transaction, not when a transaction begins.
Expand Down
33 changes: 28 additions & 5 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,16 @@ func Balancers(balancers ...GroupBalancer) GroupOpt {
// timeout, the broker will remove the member from the group and initiate a
// rebalance.
//
// This corresponds to Kafka's session.timeout.ms setting and must be within
// the broker's group.min.session.timeout.ms and group.max.session.timeout.ms.
// If you are using a GroupTransactSession for EOS, wish to lower this, and are
// talking to a Kafka cluster pre 2.5.0, consider lowering the
// TransactionTimeout. If you do not, you risk a transaction finishing after a
// group has rebalanced, which could lead to duplicate processing. If you are
// talking to a Kafka 2.5.0+ cluster, you can safely use the
// RequireStableFetchOffsets group option and prevent any problems.
//
// This option corresponds to Kafka's session.timeout.ms setting and must be
// within the broker's group.min.session.timeout.ms and
// group.max.session.timeout.ms.
func SessionTimeout(timeout time.Duration) GroupOpt {
return groupOpt{func(cfg *groupConsumer) { cfg.sessionTimeout = timeout }}
}
Expand Down Expand Up @@ -283,6 +291,14 @@ type groupConsumer struct {

rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex)

// For EOS, before we commit, we force a heartbeat. If the client and
// group member are both configured properly, then the transactional
// timeout will be less than the session timeout. By forcing a
// heartbeat before the commit, if the heartbeat was successful, then
// we ensure that we will complete the transaction within the group
// session, meaning we will not commit after the group has rebalanced.
heartbeatForceCh chan func(error)

// The following two are only updated in the manager / join&sync loop
lastAssigned map[string][]int32 // only updated in join&sync loop
nowAssigned map[string][]int32 // only updated in join&sync loop
Expand Down Expand Up @@ -402,9 +418,10 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) {

tps: newTopicsPartitions(),

using: make(map[string]int),
rejoinCh: make(chan struct{}, 1),
reSeen: make(map[string]struct{}),
using: make(map[string]int),
rejoinCh: make(chan struct{}, 1),
heartbeatForceCh: make(chan func(error)),
reSeen: make(map[string]struct{}),

sessionTimeout: 45000 * time.Millisecond,
rebalanceTimeout: 60000 * time.Millisecond,
Expand Down Expand Up @@ -919,12 +936,15 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio

for {
var err error
var force func(error)
heartbeat = false
select {
case <-cooperativeFastCheck:
heartbeat = true
case <-ticker.C:
heartbeat = true
case force = <-g.heartbeatForceCh:
heartbeat = true
case <-g.rejoinCh:
// If a metadata update changes our subscription,
// we just pretend we are rebalancing.
Expand Down Expand Up @@ -959,6 +979,9 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio
err = kerr.ErrorForCode(resp.ErrorCode)
}
g.cl.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "err", err)
if force != nil {
force(err)
}
}

// The first error either triggers a clean revoke and metadata
Expand Down
108 changes: 87 additions & 21 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ type GroupTransactSession struct {

cooperative bool

revokeMu sync.Mutex
revoked bool
failMu sync.Mutex

revoked bool
revokedCh chan struct{} // closed once when revoked is set; reset after End
lost bool
lostCh chan struct{} // closed once when lost is set; reset after End
}

// AssignGroupTransactSession is exactly the same as AssignGroup, but wraps the
Expand Down Expand Up @@ -76,20 +80,44 @@ func (cl *Client) AssignGroupTransactSession(group string, opts ...GroupOpt) *Gr

userRevoked := g.onRevoked
g.onRevoked = func(ctx context.Context, rev map[string][]int32) {
s.revokeMu.Lock()
defer s.revokeMu.Unlock()
s.failMu.Lock()
defer s.failMu.Unlock()
if s.revoked {
return
}

if s.cooperative && len(rev) == 0 && !s.revoked {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit")
} else {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")
s.revoked = true
close(s.revokedCh)
}

if userRevoked != nil {
userRevoked(ctx, rev)
}
}

userLost := g.onLost
g.onLost = func(ctx context.Context, lost map[string][]int32) {
s.failMu.Lock()
defer s.failMu.Unlock()
if s.lost {
return
}

cl.cfg.logger.Log(LogLevelInfo, "transact session in on_lost; aborting next commit if we are currently in a transaction")
s.lost = true
close(s.lostCh)

if userLost != nil {
userLost(ctx, lost)
} else if userRevoked != nil {
userRevoked(ctx, lost)
}
}

return s
}

Expand Down Expand Up @@ -139,6 +167,10 @@ func (s *GroupTransactSession) Begin() error {
return s.cl.BeginTransaction()
}

func (s *GroupTransactSession) failed() bool {
return s.revoked || s.lost
}

// End ends a transaction, committing if commit is true, if the group did not
// rebalance since the transaction began, and if committing offsets is
// successful. If commit is false, the group has rebalanced, or any partition
Expand All @@ -156,9 +188,12 @@ func (s *GroupTransactSession) Begin() error {
// recommended to not cancel it.
func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (bool, error) {
defer func() {
s.revokeMu.Lock()
s.failMu.Lock()
s.revoked = false
s.revokeMu.Unlock()
s.revokedCh = make(chan struct{})
s.lost = false
s.lostCh = make(chan struct{})
s.failMu.Unlock()
}()

switch commit {
Expand All @@ -174,20 +209,22 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry

wantCommit := bool(commit)

s.revokeMu.Lock()
revoked := s.revoked
s.failMu.Lock()
failed := s.failed()

precommit := s.cl.CommittedOffsets()
postcommit := s.cl.UncommittedOffsets()
s.revokeMu.Unlock()
s.failMu.Unlock()

var oldGeneration bool
var commitErr error
if wantCommit && !revoked {
var g *groupConsumer

if wantCommit && !failed {
var commitErrs []string

committed := make(chan struct{})
s.cl.commitTransactionOffsets(context.Background(), postcommit,
g = s.cl.commitTransactionOffsets(context.Background(), postcommit,
func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
defer close(committed)
if err != nil {
Expand Down Expand Up @@ -215,14 +252,42 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
}
}

s.revokeMu.Lock()
defer s.revokeMu.Unlock()
// Now that we have committed our offsets, before we allow them to be
// used, we force a heartbeat. By forcing a heartbeat, if there is no
// error, then we know we have up to RebalanceTimeout to write our
// EndTxnRequest without a problem.
//
// We should not be booted from the group if we receive an ok
// heartbeat, meaning that, as mentioned, we should be able to end the
// transaction safely.
var okHeartbeat bool
if g != nil && commitErr == nil {
waitHeartbeat := make(chan struct{})
var heartbeatErr error
select {
case g.heartbeatForceCh <- func(err error) {
defer close(waitHeartbeat)
heartbeatErr = err
}:
select {
case <-waitHeartbeat:
okHeartbeat = heartbeatErr == nil
case <-s.revokedCh:
case <-s.lostCh:
}
case <-s.revokedCh:
case <-s.lostCh:
}
}

s.failMu.Lock()
defer s.failMu.Unlock()

tryCommit := !s.revoked && commitErr == nil && !oldGeneration
tryCommit := !s.failed() && commitErr == nil && !oldGeneration && okHeartbeat
willTryCommit := wantCommit && tryCommit

s.cl.cfg.logger.Log(LogLevelInfo, "transaction session ending",
"was_revoked", s.revoked,
"was_failed", s.failed(),
"want_commit", wantCommit,
"can_try_commit", tryCommit,
"will_try_commit", willTryCommit,
Expand Down Expand Up @@ -547,13 +612,13 @@ func (cl *Client) commitTransactionOffsets(
ctx context.Context,
uncommitted map[string]map[int32]EpochOffset,
onDone func(*kmsg.TxnOffsetCommitRequest, *kmsg.TxnOffsetCommitResponse, error),
) {
) *groupConsumer {
cl.cfg.logger.Log(LogLevelDebug, "in commitTransactionOffsets", "with", uncommitted)
defer cl.cfg.logger.Log(LogLevelDebug, "left commitTransactionOffsets")

if cl.cfg.txnID == nil {
onDone(nil, nil, errNotTransactional)
return
return nil
}

// Before committing, ensure we are at least in a transaction. We
Expand All @@ -563,18 +628,18 @@ func (cl *Client) commitTransactionOffsets(
if !cl.producer.inTxn {
onDone(nil, nil, errNotInTransaction)
cl.producer.txnMu.Unlock()
return
return nil
}
cl.producer.txnMu.Unlock()

g, ok := cl.consumer.loadGroup()
if !ok {
onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), errNotGroup)
return
return nil
}
if len(uncommitted) == 0 {
onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), nil)
return
return g
}

g.mu.Lock()
Expand All @@ -585,12 +650,13 @@ func (cl *Client) commitTransactionOffsets(
if onDone != nil {
onDone(nil, nil, err)
}
return
return g
}
g.offsetsAddedToTxn = true
}

g.commitTxn(ctx, uncommitted, onDone)
return g
}

// Ties a transactional producer to a group. Since this requires a producer ID,
Expand Down

0 comments on commit 939cba2

Please sign in to comment.