Skip to content

Commit

Permalink
txns: sleep 200ms on commit, preventing rebalance / new commit
Browse files Browse the repository at this point in the history
See the updated transaction docs. This sleep will not happen if the user
requires stable fetch offsets and the cluster is 2.5+.

Ideally 200ms is fast enough especially in the context of this
preventing and incredibly rare scenario. However, the scenario is such
that if it happens, unknown undetectable duplicates could occur. 200ms
may not be long enough in light of this, but the thought here is to
allow the cluster at least a little bit of a heads up before allowing a
rebalance, which itself requires JoinGroup and SyncGroup to complete
before OffsetFetch can be issued to cause this problem.
  • Loading branch information
twmb committed Jan 6, 2022
1 parent 12eaa1e commit a059901
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 13 deletions.
54 changes: 44 additions & 10 deletions docs/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ KIP-447?
## The problem

[KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics)
bills itself as producer scalability for exactly once semantics. Mostly, this
is a KIP to add a bit more safety to EOS, and is mostly tailored for the Java
client due to how it is implemented.
bills itself as producer scalability for exactly once semantics. This
is a KIP to add more safety to EOS.

Before KIP-447, Kafka Streams was implemented to consume from only one
partition, modify records it consumed, and produce back to a new topic. Streams
Expand Down Expand Up @@ -89,8 +88,12 @@ blip of time and then later reconnects to commit.

## The franz-go approach

The franz-go client supports KIP-447, but allows consuming multiple partitions as
an EOS consumer/producer even on older (pre 2.5.0) Kafka clusters.
The franz-go client supports KIP-447, but allows consuming multiple partitions
as an EOS consumer/producer even on older (pre 2.5) Kafka clusters. There is
a very small risk of duplicates with the approach this client chooses, you can
read on below for more details. Alternatively, you can use this client exactly
like the Java client, but as with the Java client, this requires extra special
care.

To start, unlike the Java client, franz-go does not require a separate client
and producer. Instead, both are merged into one "client", and by merging them,
Expand All @@ -108,7 +111,7 @@ not, and then we will commit the transaction. To work around this, the franz-go
client forces a successful heartbeat (and a successful response) immediately
before committing the transaction. **If a heartbeat immediately before
committing is successful, then we know we can commit within the session
timeout**. Only a little bit more remains.
timeout**. Still, more remains.

Even if we commit immediately before ending a transaction, it is possible that
our commit will take so long that a rebalance happens before the commit
Expand All @@ -130,7 +133,38 @@ If a rebalance happens while committing, the OnRevoked callback is blocked
until the `EndTxn` request completes, meaning either the `EndTxn` will complete
successfully before the member is allowed to rebalance, or the `EndTxn` will
hang long enough for the member to be booted. In either scenario, we avoid our
problem.

Thus, although we do support 2.5.0+ behavior, the client itself works around
duplicates in a pre-2.5.0 world with a lot of edge case handling.
problem. Again though, more remains.

After `EndTxn`, it is possible that a rebalance could immediately happen.
Within Kafka when a transaction ends, Kafka propagates a commit marker to all
partitions that were a part of the transaction. If a rebalance finishes and the
new consumer fetches offsets _before_ the commit marker is propagated, then the
new consumer will fetch the previously committed offsets, not the newly
committed offsets. There is nothing a client can do to reliably prevent this
scenario. Here, franz-go takes a heuristic approach: the assumption is that
inter-broker communication is always inevitably faster than broker `<=>` client
communication. On successful commit, if the client is not speaking to a 2.5+
cluster (KIP-447 cluster) _or_ the client does not have
`RequireStableFetchOffsets` enabled, then the client will sleep 200ms before
releasing the lock that allows a rebalance to continue. The assumption is that
200ms is enough time for Kafka to propagate transactional markers: the
propagation should finish before a client is able to do the following: re-join,
have a new leader assign partitions, sync the assignment, and issue the offset
fetch request. In effect, the 200ms here is an attempt to provide KIP-447
semantics (waiting for stable fetch offsets) in place it matters most even
though the cluster does not support the wait officially. Internally, the sleep
is concurrent and only blocks a rebalance from beginning, it does not block
you from starting a new transaction (but, it does prevent you from _ending_
a new transaction).

One last flaw of the above approach is that a lot of it is dependent on timing.
If the servers you are running on do not have reliable clocks and may be very
out of sync, then the timing aspects above may not work. However, it is likely
your cluster will have other issues if some broker clocks are very off. It is
recommended to have alerts on ntp clock drift.

