diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 27830562..ffd879fb 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -302,7 +302,7 @@ func defaultCfg() cfg { metadataMaxAge: 5 * time.Minute, metadataMinAge: 10 * time.Second, - txnTimeout: 60 * time.Second, + txnTimeout: 40 * time.Second, acks: AllISRAcks(), compression: []CompressionCodec{SnappyCompression(), NoCompression()}, maxRecordBatchBytes: 1000000, // Kafka max.message.bytes default is 1000012 @@ -800,10 +800,12 @@ func TransactionalID(id string) ProducerOpt { } // TransactionTimeout sets the allowed for a transaction, overriding the -// default 60s. It may be a good idea to set this under the rebalance timeout -// for a group, so that a produce will not complete successfully after the -// consumer group has already moved the partitions the consumer/producer is -// working on from one group member to another. +// default 40s. It is a good idea to keep this less than a group's session +// timeout, so that a group member will always be alive for the duration of a +// transaction even if connectivity dies. This helps prevent a transaction +// finishing after a rebalance, which is problematic pre-Kafka 2.5.0. If you +// are on Kafka 2.5.0+, then you can use the RequireStableFetchOffsets option +// when assigning the group, and you can set this to whatever you would like. // // Transaction timeouts begin when the first record is produced within a // transaction, not when a transaction begins. diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 1573894c..d61770a6 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -62,8 +62,16 @@ func Balancers(balancers ...GroupBalancer) GroupOpt { // timeout, the broker will remove the member from the group and initiate a // rebalance. // -// This corresponds to Kafka's session.timeout.ms setting and must be within -// the broker's group.min.session.timeout.ms and group.max.session.timeout.ms. +// If you are using a GroupTransactSession for EOS, wish to lower this, and are +// talking to a Kafka cluster pre 2.5.0, consider lowering the +// TransactionTimeout. If you do not, you risk a transaction finishing after a +// group has rebalanced, which could lead to duplicate processing. If you are +// talking to a Kafka 2.5.0+ cluster, you can safely use the +// RequireStableFetchOffsets group option and prevent any problems. +// +// This option corresponds to Kafka's session.timeout.ms setting and must be +// within the broker's group.min.session.timeout.ms and +// group.max.session.timeout.ms. func SessionTimeout(timeout time.Duration) GroupOpt { return groupOpt{func(cfg *groupConsumer) { cfg.sessionTimeout = timeout }} } @@ -283,6 +291,14 @@ type groupConsumer struct { rejoinCh chan struct{} // cap 1; sent to if subscription changes (regex) + // For EOS, before we commit, we force a heartbeat. If the client and + // group member are both configured properly, then the transactional + // timeout will be less than the session timeout. By forcing a + // heartbeat before the commit, if the heartbeat was successful, then + // we ensure that we will complete the transaction within the group + // session, meaning we will not commit after the group has rebalanced. + heartbeatForceCh chan func(error) + // The following two are only updated in the manager / join&sync loop lastAssigned map[string][]int32 // only updated in join&sync loop nowAssigned map[string][]int32 // only updated in join&sync loop @@ -402,9 +418,10 @@ func (cl *Client) AssignGroup(group string, opts ...GroupOpt) { tps: newTopicsPartitions(), - using: make(map[string]int), - rejoinCh: make(chan struct{}, 1), - reSeen: make(map[string]struct{}), + using: make(map[string]int), + rejoinCh: make(chan struct{}, 1), + heartbeatForceCh: make(chan func(error)), + reSeen: make(map[string]struct{}), sessionTimeout: 45000 * time.Millisecond, rebalanceTimeout: 60000 * time.Millisecond, @@ -919,12 +936,15 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio for { var err error + var force func(error) heartbeat = false select { case <-cooperativeFastCheck: heartbeat = true case <-ticker.C: heartbeat = true + case force = <-g.heartbeatForceCh: + heartbeat = true case <-g.rejoinCh: // If a metadata update changes our subscription, // we just pretend we are rebalancing. @@ -959,6 +979,9 @@ func (g *groupConsumer) heartbeat(fetchErrCh <-chan error, s *assignRevokeSessio err = kerr.ErrorForCode(resp.ErrorCode) } g.cl.cfg.logger.Log(LogLevelDebug, "heartbeat complete", "err", err) + if force != nil { + force(err) + } } // The first error either triggers a clean revoke and metadata diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 936367fa..f19c4c6c 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -32,8 +32,12 @@ type GroupTransactSession struct { cooperative bool - revokeMu sync.Mutex - revoked bool + failMu sync.Mutex + + revoked bool + revokedCh chan struct{} // closed once when revoked is set; reset after End + lost bool + lostCh chan struct{} // closed once when lost is set; reset after End } // AssignGroupTransactSession is exactly the same as AssignGroup, but wraps the @@ -76,13 +80,18 @@ func (cl *Client) AssignGroupTransactSession(group string, opts ...GroupOpt) *Gr userRevoked := g.onRevoked g.onRevoked = func(ctx context.Context, rev map[string][]int32) { - s.revokeMu.Lock() - defer s.revokeMu.Unlock() + s.failMu.Lock() + defer s.failMu.Unlock() + if s.revoked { + return + } + if s.cooperative && len(rev) == 0 && !s.revoked { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit") } else { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction") s.revoked = true + close(s.revokedCh) } if userRevoked != nil { @@ -90,6 +99,25 @@ func (cl *Client) AssignGroupTransactSession(group string, opts ...GroupOpt) *Gr } } + userLost := g.onLost + g.onLost = func(ctx context.Context, lost map[string][]int32) { + s.failMu.Lock() + defer s.failMu.Unlock() + if s.lost { + return + } + + cl.cfg.logger.Log(LogLevelInfo, "transact session in on_lost; aborting next commit if we are currently in a transaction") + s.lost = true + close(s.lostCh) + + if userLost != nil { + userLost(ctx, lost) + } else if userRevoked != nil { + userRevoked(ctx, lost) + } + } + return s } @@ -139,6 +167,10 @@ func (s *GroupTransactSession) Begin() error { return s.cl.BeginTransaction() } +func (s *GroupTransactSession) failed() bool { + return s.revoked || s.lost +} + // End ends a transaction, committing if commit is true, if the group did not // rebalance since the transaction began, and if committing offsets is // successful. If commit is false, the group has rebalanced, or any partition @@ -156,9 +188,12 @@ func (s *GroupTransactSession) Begin() error { // recommended to not cancel it. func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (bool, error) { defer func() { - s.revokeMu.Lock() + s.failMu.Lock() s.revoked = false - s.revokeMu.Unlock() + s.revokedCh = make(chan struct{}) + s.lost = false + s.lostCh = make(chan struct{}) + s.failMu.Unlock() }() switch commit { @@ -174,20 +209,22 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry wantCommit := bool(commit) - s.revokeMu.Lock() - revoked := s.revoked + s.failMu.Lock() + failed := s.failed() precommit := s.cl.CommittedOffsets() postcommit := s.cl.UncommittedOffsets() - s.revokeMu.Unlock() + s.failMu.Unlock() var oldGeneration bool var commitErr error - if wantCommit && !revoked { + var g *groupConsumer + + if wantCommit && !failed { var commitErrs []string committed := make(chan struct{}) - s.cl.commitTransactionOffsets(context.Background(), postcommit, + g = s.cl.commitTransactionOffsets(context.Background(), postcommit, func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) { defer close(committed) if err != nil { @@ -215,14 +252,42 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry } } - s.revokeMu.Lock() - defer s.revokeMu.Unlock() + // Now that we have committed our offsets, before we allow them to be + // used, we force a heartbeat. By forcing a heartbeat, if there is no + // error, then we know we have up to RebalanceTimeout to write our + // EndTxnRequest without a problem. + // + // We should not be booted from the group if we receive an ok + // heartbeat, meaning that, as mentioned, we should be able to end the + // transaction safely. + var okHeartbeat bool + if g != nil && commitErr == nil { + waitHeartbeat := make(chan struct{}) + var heartbeatErr error + select { + case g.heartbeatForceCh <- func(err error) { + defer close(waitHeartbeat) + heartbeatErr = err + }: + select { + case <-waitHeartbeat: + okHeartbeat = heartbeatErr == nil + case <-s.revokedCh: + case <-s.lostCh: + } + case <-s.revokedCh: + case <-s.lostCh: + } + } + + s.failMu.Lock() + defer s.failMu.Unlock() - tryCommit := !s.revoked && commitErr == nil && !oldGeneration + tryCommit := !s.failed() && commitErr == nil && !oldGeneration && okHeartbeat willTryCommit := wantCommit && tryCommit s.cl.cfg.logger.Log(LogLevelInfo, "transaction session ending", - "was_revoked", s.revoked, + "was_failed", s.failed(), "want_commit", wantCommit, "can_try_commit", tryCommit, "will_try_commit", willTryCommit, @@ -547,13 +612,13 @@ func (cl *Client) commitTransactionOffsets( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*kmsg.TxnOffsetCommitRequest, *kmsg.TxnOffsetCommitResponse, error), -) { +) *groupConsumer { cl.cfg.logger.Log(LogLevelDebug, "in commitTransactionOffsets", "with", uncommitted) defer cl.cfg.logger.Log(LogLevelDebug, "left commitTransactionOffsets") if cl.cfg.txnID == nil { onDone(nil, nil, errNotTransactional) - return + return nil } // Before committing, ensure we are at least in a transaction. We @@ -563,18 +628,18 @@ func (cl *Client) commitTransactionOffsets( if !cl.producer.inTxn { onDone(nil, nil, errNotInTransaction) cl.producer.txnMu.Unlock() - return + return nil } cl.producer.txnMu.Unlock() g, ok := cl.consumer.loadGroup() if !ok { onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), errNotGroup) - return + return nil } if len(uncommitted) == 0 { onDone(new(kmsg.TxnOffsetCommitRequest), new(kmsg.TxnOffsetCommitResponse), nil) - return + return g } g.mu.Lock() @@ -585,12 +650,13 @@ func (cl *Client) commitTransactionOffsets( if onDone != nil { onDone(nil, nil, err) } - return + return g } g.offsetsAddedToTxn = true } g.commitTxn(ctx, uncommitted, onDone) + return g } // Ties a transactional producer to a group. Since this requires a producer ID,