Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupTransactSession Close hangs, preventing restart #753

Closed
iamnoah opened this issue Jun 3, 2024 · 5 comments · Fixed by #760
Closed

GroupTransactSession Close hangs, preventing restart #753

iamnoah opened this issue Jun 3, 2024 · 5 comments · Fixed by #760
Labels
enhancement New feature or request has pr minor

Comments

@iamnoah
Copy link
Contributor

iamnoah commented Jun 3, 2024

We have a GroupTransactSession that we use in a slightly unusual way. We don't actually consume a topic, but instead use offset commit metadata as the cursor for our transactional producer (see #559).

The session is created with these options:

		kgo.ConsumerGroup(...),
		kgo.ConsumeTopics(...),
		kgo.TransactionalID(...),
		kgo.FetchIsolationLevel(kgo.ReadCommitted()),
		kgo.BlockRebalanceOnPoll(),
		kgo.DisableAutoCommit(),
		kgo.FetchMaxWait(200 * time.Millisecond),
		kgo.OnPartitionsAssigned(func(ctx context.Context, client *kgo.Client, m map[string][]int32) {
			client.PauseFetchPartitions(m) // never actually fetch anything, we don't care what is in the topic
			// ... + some non-blocking internal state management
		}),
		kgo.OnOffsetsFetched(func(ctx context.Context, client *kgo.Client, response *kmsg.OffsetFetchResponse) error {
			// internal state management
		}),
		kgo.OnPartitionsRevoked(func(ctx context.Context, client *kgo.Client, m map[string][]int32) {
			client.ForceMetadataRefresh() // ... + some state management

When we hit the MSK auth error described in #731 we are attempting to bail out and restart the process. Sometimes we are getting stuck in LeaveGroupContext:

goroutine 235 [select, 16 minutes]:
github.com/twmb/franz-go/pkg/kgo.(*Client).LeaveGroupContext(0xc000481008, {0x14d91a8, 0xc000d58c80?})
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:254 +0x131
github.com/twmb/franz-go/pkg/kgo.(*Client).close(0xc000481008, {0x14d91a8, 0xc000d58c80})
	github.com/twmb/[email protected]/pkg/kgo/client.go:1006 +0x1ba
github.com/twmb/franz-go/pkg/kgo.(*Client).Close(...)
	github.com/twmb/[email protected]/pkg/kgo/client.go:993
github.com/twmb/franz-go/pkg/kgo.(*GroupTransactSession).Close(...)
	github.com/twmb/[email protected]/pkg/kgo/txn.go:157

This keeps the process from exiting, but with our producer unable to produce.

The loop that invokes Close looks like this (simplified):

	defer client.Close()

	for {
		// doing this to ensure we have updated partition assignments
		client.PollFetches(nil)

		// ... load some data ...

		client.ProduceSync(...)
		
		client.End(kgo.PreTxnCommitFnContext(ctx, func(request *kmsg.TxnOffsetCommitRequest) error {
			request.Topics = []kmsg.TxnOffsetCommitRequestTopic{
				{
					Topic: ...,
					Partitions: []kmsg.TxnOffsetCommitRequestTopicPartition{
						{
							Metadata: currentCursor,
						},
					},
				},
			}
			return nil
		}), kgo.TryCommit)
	}

Other franz-go routines at the time of the above trace:

goroutine 82259 [sync.Cond.Wait, 16 minutes]:
sync.runtime_notifyListWait(0xc000e4bb90, 0x0)
	runtime/sema.go:569 +0x159
sync.(*Cond).Wait(0x54aa10?)
	sync/cond.go:70 +0x85
github.com/twmb/franz-go/pkg/kgo.(*consumer).waitAndAddRebalance(0xc000481698)
	github.com/twmb/[email protected]/pkg/kgo/consumer.go:261 +0xb4
github.com/twmb/franz-go/pkg/kgo.(*Client).LeaveGroupContext.func1()
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:246 +0x32
created by github.com/twmb/franz-go/pkg/kgo.(*Client).LeaveGroupContext in goroutine 235
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:245 +0xda

goroutine 242 [chan receive, 16 minutes]:
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat(0xc000d42c80)
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:887 +0x42b
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage(0xc000d42c80)
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:392 +0x199
created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).findNewAssignments in goroutine 241
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:1786 +0x9f9

goroutine 226 [select]:
github.com/twmb/franz-go/pkg/kgo.(*Client).reapConnectionsLoop(0xc000481008)
	github.com/twmb/[email protected]/pkg/kgo/broker.go:558 +0x155
created by github.com/twmb/franz-go/pkg/kgo.NewClient in goroutine 1
	github.com/twmb/[email protected]/pkg/kgo/client.go:519 +0xbdd

goroutine 225 [select, 2 minutes]:
github.com/twmb/franz-go/pkg/kgo.(*Client).updateMetadataLoop(0xc000481008)
	github.com/twmb/[email protected]/pkg/kgo/metadata.go:173 +0x1ef
created by github.com/twmb/franz-go/pkg/kgo.NewClient in goroutine 1
	github.com/twmb/[email protected]/pkg/kgo/client.go:518 +0xb9b

goroutine 82291 [select]:
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).heartbeat(0xc000d42c80, 0xc00119d3e0, 0xc0011122e8)
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:940 +0x26c
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat.func1()
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:850 +0x11c
created by github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat in goroutine 242
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:847 +0x385