Thus, although we do support 2.5+ behavior, the client itself works around
duplicates in a pre-2.5 world with a lot of edge case handling. It is
_strongly_ recommended to use a 2.5+ cluster and to always enable
`RequireStableFetchOffsets`. The option itself has more documentation on
what other settings may need to be tweaked.
4 changes: 3 additions & 1 deletion pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,7 +1309,9 @@ func HeartbeatInterval(interval time.Duration) GroupOpt {
// that a transactional producer could be committing to.
//
// With this option, Kafka will block group consumers from fetching offsets for
// partitions that are in an active transaction.
// partitions that are in an active transaction. This option is **strongly**
// recommended to help prevent duplication problems. See this repo's KIP-447
// doc to learn more.
//
// Because this can block consumption, it is strongly recommended to set
// transactional timeouts to a small value (10s) rather than the default 60s.
Expand Down
44 changes: 42 additions & 2 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
// GroupTransactSession abstracts away the proper way to begin a transaction
// and more importantly how to end a transaction when consuming in a group,
// modifying records, and producing (EOS transaction).
//
// If you are running Kafka 2.5+, it is strongly recommended that you also use
// RequireStableFetchOffsets. See that config option's documentation for more
// details.
type GroupTransactSession struct {
cl *Client

Expand Down Expand Up @@ -62,6 +66,13 @@ type GroupTransactSession struct {
// rebalance timeout, but this is just one request with no cpu logic. With a
// proper rebalance timeout, this single request will not fail and the commit
// will succeed properly.
//
// If this client detects you are talking to a pre-2.5 cluster, OR if you have
// not enabled RequireStableFetchOffsets, the client will sleep for 200ms after
// a successful commit to allow Kafka's txn markers to propagate. This is not
// foolproof in the event of some extremely unlikely communication patterns and
// **potentially** could allow duplicates. See this repo's transaction's doc
// for more details.
func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
s := &GroupTransactSession{
revokedCh: make(chan struct{}),
Expand Down Expand Up @@ -219,7 +230,7 @@ func (s *GroupTransactSession) failed() bool {
// and prevents new requests (multiple requests are issued at the end of a
// transact session). Thus, while a context is allowed, it is strongly
// recommended to not cancel it.
func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (bool, error) {
func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (committed bool, err error) {
defer func() {
s.failMu.Lock()
s.revoked = false
Expand Down Expand Up @@ -253,12 +264,14 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
var commitErr error
var g *groupConsumer

kip447 := false
if wantCommit && !failed {
var commitErrs []string

committed := make(chan struct{})
g = s.cl.commitTransactionOffsets(context.Background(), postcommit,
func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
kip447 = resp.Version >= 3
defer close(committed)
if err != nil {
commitErrs = append(commitErrs, err.Error())
Expand Down Expand Up @@ -318,7 +331,33 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry
}

s.failMu.Lock()
defer s.failMu.Unlock()

// If we know we are KIP-447 and the user is requiring stable, we can
// unlock immediately because Kafka will itself block a rebalance
// fetching offsets from oustanding transactions.
//
// If either of these are false, we spin up a goroutine that sleeps for
// 200ms before unlocking to give Kafka a chance to avoid some odd race
// that would permit duplicates (i.e., what KIP-447 is preventing).
//
// This 200ms is not perfect but it should be well enough time on a
// stable cluster. On an unstable cluster, I still expect clients to be
// slower than intra-cluster communication, but there is a risk.
if kip447 && s.cl.cfg.requireStable {
defer s.failMu.Unlock()
} else {
defer func() {
if committed {
s.cl.cfg.logger.Log(LogLevelDebug, "sleeping 200ms before allowing a rebalance to continue to give Kafka a chance to write txn markers and avoid duplicates")
go func() {
time.Sleep(200 * time.Millisecond)
s.failMu.Unlock()
}()
} else {
s.failMu.Unlock()
}
}()
}

tryCommit := !s.failed() && commitErr == nil && !hasAbortableCommitErr && okHeartbeat
willTryCommit := wantCommit && tryCommit
Expand Down Expand Up @@ -368,6 +407,7 @@ retryUnattempted:
return false, endTxnErr

default: // both errs nil
committed = willTryCommit
return willTryCommit, nil
}
}
Expand Down

0 comments on commit a059901

Please sign in to comment.