goroutine 82292 [sync.Cond.Wait, 16 minutes]:
sync.runtime_notifyListWait(0xc000e4bb90, 0x1)
	runtime/sema.go:569 +0x159
sync.(*Cond).Wait(0xc0005ad801?)
	sync/cond.go:70 +0x85
github.com/twmb/franz-go/pkg/kgo.(*consumer).waitAndAddRebalance(0xc000481698)
	github.com/twmb/[email protected]/pkg/kgo/consumer.go:261 +0xb4
github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).assign.func1()
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:793 +0x8a
created by github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).assign in goroutine 242
	github.com/twmb/[email protected]/pkg/kgo/consumer_group.go:784 +0x85

Dependencies:

	github.com/twmb/franz-go v1.15.1
	github.com/twmb/franz-go/pkg/kadm v1.9.0
	github.com/twmb/franz-go/pkg/kmsg v1.7.0

What could be causing this? I suspect the ForceMetadataRefresh and/or the fact that we don't commit offsets in OnPartitionsRevoked, but looking through the code, I don't really understand how it hangs where it does, as the consumer group ctx seems like it should already have been canceled.

@twmb
Copy link
Owner

twmb commented Jun 4, 2024

Are you exiting the poll loop somewhere without allowing rebalance? You may want to check CloseAllowingRebalance().

@twmb twmb added the waiting label Jun 4, 2024
@iamnoah
Copy link
Contributor Author

iamnoah commented Jun 5, 2024

@twmb thanks I will try that. GroupTransactSession doesn't expose that directly, so I'm replacing GroupTransactSession.Close with GroupTransactSession.Client().CloseAllowingRebalance()?

@twmb
Copy link
Owner

twmb commented Jun 7, 2024

That will work, but I'll leave this issue open as a reminder to spot check if any APIs are worth mirroring to GroupTransactSession.

@twmb twmb added enhancement New feature or request and removed waiting labels Jun 7, 2024
@twmb
Copy link
Owner

twmb commented Jun 8, 2024

From a quick audit, I think only AllowRebalance and CloseAllowingRebalance are worth adding to GroupTransactSession -- for other APIs, a user can go through Client()

twmb added a commit that referenced this issue Jun 8, 2024
…ssion

These two functions are basically required if you use the
BlockRebalanceOnPoll function when initializing a GroupTransactSession,
so, it seems worth it to promote these to direct APIs on GTS.

Closes #753.
@twmb twmb added the has pr label Jun 8, 2024
@twmb twmb added the minor label Oct 11, 2024
@twmb twmb closed this as completed in #760 Oct 15, 2024
@twmb
Copy link
Owner

twmb commented Oct 15, 2024

Bit of a delay on my side ... releasing this evening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request has pr minor
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants